一、前言:为什么需要 State?
1.1 无状态 vs 有状态
graph LR
subgraph "无状态计算"
A1[输入1] --> B1[处理] --> C1[输出1]
A2[输入2] --> B2[处理] --> C2[输出2]
A3[输入3] --> B3[处理] --> C3[输出3]
end
graph LR
subgraph "有状态计算"
A1[输入1] --> B1[处理 + 状态更新]
B1 --> STATE[(State)]
STATE --> B2[处理 + 状态更新]
A2[输入2] --> B2
B2 --> STATE
STATE --> B3[处理 + 状态更新]
A3[输入3] --> B3
end
1.2 哪些场景需要 State?
graph TB
A[需要State的场景] --> B[聚合计算]
A --> C[去重]
A --> D[窗口计算]
A --> E[机器学习模型]
A --> F[CEP模式匹配]
B --> B1["SUM/COUNT/AVG"]
C --> C1["UV统计"]
D --> D1["窗口内数据"]
E --> E1["模型参数"]
F --> F1["历史事件序列"]
1.3 Flink State 的核心价值
graph LR
A[Flink State] --> B[高效访问]
A --> C[容错保证]
A --> D[精确一次]
B --> B1["本地内存/RocksDB
毫秒级访问"] C --> C1["Checkpoint机制
故障自动恢复"] D --> D1["Exactly-Once
状态一致性"]
毫秒级访问"] C --> C1["Checkpoint机制
故障自动恢复"] D --> D1["Exactly-Once
状态一致性"]
二、State 类型详解
2.1 State 分类
graph TB
subgraph "Flink State 分类"
A[Managed State
托管状态] --> A1[Keyed State] A --> A2[Operator State] B[Raw State
原始状态] --> B1["用户自行管理
不推荐使用"] end A1 --> K1[ValueState] A1 --> K2[ListState] A1 --> K3[MapState] A1 --> K4[ReducingState] A1 --> K5[AggregatingState] A2 --> O1[ListState] A2 --> O2[BroadcastState]
托管状态] --> A1[Keyed State] A --> A2[Operator State] B[Raw State
原始状态] --> B1["用户自行管理
不推荐使用"] end A1 --> K1[ValueState] A1 --> K2[ListState] A1 --> K3[MapState] A1 --> K4[ReducingState] A1 --> K5[AggregatingState] A2 --> O1[ListState] A2 --> O2[BroadcastState]
2.2 Keyed State 详解
graph TB
subgraph "Keyed State 特点"
A["按 Key 分区"]
B["每个 Key 独立状态"]
C["只能在 KeyedStream 后使用"]
end
Keyed State 类型:
| 类型 | 说明 | 适用场景 |
|---|---|---|
| ValueState | 单个值 | 最新值、计数器 |
| ListState | 值列表 | 历史记录 |
| MapState | 键值对 | 多维度状态 |
| ReducingState | 聚合值(同类型) | 累加器 |
| AggregatingState | 聚合值(不同类型) | 复杂聚合 |
代码示例:
public class StatefulFunction extends KeyedProcessFunction<String, Event, Result> {
// 1. ValueState - 单个值
private ValueState<Long> countState;
// 2. ListState - 列表
private ListState<Event> historyState;
// 3. MapState - Map
private MapState<String, Long> categoryCountState;
// 4. ReducingState - 归约
private ReducingState<Long> sumState;
@Override
public void open(Configuration parameters) throws Exception {
// 初始化 ValueState
ValueStateDescriptor<Long> countDesc = new ValueStateDescriptor<>("count", Long.class);
countState = getRuntimeContext().getState(countDesc);
// 初始化 ListState
ListStateDescriptor<Event> historyDesc = new ListStateDescriptor<>("history", Event.class);
historyState = getRuntimeContext().getListState(historyDesc);
// 初始化 MapState
MapStateDescriptor<String, Long> categoryDesc = new MapStateDescriptor<>("category", String.class, Long.class);
categoryCountState = getRuntimeContext().getMapState(categoryDesc);
// 初始化 ReducingState
ReducingStateDescriptor<Long> sumDesc = new ReducingStateDescriptor<>("sum", Long::sum, Long.class);
sumState = getRuntimeContext().getReducingState(sumDesc);
}
@Override
public void processElement(Event event, Context ctx, Collector<r> out) throws Exception {
// 使用 ValueState
Long count = countState.value();
if (count == null) count = 0L;
countState.update(count + 1);
// 使用 ListState
historyState.add(event);
// 使用 MapState
String category = event.getCategory();
Long categoryCount = categoryCountState.get(category);
categoryCountState.put(category, categoryCount == null ? 1 : categoryCount + 1);
// 使用 ReducingState
sumState.add(event.getAmount());
// 输出结果
out.collect(new Result(countState.value(), sumState.get()));
}
}2.3 Operator State 详解
graph TB
subgraph "Operator State 特点"
A["绑定到算子实例"]
B["所有数据共享"]
C["常用于 Source/Sink"]
end
代码示例:
public class MySourceFunction implements SourceFunction<String>, CheckpointedFunction {
private volatile boolean isRunning = true;
private long offset;
// Operator State
private ListState<Long> offsetState;
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// 初始化 Operator State
ListStateDescriptor<Long> desc = new ListStateDescriptor<>("offset", Long.class);
offsetState = context.getOperatorStateStore().getListState(desc);
// 从 State 恢复
if (context.isRestored()) {
for (Long o : offsetState.get()) {
offset = o;
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// Checkpoint 时保存状态
offsetState.clear();
offsetState.add(offset);
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning) {
synchronized (ctx.getCheckpointLock()) {
// 读取数据并更新 offset
String data = readFromSource(offset);
ctx.collect(data);
offset++;
}
}
}
@Override
public void cancel() {
isRunning = false;
}
}2.4 Broadcast State
graph LR
subgraph "Broadcast State"
RULE[规则流] --> BC[Broadcast]
BC --> OP1[算子1]
BC --> OP2[算子2]
BC --> OP3[算子N]
DATA[数据流] --> OP1
DATA --> OP2
DATA --> OP3
end
代码示例:
// 规则流
DataStream<Rule> ruleStream = env.addSource(new RuleSource());
// 数据流
DataStream<Event> dataStream = env.addSource(new EventSource());
// 定义 Broadcast State
MapStateDescriptor<String, Rule> ruleStateDesc = new MapStateDescriptor<>(
"rules",
String.class,
Rule.class
);
// 广播规则流
BroadcastStream<Rule> broadcastRules = ruleStream.broadcast(ruleStateDesc);
// 连接数据流和广播流
dataStream
.keyBy(Event::getKey)
.connect(broadcastRules)
.process(new BroadcastProcessFunction<Event, Rule, Result>() {
@Override
public void processElement(Event event, ReadOnlyContext ctx, Collector<r> out) {
// 读取规则(只读)
ReadOnlyBroadcastState<String, Rule> rules = ctx.getBroadcastState(ruleStateDesc);
Rule rule = rules.get(event.getRuleId());
if (rule != null && rule.match(event)) {
out.collect(new Result(event, rule));
}
}
@Override
public void processBroadcastElement(Rule rule, Context ctx, Collector<r> out) {
// 更新规则
BroadcastState<String, Rule> rules = ctx.getBroadcastState(ruleStateDesc);
rules.put(rule.getId(), rule);
}
});三、State Backend
3.1 State Backend 类型
graph TB
subgraph "State Backend"
A[HashMapStateBackend] --> A1["状态存内存
快但受内存限制"] B[EmbeddedRocksDBStateBackend] --> B1["状态存RocksDB
支持大状态"] end
快但受内存限制"] B[EmbeddedRocksDBStateBackend] --> B1["状态存RocksDB
支持大状态"] end
3.2 对比与选型
| 特性 | HashMapStateBackend | RocksDBStateBackend |
|---|---|---|
| 存储位置 | JVM 堆内存 | RocksDB(本地磁盘) |
| 状态大小 | 受堆内存限制 | 可以很大(TB级) |
| 访问速度 | 快(纳秒级) | 较慢(毫秒级) |
| 序列化 | 只在 Checkpoint 时 | 每次读写都序列化 |
| 增量 Checkpoint | 不支持 | 支持 |
| 适用场景 | 状态小、速度优先 | 状态大、大规模生产 |
3.3 配置方式
// 方式1:代码配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// HashMapStateBackend
env.setStateBackend(new HashMapStateBackend());
// RocksDBStateBackend
env.setStateBackend(new EmbeddedRocksDBStateBackend());
// 方式2:flink-conf.yaml
// state.backend: rocksdb
// state.checkpoints.dir: hdfs:///flink/checkpoints
// state.backend.rocksdb.localdir: /data/rocksdb3.4 RocksDB 调优
# flink-conf.yaml RocksDB 配置
# 本地目录(建议 SSD)
state.backend.rocksdb.localdir: /ssd/rocksdb
# 内存管理(托管内存)
state.backend.rocksdb.memory.managed: true
# Block Cache 大小
state.backend.rocksdb.block.cache-size: 256mb
# Write Buffer 数量
state.backend.rocksdb.writebuffer.count: 4
# Write Buffer 大小
state.backend.rocksdb.writebuffer.size: 128mb
# 增量 Checkpoint
state.backend.incremental: true四、Checkpoint 机制
4.1 Checkpoint 原理
sequenceDiagram
participant JM as JobManager
participant Source as Source
participant Op as Operator
participant Sink as Sink
participant Storage as 存储
JM->>Source: 触发 Checkpoint (barrier n)
Source->>Source: 保存状态
Source->>Storage: 状态持久化
Source->>Op: 发送 barrier n
Op->>Op: 收到 barrier,保存状态
Op->>Storage: 状态持久化
Op->>Sink: 发送 barrier n
Sink->>Sink: 收到 barrier,保存状态
Sink->>Storage: 状态持久化
Sink->>JM: 确认 Checkpoint n 完成
JM->>JM: 所有确认收到,Checkpoint 成功
4.2 Barrier 对齐
graph TB
subgraph "Barrier 对齐"
S1[Source 1] -->|barrier n| OP
S2[Source 2] -->|barrier n| OP
OP[Operator]
NOTE["等待所有输入的 barrier
对齐后才开始 checkpoint"] end
对齐后才开始 checkpoint"] end
sequenceDiagram
participant Input1
participant Input2
participant Operator
Input1->>Operator: barrier n
Note over Operator: 缓存 Input1 数据
等待 Input2 barrier Input2->>Operator: 数据 Note over Operator: 继续处理 Input2 Input2->>Operator: barrier n Note over Operator: barrier 对齐完成
开始 checkpoint Operator->>Operator: 保存状态
等待 Input2 barrier Input2->>Operator: 数据 Note over Operator: 继续处理 Input2 Input2->>Operator: barrier n Note over Operator: barrier 对齐完成
开始 checkpoint Operator->>Operator: 保存状态
4.3 Unaligned Checkpoint (Flink 1.11+)
graph TB
subgraph "Unaligned Checkpoint"
S1[Source 1] -->|barrier + 数据| OP
S2[Source 2] -->|barrier + 数据| OP
OP[Operator]
NOTE["不等待对齐
barrier 可以超越数据
缓冲数据也写入 checkpoint"] end
barrier 可以超越数据
缓冲数据也写入 checkpoint"] end
配置:
// 开启 Unaligned Checkpoint
env.getCheckpointConfig().enableUnalignedCheckpoints();
// 或者配置文件
// execution.checkpointing.unaligned: true4.4 Checkpoint 配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用 Checkpoint,间隔 1 分钟
env.enableCheckpointing(60000);
CheckpointConfig config = env.getCheckpointConfig();
// Checkpoint 模式
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 最小间隔
config.setMinPauseBetweenCheckpoints(30000);
// 超时时间
config.setCheckpointTimeout(600000);
// 最大并发 Checkpoint
config.setMaxConcurrentCheckpoints(1);
// 失败容忍次数
config.setTolerableCheckpointFailureNumber(3);
// 取消时保留 Checkpoint
config.setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// Checkpoint 存储
config.setCheckpointStorage("hdfs:///flink/checkpoints");
// Unaligned Checkpoint
config.enableUnalignedCheckpoints();五、Savepoint
5.1 Checkpoint vs Savepoint
graph TB
subgraph "Checkpoint"
C1["自动触发"]
C2["用于故障恢复"]
C3["可以增量"]
C4["作业停止后可能删除"]
end
subgraph "Savepoint"
S1["手动触发"]
S2["用于版本升级/迁移"]
S3["全量快照"]
S4["永久保留"]
end
| 特性 | Checkpoint | Savepoint |
|---|---|---|
| 触发方式 | 自动 | 手动 |
| 主要用途 | 故障恢复 | 版本升级、迁移 |
| 格式 | 可能增量 | 全量、标准化 |
| 生命周期 | 由 Flink 管理 | 用户管理 |
| 兼容性 | 不保证跨版本 | 跨版本兼容 |
5.2 Savepoint 操作
# 触发 Savepoint
bin/flink savepoint <jobId> [savepointPath]
bin/flink savepoint abc123 hdfs:///savepoints/
# 取消作业并触发 Savepoint
bin/flink cancel -s [savepointPath] <jobId>
bin/flink cancel -s hdfs:///savepoints/ abc123
# 从 Savepoint 恢复
bin/flink run -s hdfs:///savepoints/savepoint-abc123 myJob.jar
# 允许跳过无法恢复的状态
bin/flink run -s hdfs:///savepoints/savepoint-abc123 --allowNonRestoredState myJob.jar5.3 状态兼容性
graph TD
A[状态兼容性检查] --> B{算子ID匹配?}
B -->|是| C{状态名匹配?}
B -->|否| D["失败或跳过
(--allowNonRestoredState)"] C -->|是| E{类型兼容?} C -->|否| D E -->|是| F[恢复成功] E -->|否| D
(--allowNonRestoredState)"] C -->|是| E{类型兼容?} C -->|否| D E -->|是| F[恢复成功] E -->|否| D
最佳实践:
// 为算子指定 UID,保证升级兼容性
stream
.map(new MyMapper())
.uid("my-mapper") // 重要!
.name("My Mapper")
.keyBy(...)
.process(new MyProcessor())
.uid("my-processor") // 重要!
.name("My Processor");六、Exactly-Once 语义
6.1 端到端 Exactly-Once
graph LR
subgraph "端到端 Exactly-Once"
SOURCE[Source
可重放] --> FLINK[Flink
Checkpoint] FLINK --> SINK[Sink
幂等/事务] end
可重放] --> FLINK[Flink
Checkpoint] FLINK --> SINK[Sink
幂等/事务] end
三个条件:
- Source 可重放:如 Kafka(offset 可回溯)
- Flink 内部:Checkpoint + Barrier
- Sink 端:幂等或两阶段提交
6.2 两阶段提交 (2PC)
sequenceDiagram
participant Flink
participant Sink
participant External as 外部系统
Note over Flink: Checkpoint 开始
Flink->>Sink: 预提交 (pre-commit)
Sink->>External: 开启事务,写入数据
Sink-->>Flink: 预提交成功
Note over Flink: Checkpoint 完成
Flink->>Sink: 通知提交
Sink->>External: 提交事务
External-->>Sink: 事务提交成功
Note over External: 数据可见
6.3 Kafka Exactly-Once 示例
// Kafka Producer 配置
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "kafka:9092");
producerProps.setProperty("transaction.timeout.ms", "600000");
// 创建 Kafka Sink(Exactly-Once)
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("output-topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // Exactly-Once
.setTransactionalIdPrefix("my-app") // 事务ID前缀
.build();
stream.sinkTo(sink);七、State 调优
7.1 State TTL
// 设置 State TTL
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7)) // 7天过期
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 创建和写入时更新
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期数据
.cleanupFullSnapshot() // 全量快照时清理
.cleanupInRocksdbCompactFilter(1000) // RocksDB Compaction 时清理
.build();
// 应用到 State
ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("my-state", Long.class);
stateDesc.enableTimeToLive(ttlConfig);7.2 大状态优化
mindmap
root((大状态优化))
State Backend
使用 RocksDB
增量 Checkpoint
本地 SSD
State 设计
使用 MapState 代替大 ValueState
合理设置 TTL
避免状态无限增长
Checkpoint
调整间隔
增加超时时间
使用 Unaligned
资源
增加 Managed Memory
增加并行度分散状态
7.3 常见问题排查
flowchart TD
A[State 问题] --> B{Checkpoint 失败?}
B -->|超时| C["增加超时时间
检查 Barrier 对齐"] B -->|OOM| D["使用 RocksDB
增加内存"] B -->|存储失败| E["检查存储连接
检查权限"] A --> F{状态过大?} F -->|是| G["设置 TTL
清理过期状态
使用增量 CP"] A --> H{恢复失败?} H -->|是| I["检查算子 UID
检查状态兼容性"]
检查 Barrier 对齐"] B -->|OOM| D["使用 RocksDB
增加内存"] B -->|存储失败| E["检查存储连接
检查权限"] A --> F{状态过大?} F -->|是| G["设置 TTL
清理过期状态
使用增量 CP"] A --> H{恢复失败?} H -->|是| I["检查算子 UID
检查状态兼容性"]
八、实战案例
8.1 案例:实时去重计数
/**
* 实时统计独立用户数(精确去重)
*/
public class ExactUVCount extends KeyedProcessFunction<String, UserEvent, Tuple2<String, Long>> {
// 使用 MapState 存储用户 ID
private MapState<Long, Boolean> userIdState;
private ValueState<Long> countState;
@Override
public void open(Configuration parameters) {
// 配置 TTL
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupInRocksdbCompactFilter(1000)
.build();
MapStateDescriptor<Long, Boolean> userIdDesc = new MapStateDescriptor<>("user-ids", Long.class, Boolean.class);
userIdDesc.enableTimeToLive(ttlConfig);
userIdState = getRuntimeContext().getMapState(userIdDesc);
ValueStateDescriptor<Long> countDesc = new ValueStateDescriptor<>("count", Long.class);
countDesc.enableTimeToLive(ttlConfig);
countState = getRuntimeContext().getState(countDesc);
}
@Override
public void processElement(UserEvent event, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
Long userId = event.getUserId();
// 检查是否已存在
if (!userIdState.contains(userId)) {
userIdState.put(userId, true);
Long count = countState.value();
count = count == null ? 1 : count + 1;
countState.update(count);
}
// 输出当前 UV
out.collect(Tuple2.of(ctx.getCurrentKey(), countState.value()));
}
}8.2 案例:双流 Join
/**
* 订单流和支付流 Join
*/
public class OrderPaymentJoin extends CoProcessFunction<Order, Payment, OrderWithPayment> {
// 订单状态(等待支付)
private MapState<String, Order> orderState;
// 支付状态(订单未到)
private MapState<String, Payment> paymentState;
@Override
public void open(Configuration parameters) {
// 5分钟 TTL
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(5))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.build();
MapStateDescriptor<String, Order> orderDesc = new MapStateDescriptor<>("orders", String.class, Order.class);
orderDesc.enableTimeToLive(ttlConfig);
orderState = getRuntimeContext().getMapState(orderDesc);
MapStateDescriptor<String, Payment> paymentDesc = new MapStateDescriptor<>("payments", String.class, Payment.class);
paymentDesc.enableTimeToLive(ttlConfig);
paymentState = getRuntimeContext().getMapState(paymentDesc);
}
@Override
public void processElement1(Order order, Context ctx, Collector<OrderWithPayment> out) throws Exception {
// 检查是否有匹配的支付
Payment payment = paymentState.get(order.getOrderId());
if (payment != null) {
out.collect(new OrderWithPayment(order, payment));
paymentState.remove(order.getOrderId());
} else {
// 缓存订单,等待支付
orderState.put(order.getOrderId(), order);
}
}
@Override
public void processElement2(Payment payment, Context ctx, Collector<OrderWithPayment> out) throws Exception {
// 检查是否有匹配的订单
Order order = orderState.get(payment.getOrderId());
if (order != null) {
out.collect(new OrderWithPayment(order, payment));
orderState.remove(payment.getOrderId());
} else {
// 缓存支付,等待订单
paymentState.put(payment.getOrderId(), payment);
}
}
}九、监控与运维
9.1 关键监控指标
graph TB
subgraph "Checkpoint 监控"
A1[lastCheckpointDuration]
A2[lastCheckpointSize]
A3[numberOfCompletedCheckpoints]
A4[numberOfFailedCheckpoints]
end
subgraph "State 监控"
B1[State Size]
B2[RocksDB Memory Usage]
B3[Number of State Entries]
end
subgraph "背压监控"
C1[backPressuredTimeMsPerSecond]
C2[idleTimeMsPerSecond]
end
9.2 Checkpoint 失败排查
flowchart TD
A[Checkpoint 失败] --> B{错误类型}
B -->|超时| C["原因:
- State 太大
- 背压严重
- 存储慢"] C --> C1["解决:
- 增加超时时间
- 使用增量CP
- 优化背压"] B -->|存储错误| D["原因:
- 存储不可用
- 权限问题
- 空间不足"] D --> D1["解决:
- 检查存储连接
- 检查权限
- 清理空间"] B -->|对齐超时| E["原因:
- Barrier 被阻塞
- 数据倾斜"] E --> E1["解决:
- 开启 Unaligned
- 处理倾斜"]
- State 太大
- 背压严重
- 存储慢"] C --> C1["解决:
- 增加超时时间
- 使用增量CP
- 优化背压"] B -->|存储错误| D["原因:
- 存储不可用
- 权限问题
- 空间不足"] D --> D1["解决:
- 检查存储连接
- 检查权限
- 清理空间"] B -->|对齐超时| E["原因:
- Barrier 被阻塞
- 数据倾斜"] E --> E1["解决:
- 开启 Unaligned
- 处理倾斜"]
十、总结
10.1 核心要点
mindmap
root((State与容错))
State类型
Keyed State - 按Key分区
Operator State - 算子级别
Broadcast State - 广播共享
State Backend
HashMap - 内存快
RocksDB - 支持大状态
Checkpoint
Barrier对齐
增量快照
Unaligned
Exactly-Once
Source可重放
Flink内部Checkpoint
Sink幂等/2PC
10.2 一句话总结
State = Flink 有状态计算的基础
Checkpoint = Flink 容错的核心机制
两者结合,实现 Exactly-Once 语义