搜 索

千级TPS支付系统架构:Java 21虚拟线程の终极形态

  • 8阅读
  • 2025年05月31日
  • 0评论
首页 / 编程 / 正文

一、传统支付系统架构:三座大山

在动手之前,先来看看传统支付系统为什么到了500 TPS就开始"喘气"。

1.1 传统架构长什么样

graph TB subgraph 传统支付架构 Client[客户端] --> Gateway[API Gateway] Gateway --> PayService[支付服务
Platform Thread Pool
200 threads] PayService --> AccountDB[(账户数据库
单点主库)] PayService --> ChannelService[渠道服务
同步HTTP调用] ChannelService --> BankA[银行A] ChannelService --> BankB[银行B] ChannelService --> BankC[银行C] PayService --> SettleService[清结算服务] SettleService --> AccountDB end style AccountDB fill:#ff6b6b,stroke:#c92a2a,color:#fff style ChannelService fill:#ff922b,stroke:#e8590c,color:#fff style PayService fill:#ffd43b,stroke:#f08c00,color:#333

1.2 三大瓶颈,一个都跑不掉

graph LR subgraph 瓶颈1 - 热点账户 A1[交易A] --> Lock[行锁竞争
同一余额字段] A2[交易B] --> Lock A3[交易C] --> Lock Lock --> DB1[(单行写入
串行化)] end subgraph 瓶颈2 - 渠道响应 B1[请求1] --> CH[渠道调用
同步阻塞] CH --> Wait[等待3-10秒
线程被占用] Wait --> Pool[线程池耗尽
新请求排队] end subgraph 瓶颈3 - 雪崩效应 C1[慢SQL] --> Conn[连接池占满] Conn --> Cascade[级联阻塞
所有服务受影响] Cascade --> Down[系统雪崩💥] end style Lock fill:#ff6b6b,stroke:#c92a2a,color:#fff style Wait fill:#ff922b,stroke:#e8590c,color:#fff style Down fill:#ff0000,stroke:#c92a2a,color:#fff

传统架构的核心矛盾用一句话总结:

200个Platform Thread,每个线程等渠道3秒,你的系统并发上限就是 200/3 ≈ 66 TPS。加线程?1000个线程吃掉2GB内存,上下文切换还把CPU打满。

这就是为什么传统架构在500 TPS面前就开始发抖——不是CPU不够快,是线程模型本身就是瓶颈。


二、新架构:Virtual Threads + 三重破壁

2.1 整体架构全景

graph TB subgraph 接入层 Client[客户端] --> GW[API Gateway
限流 / 鉴权 / 路由
Spring Cloud Gateway] end subgraph 交易编排层 GW --> Orchestrator[交易编排服务
Saga状态机驱动
☕ Virtual Threads] end subgraph 核心域服务 Orchestrator --> AccountDomain[账户域服务
☕ Virtual Threads] Orchestrator --> ChannelDomain[渠道域服务
☕ Virtual Threads] Orchestrator --> SettleDomain[清结算域服务
☕ Virtual Threads] end subgraph 账户域内部 AccountDomain --> QuotaPool[额度池
CAS无锁扣减] AccountDomain --> ShadowAcct[影子账户
Hash分散写入] AccountDomain --> BatchWriter[批量写入器
Disruptor攒批] QuotaPool --> Redis[(Redis
L2缓存)] ShadowAcct --> MasterDB[(主库
分散行锁)] BatchWriter --> MasterDB end subgraph 渠道域内部 ChannelDomain --> CBPool[渠道隔离池
独立Semaphore] CBPool --> CB[Resilience4j
熔断器] CB --> BankA[银行A] CB --> BankB[银行B] CB --> NPSS[NPSS/Aani] ChannelDomain --> Compensation[补偿查询
Implicit Reversal] end subgraph 可观测性平台 AllServices[所有服务] -.-> Trace[SkyWalking
全链路追踪] AllServices -.-> Metrics[Prometheus
实时指标] AllServices -.-> Logs[Loki/ELK
结构化日志] Trace --> Grafana[Grafana
统一看板 + 智能告警] Metrics --> Grafana Logs --> Grafana end subgraph 读服务 - 分离 GW --> QueryService[查询服务
☕ Virtual Threads] QueryService --> L1[L1 Caffeine
本地缓存] L1 --> Redis Redis --> SlaveDB[(从库)] end style QuotaPool fill:#51cf66,stroke:#2b8a3e,color:#fff style ShadowAcct fill:#51cf66,stroke:#2b8a3e,color:#fff style BatchWriter fill:#51cf66,stroke:#2b8a3e,color:#fff style CB fill:#339af0,stroke:#1864ab,color:#fff style Grafana fill:#845ef7,stroke:#5f3dc4,color:#fff style Orchestrator fill:#ffd43b,stroke:#f08c00,color:#333

2.2 请求处理全流程

sequenceDiagram participant C as 客户端 participant GW as API Gateway participant O as 交易编排
(Virtual Thread) participant A as 账户服务
(Virtual Thread) participant QP as 额度池
(CAS) participant CH as 渠道服务
(Virtual Thread) participant Bank as 银行/NPSS participant BW as 批量写入器 participant DB as 数据库 C->>GW: POST /payment GW->>GW: 限流检查 + 幂等校验(Redis SETNX) GW->>O: 转发请求 Note over O: 🔥 Virtual Thread创建
成本 ≈ 1KB (vs Platform Thread 1MB) O->>A: 1. 扣减余额 A->>QP: CAS无锁扣减本地额度 QP-->>A: 扣减成功 (微秒级) A->>BW: 异步投递记账流水 A-->>O: 余额扣减完成 O->>CH: 2. 调用渠道 CH->>CH: Semaphore限流 + 熔断检查 Note over CH,Bank: Virtual Thread在此park
让出carrier thread
不浪费任何OS资源 CH->>Bank: HTTP请求 Bank-->>CH: 响应(假设2秒) alt 成功 CH-->>O: 渠道成功 O->>A: 3. 确认记账 BW->>DB: 批量写入(256条/批) O-->>C: 支付成功 else 超时 CH-->>O: PENDING O->>O: 标记Saga状态=PENDING O-->>C: 受理成功,处理中 Note over CH: 启动补偿查询
连续3次失败→Implicit Reversal else 熔断 CH-->>O: 渠道熔断 O->>CH: 尝试备用渠道 end

三、创新点深度解析:为什么比传统架构快10倍

3.1 创新点一:Virtual Threads彻底解放并发模型

这是整个架构最核心的升级。先看对比:

graph LR subgraph 传统 Platform Thread 模型 PT1[Thread-1
1MB栈] --> |等待渠道3s| Block1[阻塞
OS线程空转] PT2[Thread-2
1MB栈] --> |等待渠道3s| Block2[阻塞
OS线程空转] PT3[Thread-3
1MB栈] --> |等待渠道3s| Block3[阻塞
OS线程空转] PTN[... x 200] --> |线程池满| Reject[新请求被拒❌] end subgraph Virtual Thread 模型 VT1[VThread-1
~1KB] --> |等待渠道| Park1[park
让出carrier] VT2[VThread-2
~1KB] --> |等待渠道| Park2[park
让出carrier] VT3[VThread-3
~1KB] --> |等待渠道| Park3[park
让出carrier] VTN[... x 100,000+] --> |无上限| Accept[全部受理✅] Park1 --> Carrier[少量Carrier Threads
= CPU核心数] Park2 --> Carrier Park3 --> Carrier end style Reject fill:#ff6b6b,stroke:#c92a2a,color:#fff style Accept fill:#51cf66,stroke:#2b8a3e,color:#fff style Carrier fill:#339af0,stroke:#1864ab,color:#fff

关键代码:Spring Boot 3.2+ Virtual Threads 集成

/**
 * 启用Virtual Threads — 就这么简单
 * Spring Boot 3.2+ 原生支持
 */
@SpringBootApplication
public class PaymentApplication {
    public static void main(String[] args) {
        SpringApplication.run(PaymentApplication.class, args);
    }
}
# application.yml — 一行配置开启Virtual Threads
spring:
  threads:
    virtual:
      enabled: true  # Tomcat/Undertow自动切换到VT模式
/**
 * 但在支付系统里,光开启VT不够
 * 需要针对支付场景做深度适配
 */
@Configuration
public class VirtualThreadConfig {

    /**
     * 交易编排专用Executor
     * 每个请求一个Virtual Thread,无上限
     */
    @Bean("txnExecutor")
    public ExecutorService transactionExecutor() {
        return Executors.newVirtualThreadPerTaskExecutor();
    }

    /**
     * 渠道调用Executor — 带Semaphore限流
     * Virtual Thread虽然"便宜",但下游渠道有连接数限制
     * 这是VT场景下容易踩的坑!
     */
    @Bean("channelExecutor")
    public ExecutorService channelExecutor() {
        return Executors.newVirtualThreadPerTaskExecutor();
    }

    /**
     * ⚠️ 关键:Structured Concurrency
     * Java 21 Preview特性,支付系统天然适合
     * 一笔交易 = 一个scope,子任务自动管理生命周期
     */
    public PaymentResult executePayment(PaymentRequest req) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            // 并行执行:风控检查 + 余额预校验 + 渠道路由
            Subtask<RiskResult> riskCheck = scope.fork(() -> 
                riskService.check(req));
            Subtask<Boolean> balanceCheck = scope.fork(() -> 
                accountService.preCheck(req.getAccountId(), req.getAmount()));
            Subtask<ChannelRoute> routing = scope.fork(() -> 
                routingService.selectChannel(req));

            scope.join();           // 等待所有子任务
            scope.throwIfFailed();  // 任一失败则全部取消

            // 所有前置检查通过,执行核心交易
            return processCore(req, riskCheck.get(), routing.get());
        }
    }
}

Virtual Threads vs Platform Threads 量化对比

xychart-beta title "并发处理能力对比(渠道平均响应2s)" x-axis ["200 Threads", "500 Threads", "1000 Threads", "5000 VThreads", "50000 VThreads"] y-axis "TPS" 0 --> 3000 bar [100, 250, 450, 2500, 2800]
指标Platform Threads (200)Platform Threads (1000)Virtual Threads
内存占用200MB (1MB/thread)1GB~50MB (全部VT)
最大并发连接2001000100,000+
渠道等待时线程状态BLOCKED (浪费OS资源)BLOCKEDPARKED (零开销)
上下文切换开销高 (内核态切换)极高 (CPU打满)极低 (用户态调度)
理论TPS(渠道2s响应)~100~4502500+
纵向扩展能力加线程=加内存物理极限~2000几乎无上限
划重点:Virtual Threads不是让单个请求变快,而是让系统能同时处理的请求数暴增。在IO密集的支付场景,这就是降维打击。

3.2 创新点二:影子账户 + 额度池 — 破解热点账户

graph TB subgraph 传统方案 - 串行等锁 T1[交易1] --> Lock[争抢行锁
UPDATE account
SET balance = balance - X
WHERE id = 'hot_account'] T2[交易2] --> Lock T3[交易3] --> Lock Lock --> Serial[串行执行
~50 TPS/账户] end subgraph 新方案 - 三级加速 direction TB N1[交易1] --> QP[L1: 额度池
CAS无锁
微秒级] N2[交易2] --> QP N3[交易3] --> QP QP --> Shadow[L2: 影子账户
Hash分散
16路并行写] Shadow --> Batch[L3: 批量合并
Disruptor攒批
256条/次写DB] Batch --> DB[(数据库
压力降至1/256)] end style Lock fill:#ff6b6b,stroke:#c92a2a,color:#fff style Serial fill:#ff6b6b,stroke:#c92a2a,color:#fff style QP fill:#51cf66,stroke:#2b8a3e,color:#fff style Shadow fill:#51cf66,stroke:#2b8a3e,color:#fff style Batch fill:#51cf66,stroke:#2b8a3e,color:#fff

影子账户核心实现

/**
 * 影子账户服务 — 把1个热点账户拆成N个子账户
 * 
 * 核心思想:空间换时间,用多行替代单行,打散锁竞争
 */
@Service
@Slf4j
public class ShadowAccountService {

    private static final int SHADOW_COUNT = 16;

    private final ShadowAccountMapper shadowMapper;
    private final AsyncLedgerService asyncLedger;
    private final QuotaPoolService quotaPool;

    /**
     * 入账(收款):Hash分散写入子账户
     * 16个子账户 = 锁竞争降为1/16
     */
    public void credit(String mainAccountId, BigDecimal amount, String txnId) {
        int slot = Math.abs(txnId.hashCode() % SHADOW_COUNT);
        String shadowId = mainAccountId + ":shadow:" + slot;

        // 只锁一个子账户行,其他15个子账户不受影响
        shadowMapper.credit(shadowId, amount);

        // 投递到异步汇总队列
        asyncLedger.enqueueConsolidation(mainAccountId, shadowId, amount);

        log.debug("Credit shadow account: {} slot: {} amount: {}", 
                  mainAccountId, slot, amount);
    }

    /**
     * 出账(付款):先走额度池快速扣减
     * 额度池空了才去DB补充
     */
    public DebitResult debit(String mainAccountId, BigDecimal amount) {
        // 第一道:额度池(CAS无锁,微秒级)
        if (quotaPool.tryDebit(mainAccountId, amount)) {
            return DebitResult.success();
        }

        // 第二道:尝试从DB补充额度
        if (quotaPool.tryRefillAndDebit(mainAccountId, amount)) {
            return DebitResult.success();
        }

        // 余额确实不足
        return DebitResult.insufficientBalance();
    }
}

额度池:CAS无锁 + Virtual Thread友好

/**
 * 额度池 — JVM本地内存级余额扣减
 * 
 * 为什么用CAS而不是synchronized?
 * 因为synchronized在Virtual Thread场景下会PIN住carrier thread!
 * 这是VT的已知限制,必须避免。
 * 
 * ⚠️ VT踩坑警告:
 * - synchronized → 会pin carrier thread → 不要用
 * - ReentrantLock → VT友好 → 可以用
 * - CAS (AtomicLong) → 最优 → 推荐用
 */
@Service
public class QuotaPoolService {

    // 本地额度池:每个账户一个AtomicLong(单位:分)
    private final ConcurrentHashMap<String, AtomicLong> localQuota 
        = new ConcurrentHashMap<>();
    
    private static final long REFILL_AMOUNT_CENTS = 10_000_00L; // 每次补充10000元

    /**
     * CAS无锁扣减 — 零竞争,零等待
     */
    public boolean tryDebit(String accountId, BigDecimal amount) {
        long amountCents = amount.multiply(BigDecimal.valueOf(100)).longValue();
        AtomicLong quota = localQuota.get(accountId);
        
        if (quota == null) return false;

        // CAS自旋扣减
        while (true) {
            long current = quota.get();
            if (current < amountCents) {
                return false; // 本地额度不足
            }
            if (quota.compareAndSet(current, current - amountCents)) {
                // 扣减成功,异步记录流水
                asyncJournal.record(accountId, amountCents);
                return true;
            }
            // CAS失败,重试(极少发生)
        }
    }

    /**
     * 从DB补充额度到本地
     * 使用ReentrantLock而非synchronized(VT兼容)
     */
    private final ConcurrentHashMap<String, ReentrantLock> refillLocks 
        = new ConcurrentHashMap<>();

    public boolean tryRefillAndDebit(String accountId, BigDecimal amount) {
        ReentrantLock lock = refillLocks.computeIfAbsent(accountId, 
            k -> new ReentrantLock());

        // tryLock避免排队,拿不到锁直接返回
        if (!lock.tryLock()) return false;
        
        try {
            // 双重检查:可能别的线程已经补充过了
            long amountCents = amount.multiply(BigDecimal.valueOf(100)).longValue();
            AtomicLong quota = localQuota.computeIfAbsent(accountId, 
                k -> new AtomicLong(0));
            
            if (quota.get() >= amountCents) {
                return tryDebit(accountId, amount);
            }

            // 从DB扣减一大块额度到本地
            boolean success = accountMapper.reserveQuota(accountId, REFILL_AMOUNT_CENTS);
            if (success) {
                quota.addAndGet(REFILL_AMOUNT_CENTS);
                return tryDebit(accountId, amount);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

    /**
     * 定时归还未使用额度(防止资金被"锁死"在本地)
     */
    @Scheduled(fixedRate = 5000)
    public void returnUnusedQuota() {
        localQuota.forEach((accountId, quota) -> {
            long remaining = quota.getAndSet(0);
            if (remaining > 0) {
                accountMapper.returnQuota(accountId, remaining);
                log.info("Returned unused quota: account={} amount={}cents", 
                         accountId, remaining);
            }
        });
    }
}

批量写入器:Disruptor攒批

/**
 * 批量写入器 — 把N次单条INSERT合并为1次批量INSERT
 * 
 * 256笔交易流水合并一次写入 → DB压力直降为1/256
 */
@Component
public class BatchLedgerWriter implements SmartLifecycle {

    private Disruptor<LedgerEvent> disruptor;
    private RingBuffer<LedgerEvent> ringBuffer;
    
    private static final int BUFFER_SIZE = 4096;
    private static final int BATCH_SIZE = 256;

    @Override
    public void start() {
        disruptor = new Disruptor<>(
            LedgerEvent::new,
            BUFFER_SIZE,
            Thread.ofVirtual().factory(),  // 用Virtual Thread作消费者
            ProducerType.MULTI,
            new BusySpinWaitStrategy()
        );
        
        disruptor.handleEventsWith(this::onEvent);
        ringBuffer = disruptor.start();
    }

    /**
     * 生产端:交易完成后投递流水
     */
    public void submit(LedgerEntry entry) {
        ringBuffer.publishEvent((event, seq) -> event.setEntry(entry));
    }

    /**
     * 消费端:攒够一批再写DB
     */
    private final List<LedgerEntry> buffer = new ArrayList<>(BATCH_SIZE);

    private void onEvent(LedgerEvent event, long sequence, boolean endOfBatch) {
        buffer.add(event.getEntry());
        
        if (buffer.size() >= BATCH_SIZE || endOfBatch) {
            flush();
        }
    }

    private void flush() {
        if (buffer.isEmpty()) return;
        
        List<LedgerEntry> batch = new ArrayList<>(buffer);
        buffer.clear();
        
        try {
            ledgerMapper.batchInsert(batch);
            metrics.counter("ledger.batch.success").increment();
            metrics.summary("ledger.batch.size").record(batch.size());
        } catch (Exception e) {
            log.error("Batch insert failed, falling back to single insert", e);
            // 降级:逐条重试
            for (LedgerEntry entry : batch) {
                try {
                    ledgerMapper.singleInsert(entry);
                } catch (Exception ex) {
                    // 写入WAL,等待人工恢复
                    walWriter.write(entry);
                    alertService.fire("LEDGER_WRITE_FAIL", entry.getTxnId());
                }
            }
        }
    }
}

3.3 创新点三:渠道隔离 + 熔断 — Virtual Thread版

传统架构用独立线程池隔离渠道,但线程池本身就是稀缺资源。Virtual Thread方案用Semaphore替代线程池做限流,效果更好、资源消耗几乎为零。

graph TB subgraph 传统方案 - 线程池隔离 R1[请求] --> PoolA[渠道A线程池
50 threads
50MB内存] R2[请求] --> PoolB[渠道B线程池
50 threads
50MB内存] R3[请求] --> PoolC[渠道C线程池
50 threads
50MB内存] end subgraph VT方案 - Semaphore隔离 V1[请求
VThread] --> SemA[Semaphore A
permits=50
~0 内存] V2[请求
VThread] --> SemB[Semaphore B
permits=50
~0 内存] V3[请求
VThread] --> SemC[Semaphore C
permits=50
~0 内存] end style PoolA fill:#ff922b,stroke:#e8590c,color:#fff style PoolB fill:#ff922b,stroke:#e8590c,color:#fff style PoolC fill:#ff922b,stroke:#e8590c,color:#fff style SemA fill:#51cf66,stroke:#2b8a3e,color:#fff style SemB fill:#51cf66,stroke:#2b8a3e,color:#fff style SemC fill:#51cf66,stroke:#2b8a3e,color:#fff
/**
 * 渠道网关 — Virtual Thread + Semaphore隔离 + 熔断
 */
@Service
@Slf4j
public class ChannelGateway {

    // 每个渠道一个Semaphore(替代线程池做限流)
    private final ConcurrentHashMap<String, Semaphore> channelSemaphores 
        = new ConcurrentHashMap<>();
    
    // 熔断器
    private final ConcurrentHashMap<String, CircuitBreaker> circuitBreakers 
        = new ConcurrentHashMap<>();

    // Virtual Thread Executor
    private final ExecutorService vtExecutor = Executors.newVirtualThreadPerTaskExecutor();

    /**
     * 发送渠道请求 — 非阻塞、隔离、熔断
     */
    public CompletableFuture<ChannelResponse> send(String channelId, PaymentRequest req) {
        return CompletableFuture.supplyAsync(() -> doSend(channelId, req), vtExecutor);
    }

    private ChannelResponse doSend(String channelId, PaymentRequest req) {
        Semaphore semaphore = channelSemaphores.computeIfAbsent(channelId,
            k -> new Semaphore(getChannelConfig(k).getMaxConcurrency()));
        
        CircuitBreaker cb = getOrCreateCircuitBreaker(channelId);

        // 1. Semaphore限流:保护下游渠道
        boolean acquired;
        try {
            acquired = semaphore.tryAcquire(
                getChannelConfig(channelId).getQueueTimeoutMs(), 
                TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return ChannelResponse.systemError("Interrupted");
        }

        if (!acquired) {
            metrics.counter("channel.semaphore.rejected", "channel", channelId).increment();
            return ChannelResponse.busy("Channel " + channelId + " is overloaded");
        }

        try {
            // 2. 熔断检查 + 实际调用
            return cb.executeSupplier(() -> {
                long start = System.nanoTime();
                try {
                    HttpResponse resp = httpClient.send(
                        buildRequest(channelId, req),
                        HttpResponse.BodyHandlers.ofString());

                    long durationMs = (System.nanoTime() - start) / 1_000_000;
                    metrics.recordChannelCall(channelId, "success", durationMs);
                    
                    return parseResponse(channelId, resp);
                } catch (HttpTimeoutException e) {
                    long durationMs = (System.nanoTime() - start) / 1_000_000;
                    metrics.recordChannelCall(channelId, "timeout", durationMs);
                    
                    // 超时→PENDING→异步补偿
                    compensationService.schedulePendingQuery(channelId, req.getTxnId());
                    return ChannelResponse.pending();
                }
            });
        } catch (CallNotPermittedException e) {
            // 3. 熔断开启 → 尝试备用渠道
            log.warn("Channel {} circuit breaker OPEN, trying fallback", channelId);
            metrics.counter("channel.circuit.open", "channel", channelId).increment();
            return tryFallbackChannel(channelId, req);
        } finally {
            semaphore.release();
        }
    }

    /**
     * Java 21 HttpClient — 天然支持Virtual Thread
     * 当VThread在等待HTTP响应时,会自动park,不占用carrier thread
     */
    private final HttpClient httpClient = HttpClient.newBuilder()
        .connectTimeout(Duration.ofSeconds(5))
        .executor(Executors.newVirtualThreadPerTaskExecutor())
        .build();
}

3.4 创新点四:全链路可观测 + 智能告警

graph TB subgraph 数据采集层 App[支付应用] --> |Micrometer| Prom[Prometheus
时序指标] App --> |SkyWalking Agent| SW[SkyWalking
分布式追踪] App --> |Logback + MDC| Loki[Loki
结构化日志] end subgraph 关联层 Prom --> Grafana[Grafana 统一看板] SW --> Grafana Loki --> Grafana end subgraph 告警层 Grafana --> TPS[TPS突降告警
rate下降>50%] Grafana --> SR[成功率告警
success rate<95%] Grafana --> Latency[延迟告警
P99>5s] Grafana --> DB[DB告警
连接池>85%] Grafana --> Hot[热点账户告警
单账户QPS>100] TPS --> AlertChannel[告警通道] SR --> AlertChannel Latency --> AlertChannel DB --> AlertChannel Hot --> AlertChannel AlertChannel --> Slack[Slack/Teams] AlertChannel --> PagerDuty[PagerDuty
电话告警] AlertChannel --> AutoHeal[自愈动作
自动扩容/熔断] end style Grafana fill:#845ef7,stroke:#5f3dc4,color:#fff style AlertChannel fill:#ff6b6b,stroke:#c92a2a,color:#fff style AutoHeal fill:#51cf66,stroke:#2b8a3e,color:#fff
/**
 * 统一埋点切面 — 自动采集每笔交易全维度指标
 * 
 * 关键设计:TraceId贯穿 Metrics + Trace + Log
 * 出了问题,一个TraceId就能把整条链路还原
 */
@Aspect
@Component
public class PaymentObservabilityAspect {

    private final MeterRegistry registry;

    @Around("@annotation(com.astratech.annotation.Observable)")
    public Object observe(ProceedingJoinPoint pjp) throws Throwable {
        String traceId = MDC.get("traceId");
        String operation = pjp.getSignature().toShortString();
        String[] tags = extractTags(pjp); // channel, txnType, accountId等

        Timer.Sample sample = Timer.start(registry);
        String result = "success";

        try {
            Object ret = pjp.proceed();
            return ret;
        } catch (Exception e) {
            result = classifyError(e); // timeout / circuit_open / db_error / biz_error
            throw e;
        } finally {
            sample.stop(Timer.builder("payment.operation.duration")
                .tag("op", operation)
                .tag("result", result)
                .tags(tags)
                .register(registry));

            // 结构化日志:一行日志包含所有关键信息
            log.info("PAYMENT_OP op={} result={} traceId={} duration={}ms {}",
                operation, result, traceId, sample.duration(), 
                formatTags(tags));
        }
    }
}

/**
 * Prometheus告警规则 — 五大核心告警
 */
// prometheus-rules.yml 内容见下方YAML
# prometheus-rules.yml
groups:
  - name: payment-critical
    rules:
      # 1. TPS突降 — 可能渠道全挂了
      - alert: PaymentTpsDropped
        expr: >
          rate(payment_operation_duration_seconds_count{op=~".*Payment.*"}[1m]) 
          < 0.5 * rate(payment_operation_duration_seconds_count{op=~".*Payment.*"}[1m] offset 5m)
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "🚨 交易TPS下降超过50%"
          runbook: "检查渠道状态 → 检查DB连接 → 检查网络"

      # 2. 成功率下降
      - alert: PaymentSuccessRateLow
        expr: >
          sum(rate(payment_operation_duration_seconds_count{result="success"}[5m]))
          / sum(rate(payment_operation_duration_seconds_count[5m])) < 0.95
        for: 3m
        labels:
          severity: critical

      # 3. 渠道P99延迟飙升
      - alert: ChannelLatencyP99High
        expr: >
          histogram_quantile(0.99, 
            rate(payment_operation_duration_seconds_bucket{op=~".*Channel.*"}[5m])
          ) > 5
        for: 2m
        labels:
          severity: warning

      # 4. DB连接池即将耗尽
      - alert: DatabasePoolNearExhaustion
        expr: hikaricp_connections_active / hikaricp_connections_max > 0.85
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "🔥 数据库连接池使用率超过85%"
          action: "检查慢SQL → 检查连接泄露 → 考虑扩容"

      # 5. Virtual Thread carrier线程被pin
      - alert: VirtualThreadPinning
        expr: >
          rate(jdk_virtual_thread_pinned_total[5m]) > 10
        for: 1m
        labels:
          severity: warning
        annotations:
          summary: "⚠️ Virtual Thread频繁被pin,检查synchronized使用"

3.5 创新点五:Structured Concurrency — 交易编排的终极形态

Java 21的Structured Concurrency(预览特性)和支付系统的Saga模式天生一对:

graph TB subgraph 传统Saga编排 S1[步骤1: 扣款] --> |成功| S2[步骤2: 调渠道] S2 --> |成功| S3[步骤3: 记账] S2 --> |失败| C1[补偿: 退款] S3 --> |失败| C2[补偿: 冲正+退款] Note1[问题:全串行
总耗时 = 各步骤之和] end subgraph Structured Concurrency编排 SC[StructuredTaskScope] --> |fork| P1[风控检查
VThread] SC --> |fork| P2[余额预检
VThread] SC --> |fork| P3[渠道路由
VThread] P1 --> |join| Gate{全部通过?} P2 --> |join| Gate P3 --> |join| Gate Gate --> |是| Core[核心交易
串行保序] Gate --> |任一失败| Cancel[自动取消
所有子任务] Note2[优势:前置检查并行
总耗时 = 最慢步骤耗时] end style Note1 fill:#ff6b6b,stroke:#c92a2a,color:#fff style Note2 fill:#51cf66,stroke:#2b8a3e,color:#fff style SC fill:#339af0,stroke:#1864ab,color:#fff
/**
 * 交易编排器 — Structured Concurrency版
 * 
 * 传统方式:风控3s + 余额检查1s + 路由1s = 串行5s
 * SC方式:  max(风控3s, 余额1s, 路由1s) = 并行3s
 * 提升:40%延迟优化
 */
@Service
public class PaymentOrchestrator {

    /**
     * 核心交易流程 — 使用Structured Concurrency
     */
    public PaymentResult process(PaymentRequest req) {
        String txnId = IdGenerator.nextTxnId();
        MDC.put("traceId", txnId);

        try {
            // Phase 1: 并行前置检查(Structured Concurrency)
            PreCheckResult preCheck = parallelPreCheck(req);

            // Phase 2: 串行核心交易(保证顺序)
            return executeCore(txnId, req, preCheck);

        } catch (PaymentException e) {
            // Saga补偿
            compensate(txnId, e.getFailedStep());
            return PaymentResult.failed(e.getCode(), e.getMessage());
        }
    }

    /**
     * 并行前置检查 — SC保证子任务生命周期
     */
    private PreCheckResult parallelPreCheck(PaymentRequest req) {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            
            // 三个检查并行执行,每个在自己的Virtual Thread里
            var riskFuture = scope.fork(() -> {
                log.info("Risk check started");
                return riskService.evaluate(req);
            });

            var balanceFuture = scope.fork(() -> {
                log.info("Balance pre-check started");
                return accountService.checkSufficientBalance(
                    req.getPayerAccountId(), req.getAmount());
            });

            var routeFuture = scope.fork(() -> {
                log.info("Channel routing started");
                return routingService.selectBestChannel(req);
            });

            // 等待所有完成,或任一失败
            scope.join();
            scope.throwIfFailed(e -> new PaymentException("PRE_CHECK_FAILED", e));

            return new PreCheckResult(
                riskFuture.get(),
                balanceFuture.get(),
                routeFuture.get()
            );
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PaymentException("INTERRUPTED", e);
        }
    }

    /**
     * 串行核心交易 — Saga状态机
     */
    private PaymentResult executeCore(String txnId, PaymentRequest req, 
                                       PreCheckResult preCheck) {
        SagaContext saga = SagaContext.create(txnId);

        try {
            // Step 1: 扣减余额
            saga.step("DEBIT", 
                () -> accountService.debit(req.getPayerAccountId(), req.getAmount()),
                () -> accountService.credit(req.getPayerAccountId(), req.getAmount()) // 补偿
            );

            // Step 2: 调用渠道
            saga.step("CHANNEL_CALL",
                () -> channelGateway.send(preCheck.getRoute().getChannelId(), req).join(),
                () -> channelGateway.reverse(preCheck.getRoute().getChannelId(), txnId) // implicit reversal
            );

            // Step 3: 入账(如果是转账)
            if (req.getType() == TxnType.TRANSFER) {
                saga.step("CREDIT",
                    () -> accountService.credit(req.getPayeeAccountId(), req.getAmount()),
                    () -> accountService.debit(req.getPayeeAccountId(), req.getAmount())
                );
            }

            saga.complete();
            return PaymentResult.success(txnId);

        } catch (Exception e) {
            saga.compensate(); // 自动反向执行已完成步骤的补偿动作
            throw e;
        }
    }
}

四、创新点对比总结

graph LR subgraph 传统架构 T1[Platform Thread
200并发上限] T2[单账户行锁
50 TPS/账户] T3[线程池隔离
内存大户] T4[串行Saga
延迟累加] T5[被动告警
出了事才知道] end subgraph 新架构 N1[Virtual Thread
10万+并发] N2[影子账户+额度池
2000+ TPS/账户] N3[Semaphore隔离
零内存开销] N4[Structured Concurrency
并行前置检查] N5[全链路观测
亚秒级发现] end T1 --> |10x+| N1 T2 --> |40x+| N2 T3 --> |1000x内存节省| N3 T4 --> |40%延迟下降| N4 T5 --> |从小时到分钟| N5 style T1 fill:#ff6b6b,stroke:#c92a2a,color:#fff style T2 fill:#ff6b6b,stroke:#c92a2a,color:#fff style T3 fill:#ff6b6b,stroke:#c92a2a,color:#fff style T4 fill:#ff6b6b,stroke:#c92a2a,color:#fff style T5 fill:#ff6b6b,stroke:#c92a2a,color:#fff style N1 fill:#51cf66,stroke:#2b8a3e,color:#fff style N2 fill:#51cf66,stroke:#2b8a3e,color:#fff style N3 fill:#51cf66,stroke:#2b8a3e,color:#fff style N4 fill:#51cf66,stroke:#2b8a3e,color:#fff style N5 fill:#51cf66,stroke:#2b8a3e,color:#fff
维度传统架构新架构(本文)提升倍数
并发模型Platform Thread, 200-1000个Virtual Thread, 10万+100x
单热点账户TPS~50 (行锁串行)2000+ (影子账户+额度池+攒批)40x
渠道隔离内存150MB (3个渠道×50线程×1MB)~0 (Semaphore)
前置检查延迟5s (串行)3s (SC并行)40%↓
系统总TPS300-5002000-30005-10x
故障定位时间小时级 (翻日志)分钟级 (TraceId+Grafana)60x
synchronized风险pin carrier thread⚠️ 需排查
最低JDK版本JDK 8+JDK 21+需升级

五、Virtual Threads踩坑指南

用VT不是免费午餐,这几个坑一定要知道:

5.1 synchronized会pin住carrier thread

// ❌ 错误:synchronized在VT里会pin carrier thread
public synchronized void updateBalance(String accountId, BigDecimal amount) {
    // 如果这里有IO操作(如DB调用),carrier thread被pin住
    // 其他VT无法使用这个carrier,性能断崖式下降
    accountMapper.update(accountId, amount);
}

// ✅ 正确:用ReentrantLock替代
private final ReentrantLock lock = new ReentrantLock();

public void updateBalance(String accountId, BigDecimal amount) {
    lock.lock();
    try {
        accountMapper.update(accountId, amount);
    } finally {
        lock.unlock();
    }
}

// ✅ 更好:CAS无锁
private final AtomicReference<BigDecimal> balance = new AtomicReference<>();

public void updateBalance(BigDecimal amount) {
    balance.updateAndGet(current -> current.subtract(amount));
}

5.2 线程池不要给VT设上限

// ❌ 错误:给VT套一个固定大小线程池,完全浪费了VT优势
ExecutorService bad = Executors.newFixedThreadPool(200, Thread.ofVirtual().factory());

// ✅ 正确:每个任务一个VT
ExecutorService good = Executors.newVirtualThreadPerTaskExecutor();

// ✅ 如果需要限流,用Semaphore
Semaphore limit = new Semaphore(200);

5.3 ThreadLocal要小心

// ⚠️ VT数量可能达到百万级,每个VT一份ThreadLocal会爆内存
// 特别注意连接池、Buffer池等使用ThreadLocal做缓存的库

// ✅ 用ScopedValue替代(Java 21 Preview)
private static final ScopedValue<String> TRACE_ID = ScopedValue.newInstance();

ScopedValue.where(TRACE_ID, "txn-12345").run(() -> {
    // 在这个scope内,TRACE_ID可用
    processPayment();
});

5.4 数据库连接池是新的瓶颈

/**
 * VT可以轻松发起10万并发请求
 * 但数据库连接池通常只有50-100个连接
 * 不限流会导致连接池被打爆
 * 
 * 解决方案:在DB层前面加Semaphore
 */
@Configuration
public class DatabaseProtection {
    
    // DB操作限流信号量,permits = 连接池大小 × 0.8
    private final Semaphore dbSemaphore = new Semaphore(40);

    public <T> T executeWithProtection(Supplier<T> dbOperation) {
        try {
            if (!dbSemaphore.tryAcquire(3, TimeUnit.SECONDS)) {
                throw new DatabaseOverloadException("DB connection pool exhausted");
            }
            return dbOperation.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } finally {
            dbSemaphore.release();
        }
    }
}

六、部署架构

graph TB subgraph Kubernetes Cluster subgraph 接入层 Ingress[Ingress Controller] --> GW1[Gateway Pod ×3] end subgraph 交易服务 GW1 --> Pay1[支付服务 Pod ×4
JDK 21 + VT
CPU: 2c, Mem: 4Gi] end subgraph 账户服务 Pay1 --> Acct1[账户服务 Pod ×3
JDK 21 + VT
CPU: 2c, Mem: 4Gi] end subgraph 渠道服务 Pay1 --> Ch1[渠道服务 Pod ×3
JDK 21 + VT
CPU: 1c, Mem: 2Gi] end subgraph 中间件 Redis[(Redis Cluster
额度池 + 缓存)] MasterDB[(MySQL 主库
账户 + 流水)] SlaveDB[(MySQL 从库 ×2
查询服务)] MQ[RocketMQ
异步消息] end subgraph 可观测性 Prom[Prometheus] SW[SkyWalking OAP] Loki[Loki] Grafana[Grafana
Dashboard + Alert] end end Acct1 --> Redis Acct1 --> MasterDB Ch1 --> MQ Pay1 --> MQ Pay1 -.-> Prom Acct1 -.-> Prom Ch1 -.-> Prom Prom --> Grafana SW --> Grafana Loki --> Grafana style Pay1 fill:#339af0,stroke:#1864ab,color:#fff style Acct1 fill:#339af0,stroke:#1864ab,color:#fff style Ch1 fill:#339af0,stroke:#1864ab,color:#fff style Grafana fill:#845ef7,stroke:#5f3dc4,color:#fff

七、总结

千级TPS的支付系统不是某一个"银弹"能搞定的,而是一整套组合拳:

  1. Virtual Threads — 解决IO等待浪费线程的问题,让并发数从百级跳到万级
  2. 影子账户 + 额度池 — 解决热点账户行锁的问题,单账户TPS从50到2000+
  3. Semaphore + 熔断 — 解决渠道隔离问题,一个渠道挂不影响全局
  4. Structured Concurrency — 解决串行编排延迟累加的问题,前置检查并行化
  5. 全链路可观测 — 解决"出了问题不知道在哪"的问题,分钟级定位

但也别忘了VT的几个坑:synchronized会pin、ThreadLocal会爆、连接池会被打爆。升级到VT不是简单换个Executor就行,需要全链路review。

最后一句忠告:先压测,再优化。所有的"理论TPS"都是纸老虎,只有压测报告才是真理。

评论区
暂无评论
avatar