搜 索

支付结果通知系统设计方案

  • 2阅读
  • 2025年11月01日
  • 0评论
首页 / 支付相关 / 正文

一、核心挑战与设计目标

支付结果通知是连接支付平台与商户的"最后一公里"。通知丢失 = 商户不知道钱到没到 = 用户投诉 + 商户流失。系统设计必须满足:

指标目标说明
时效性p99 < 3s支付完成后 3 秒内商户收到通知
可靠性99.99%消息不丢失,至少投递一次
可用性99.99%通知系统挂了不影响支付主链路
幂等性100%重复通知不会导致商户重复处理

二、整体架构

┌──────────────┐
│  支付核心系统  │
└──────┬───────┘
       │ ① 支付完成,写DB + 发MQ(事务消息)
       ▼
┌──────────────┐     ┌─────────────────┐
│  消息队列     │────▶│  通知调度服务     │
│  (Kafka/     │     │  (Dispatcher)    │
│   RocketMQ)  │     └────────┬────────┘
└──────────────┘              │
                   ┌──────────┼──────────┐
                   ▼          ▼          ▼
              ┌────────┐ ┌────────┐ ┌────────┐
              │Worker 1│ │Worker 2│ │Worker N│
              └───┬────┘ └───┬────┘ └───┬────┘
                  │          │          │
                  ▼          ▼          ▼
             ┌──────────────────────────────┐
             │     商户回调接口 (Webhook)     │
             └──────────────────────────────┘
                  │
                  ▼ ② 商户未应答?
             ┌──────────────────┐
             │  重试调度器        │
             │  (指数退避重试)    │
             │  1s→5s→30s→5m→1h │
             └──────────────────┘
                  │
                  ▼ ③ 全部重试失败?
             ┌──────────────────┐
             │  死信队列 + 告警   │
             │  人工介入/商户查询  │
             └──────────────────┘

三、关键设计决策

3.1 消息不丢:事务消息 + 本地消息表

支付系统最怕的就是"钱扣了但通知没发出去"。用本地消息表兜底:

支付事务 {
    1. 更新支付订单状态 = SUCCESS
    2. 插入 notify_message 表(同一个DB事务)
    3. 提交事务
}

异步扫描器 {
    扫描 notify_message 中 status=PENDING 的记录
    发送到 MQ
    更新 status=SENT
}

这样即使 MQ 挂了,本地消息表里的数据不会丢,扫描器会持续重试。

3.2 通知不重:幂等设计

商户侧必须根据 notify_id 做幂等处理,我们在通知协议中明确:

  • 每条通知携带全局唯一的 notify_id
  • 商户收到后返回 SUCCESS 表示已处理
  • 我们可能会重复发送,商户必须能处理重复

3.3 通知够快:分级投递

不是所有通知都需要同等对待,按金额和商户等级分优先级:

  • P0:大额交易 / VIP商户 → 独立队列,独立 Worker 池,专线投递
  • P1:普通交易 → 标准队列
  • P2:小额交易 → 批量队列,可稍延迟

3.4 系统不挂:多活 + 降级

  • 通知服务多机房部署,任一机房宕机自动切换
  • MQ 不可用时降级为本地消息表直接投递
  • 商户接口超时时快速失败进入重试队列,不阻塞主流程

四、数据模型

CREATE TABLE notify_message (
    id              BIGINT PRIMARY KEY AUTO_INCREMENT,
    notify_id       VARCHAR(64) NOT NULL UNIQUE,     -- 全局唯一通知ID
    order_id        VARCHAR(64) NOT NULL,             -- 支付订单号
    merchant_id     VARCHAR(32) NOT NULL,             -- 商户ID
    notify_url      VARCHAR(512) NOT NULL,            -- 商户回调地址
    notify_body     TEXT NOT NULL,                    -- 通知内容(JSON)
    priority        TINYINT DEFAULT 1,                -- 0=P0, 1=P1, 2=P2
    status          TINYINT DEFAULT 0,                -- 0=PENDING, 1=SENDING, 2=SUCCESS, 3=FAILED
    retry_count     INT DEFAULT 0,                    -- 已重试次数
    max_retry       INT DEFAULT 8,                    -- 最大重试次数
    next_retry_at   DATETIME,                         -- 下次重试时间
    last_response   VARCHAR(512),                     -- 商户最后响应
    last_http_code  INT,                              -- 最后HTTP状态码
    created_at      DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at      DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_status_retry (status, next_retry_at),
    INDEX idx_merchant_order (merchant_id, order_id)
);

五、Go 实现

5.1 项目结构

payment-notify/
├── main.go
├── config/
│   └── config.go
├── model/
│   └── notify.go          // 数据模型
├── producer/
│   └── producer.go        // 消息生产者(本地消息表写入)
├── dispatcher/
│   └── dispatcher.go      // 调度器(消费MQ,分发给Worker)
├── worker/
│   └── worker.go          // 通知投递Worker
├── retry/
│   └── scheduler.go       // 重试调度器
├── store/
│   └── mysql.go           // 持久化层
└── scanner/
    └── scanner.go         // 本地消息表扫描器

5.2 核心数据模型

// model/notify.go
package model

import "time"

type NotifyStatus int

const (
    StatusPending  NotifyStatus = 0
    StatusSending  NotifyStatus = 1
    StatusSuccess  NotifyStatus = 2
    StatusFailed   NotifyStatus = 3
)

type Priority int

const (
    PriorityP0 Priority = 0 // VIP/大额
    PriorityP1 Priority = 1 // 普通
    PriorityP2 Priority = 2 // 小额
)

// NotifyMessage 通知消息实体
type NotifyMessage struct {
    ID            int64        `json:"id" db:"id"`
    NotifyID      string       `json:"notify_id" db:"notify_id"`
    OrderID       string       `json:"order_id" db:"order_id"`
    MerchantID    string       `json:"merchant_id" db:"merchant_id"`
    NotifyURL     string       `json:"notify_url" db:"notify_url"`
    NotifyBody    string       `json:"notify_body" db:"notify_body"`
    Priority      Priority     `json:"priority" db:"priority"`
    Status        NotifyStatus `json:"status" db:"status"`
    RetryCount    int          `json:"retry_count" db:"retry_count"`
    MaxRetry      int          `json:"max_retry" db:"max_retry"`
    NextRetryAt   *time.Time   `json:"next_retry_at" db:"next_retry_at"`
    LastResponse  string       `json:"last_response" db:"last_response"`
    LastHTTPCode  int          `json:"last_http_code" db:"last_http_code"`
    CreatedAt     time.Time    `json:"created_at" db:"created_at"`
    UpdatedAt     time.Time    `json:"updated_at" db:"updated_at"`
}

// NotifyResult 商户回调结果
type NotifyResult struct {
    NotifyID   string
    HTTPCode   int
    Response   string
    Success    bool
    Duration   time.Duration
    Error      error
}

// RetryStrategy 重试策略:指数退避
// 第1次: 5s, 第2次: 30s, 第3次: 5min, 第4次: 30min,
// 第5次: 1h, 第6次: 2h, 第7次: 6h, 第8次: 24h
var RetryIntervals = []time.Duration{
    5 * time.Second,
    30 * time.Second,
    5 * time.Minute,
    30 * time.Minute,
    1 * time.Hour,
    2 * time.Hour,
    6 * time.Hour,
    24 * time.Hour,
}

func GetNextRetryTime(retryCount int) time.Time {
    if retryCount >= len(RetryIntervals) {
        return time.Now().Add(RetryIntervals[len(RetryIntervals)-1])
    }
    return time.Now().Add(RetryIntervals[retryCount])
}

5.3 消息生产者(本地消息表 + MQ 双写)

// producer/producer.go
package producer

import (
    "context"
    "database/sql"
    "encoding/json"
    "fmt"
    "log/slog"

    "github.com/google/uuid"
    "github.com/segmentio/kafka-go"

    "payment-notify/model"
)

type Producer struct {
    db          *sql.DB
    kafkaWriter *kafka.Writer
    logger      *slog.Logger
}

func NewProducer(db *sql.DB, kafkaWriter *kafka.Writer, logger *slog.Logger) *Producer {
    return &Producer{db: db, kafkaWriter: kafkaWriter, logger: logger}
}

// PaymentResult 支付核心传过来的支付结果
type PaymentResult struct {
    OrderID    string          `json:"order_id"`
    MerchantID string          `json:"merchant_id"`
    Amount     int64           `json:"amount"`
    Currency   string          `json:"currency"`
    Status     string          `json:"status"`
    NotifyURL  string          `json:"notify_url"`
    Priority   model.Priority  `json:"priority"`
    Extra      json.RawMessage `json:"extra,omitempty"`
}

// ProduceNotification 在支付事务中调用,保证消息不丢
// 核心思路:本地DB事务写入 + 尝试投递MQ(MQ失败不影响事务)
func (p *Producer) ProduceNotification(ctx context.Context, tx *sql.Tx, result *PaymentResult) error {
    notifyID := uuid.New().String()

    // 构建通知体(发给商户的内容)
    body, _ := json.Marshal(map[string]interface{}{
        "notify_id":  notifyID,
        "order_id":   result.OrderID,
        "amount":     result.Amount,
        "currency":   result.Currency,
        "status":     result.Status,
        "timestamp":  "2026-03-02T12:00:00Z",
        "extra":      result.Extra,
    })

    // ① 在支付事务内写入本地消息表
    _, err := tx.ExecContext(ctx, `
        INSERT INTO notify_message 
            (notify_id, order_id, merchant_id, notify_url, notify_body, priority, status, max_retry)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
        notifyID, result.OrderID, result.MerchantID,
        result.NotifyURL, string(body), result.Priority,
        model.StatusPending, 8,
    )
    if err != nil {
        return fmt.Errorf("insert notify_message failed: %w", err)
    }

    // ② 尝试投递到 Kafka(失败不回滚事务,由 Scanner 兜底)
    go func() {
        kafkaMsg := kafka.Message{
            Key:   []byte(result.MerchantID),
            Value: body,
            Headers: []kafka.Header{
                {Key: "notify_id", Value: []byte(notifyID)},
                {Key: "priority", Value: []byte(fmt.Sprintf("%d", result.Priority))},
            },
        }
        if err := p.kafkaWriter.WriteMessages(context.Background(), kafkaMsg); err != nil {
            p.logger.Warn("kafka write failed, scanner will retry",
                "notify_id", notifyID,
                "error", err,
            )
        }
    }()

    return nil
}

5.4 本地消息表扫描器(兜底保障)

// scanner/scanner.go
package scanner

import (
    "context"
    "database/sql"
    "encoding/json"
    "fmt"
    "log/slog"
    "time"

    "github.com/segmentio/kafka-go"

    "payment-notify/model"
)

// Scanner 定期扫描本地消息表,把漏掉的消息补发到 MQ
// 这是保证消息不丢的最后一道防线
type Scanner struct {
    db          *sql.DB
    kafkaWriter *kafka.Writer
    logger      *slog.Logger
    interval    time.Duration // 扫描间隔
    batchSize   int           // 每次扫描条数
}

func NewScanner(db *sql.DB, writer *kafka.Writer, logger *slog.Logger) *Scanner {
    return &Scanner{
        db:          db,
        kafkaWriter: writer,
        logger:      logger,
        interval:    3 * time.Second, // 每3秒扫描一次
        batchSize:   100,
    }
}

func (s *Scanner) Start(ctx context.Context) {
    ticker := time.NewTicker(s.interval)
    defer ticker.Stop()

    s.logger.Info("scanner started", "interval", s.interval)

    for {
        select {
        case <-ctx.Done():
            s.logger.Info("scanner stopped")
            return
        case <-ticker.C:
            if err := s.scan(ctx); err != nil {
                s.logger.Error("scan failed", "error", err)
            }
        }
    }
}

func (s *Scanner) scan(ctx context.Context) error {
    // 查找状态为 PENDING 且创建超过 5 秒的记录
    // (给 Producer 的异步 Kafka 投递留一点缓冲时间)
    rows, err := s.db.QueryContext(ctx, `
        SELECT id, notify_id, merchant_id, notify_body, priority
        FROM notify_message
        WHERE status = ? AND created_at < DATE_SUB(NOW(), INTERVAL 5 SECOND)
        ORDER BY priority ASC, created_at ASC
        LIMIT ?`,
        model.StatusPending, s.batchSize,
    )
    if err != nil {
        return fmt.Errorf("query pending messages: %w", err)
    }
    defer rows.Close()

    var messages []kafka.Message
    var ids []int64

    for rows.Next() {
        var (
            id         int64
            notifyID   string
            merchantID string
            body       string
            priority   int
        )
        if err := rows.Scan(&id, &notifyID, &merchantID, &body, &priority); err != nil {
            continue
        }
        messages = append(messages, kafka.Message{
            Key:   []byte(merchantID),
            Value: []byte(body),
            Headers: []kafka.Header{
                {Key: "notify_id", Value: []byte(notifyID)},
                {Key: "priority", Value: []byte(fmt.Sprintf("%d", priority))},
            },
        })
        ids = append(ids, id)
    }

    if len(messages) == 0 {
        return nil
    }

    s.logger.Info("scanner found pending messages", "count", len(messages))

    // 批量投递到 Kafka
    if err := s.kafkaWriter.WriteMessages(ctx, messages...); err != nil {
        return fmt.Errorf("kafka batch write: %w", err)
    }

    // 更新状态为 SENDING
    query := fmt.Sprintf(
        "UPDATE notify_message SET status = %d WHERE id IN (%s)",
        model.StatusSending, intSliceToSQL(ids),
    )
    _, _ = s.db.ExecContext(ctx, query)

    return nil
}

func intSliceToSQL(ids []int64) string {
    result, _ := json.Marshal(ids)
    // [1,2,3] -> 1,2,3
    return string(result[1 : len(result)-1])
}

5.5 通知投递 Worker(核心:快速 + 签名 + 熔断)

// worker/worker.go
package worker

import (
    "bytes"
    "context"
    "crypto/hmac"
    "crypto/sha256"
    "encoding/hex"
    "encoding/json"
    "fmt"
    "io"
    "log/slog"
    "net"
    "net/http"
    "sync"
    "time"

    "payment-notify/model"
)

// Worker 负责实际的 HTTP 回调投递
type Worker struct {
    client       *http.Client
    store        NotifyStore
    logger       *slog.Logger
    signKey      string
    breakers     map[string]*CircuitBreaker // 每个商户一个熔断器
    breakerMu    sync.RWMutex
    metrics      *Metrics
}

// NotifyStore 持久化接口
type NotifyStore interface {
    UpdateStatus(ctx context.Context, notifyID string, status model.NotifyStatus,
        httpCode int, response string) error
    ScheduleRetry(ctx context.Context, notifyID string, retryCount int,
        nextRetryAt time.Time) error
    MarkDead(ctx context.Context, notifyID string) error
}

// Metrics 监控指标
type Metrics struct {
    mu              sync.Mutex
    TotalSent       int64
    TotalSuccess    int64
    TotalFailed     int64
    TotalTimeout    int64
    AvgLatencyMs    float64
}

func NewWorker(store NotifyStore, signKey string, logger *slog.Logger) *Worker {
    // 定制 HTTP Client:连接池 + 超时控制
    transport := &http.Transport{
        DialContext: (&net.Dialer{
            Timeout:   3 * time.Second,  // 连接超时
            KeepAlive: 30 * time.Second,
        }).DialContext,
        MaxIdleConns:          200,
        MaxIdleConnsPerHost:   20,
        IdleConnTimeout:       90 * time.Second,
        TLSHandshakeTimeout:  3 * time.Second,
        ResponseHeaderTimeout: 5 * time.Second, // 读响应头超时
    }

    return &Worker{
        client: &http.Client{
            Transport: transport,
            Timeout:   10 * time.Second, // 整体超时
        },
        store:    store,
        logger:   logger,
        signKey:  signKey,
        breakers: make(map[string]*CircuitBreaker),
        metrics:  &Metrics{},
    }
}

// Deliver 投递单条通知
func (w *Worker) Deliver(ctx context.Context, msg *model.NotifyMessage) *model.NotifyResult {
    result := &model.NotifyResult{NotifyID: msg.NotifyID}
    start := time.Now()

    // ① 检查熔断器:如果该商户连续失败太多次,暂时跳过
    breaker := w.getBreaker(msg.MerchantID)
    if !breaker.Allow() {
        w.logger.Warn("circuit breaker open, skip delivery",
            "merchant_id", msg.MerchantID,
            "notify_id", msg.NotifyID,
        )
        result.Error = fmt.Errorf("circuit breaker open for merchant %s", msg.MerchantID)
        w.handleFailure(ctx, msg, result)
        return result
    }

    // ② 构建签名请求
    req, err := w.buildRequest(ctx, msg)
    if err != nil {
        result.Error = err
        w.handleFailure(ctx, msg, result)
        return result
    }

    // ③ 发送 HTTP 请求
    resp, err := w.client.Do(req)
    result.Duration = time.Since(start)

    if err != nil {
        result.Error = err
        breaker.RecordFailure()
        w.handleFailure(ctx, msg, result)
        return result
    }
    defer resp.Body.Close()

    // ④ 读取响应
    body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024)) // 限制读取大小
    result.HTTPCode = resp.StatusCode
    result.Response = string(body)

    // ⑤ 判断是否成功(商户返回 200 且 body 包含 SUCCESS)
    if resp.StatusCode == http.StatusOK && isSuccessResponse(body) {
        result.Success = true
        breaker.RecordSuccess()
        w.handleSuccess(ctx, msg, result)
    } else {
        breaker.RecordFailure()
        w.handleFailure(ctx, msg, result)
    }

    // ⑥ 更新监控指标
    w.updateMetrics(result)

    return result
}

// buildRequest 构建带签名的 HTTP 请求
func (w *Worker) buildRequest(ctx context.Context, msg *model.NotifyMessage) (*http.Request, error) {
    req, err := http.NewRequestWithContext(ctx, http.MethodPost, msg.NotifyURL,
        bytes.NewBufferString(msg.NotifyBody))
    if err != nil {
        return nil, fmt.Errorf("build request: %w", err)
    }

    timestamp := fmt.Sprintf("%d", time.Now().Unix())

    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("X-Notify-ID", msg.NotifyID)
    req.Header.Set("X-Timestamp", timestamp)
    req.Header.Set("X-Signature", w.sign(msg.NotifyBody, timestamp))
    req.Header.Set("User-Agent", "PaymentNotify/1.0")

    return req, nil
}

// sign HMAC-SHA256 签名,防止通知被伪造
func (w *Worker) sign(body, timestamp string) string {
    mac := hmac.New(sha256.New, []byte(w.signKey))
    mac.Write([]byte(body + timestamp))
    return hex.EncodeToString(mac.Sum(nil))
}

func (w *Worker) handleSuccess(ctx context.Context, msg *model.NotifyMessage, result *model.NotifyResult) {
    w.logger.Info("notify delivered",
        "notify_id", msg.NotifyID,
        "merchant_id", msg.MerchantID,
        "duration_ms", result.Duration.Milliseconds(),
    )
    _ = w.store.UpdateStatus(ctx, msg.NotifyID, model.StatusSuccess,
        result.HTTPCode, result.Response)
}

func (w *Worker) handleFailure(ctx context.Context, msg *model.NotifyMessage, result *model.NotifyResult) {
    w.logger.Warn("notify delivery failed",
        "notify_id", msg.NotifyID,
        "merchant_id", msg.MerchantID,
        "retry_count", msg.RetryCount,
        "http_code", result.HTTPCode,
        "error", result.Error,
    )

    // 还有重试机会 → 计算下次重试时间
    if msg.RetryCount < msg.MaxRetry {
        nextRetry := model.GetNextRetryTime(msg.RetryCount)
        _ = w.store.ScheduleRetry(ctx, msg.NotifyID, msg.RetryCount+1, nextRetry)
    } else {
        // 重试耗尽 → 进入死信,触发告警
        w.logger.Error("notify max retry exhausted, entering dead letter",
            "notify_id", msg.NotifyID,
            "merchant_id", msg.MerchantID,
        )
        _ = w.store.MarkDead(ctx, msg.NotifyID)
        // TODO: 发送告警(钉钉/Slack/PagerDuty)
    }
}

func (w *Worker) getBreaker(merchantID string) *CircuitBreaker {
    w.breakerMu.RLock()
    b, ok := w.breakers[merchantID]
    w.breakerMu.RUnlock()
    if ok {
        return b
    }

    w.breakerMu.Lock()
    defer w.breakerMu.Unlock()
    b = NewCircuitBreaker(5, 30*time.Second) // 连续5次失败 → 熔断30秒
    w.breakers[merchantID] = b
    return b
}

func (w *Worker) updateMetrics(result *model.NotifyResult) {
    w.metrics.mu.Lock()
    defer w.metrics.mu.Unlock()
    w.metrics.TotalSent++
    if result.Success {
        w.metrics.TotalSuccess++
    } else {
        w.metrics.TotalFailed++
    }
}

func isSuccessResponse(body []byte) bool {
    var resp map[string]interface{}
    if err := json.Unmarshal(body, &resp); err != nil {
        // 兼容纯文本 SUCCESS
        return string(body) == "SUCCESS"
    }
    code, _ := resp["code"].(string)
    return code == "SUCCESS" || code == "OK"
}

5.6 熔断器(保护商户接口不可用时不疯狂重试)

// worker/circuit_breaker.go
package worker

import (
    "sync"
    "time"
)

type BreakerState int

const (
    StateClosed   BreakerState = iota // 正常
    StateOpen                         // 熔断
    StateHalfOpen                     // 半开(试探)
)

// CircuitBreaker 简易熔断器
// 当某个商户的回调接口连续失败 N 次后,暂停投递一段时间
// 避免对已经宕机的商户接口做无意义的重试
type CircuitBreaker struct {
    mu             sync.Mutex
    state          BreakerState
    failureCount   int
    failureThresh  int           // 连续失败多少次触发熔断
    resetTimeout   time.Duration // 熔断持续时间
    lastFailureAt  time.Time
}

func NewCircuitBreaker(threshold int, timeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        state:         StateClosed,
        failureThresh: threshold,
        resetTimeout:  timeout,
    }
}

func (cb *CircuitBreaker) Allow() bool {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    switch cb.state {
    case StateClosed:
        return true
    case StateOpen:
        // 超过冷却期 → 转半开,允许一次试探
        if time.Since(cb.lastFailureAt) > cb.resetTimeout {
            cb.state = StateHalfOpen
            return true
        }
        return false
    case StateHalfOpen:
        return true // 半开状态允许一次请求
    }
    return false
}

func (cb *CircuitBreaker) RecordSuccess() {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    cb.failureCount = 0
    cb.state = StateClosed
}

func (cb *CircuitBreaker) RecordFailure() {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    cb.failureCount++
    cb.lastFailureAt = time.Now()

    if cb.failureCount >= cb.failureThresh {
        cb.state = StateOpen
    }
}

5.7 调度器(消费 MQ + 分优先级派发)

// dispatcher/dispatcher.go
package dispatcher

import (
    "context"
    "encoding/json"
    "log/slog"
    "strconv"
    "sync"

    "github.com/segmentio/kafka-go"

    "payment-notify/model"
    "payment-notify/worker"
)

// Dispatcher 从 Kafka 消费消息,按优先级分发给不同的 Worker 池
type Dispatcher struct {
    readers  map[model.Priority]*kafka.Reader
    pools    map[model.Priority]*WorkerPool
    logger   *slog.Logger
}

type WorkerPool struct {
    workers    []*worker.Worker
    ch         chan *model.NotifyMessage
    workerSize int
    wg         sync.WaitGroup
}

func NewDispatcher(brokers []string, w *worker.Worker, logger *slog.Logger) *Dispatcher {
    d := &Dispatcher{
        readers: make(map[model.Priority]*kafka.Reader),
        pools:   make(map[model.Priority]*WorkerPool),
        logger:  logger,
    }

    // 每个优先级一个独立的 Kafka Topic + Worker 池
    configs := map[model.Priority]struct {
        topic      string
        poolSize   int
        bufferSize int
    }{
        model.PriorityP0: {"notify-p0", 20, 1000},  // VIP:20个 Worker
        model.PriorityP1: {"notify-p1", 10, 5000},   // 普通:10个 Worker
        model.PriorityP2: {"notify-p2", 5, 10000},   // 小额:5个 Worker
    }

    for priority, cfg := range configs {
        d.readers[priority] = kafka.NewReader(kafka.ReaderConfig{
            Brokers:  brokers,
            Topic:    cfg.topic,
            GroupID:  "notify-dispatcher",
            MinBytes: 1e3,
            MaxBytes: 10e6,
        })
        d.pools[priority] = &WorkerPool{
            ch:         make(chan *model.NotifyMessage, cfg.bufferSize),
            workerSize: cfg.poolSize,
        }
    }

    return d
}

func (d *Dispatcher) Start(ctx context.Context) {
    // 启动所有 Worker 池
    for priority, pool := range d.pools {
        for i := 0; i < pool.workerSize; i++ {
            pool.wg.Add(1)
            go func(p model.Priority, id int) {
                defer pool.wg.Done()
                d.logger.Info("worker started", "priority", p, "worker_id", id)
                for msg := range pool.ch {
                    // 这里复用同一个 worker 实例(线程安全)
                    // 实际项目中每个 goroutine 可以持有独立的 worker
                    _ = d.pools[p] // 占位,实际调用 worker.Deliver
                    _ = msg
                }
            }(priority, i)
        }
    }

    // 启动 Kafka 消费者
    for priority, reader := range d.readers {
        go d.consume(ctx, priority, reader)
    }
}

func (d *Dispatcher) consume(ctx context.Context, priority model.Priority, reader *kafka.Reader) {
    for {
        select {
        case <-ctx.Done():
            return
        default:
        }

        kafkaMsg, err := reader.ReadMessage(ctx)
        if err != nil {
            d.logger.Error("kafka read error", "priority", priority, "error", err)
            continue
        }

        var msg model.NotifyMessage
        if err := json.Unmarshal(kafkaMsg.Value, &msg); err != nil {
            d.logger.Error("unmarshal notify message", "error", err)
            continue
        }

        // 从 Header 中提取优先级
        for _, h := range kafkaMsg.Headers {
            if h.Key == "priority" {
                p, _ := strconv.Atoi(string(h.Value))
                msg.Priority = model.Priority(p)
            }
        }

        // 投递到对应优先级的 Worker 池
        pool, ok := d.pools[msg.Priority]
        if !ok {
            pool = d.pools[model.PriorityP1] // 降级到普通队列
        }

        select {
        case pool.ch <- &msg:
        default:
            // 队列满了 → 降级处理
            d.logger.Warn("worker pool full, message delayed",
                "priority", msg.Priority,
                "notify_id", msg.NotifyID,
            )
        }
    }
}

func (d *Dispatcher) Stop() {
    for _, pool := range d.pools {
        close(pool.ch)
        pool.wg.Wait()
    }
    for _, reader := range d.readers {
        _ = reader.Close()
    }
}

5.8 重试调度器(指数退避 + 精准调度)

// retry/scheduler.go
package retry

import (
    "context"
    "database/sql"
    "log/slog"
    "time"

    "payment-notify/model"
    "payment-notify/worker"
)

// RetryScheduler 定期扫描需要重试的消息并重新投递
type RetryScheduler struct {
    db        *sql.DB
    worker    *worker.Worker
    logger    *slog.Logger
    interval  time.Duration
    batchSize int
}

func NewRetryScheduler(db *sql.DB, w *worker.Worker, logger *slog.Logger) *RetryScheduler {
    return &RetryScheduler{
        db:        db,
        worker:    w,
        logger:    logger,
        interval:  5 * time.Second,
        batchSize: 50,
    }
}

func (r *RetryScheduler) Start(ctx context.Context) {
    ticker := time.NewTicker(r.interval)
    defer ticker.Stop()

    r.logger.Info("retry scheduler started")

    for {
        select {
        case <-ctx.Done():
            r.logger.Info("retry scheduler stopped")
            return
        case <-ticker.C:
            r.processRetries(ctx)
        }
    }
}

func (r *RetryScheduler) processRetries(ctx context.Context) {
    rows, err := r.db.QueryContext(ctx, `
        SELECT id, notify_id, order_id, merchant_id, notify_url, 
               notify_body, priority, status, retry_count, max_retry
        FROM notify_message
        WHERE status = ? AND next_retry_at <= NOW()
        ORDER BY priority ASC, next_retry_at ASC
        LIMIT ?`,
        model.StatusFailed, r.batchSize,
    )
    if err != nil {
        r.logger.Error("query retry messages", "error", err)
        return
    }
    defer rows.Close()

    for rows.Next() {
        var msg model.NotifyMessage
        if err := rows.Scan(
            &msg.ID, &msg.NotifyID, &msg.OrderID, &msg.MerchantID,
            &msg.NotifyURL, &msg.NotifyBody, &msg.Priority, &msg.Status,
            &msg.RetryCount, &msg.MaxRetry,
        ); err != nil {
            r.logger.Error("scan retry message", "error", err)
            continue
        }

        r.logger.Info("retrying notification",
            "notify_id", msg.NotifyID,
            "retry_count", msg.RetryCount,
            "merchant_id", msg.MerchantID,
        )

        // 直接调用 Worker 投递(不再过 MQ,减少延迟)
        result := r.worker.Deliver(ctx, &msg)
        if result.Success {
            r.logger.Info("retry succeeded",
                "notify_id", msg.NotifyID,
                "total_retries", msg.RetryCount,
            )
        }
    }
}

5.9 主程序入口

// main.go
package main

import (
    "context"
    "database/sql"
    "log/slog"
    "os"
    "os/signal"
    "syscall"

    _ "github.com/go-sql-driver/mysql"
    "github.com/segmentio/kafka-go"

    "payment-notify/dispatcher"
    "payment-notify/retry"
    "payment-notify/scanner"
    "payment-notify/worker"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
        Level: slog.LevelInfo,
    }))

    // 数据库连接
    db, err := sql.Open("mysql", "root:password@tcp(localhost:3306)/payment?parseTime=true")
    if err != nil {
        logger.Error("database connection failed", "error", err)
        os.Exit(1)
    }
    db.SetMaxOpenConns(50)
    db.SetMaxIdleConns(25)

    // Kafka Writer
    kafkaWriter := &kafka.Writer{
        Addr:         kafka.TCP("localhost:9092"),
        Balancer:     &kafka.LeastBytes{},
        BatchTimeout: 10 * time.Millisecond, // 低延迟
        RequiredAcks: kafka.RequireAll,       // 所有副本确认
    }

    // 初始化各组件
    store := NewMySQLStore(db)  // 实现 NotifyStore 接口
    w := worker.NewWorker(store, "your-hmac-secret-key", logger)
    scan := scanner.NewScanner(db, kafkaWriter, logger)
    disp := dispatcher.NewDispatcher([]string{"localhost:9092"}, w, logger)
    retrySched := retry.NewRetryScheduler(db, w, logger)

    ctx, cancel := context.WithCancel(context.Background())

    // 启动所有组件
    go scan.Start(ctx)
    go disp.Start(ctx)
    go retrySched.Start(ctx)

    logger.Info("payment notification system started")

    // 优雅关闭
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    <-sigCh

    logger.Info("shutting down...")
    cancel()
    disp.Stop()
    _ = kafkaWriter.Close()
    _ = db.Close()
    logger.Info("shutdown complete")
}

六、可靠性保障全景

时间线 ────────────────────────────────────────────────▶

支付完成
  │
  ├─ ① DB事务写入 notify_message ──────── 保证消息不丢
  │
  ├─ ② 异步投 Kafka ─── 成功 ──▶ Dispatcher ──▶ Worker ──▶ 商户
  │                        │                              │
  │                        │                    返回 SUCCESS ──▶ 完成 ✓
  │                        │                              │
  │                        │                    返回失败/超时
  │                        │                        │
  │                        │                ③ 重试调度器(指数退避)
  │                        │                   5s → 30s → 5m → 30m → 1h → 2h → 6h → 24h
  │                        │                        │
  │                        │                  全部重试失败
  │                        │                        │
  │                        │                ④ 死信队列 + 告警 + 人工介入
  │                        │
  │                    投递失败
  │                        │
  │                ⑤ Scanner 兜底扫描 ──▶ 重新投 Kafka ──▶ 重走 ②
  │
  ├─ ⑥ 商户主动查询接口 ──── 随时可查,不依赖通知
  │
  └─ ⑦ 对账系统 T+1 ──── 最终一致性兜底

七、监控告警体系

告警项阈值告警渠道
通知延迟 p99 > 5s持续 1 分钟Slack + PagerDuty
通知成功率 < 99%持续 5 分钟Slack
单商户连续失败 > 10 次立即邮件通知商户
死信队列积压 > 100立即PagerDuty
本地消息表 PENDING > 1000持续 1 分钟Slack
Kafka 消费延迟 > 10000持续 2 分钟PagerDuty

八、性能优化要点

连接池复用:HTTP Client 配置了连接池(MaxIdleConnsPerHost=20),避免每次通知都建立新连接。对于高频商户,TCP 连接常驻,省掉了 TLS 握手开销。

批量提交:Scanner 批量扫描 + 批量投递 Kafka,减少 DB 和 MQ 的 round trip。

分级队列:P0/P1/P2 分离,VIP 商户的大额通知不会被小额通知堵住。

熔断保护:当某商户接口宕机时,熔断器会暂停对该商户的投递,释放 Worker 给其他商户用。

异步签名:签名计算放在 Worker 而不是 Dispatcher,利用 Worker 池的并发能力。

九、方案总结

这套系统的核心思路就是多层防线、层层兜底

  1. 本地消息表 — 即使 MQ 挂了,消息也不丢
  2. Scanner 扫描器 — 漏掉的消息会被捞起来重发
  3. 指数退避重试 — 商户暂时不可用也不怕,持续重试 24 小时
  4. 熔断器 — 商户宕机不会拖垮整个系统
  5. 死信 + 告警 — 最终兜底,人工介入
  6. 商户查询接口 — 不依赖推送,商户随时可以主动拉
  7. T+1 对账 — 最终一致性的终极保障
评论区
暂无评论
avatar