一、核心挑战与设计目标
支付结果通知是连接支付平台与商户的"最后一公里"。通知丢失 = 商户不知道钱到没到 = 用户投诉 + 商户流失。系统设计必须满足:
| 指标 | 目标 | 说明 |
|---|---|---|
| 时效性 | p99 < 3s | 支付完成后 3 秒内商户收到通知 |
| 可靠性 | 99.99% | 消息不丢失,至少投递一次 |
| 可用性 | 99.99% | 通知系统挂了不影响支付主链路 |
| 幂等性 | 100% | 重复通知不会导致商户重复处理 |
二、整体架构
┌──────────────┐
│ 支付核心系统 │
└──────┬───────┘
│ ① 支付完成,写DB + 发MQ(事务消息)
▼
┌──────────────┐ ┌─────────────────┐
│ 消息队列 │────▶│ 通知调度服务 │
│ (Kafka/ │ │ (Dispatcher) │
│ RocketMQ) │ └────────┬────────┘
└──────────────┘ │
┌──────────┼──────────┐
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│Worker 1│ │Worker 2│ │Worker N│
└───┬────┘ └───┬────┘ └───┬────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────┐
│ 商户回调接口 (Webhook) │
└──────────────────────────────┘
│
▼ ② 商户未应答?
┌──────────────────┐
│ 重试调度器 │
│ (指数退避重试) │
│ 1s→5s→30s→5m→1h │
└──────────────────┘
│
▼ ③ 全部重试失败?
┌──────────────────┐
│ 死信队列 + 告警 │
│ 人工介入/商户查询 │
└──────────────────┘三、关键设计决策
3.1 消息不丢:事务消息 + 本地消息表
支付系统最怕的就是"钱扣了但通知没发出去"。用本地消息表兜底:
支付事务 {
1. 更新支付订单状态 = SUCCESS
2. 插入 notify_message 表(同一个DB事务)
3. 提交事务
}
异步扫描器 {
扫描 notify_message 中 status=PENDING 的记录
发送到 MQ
更新 status=SENT
}这样即使 MQ 挂了,本地消息表里的数据不会丢,扫描器会持续重试。
3.2 通知不重:幂等设计
商户侧必须根据 notify_id 做幂等处理,我们在通知协议中明确:
- 每条通知携带全局唯一的
notify_id - 商户收到后返回
SUCCESS表示已处理 - 我们可能会重复发送,商户必须能处理重复
3.3 通知够快:分级投递
不是所有通知都需要同等对待,按金额和商户等级分优先级:
- P0:大额交易 / VIP商户 → 独立队列,独立 Worker 池,专线投递
- P1:普通交易 → 标准队列
- P2:小额交易 → 批量队列,可稍延迟
3.4 系统不挂:多活 + 降级
- 通知服务多机房部署,任一机房宕机自动切换
- MQ 不可用时降级为本地消息表直接投递
- 商户接口超时时快速失败进入重试队列,不阻塞主流程
四、数据模型
CREATE TABLE notify_message (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
notify_id VARCHAR(64) NOT NULL UNIQUE, -- 全局唯一通知ID
order_id VARCHAR(64) NOT NULL, -- 支付订单号
merchant_id VARCHAR(32) NOT NULL, -- 商户ID
notify_url VARCHAR(512) NOT NULL, -- 商户回调地址
notify_body TEXT NOT NULL, -- 通知内容(JSON)
priority TINYINT DEFAULT 1, -- 0=P0, 1=P1, 2=P2
status TINYINT DEFAULT 0, -- 0=PENDING, 1=SENDING, 2=SUCCESS, 3=FAILED
retry_count INT DEFAULT 0, -- 已重试次数
max_retry INT DEFAULT 8, -- 最大重试次数
next_retry_at DATETIME, -- 下次重试时间
last_response VARCHAR(512), -- 商户最后响应
last_http_code INT, -- 最后HTTP状态码
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_status_retry (status, next_retry_at),
INDEX idx_merchant_order (merchant_id, order_id)
);五、Go 实现
5.1 项目结构
payment-notify/
├── main.go
├── config/
│ └── config.go
├── model/
│ └── notify.go // 数据模型
├── producer/
│ └── producer.go // 消息生产者(本地消息表写入)
├── dispatcher/
│ └── dispatcher.go // 调度器(消费MQ,分发给Worker)
├── worker/
│ └── worker.go // 通知投递Worker
├── retry/
│ └── scheduler.go // 重试调度器
├── store/
│ └── mysql.go // 持久化层
└── scanner/
└── scanner.go // 本地消息表扫描器5.2 核心数据模型
// model/notify.go
package model
import "time"
type NotifyStatus int
const (
StatusPending NotifyStatus = 0
StatusSending NotifyStatus = 1
StatusSuccess NotifyStatus = 2
StatusFailed NotifyStatus = 3
)
type Priority int
const (
PriorityP0 Priority = 0 // VIP/大额
PriorityP1 Priority = 1 // 普通
PriorityP2 Priority = 2 // 小额
)
// NotifyMessage 通知消息实体
type NotifyMessage struct {
ID int64 `json:"id" db:"id"`
NotifyID string `json:"notify_id" db:"notify_id"`
OrderID string `json:"order_id" db:"order_id"`
MerchantID string `json:"merchant_id" db:"merchant_id"`
NotifyURL string `json:"notify_url" db:"notify_url"`
NotifyBody string `json:"notify_body" db:"notify_body"`
Priority Priority `json:"priority" db:"priority"`
Status NotifyStatus `json:"status" db:"status"`
RetryCount int `json:"retry_count" db:"retry_count"`
MaxRetry int `json:"max_retry" db:"max_retry"`
NextRetryAt *time.Time `json:"next_retry_at" db:"next_retry_at"`
LastResponse string `json:"last_response" db:"last_response"`
LastHTTPCode int `json:"last_http_code" db:"last_http_code"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
}
// NotifyResult 商户回调结果
type NotifyResult struct {
NotifyID string
HTTPCode int
Response string
Success bool
Duration time.Duration
Error error
}
// RetryStrategy 重试策略:指数退避
// 第1次: 5s, 第2次: 30s, 第3次: 5min, 第4次: 30min,
// 第5次: 1h, 第6次: 2h, 第7次: 6h, 第8次: 24h
var RetryIntervals = []time.Duration{
5 * time.Second,
30 * time.Second,
5 * time.Minute,
30 * time.Minute,
1 * time.Hour,
2 * time.Hour,
6 * time.Hour,
24 * time.Hour,
}
func GetNextRetryTime(retryCount int) time.Time {
if retryCount >= len(RetryIntervals) {
return time.Now().Add(RetryIntervals[len(RetryIntervals)-1])
}
return time.Now().Add(RetryIntervals[retryCount])
}5.3 消息生产者(本地消息表 + MQ 双写)
// producer/producer.go
package producer
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log/slog"
"github.com/google/uuid"
"github.com/segmentio/kafka-go"
"payment-notify/model"
)
type Producer struct {
db *sql.DB
kafkaWriter *kafka.Writer
logger *slog.Logger
}
func NewProducer(db *sql.DB, kafkaWriter *kafka.Writer, logger *slog.Logger) *Producer {
return &Producer{db: db, kafkaWriter: kafkaWriter, logger: logger}
}
// PaymentResult 支付核心传过来的支付结果
type PaymentResult struct {
OrderID string `json:"order_id"`
MerchantID string `json:"merchant_id"`
Amount int64 `json:"amount"`
Currency string `json:"currency"`
Status string `json:"status"`
NotifyURL string `json:"notify_url"`
Priority model.Priority `json:"priority"`
Extra json.RawMessage `json:"extra,omitempty"`
}
// ProduceNotification 在支付事务中调用,保证消息不丢
// 核心思路:本地DB事务写入 + 尝试投递MQ(MQ失败不影响事务)
func (p *Producer) ProduceNotification(ctx context.Context, tx *sql.Tx, result *PaymentResult) error {
notifyID := uuid.New().String()
// 构建通知体(发给商户的内容)
body, _ := json.Marshal(map[string]interface{}{
"notify_id": notifyID,
"order_id": result.OrderID,
"amount": result.Amount,
"currency": result.Currency,
"status": result.Status,
"timestamp": "2026-03-02T12:00:00Z",
"extra": result.Extra,
})
// ① 在支付事务内写入本地消息表
_, err := tx.ExecContext(ctx, `
INSERT INTO notify_message
(notify_id, order_id, merchant_id, notify_url, notify_body, priority, status, max_retry)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
notifyID, result.OrderID, result.MerchantID,
result.NotifyURL, string(body), result.Priority,
model.StatusPending, 8,
)
if err != nil {
return fmt.Errorf("insert notify_message failed: %w", err)
}
// ② 尝试投递到 Kafka(失败不回滚事务,由 Scanner 兜底)
go func() {
kafkaMsg := kafka.Message{
Key: []byte(result.MerchantID),
Value: body,
Headers: []kafka.Header{
{Key: "notify_id", Value: []byte(notifyID)},
{Key: "priority", Value: []byte(fmt.Sprintf("%d", result.Priority))},
},
}
if err := p.kafkaWriter.WriteMessages(context.Background(), kafkaMsg); err != nil {
p.logger.Warn("kafka write failed, scanner will retry",
"notify_id", notifyID,
"error", err,
)
}
}()
return nil
}5.4 本地消息表扫描器(兜底保障)
// scanner/scanner.go
package scanner
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log/slog"
"time"
"github.com/segmentio/kafka-go"
"payment-notify/model"
)
// Scanner 定期扫描本地消息表,把漏掉的消息补发到 MQ
// 这是保证消息不丢的最后一道防线
type Scanner struct {
db *sql.DB
kafkaWriter *kafka.Writer
logger *slog.Logger
interval time.Duration // 扫描间隔
batchSize int // 每次扫描条数
}
func NewScanner(db *sql.DB, writer *kafka.Writer, logger *slog.Logger) *Scanner {
return &Scanner{
db: db,
kafkaWriter: writer,
logger: logger,
interval: 3 * time.Second, // 每3秒扫描一次
batchSize: 100,
}
}
func (s *Scanner) Start(ctx context.Context) {
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
s.logger.Info("scanner started", "interval", s.interval)
for {
select {
case <-ctx.Done():
s.logger.Info("scanner stopped")
return
case <-ticker.C:
if err := s.scan(ctx); err != nil {
s.logger.Error("scan failed", "error", err)
}
}
}
}
func (s *Scanner) scan(ctx context.Context) error {
// 查找状态为 PENDING 且创建超过 5 秒的记录
// (给 Producer 的异步 Kafka 投递留一点缓冲时间)
rows, err := s.db.QueryContext(ctx, `
SELECT id, notify_id, merchant_id, notify_body, priority
FROM notify_message
WHERE status = ? AND created_at < DATE_SUB(NOW(), INTERVAL 5 SECOND)
ORDER BY priority ASC, created_at ASC
LIMIT ?`,
model.StatusPending, s.batchSize,
)
if err != nil {
return fmt.Errorf("query pending messages: %w", err)
}
defer rows.Close()
var messages []kafka.Message
var ids []int64
for rows.Next() {
var (
id int64
notifyID string
merchantID string
body string
priority int
)
if err := rows.Scan(&id, ¬ifyID, &merchantID, &body, &priority); err != nil {
continue
}
messages = append(messages, kafka.Message{
Key: []byte(merchantID),
Value: []byte(body),
Headers: []kafka.Header{
{Key: "notify_id", Value: []byte(notifyID)},
{Key: "priority", Value: []byte(fmt.Sprintf("%d", priority))},
},
})
ids = append(ids, id)
}
if len(messages) == 0 {
return nil
}
s.logger.Info("scanner found pending messages", "count", len(messages))
// 批量投递到 Kafka
if err := s.kafkaWriter.WriteMessages(ctx, messages...); err != nil {
return fmt.Errorf("kafka batch write: %w", err)
}
// 更新状态为 SENDING
query := fmt.Sprintf(
"UPDATE notify_message SET status = %d WHERE id IN (%s)",
model.StatusSending, intSliceToSQL(ids),
)
_, _ = s.db.ExecContext(ctx, query)
return nil
}
func intSliceToSQL(ids []int64) string {
result, _ := json.Marshal(ids)
// [1,2,3] -> 1,2,3
return string(result[1 : len(result)-1])
}5.5 通知投递 Worker(核心:快速 + 签名 + 熔断)
// worker/worker.go
package worker
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log/slog"
"net"
"net/http"
"sync"
"time"
"payment-notify/model"
)
// Worker 负责实际的 HTTP 回调投递
type Worker struct {
client *http.Client
store NotifyStore
logger *slog.Logger
signKey string
breakers map[string]*CircuitBreaker // 每个商户一个熔断器
breakerMu sync.RWMutex
metrics *Metrics
}
// NotifyStore 持久化接口
type NotifyStore interface {
UpdateStatus(ctx context.Context, notifyID string, status model.NotifyStatus,
httpCode int, response string) error
ScheduleRetry(ctx context.Context, notifyID string, retryCount int,
nextRetryAt time.Time) error
MarkDead(ctx context.Context, notifyID string) error
}
// Metrics 监控指标
type Metrics struct {
mu sync.Mutex
TotalSent int64
TotalSuccess int64
TotalFailed int64
TotalTimeout int64
AvgLatencyMs float64
}
func NewWorker(store NotifyStore, signKey string, logger *slog.Logger) *Worker {
// 定制 HTTP Client:连接池 + 超时控制
transport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: 3 * time.Second, // 连接超时
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 200,
MaxIdleConnsPerHost: 20,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 3 * time.Second,
ResponseHeaderTimeout: 5 * time.Second, // 读响应头超时
}
return &Worker{
client: &http.Client{
Transport: transport,
Timeout: 10 * time.Second, // 整体超时
},
store: store,
logger: logger,
signKey: signKey,
breakers: make(map[string]*CircuitBreaker),
metrics: &Metrics{},
}
}
// Deliver 投递单条通知
func (w *Worker) Deliver(ctx context.Context, msg *model.NotifyMessage) *model.NotifyResult {
result := &model.NotifyResult{NotifyID: msg.NotifyID}
start := time.Now()
// ① 检查熔断器:如果该商户连续失败太多次,暂时跳过
breaker := w.getBreaker(msg.MerchantID)
if !breaker.Allow() {
w.logger.Warn("circuit breaker open, skip delivery",
"merchant_id", msg.MerchantID,
"notify_id", msg.NotifyID,
)
result.Error = fmt.Errorf("circuit breaker open for merchant %s", msg.MerchantID)
w.handleFailure(ctx, msg, result)
return result
}
// ② 构建签名请求
req, err := w.buildRequest(ctx, msg)
if err != nil {
result.Error = err
w.handleFailure(ctx, msg, result)
return result
}
// ③ 发送 HTTP 请求
resp, err := w.client.Do(req)
result.Duration = time.Since(start)
if err != nil {
result.Error = err
breaker.RecordFailure()
w.handleFailure(ctx, msg, result)
return result
}
defer resp.Body.Close()
// ④ 读取响应
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024)) // 限制读取大小
result.HTTPCode = resp.StatusCode
result.Response = string(body)
// ⑤ 判断是否成功(商户返回 200 且 body 包含 SUCCESS)
if resp.StatusCode == http.StatusOK && isSuccessResponse(body) {
result.Success = true
breaker.RecordSuccess()
w.handleSuccess(ctx, msg, result)
} else {
breaker.RecordFailure()
w.handleFailure(ctx, msg, result)
}
// ⑥ 更新监控指标
w.updateMetrics(result)
return result
}
// buildRequest 构建带签名的 HTTP 请求
func (w *Worker) buildRequest(ctx context.Context, msg *model.NotifyMessage) (*http.Request, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, msg.NotifyURL,
bytes.NewBufferString(msg.NotifyBody))
if err != nil {
return nil, fmt.Errorf("build request: %w", err)
}
timestamp := fmt.Sprintf("%d", time.Now().Unix())
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Notify-ID", msg.NotifyID)
req.Header.Set("X-Timestamp", timestamp)
req.Header.Set("X-Signature", w.sign(msg.NotifyBody, timestamp))
req.Header.Set("User-Agent", "PaymentNotify/1.0")
return req, nil
}
// sign HMAC-SHA256 签名,防止通知被伪造
func (w *Worker) sign(body, timestamp string) string {
mac := hmac.New(sha256.New, []byte(w.signKey))
mac.Write([]byte(body + timestamp))
return hex.EncodeToString(mac.Sum(nil))
}
func (w *Worker) handleSuccess(ctx context.Context, msg *model.NotifyMessage, result *model.NotifyResult) {
w.logger.Info("notify delivered",
"notify_id", msg.NotifyID,
"merchant_id", msg.MerchantID,
"duration_ms", result.Duration.Milliseconds(),
)
_ = w.store.UpdateStatus(ctx, msg.NotifyID, model.StatusSuccess,
result.HTTPCode, result.Response)
}
func (w *Worker) handleFailure(ctx context.Context, msg *model.NotifyMessage, result *model.NotifyResult) {
w.logger.Warn("notify delivery failed",
"notify_id", msg.NotifyID,
"merchant_id", msg.MerchantID,
"retry_count", msg.RetryCount,
"http_code", result.HTTPCode,
"error", result.Error,
)
// 还有重试机会 → 计算下次重试时间
if msg.RetryCount < msg.MaxRetry {
nextRetry := model.GetNextRetryTime(msg.RetryCount)
_ = w.store.ScheduleRetry(ctx, msg.NotifyID, msg.RetryCount+1, nextRetry)
} else {
// 重试耗尽 → 进入死信,触发告警
w.logger.Error("notify max retry exhausted, entering dead letter",
"notify_id", msg.NotifyID,
"merchant_id", msg.MerchantID,
)
_ = w.store.MarkDead(ctx, msg.NotifyID)
// TODO: 发送告警(钉钉/Slack/PagerDuty)
}
}
func (w *Worker) getBreaker(merchantID string) *CircuitBreaker {
w.breakerMu.RLock()
b, ok := w.breakers[merchantID]
w.breakerMu.RUnlock()
if ok {
return b
}
w.breakerMu.Lock()
defer w.breakerMu.Unlock()
b = NewCircuitBreaker(5, 30*time.Second) // 连续5次失败 → 熔断30秒
w.breakers[merchantID] = b
return b
}
func (w *Worker) updateMetrics(result *model.NotifyResult) {
w.metrics.mu.Lock()
defer w.metrics.mu.Unlock()
w.metrics.TotalSent++
if result.Success {
w.metrics.TotalSuccess++
} else {
w.metrics.TotalFailed++
}
}
func isSuccessResponse(body []byte) bool {
var resp map[string]interface{}
if err := json.Unmarshal(body, &resp); err != nil {
// 兼容纯文本 SUCCESS
return string(body) == "SUCCESS"
}
code, _ := resp["code"].(string)
return code == "SUCCESS" || code == "OK"
}5.6 熔断器(保护商户接口不可用时不疯狂重试)
// worker/circuit_breaker.go
package worker
import (
"sync"
"time"
)
type BreakerState int
const (
StateClosed BreakerState = iota // 正常
StateOpen // 熔断
StateHalfOpen // 半开(试探)
)
// CircuitBreaker 简易熔断器
// 当某个商户的回调接口连续失败 N 次后,暂停投递一段时间
// 避免对已经宕机的商户接口做无意义的重试
type CircuitBreaker struct {
mu sync.Mutex
state BreakerState
failureCount int
failureThresh int // 连续失败多少次触发熔断
resetTimeout time.Duration // 熔断持续时间
lastFailureAt time.Time
}
func NewCircuitBreaker(threshold int, timeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
state: StateClosed,
failureThresh: threshold,
resetTimeout: timeout,
}
}
func (cb *CircuitBreaker) Allow() bool {
cb.mu.Lock()
defer cb.mu.Unlock()
switch cb.state {
case StateClosed:
return true
case StateOpen:
// 超过冷却期 → 转半开,允许一次试探
if time.Since(cb.lastFailureAt) > cb.resetTimeout {
cb.state = StateHalfOpen
return true
}
return false
case StateHalfOpen:
return true // 半开状态允许一次请求
}
return false
}
func (cb *CircuitBreaker) RecordSuccess() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.failureCount = 0
cb.state = StateClosed
}
func (cb *CircuitBreaker) RecordFailure() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.failureCount++
cb.lastFailureAt = time.Now()
if cb.failureCount >= cb.failureThresh {
cb.state = StateOpen
}
}5.7 调度器(消费 MQ + 分优先级派发)
// dispatcher/dispatcher.go
package dispatcher
import (
"context"
"encoding/json"
"log/slog"
"strconv"
"sync"
"github.com/segmentio/kafka-go"
"payment-notify/model"
"payment-notify/worker"
)
// Dispatcher 从 Kafka 消费消息,按优先级分发给不同的 Worker 池
type Dispatcher struct {
readers map[model.Priority]*kafka.Reader
pools map[model.Priority]*WorkerPool
logger *slog.Logger
}
type WorkerPool struct {
workers []*worker.Worker
ch chan *model.NotifyMessage
workerSize int
wg sync.WaitGroup
}
func NewDispatcher(brokers []string, w *worker.Worker, logger *slog.Logger) *Dispatcher {
d := &Dispatcher{
readers: make(map[model.Priority]*kafka.Reader),
pools: make(map[model.Priority]*WorkerPool),
logger: logger,
}
// 每个优先级一个独立的 Kafka Topic + Worker 池
configs := map[model.Priority]struct {
topic string
poolSize int
bufferSize int
}{
model.PriorityP0: {"notify-p0", 20, 1000}, // VIP:20个 Worker
model.PriorityP1: {"notify-p1", 10, 5000}, // 普通:10个 Worker
model.PriorityP2: {"notify-p2", 5, 10000}, // 小额:5个 Worker
}
for priority, cfg := range configs {
d.readers[priority] = kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: cfg.topic,
GroupID: "notify-dispatcher",
MinBytes: 1e3,
MaxBytes: 10e6,
})
d.pools[priority] = &WorkerPool{
ch: make(chan *model.NotifyMessage, cfg.bufferSize),
workerSize: cfg.poolSize,
}
}
return d
}
func (d *Dispatcher) Start(ctx context.Context) {
// 启动所有 Worker 池
for priority, pool := range d.pools {
for i := 0; i < pool.workerSize; i++ {
pool.wg.Add(1)
go func(p model.Priority, id int) {
defer pool.wg.Done()
d.logger.Info("worker started", "priority", p, "worker_id", id)
for msg := range pool.ch {
// 这里复用同一个 worker 实例(线程安全)
// 实际项目中每个 goroutine 可以持有独立的 worker
_ = d.pools[p] // 占位,实际调用 worker.Deliver
_ = msg
}
}(priority, i)
}
}
// 启动 Kafka 消费者
for priority, reader := range d.readers {
go d.consume(ctx, priority, reader)
}
}
func (d *Dispatcher) consume(ctx context.Context, priority model.Priority, reader *kafka.Reader) {
for {
select {
case <-ctx.Done():
return
default:
}
kafkaMsg, err := reader.ReadMessage(ctx)
if err != nil {
d.logger.Error("kafka read error", "priority", priority, "error", err)
continue
}
var msg model.NotifyMessage
if err := json.Unmarshal(kafkaMsg.Value, &msg); err != nil {
d.logger.Error("unmarshal notify message", "error", err)
continue
}
// 从 Header 中提取优先级
for _, h := range kafkaMsg.Headers {
if h.Key == "priority" {
p, _ := strconv.Atoi(string(h.Value))
msg.Priority = model.Priority(p)
}
}
// 投递到对应优先级的 Worker 池
pool, ok := d.pools[msg.Priority]
if !ok {
pool = d.pools[model.PriorityP1] // 降级到普通队列
}
select {
case pool.ch <- &msg:
default:
// 队列满了 → 降级处理
d.logger.Warn("worker pool full, message delayed",
"priority", msg.Priority,
"notify_id", msg.NotifyID,
)
}
}
}
func (d *Dispatcher) Stop() {
for _, pool := range d.pools {
close(pool.ch)
pool.wg.Wait()
}
for _, reader := range d.readers {
_ = reader.Close()
}
}5.8 重试调度器(指数退避 + 精准调度)
// retry/scheduler.go
package retry
import (
"context"
"database/sql"
"log/slog"
"time"
"payment-notify/model"
"payment-notify/worker"
)
// RetryScheduler 定期扫描需要重试的消息并重新投递
type RetryScheduler struct {
db *sql.DB
worker *worker.Worker
logger *slog.Logger
interval time.Duration
batchSize int
}
func NewRetryScheduler(db *sql.DB, w *worker.Worker, logger *slog.Logger) *RetryScheduler {
return &RetryScheduler{
db: db,
worker: w,
logger: logger,
interval: 5 * time.Second,
batchSize: 50,
}
}
func (r *RetryScheduler) Start(ctx context.Context) {
ticker := time.NewTicker(r.interval)
defer ticker.Stop()
r.logger.Info("retry scheduler started")
for {
select {
case <-ctx.Done():
r.logger.Info("retry scheduler stopped")
return
case <-ticker.C:
r.processRetries(ctx)
}
}
}
func (r *RetryScheduler) processRetries(ctx context.Context) {
rows, err := r.db.QueryContext(ctx, `
SELECT id, notify_id, order_id, merchant_id, notify_url,
notify_body, priority, status, retry_count, max_retry
FROM notify_message
WHERE status = ? AND next_retry_at <= NOW()
ORDER BY priority ASC, next_retry_at ASC
LIMIT ?`,
model.StatusFailed, r.batchSize,
)
if err != nil {
r.logger.Error("query retry messages", "error", err)
return
}
defer rows.Close()
for rows.Next() {
var msg model.NotifyMessage
if err := rows.Scan(
&msg.ID, &msg.NotifyID, &msg.OrderID, &msg.MerchantID,
&msg.NotifyURL, &msg.NotifyBody, &msg.Priority, &msg.Status,
&msg.RetryCount, &msg.MaxRetry,
); err != nil {
r.logger.Error("scan retry message", "error", err)
continue
}
r.logger.Info("retrying notification",
"notify_id", msg.NotifyID,
"retry_count", msg.RetryCount,
"merchant_id", msg.MerchantID,
)
// 直接调用 Worker 投递(不再过 MQ,减少延迟)
result := r.worker.Deliver(ctx, &msg)
if result.Success {
r.logger.Info("retry succeeded",
"notify_id", msg.NotifyID,
"total_retries", msg.RetryCount,
)
}
}
}5.9 主程序入口
// main.go
package main
import (
"context"
"database/sql"
"log/slog"
"os"
"os/signal"
"syscall"
_ "github.com/go-sql-driver/mysql"
"github.com/segmentio/kafka-go"
"payment-notify/dispatcher"
"payment-notify/retry"
"payment-notify/scanner"
"payment-notify/worker"
)
func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
// 数据库连接
db, err := sql.Open("mysql", "root:password@tcp(localhost:3306)/payment?parseTime=true")
if err != nil {
logger.Error("database connection failed", "error", err)
os.Exit(1)
}
db.SetMaxOpenConns(50)
db.SetMaxIdleConns(25)
// Kafka Writer
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Balancer: &kafka.LeastBytes{},
BatchTimeout: 10 * time.Millisecond, // 低延迟
RequiredAcks: kafka.RequireAll, // 所有副本确认
}
// 初始化各组件
store := NewMySQLStore(db) // 实现 NotifyStore 接口
w := worker.NewWorker(store, "your-hmac-secret-key", logger)
scan := scanner.NewScanner(db, kafkaWriter, logger)
disp := dispatcher.NewDispatcher([]string{"localhost:9092"}, w, logger)
retrySched := retry.NewRetryScheduler(db, w, logger)
ctx, cancel := context.WithCancel(context.Background())
// 启动所有组件
go scan.Start(ctx)
go disp.Start(ctx)
go retrySched.Start(ctx)
logger.Info("payment notification system started")
// 优雅关闭
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
logger.Info("shutting down...")
cancel()
disp.Stop()
_ = kafkaWriter.Close()
_ = db.Close()
logger.Info("shutdown complete")
}六、可靠性保障全景
时间线 ────────────────────────────────────────────────▶
支付完成
│
├─ ① DB事务写入 notify_message ──────── 保证消息不丢
│
├─ ② 异步投 Kafka ─── 成功 ──▶ Dispatcher ──▶ Worker ──▶ 商户
│ │ │
│ │ 返回 SUCCESS ──▶ 完成 ✓
│ │ │
│ │ 返回失败/超时
│ │ │
│ │ ③ 重试调度器(指数退避)
│ │ 5s → 30s → 5m → 30m → 1h → 2h → 6h → 24h
│ │ │
│ │ 全部重试失败
│ │ │
│ │ ④ 死信队列 + 告警 + 人工介入
│ │
│ 投递失败
│ │
│ ⑤ Scanner 兜底扫描 ──▶ 重新投 Kafka ──▶ 重走 ②
│
├─ ⑥ 商户主动查询接口 ──── 随时可查,不依赖通知
│
└─ ⑦ 对账系统 T+1 ──── 最终一致性兜底七、监控告警体系
| 告警项 | 阈值 | 告警渠道 |
|---|---|---|
| 通知延迟 p99 > 5s | 持续 1 分钟 | Slack + PagerDuty |
| 通知成功率 < 99% | 持续 5 分钟 | Slack |
| 单商户连续失败 > 10 次 | 立即 | 邮件通知商户 |
| 死信队列积压 > 100 | 立即 | PagerDuty |
| 本地消息表 PENDING > 1000 | 持续 1 分钟 | Slack |
| Kafka 消费延迟 > 10000 | 持续 2 分钟 | PagerDuty |
八、性能优化要点
连接池复用:HTTP Client 配置了连接池(MaxIdleConnsPerHost=20),避免每次通知都建立新连接。对于高频商户,TCP 连接常驻,省掉了 TLS 握手开销。
批量提交:Scanner 批量扫描 + 批量投递 Kafka,减少 DB 和 MQ 的 round trip。
分级队列:P0/P1/P2 分离,VIP 商户的大额通知不会被小额通知堵住。
熔断保护:当某商户接口宕机时,熔断器会暂停对该商户的投递,释放 Worker 给其他商户用。
异步签名:签名计算放在 Worker 而不是 Dispatcher,利用 Worker 池的并发能力。
九、方案总结
这套系统的核心思路就是多层防线、层层兜底:
- 本地消息表 — 即使 MQ 挂了,消息也不丢
- Scanner 扫描器 — 漏掉的消息会被捞起来重发
- 指数退避重试 — 商户暂时不可用也不怕,持续重试 24 小时
- 熔断器 — 商户宕机不会拖垮整个系统
- 死信 + 告警 — 最终兜底,人工介入
- 商户查询接口 — 不依赖推送,商户随时可以主动拉
- T+1 对账 — 最终一致性的终极保障