搜 索

Flink从入门到放弃③—State与容错:有状态计算的基石

  • 13阅读
  • 2023年08月12日
  • 0评论
首页 / AI/大数据 / 正文

一、前言:为什么需要 State?

1.1 无状态 vs 有状态

graph LR subgraph "无状态计算" A1[输入1] --> B1[处理] --> C1[输出1] A2[输入2] --> B2[处理] --> C2[输出2] A3[输入3] --> B3[处理] --> C3[输出3] end
graph LR subgraph "有状态计算" A1[输入1] --> B1[处理 + 状态更新] B1 --> STATE[(State)] STATE --> B2[处理 + 状态更新] A2[输入2] --> B2 B2 --> STATE STATE --> B3[处理 + 状态更新] A3[输入3] --> B3 end

1.2 哪些场景需要 State?

graph TB A[需要State的场景] --> B[聚合计算] A --> C[去重] A --> D[窗口计算] A --> E[机器学习模型] A --> F[CEP模式匹配] B --> B1["SUM/COUNT/AVG"] C --> C1["UV统计"] D --> D1["窗口内数据"] E --> E1["模型参数"] F --> F1["历史事件序列"]

1.3 Flink State 的核心价值

graph LR A[Flink State] --> B[高效访问] A --> C[容错保证] A --> D[精确一次] B --> B1["本地内存/RocksDB
毫秒级访问"] C --> C1["Checkpoint机制
故障自动恢复"] D --> D1["Exactly-Once
状态一致性"]

二、State 类型详解

2.1 State 分类

graph TB subgraph "Flink State 分类" A[Managed State
托管状态] --> A1[Keyed State] A --> A2[Operator State] B[Raw State
原始状态] --> B1["用户自行管理
不推荐使用"] end A1 --> K1[ValueState] A1 --> K2[ListState] A1 --> K3[MapState] A1 --> K4[ReducingState] A1 --> K5[AggregatingState] A2 --> O1[ListState] A2 --> O2[BroadcastState]

2.2 Keyed State 详解

graph TB subgraph "Keyed State 特点" A["按 Key 分区"] B["每个 Key 独立状态"] C["只能在 KeyedStream 后使用"] end

Keyed State 类型:

类型说明适用场景
ValueState单个值最新值、计数器
ListState值列表历史记录
MapState键值对多维度状态
ReducingState聚合值(同类型)累加器
AggregatingState聚合值(不同类型)复杂聚合

代码示例:

public class StatefulFunction extends KeyedProcessFunction<String, Event, Result> {
    
    // 1. ValueState - 单个值
    private ValueState<Long> countState;
    
    // 2. ListState - 列表
    private ListState<Event> historyState;
    
    // 3. MapState - Map
    private MapState<String, Long> categoryCountState;
    
    // 4. ReducingState - 归约
    private ReducingState<Long> sumState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        // 初始化 ValueState
        ValueStateDescriptor<Long> countDesc = new ValueStateDescriptor<>("count", Long.class);
        countState = getRuntimeContext().getState(countDesc);
        
        // 初始化 ListState
        ListStateDescriptor<Event> historyDesc = new ListStateDescriptor<>("history", Event.class);
        historyState = getRuntimeContext().getListState(historyDesc);
        
        // 初始化 MapState
        MapStateDescriptor<String, Long> categoryDesc = new MapStateDescriptor<>("category", String.class, Long.class);
        categoryCountState = getRuntimeContext().getMapState(categoryDesc);
        
        // 初始化 ReducingState
        ReducingStateDescriptor<Long> sumDesc = new ReducingStateDescriptor<>("sum", Long::sum, Long.class);
        sumState = getRuntimeContext().getReducingState(sumDesc);
    }
    
    @Override
    public void processElement(Event event, Context ctx, Collector<r> out) throws Exception {
        // 使用 ValueState
        Long count = countState.value();
        if (count == null) count = 0L;
        countState.update(count + 1);
        
        // 使用 ListState
        historyState.add(event);
        
        // 使用 MapState
        String category = event.getCategory();
        Long categoryCount = categoryCountState.get(category);
        categoryCountState.put(category, categoryCount == null ? 1 : categoryCount + 1);
        
        // 使用 ReducingState
        sumState.add(event.getAmount());
        
        // 输出结果
        out.collect(new Result(countState.value(), sumState.get()));
    }
}

2.3 Operator State 详解

graph TB subgraph "Operator State 特点" A["绑定到算子实例"] B["所有数据共享"] C["常用于 Source/Sink"] end

代码示例:

public class MySourceFunction implements SourceFunction<String>, CheckpointedFunction {
    
    private volatile boolean isRunning = true;
    private long offset;
    
    // Operator State
    private ListState<Long> offsetState;
    
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // 初始化 Operator State
        ListStateDescriptor<Long> desc = new ListStateDescriptor<>("offset", Long.class);
        offsetState = context.getOperatorStateStore().getListState(desc);
        
        // 从 State 恢复
        if (context.isRestored()) {
            for (Long o : offsetState.get()) {
                offset = o;
            }
        }
    }
    
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // Checkpoint 时保存状态
        offsetState.clear();
        offsetState.add(offset);
    }
    
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            synchronized (ctx.getCheckpointLock()) {
                // 读取数据并更新 offset
                String data = readFromSource(offset);
                ctx.collect(data);
                offset++;
            }
        }
    }
    
    @Override
    public void cancel() {
        isRunning = false;
    }
}

2.4 Broadcast State

graph LR subgraph "Broadcast State" RULE[规则流] --> BC[Broadcast] BC --> OP1[算子1] BC --> OP2[算子2] BC --> OP3[算子N] DATA[数据流] --> OP1 DATA --> OP2 DATA --> OP3 end

代码示例:

// 规则流
DataStream<Rule> ruleStream = env.addSource(new RuleSource());

// 数据流
DataStream<Event> dataStream = env.addSource(new EventSource());

// 定义 Broadcast State
MapStateDescriptor<String, Rule> ruleStateDesc = new MapStateDescriptor<>(
    "rules",
    String.class,
    Rule.class
);

// 广播规则流
BroadcastStream<Rule> broadcastRules = ruleStream.broadcast(ruleStateDesc);

// 连接数据流和广播流
dataStream
    .keyBy(Event::getKey)
    .connect(broadcastRules)
    .process(new BroadcastProcessFunction<Event, Rule, Result>() {
        
        @Override
        public void processElement(Event event, ReadOnlyContext ctx, Collector<r> out) {
            // 读取规则(只读)
            ReadOnlyBroadcastState<String, Rule> rules = ctx.getBroadcastState(ruleStateDesc);
            Rule rule = rules.get(event.getRuleId());
            
            if (rule != null && rule.match(event)) {
                out.collect(new Result(event, rule));
            }
        }
        
        @Override
        public void processBroadcastElement(Rule rule, Context ctx, Collector<r> out) {
            // 更新规则
            BroadcastState<String, Rule> rules = ctx.getBroadcastState(ruleStateDesc);
            rules.put(rule.getId(), rule);
        }
    });

三、State Backend

3.1 State Backend 类型

graph TB subgraph "State Backend" A[HashMapStateBackend] --> A1["状态存内存
快但受内存限制"] B[EmbeddedRocksDBStateBackend] --> B1["状态存RocksDB
支持大状态"] end

3.2 对比与选型

特性HashMapStateBackendRocksDBStateBackend
存储位置JVM 堆内存RocksDB(本地磁盘)
状态大小受堆内存限制可以很大(TB级)
访问速度快(纳秒级)较慢(毫秒级)
序列化只在 Checkpoint 时每次读写都序列化
增量 Checkpoint不支持支持
适用场景状态小、速度优先状态大、大规模生产

3.3 配置方式

// 方式1:代码配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// HashMapStateBackend
env.setStateBackend(new HashMapStateBackend());

// RocksDBStateBackend
env.setStateBackend(new EmbeddedRocksDBStateBackend());

// 方式2:flink-conf.yaml
// state.backend: rocksdb
// state.checkpoints.dir: hdfs:///flink/checkpoints
// state.backend.rocksdb.localdir: /data/rocksdb

3.4 RocksDB 调优

# flink-conf.yaml RocksDB 配置

# 本地目录(建议 SSD)
state.backend.rocksdb.localdir: /ssd/rocksdb

# 内存管理(托管内存)
state.backend.rocksdb.memory.managed: true

# Block Cache 大小
state.backend.rocksdb.block.cache-size: 256mb

# Write Buffer 数量
state.backend.rocksdb.writebuffer.count: 4

# Write Buffer 大小
state.backend.rocksdb.writebuffer.size: 128mb

# 增量 Checkpoint
state.backend.incremental: true

四、Checkpoint 机制

4.1 Checkpoint 原理

sequenceDiagram participant JM as JobManager participant Source as Source participant Op as Operator participant Sink as Sink participant Storage as 存储 JM->>Source: 触发 Checkpoint (barrier n) Source->>Source: 保存状态 Source->>Storage: 状态持久化 Source->>Op: 发送 barrier n Op->>Op: 收到 barrier,保存状态 Op->>Storage: 状态持久化 Op->>Sink: 发送 barrier n Sink->>Sink: 收到 barrier,保存状态 Sink->>Storage: 状态持久化 Sink->>JM: 确认 Checkpoint n 完成 JM->>JM: 所有确认收到,Checkpoint 成功

4.2 Barrier 对齐

graph TB subgraph "Barrier 对齐" S1[Source 1] -->|barrier n| OP S2[Source 2] -->|barrier n| OP OP[Operator] NOTE["等待所有输入的 barrier
对齐后才开始 checkpoint"] end
sequenceDiagram participant Input1 participant Input2 participant Operator Input1->>Operator: barrier n Note over Operator: 缓存 Input1 数据
等待 Input2 barrier Input2->>Operator: 数据 Note over Operator: 继续处理 Input2 Input2->>Operator: barrier n Note over Operator: barrier 对齐完成
开始 checkpoint Operator->>Operator: 保存状态

4.3 Unaligned Checkpoint (Flink 1.11+)

graph TB subgraph "Unaligned Checkpoint" S1[Source 1] -->|barrier + 数据| OP S2[Source 2] -->|barrier + 数据| OP OP[Operator] NOTE["不等待对齐
barrier 可以超越数据
缓冲数据也写入 checkpoint"] end

配置:

// 开启 Unaligned Checkpoint
env.getCheckpointConfig().enableUnalignedCheckpoints();

// 或者配置文件
// execution.checkpointing.unaligned: true

4.4 Checkpoint 配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 启用 Checkpoint,间隔 1 分钟
env.enableCheckpointing(60000);

CheckpointConfig config = env.getCheckpointConfig();

// Checkpoint 模式
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 最小间隔
config.setMinPauseBetweenCheckpoints(30000);

// 超时时间
config.setCheckpointTimeout(600000);

// 最大并发 Checkpoint
config.setMaxConcurrentCheckpoints(1);

// 失败容忍次数
config.setTolerableCheckpointFailureNumber(3);

// 取消时保留 Checkpoint
config.setExternalizedCheckpointCleanup(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

// Checkpoint 存储
config.setCheckpointStorage("hdfs:///flink/checkpoints");

// Unaligned Checkpoint
config.enableUnalignedCheckpoints();

五、Savepoint

5.1 Checkpoint vs Savepoint

graph TB subgraph "Checkpoint" C1["自动触发"] C2["用于故障恢复"] C3["可以增量"] C4["作业停止后可能删除"] end subgraph "Savepoint" S1["手动触发"] S2["用于版本升级/迁移"] S3["全量快照"] S4["永久保留"] end
特性CheckpointSavepoint
触发方式自动手动
主要用途故障恢复版本升级、迁移
格式可能增量全量、标准化
生命周期由 Flink 管理用户管理
兼容性不保证跨版本跨版本兼容

5.2 Savepoint 操作

# 触发 Savepoint
bin/flink savepoint <jobId> [savepointPath]
bin/flink savepoint abc123 hdfs:///savepoints/

# 取消作业并触发 Savepoint
bin/flink cancel -s [savepointPath] <jobId>
bin/flink cancel -s hdfs:///savepoints/ abc123

# 从 Savepoint 恢复
bin/flink run -s hdfs:///savepoints/savepoint-abc123 myJob.jar

# 允许跳过无法恢复的状态
bin/flink run -s hdfs:///savepoints/savepoint-abc123 --allowNonRestoredState myJob.jar

5.3 状态兼容性

graph TD A[状态兼容性检查] --> B{算子ID匹配?} B -->|是| C{状态名匹配?} B -->|否| D["失败或跳过
(--allowNonRestoredState)"] C -->|是| E{类型兼容?} C -->|否| D E -->|是| F[恢复成功] E -->|否| D

最佳实践:

// 为算子指定 UID,保证升级兼容性
stream
    .map(new MyMapper())
    .uid("my-mapper")  // 重要!
    .name("My Mapper")
    .keyBy(...)
    .process(new MyProcessor())
    .uid("my-processor")  // 重要!
    .name("My Processor");

六、Exactly-Once 语义

6.1 端到端 Exactly-Once

graph LR subgraph "端到端 Exactly-Once" SOURCE[Source
可重放] --> FLINK[Flink
Checkpoint] FLINK --> SINK[Sink
幂等/事务] end

三个条件:

  1. Source 可重放:如 Kafka(offset 可回溯)
  2. Flink 内部:Checkpoint + Barrier
  3. Sink 端:幂等或两阶段提交

6.2 两阶段提交 (2PC)

sequenceDiagram participant Flink participant Sink participant External as 外部系统 Note over Flink: Checkpoint 开始 Flink->>Sink: 预提交 (pre-commit) Sink->>External: 开启事务,写入数据 Sink-->>Flink: 预提交成功 Note over Flink: Checkpoint 完成 Flink->>Sink: 通知提交 Sink->>External: 提交事务 External-->>Sink: 事务提交成功 Note over External: 数据可见

6.3 Kafka Exactly-Once 示例

// Kafka Producer 配置
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "kafka:9092");
producerProps.setProperty("transaction.timeout.ms", "600000");

// 创建 Kafka Sink(Exactly-Once)
KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(
        KafkaRecordSerializationSchema.builder()
            .setTopic("output-topic")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
    )
    .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)  // Exactly-Once
    .setTransactionalIdPrefix("my-app")  // 事务ID前缀
    .build();

stream.sinkTo(sink);

七、State 调优

7.1 State TTL

// 设置 State TTL
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))  // 7天过期
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)  // 创建和写入时更新
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)  // 不返回过期数据
    .cleanupFullSnapshot()  // 全量快照时清理
    .cleanupInRocksdbCompactFilter(1000)  // RocksDB Compaction 时清理
    .build();

// 应用到 State
ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("my-state", Long.class);
stateDesc.enableTimeToLive(ttlConfig);

7.2 大状态优化

mindmap root((大状态优化)) State Backend 使用 RocksDB 增量 Checkpoint 本地 SSD State 设计 使用 MapState 代替大 ValueState 合理设置 TTL 避免状态无限增长 Checkpoint 调整间隔 增加超时时间 使用 Unaligned 资源 增加 Managed Memory 增加并行度分散状态

7.3 常见问题排查

flowchart TD A[State 问题] --> B{Checkpoint 失败?} B -->|超时| C["增加超时时间
检查 Barrier 对齐"] B -->|OOM| D["使用 RocksDB
增加内存"] B -->|存储失败| E["检查存储连接
检查权限"] A --> F{状态过大?} F -->|是| G["设置 TTL
清理过期状态
使用增量 CP"] A --> H{恢复失败?} H -->|是| I["检查算子 UID
检查状态兼容性"]

八、实战案例

8.1 案例:实时去重计数

/**
 * 实时统计独立用户数(精确去重)
 */
public class ExactUVCount extends KeyedProcessFunction<String, UserEvent, Tuple2<String, Long>> {
    
    // 使用 MapState 存储用户 ID
    private MapState<Long, Boolean> userIdState;
    private ValueState<Long> countState;
    
    @Override
    public void open(Configuration parameters) {
        // 配置 TTL
        StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(Time.days(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .cleanupInRocksdbCompactFilter(1000)
            .build();
        
        MapStateDescriptor<Long, Boolean> userIdDesc = new MapStateDescriptor<>("user-ids", Long.class, Boolean.class);
        userIdDesc.enableTimeToLive(ttlConfig);
        userIdState = getRuntimeContext().getMapState(userIdDesc);
        
        ValueStateDescriptor<Long> countDesc = new ValueStateDescriptor<>("count", Long.class);
        countDesc.enableTimeToLive(ttlConfig);
        countState = getRuntimeContext().getState(countDesc);
    }
    
    @Override
    public void processElement(UserEvent event, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
        Long userId = event.getUserId();
        
        // 检查是否已存在
        if (!userIdState.contains(userId)) {
            userIdState.put(userId, true);
            
            Long count = countState.value();
            count = count == null ? 1 : count + 1;
            countState.update(count);
        }
        
        // 输出当前 UV
        out.collect(Tuple2.of(ctx.getCurrentKey(), countState.value()));
    }
}

8.2 案例:双流 Join

/**
 * 订单流和支付流 Join
 */
public class OrderPaymentJoin extends CoProcessFunction<Order, Payment, OrderWithPayment> {
    
    // 订单状态(等待支付)
    private MapState<String, Order> orderState;
    
    // 支付状态(订单未到)
    private MapState<String, Payment> paymentState;
    
    @Override
    public void open(Configuration parameters) {
        // 5分钟 TTL
        StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(Time.minutes(5))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .build();
        
        MapStateDescriptor<String, Order> orderDesc = new MapStateDescriptor<>("orders", String.class, Order.class);
        orderDesc.enableTimeToLive(ttlConfig);
        orderState = getRuntimeContext().getMapState(orderDesc);
        
        MapStateDescriptor<String, Payment> paymentDesc = new MapStateDescriptor<>("payments", String.class, Payment.class);
        paymentDesc.enableTimeToLive(ttlConfig);
        paymentState = getRuntimeContext().getMapState(paymentDesc);
    }
    
    @Override
    public void processElement1(Order order, Context ctx, Collector<OrderWithPayment> out) throws Exception {
        // 检查是否有匹配的支付
        Payment payment = paymentState.get(order.getOrderId());
        if (payment != null) {
            out.collect(new OrderWithPayment(order, payment));
            paymentState.remove(order.getOrderId());
        } else {
            // 缓存订单,等待支付
            orderState.put(order.getOrderId(), order);
        }
    }
    
    @Override
    public void processElement2(Payment payment, Context ctx, Collector<OrderWithPayment> out) throws Exception {
        // 检查是否有匹配的订单
        Order order = orderState.get(payment.getOrderId());
        if (order != null) {
            out.collect(new OrderWithPayment(order, payment));
            orderState.remove(payment.getOrderId());
        } else {
            // 缓存支付,等待订单
            paymentState.put(payment.getOrderId(), payment);
        }
    }
}

九、监控与运维

9.1 关键监控指标

graph TB subgraph "Checkpoint 监控" A1[lastCheckpointDuration] A2[lastCheckpointSize] A3[numberOfCompletedCheckpoints] A4[numberOfFailedCheckpoints] end subgraph "State 监控" B1[State Size] B2[RocksDB Memory Usage] B3[Number of State Entries] end subgraph "背压监控" C1[backPressuredTimeMsPerSecond] C2[idleTimeMsPerSecond] end

9.2 Checkpoint 失败排查

flowchart TD A[Checkpoint 失败] --> B{错误类型} B -->|超时| C["原因:
- State 太大
- 背压严重
- 存储慢"] C --> C1["解决:
- 增加超时时间
- 使用增量CP
- 优化背压"] B -->|存储错误| D["原因:
- 存储不可用
- 权限问题
- 空间不足"] D --> D1["解决:
- 检查存储连接
- 检查权限
- 清理空间"] B -->|对齐超时| E["原因:
- Barrier 被阻塞
- 数据倾斜"] E --> E1["解决:
- 开启 Unaligned
- 处理倾斜"]

十、总结

10.1 核心要点

mindmap root((State与容错)) State类型 Keyed State - 按Key分区 Operator State - 算子级别 Broadcast State - 广播共享 State Backend HashMap - 内存快 RocksDB - 支持大状态 Checkpoint Barrier对齐 增量快照 Unaligned Exactly-Once Source可重放 Flink内部Checkpoint Sink幂等/2PC

10.2 一句话总结

State = Flink 有状态计算的基础

Checkpoint = Flink 容错的核心机制

两者结合,实现 Exactly-Once 语义


评论区
暂无评论
avatar