一、前言:时间的困惑
1.1 一个让人崩溃的问题
sequenceDiagram
participant 事件发生
participant Kafka
participant Flink
participant 结果
Note over 事件发生: 10:00:00 用户下单
事件发生->>Kafka: 网络延迟 3秒
Note over Kafka: 10:00:03 消息到达
Kafka->>Flink: 10:00:05 消费
Note over Flink: 我该用哪个时间?
10:00:00? 10:00:03? 10:00:05?
10:00:00? 10:00:03? 10:00:05?
这个问题会导致:
graph TD
A[时间混乱] --> B["统计结果不准"]
A --> C["数据丢失"]
A --> D["窗口触发错误"]
A --> E["无法重放数据"]
1.2 本文要解决的问题
- 什么是事件时间、处理时间?
- Watermark 到底是什么?
- 数据迟到怎么办?
- 各种窗口怎么选?
二、三种时间语义
2.1 时间语义图解
graph LR
subgraph "时间线"
A["10:00:00
事件发生"] --> B["10:00:02
进入Kafka"] B --> C["10:00:05
Flink处理"] end subgraph "时间类型" ET["Event Time
事件时间
10:00:00"] IT["Ingestion Time
摄入时间
10:00:02"] PT["Processing Time
处理时间
10:00:05"] end
事件发生"] --> B["10:00:02
进入Kafka"] B --> C["10:00:05
Flink处理"] end subgraph "时间类型" ET["Event Time
事件时间
10:00:00"] IT["Ingestion Time
摄入时间
10:00:02"] PT["Processing Time
处理时间
10:00:05"] end
2.2 三种时间详解
| 时间类型 | 定义 | 特点 | 适用场景 |
|---|---|---|---|
| Event Time | 事件实际发生时间 | 结果确定性、可重放 | 大多数业务场景 |
| Processing Time | Flink 处理时间 | 最简单、延迟最低 | 对准确性要求不高 |
| Ingestion Time | 进入 Flink 的时间 | 折中方案(已不推荐) | 基本不用 |
2.3 为什么 Event Time 最重要?
graph TD
subgraph "Processing Time 的问题"
A1["相同数据"] --> B1["第一次运行
结果 A"] A1 --> C1["第二次运行
结果 B"] B1 --> D1["结果不一致!"] C1 --> D1 end
结果 A"] A1 --> C1["第二次运行
结果 B"] B1 --> D1["结果不一致!"] C1 --> D1 end
graph TD
subgraph "Event Time 的优势"
A2["相同数据"] --> B2["第一次运行
结果 X"] A2 --> C2["第二次运行
结果 X"] B2 --> D2["结果一致 ✅"] C2 --> D2 end
结果 X"] A2 --> C2["第二次运行
结果 X"] B2 --> D2["结果一致 ✅"] C2 --> D2 end
三、Watermark:流处理的时钟
3.1 为什么需要 Watermark?
sequenceDiagram
participant Stream as 数据流
participant Window as 10:00窗口
Stream->>Window: 10:00:01 数据
Stream->>Window: 10:00:03 数据
Note over Window: 窗口该关闭了吗?
还会有 10:00:xx 的数据吗? Stream->>Window: ???
还会有 10:00:xx 的数据吗? Stream->>Window: ???
问题:数据是乱序的,Flink 怎么知道某个时间窗口的数据已经全部到达?
答案:Watermark!
3.2 Watermark 定义
graph LR
subgraph "Watermark 含义"
A["Watermark(t)"] --> B["表示 t 之前的数据
应该都已经到达"] B --> C["可以关闭 t 之前的窗口"] end
应该都已经到达"] B --> C["可以关闭 t 之前的窗口"] end
graph LR
subgraph "数据流中的 Watermark"
D1["数据 9:59:58"] --> D2["数据 10:00:01"]
D2 --> W1["Watermark=9:59:55"]
W1 --> D3["数据 10:00:03"]
D3 --> D4["数据 9:59:59"]
D4 --> W2["Watermark=10:00:00"]
end
3.3 Watermark 生成策略
graph TB
subgraph "Watermark 策略"
A[固定延迟] --> A1["最常用
Watermark = 最大事件时间 - 延迟"] B[单调递增] --> B1["数据有序时使用
Watermark = 当前事件时间"] C[自定义] --> C1["复杂场景
实现 WatermarkGenerator"] end
Watermark = 最大事件时间 - 延迟"] B[单调递增] --> B1["数据有序时使用
Watermark = 当前事件时间"] C[自定义] --> C1["复杂场景
实现 WatermarkGenerator"] end
代码示例:
// ========== 方式1:固定延迟(最常用)==========
DataStream<Event> stream = source
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 5秒延迟
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
);
// ========== 方式2:单调递增 ==========
DataStream<Event> stream = source
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
);
// ========== 方式3:自定义 Watermark ==========
DataStream<Event> stream = source
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forGenerator(ctx -> new MyWatermarkGenerator())
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
);
// 自定义 Watermark 生成器
public class MyWatermarkGenerator implements WatermarkGenerator<Event> {
private long maxTimestamp = Long.MIN_VALUE;
private final long maxOutOfOrderness = 5000; // 5秒
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness - 1));
}
}3.4 Watermark 传播
graph TB
subgraph "多分区 Watermark 传播"
P1[分区1
WM=10:00:05] --> OP[下游算子] P2[分区2
WM=10:00:02] --> OP P3[分区3
WM=10:00:08] --> OP OP --> RESULT["下游 WM = min(10:00:05, 10:00:02, 10:00:08)
= 10:00:02"] end
WM=10:00:05] --> OP[下游算子] P2[分区2
WM=10:00:02] --> OP P3[分区3
WM=10:00:08] --> OP OP --> RESULT["下游 WM = min(10:00:05, 10:00:02, 10:00:08)
= 10:00:02"] end
关键规则:下游算子的 Watermark = 所有上游 Watermark 的最小值
3.5 Watermark 与窗口触发
sequenceDiagram
participant Data as 数据
participant WM as Watermark
participant Window as 窗口 [10:00, 10:01)
Data->>Window: 10:00:15 数据
Data->>Window: 10:00:30 数据
WM->>Window: Watermark = 10:00:50
Note over Window: 窗口结束时间 10:01:00
WM < 窗口结束
继续等待 Data->>Window: 10:00:55 数据 WM->>Window: Watermark = 10:01:00 Note over Window: WM >= 窗口结束
触发窗口计算! Window->>Window: 输出结果
WM < 窗口结束
继续等待 Data->>Window: 10:00:55 数据 WM->>Window: Watermark = 10:01:00 Note over Window: WM >= 窗口结束
触发窗口计算! Window->>Window: 输出结果
四、窗口详解
4.1 窗口类型全景
graph TB
subgraph "Flink 窗口类型"
A[时间窗口] --> A1[Tumbling
滚动窗口] A --> A2[Sliding
滑动窗口] A --> A3[Session
会话窗口] B[计数窗口] --> B1[Tumbling Count] B --> B2[Sliding Count] C[全局窗口] --> C1[Global Window
需自定义触发器] end
滚动窗口] A --> A2[Sliding
滑动窗口] A --> A3[Session
会话窗口] B[计数窗口] --> B1[Tumbling Count] B --> B2[Sliding Count] C[全局窗口] --> C1[Global Window
需自定义触发器] end
4.2 滚动窗口 (Tumbling Window)
gantt
title 滚动窗口示意图
dateFormat HH:mm
axisFormat %H:%M
section 窗口
窗口1 [10:00-10:01) : 10:00, 1m
窗口2 [10:01-10:02) : 10:01, 1m
窗口3 [10:02-10:03) : 10:02, 1m
特点:固定大小、不重叠、无间隙
// 事件时间滚动窗口
stream
.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.sum("value");
// 处理时间滚动窗口
stream
.keyBy(Event::getKey)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.sum("value");
// 带偏移量(处理时区问题)
stream
.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) // 北京时间
.sum("value");4.3 滑动窗口 (Sliding Window)
gantt
title 滑动窗口示意图 (窗口1分钟,滑动30秒)
dateFormat HH:mm:ss
axisFormat %H:%M:%S
section 窗口
窗口1 [10:00:00-10:01:00) : 10:00:00, 60s
窗口2 [10:00:30-10:01:30) : 10:00:30, 60s
窗口3 [10:01:00-10:02:00) : 10:01:00, 60s
特点:固定大小、可重叠、一条数据可能属于多个窗口
// 滑动窗口:窗口1分钟,每30秒滑动
stream
.keyBy(Event::getKey)
.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(30)))
.sum("value");
// 注意:如果窗口大小 / 滑动间隔 = N,则每条数据属于 N 个窗口
// 上例中 60s / 30s = 2,每条数据属于 2 个窗口4.4 会话窗口 (Session Window)
graph LR
subgraph "会话窗口示意图"
D1["10:00:00"] --> D2["10:00:05"]
D2 --> GAP1["GAP > 30s"]
GAP1 --> D3["10:00:40"]
D3 --> D4["10:00:50"]
D4 --> GAP2["GAP > 30s"]
GAP2 --> D5["10:01:30"]
end
subgraph "窗口划分"
W1["会话1: 10:00:00-10:00:05"]
W2["会话2: 10:00:40-10:00:50"]
W3["会话3: 10:01:30"]
end
特点:基于活动间隙、窗口大小不固定
// 静态间隙
stream
.keyBy(Event::getUserId)
.window(EventTimeSessionWindows.withGap(Time.seconds(30)))
.sum("value");
// 动态间隙
stream
.keyBy(Event::getUserId)
.window(EventTimeSessionWindows.withDynamicGap(event -> {
// 根据事件动态返回间隙
if (event.isVip()) {
return Time.minutes(5).toMilliseconds();
}
return Time.minutes(1).toMilliseconds();
}))
.sum("value");4.5 计数窗口 (Count Window)
// 滚动计数窗口:每100条触发
stream
.keyBy(Event::getKey)
.countWindow(100)
.sum("value");
// 滑动计数窗口:窗口100条,每10条滑动
stream
.keyBy(Event::getKey)
.countWindow(100, 10)
.sum("value");4.6 窗口选型指南
flowchart TD
A[选择窗口类型] --> B{按什么分组?}
B -->|固定时间间隔| C{需要重叠吗?}
C -->|不需要| D[Tumbling Window]
C -->|需要| E[Sliding Window]
B -->|用户活动| F[Session Window]
B -->|固定条数| G[Count Window]
B -->|自定义逻辑| H[Global Window + Trigger]
五、迟到数据处理
5.1 数据迟到的原因
graph TD
A[数据迟到原因] --> B[网络延迟]
A --> C[数据源延迟]
A --> D[Kafka 分区不均]
A --> E[系统时钟不同步]
B --> RESULT[到达时窗口已关闭]
C --> RESULT
D --> RESULT
E --> RESULT
5.2 迟到数据处理策略
graph TB
subgraph "处理策略"
A[允许迟到] --> A1["allowedLateness()
窗口延迟关闭"] B[侧输出] --> B1["sideOutputLateData()
迟到数据另外处理"] C[丢弃] --> C1["默认行为
超时直接丢弃"] end
窗口延迟关闭"] B[侧输出] --> B1["sideOutputLateData()
迟到数据另外处理"] C[丢弃] --> C1["默认行为
超时直接丢弃"] end
代码示例:
// 定义侧输出标签
final OutputTag<Event> lateDataTag = new OutputTag<Event>("late-data"){};
DataStream<Result> result = stream
.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
// 允许迟到 1 分钟
.allowedLateness(Time.minutes(1))
// 迟到数据输出到侧流
.sideOutputLateData(lateDataTag)
.sum("value");
// 获取迟到数据
DataStream<Event> lateData = result.getSideOutput(lateDataTag);
// 处理迟到数据(例如:写入特殊表、告警等)
lateData.addSink(new LateDateSink());5.3 迟到数据处理时序
sequenceDiagram
participant Data as 数据
participant Window as 窗口[10:00-10:01)
participant Side as 侧输出
Note over Window: Watermark 延迟5秒
allowedLateness=1分钟 Data->>Window: 10:00:30 (正常) Data->>Window: 10:00:50 (正常) Note over Window: WM=10:01:00, 第一次触发 Window-->>Window: 输出中间结果 Data->>Window: 10:00:55 (迟到但允许) Note over Window: WM=10:01:30, 更新结果 Window-->>Window: 输出更新结果 Data->>Side: 10:00:40 (太迟了) Note over Window: WM=10:02:00 > 窗口结束+1分钟
窗口彻底关闭 Side-->>Side: 侧输出处理
allowedLateness=1分钟 Data->>Window: 10:00:30 (正常) Data->>Window: 10:00:50 (正常) Note over Window: WM=10:01:00, 第一次触发 Window-->>Window: 输出中间结果 Data->>Window: 10:00:55 (迟到但允许) Note over Window: WM=10:01:30, 更新结果 Window-->>Window: 输出更新结果 Data->>Side: 10:00:40 (太迟了) Note over Window: WM=10:02:00 > 窗口结束+1分钟
窗口彻底关闭 Side-->>Side: 侧输出处理
5.4 最佳实践
mindmap
root((迟到数据处理))
预防
合理设置Watermark延迟
数据源端保证时间准确
监控数据延迟
处理
设置allowedLateness
使用侧输出收集
定期重算修正
兜底
迟到数据写入特殊表
发送告警
离线批处理补数
六、窗口函数详解
6.1 窗口函数类型
graph TB
subgraph "窗口函数"
A[增量聚合] --> A1["ReduceFunction"]
A --> A2["AggregateFunction"]
B[全量窗口] --> B1["ProcessWindowFunction"]
C[组合使用] --> C1["增量 + Process
兼顾性能和灵活"] end
兼顾性能和灵活"] end
6.2 增量聚合函数
// ========== ReduceFunction ==========
// 输入输出类型相同
stream
.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce((e1, e2) -> new Event(e1.getKey(), e1.getValue() + e2.getValue()));
// ========== AggregateFunction ==========
// 更灵活,输入、累加器、输出可以不同类型
stream
.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new AggregateFunction<Event, Tuple2<Long, Long>, Double>() {
@Override
public Tuple2<Long, Long> createAccumulator() {
return Tuple2.of(0L, 0L); // (sum, count)
}
@Override
public Tuple2<Long, Long> add(Event event, Tuple2<Long, Long> acc) {
return Tuple2.of(acc.f0 + event.getValue(), acc.f1 + 1);
}
@Override
public Double getResult(Tuple2<Long, Long> acc) {
return acc.f0.doubleValue() / acc.f1; // 计算平均值
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
});6.3 全量窗口函数
// ProcessWindowFunction:可以获取窗口元数据
stream
.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new ProcessWindowFunction<Event, Result, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<Event> elements, Collector<Result> out) {
// 获取窗口信息
TimeWindow window = context.window();
long windowStart = window.getStart();
long windowEnd = window.getEnd();
// 处理所有元素
long count = 0;
long sum = 0;
for (Event e : elements) {
count++;
sum += e.getValue();
}
out.collect(new Result(key, windowStart, windowEnd, sum, count));
}
});6.4 组合使用(推荐)
// 增量聚合 + ProcessWindowFunction
// 兼顾性能(增量聚合)和灵活性(获取窗口信息)
stream
.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(
new MyAggregateFunction(), // 增量聚合
new MyProcessWindowFunction() // 封装结果
);
// AggregateFunction
public class MyAggregateFunction implements AggregateFunction<Event, Long, Long> {
@Override
public Long createAccumulator() { return 0L; }
@Override
public Long add(Event event, Long acc) { return acc + event.getValue(); }
@Override
public Long getResult(Long acc) { return acc; }
@Override
public Long merge(Long a, Long b) { return a + b; }
}
// ProcessWindowFunction
public class MyProcessWindowFunction extends ProcessWindowFunction<Long, Result, String, TimeWindow> {
@Override
public void process(String key, Context ctx, Iterable<Long> aggResults, Collector<Result> out) {
Long sum = aggResults.iterator().next();
TimeWindow window = ctx.window();
out.collect(new Result(key, window.getStart(), window.getEnd(), sum));
}
}七、窗口触发器与驱逐器
7.1 Trigger(触发器)
graph TB
subgraph "Trigger 决定"
A[何时触发计算] --> A1["FIRE: 触发计算"]
A --> A2["FIRE_AND_PURGE: 触发并清除"]
A --> A3["CONTINUE: 继续等待"]
A --> A4["PURGE: 只清除不触发"]
end
// 自定义 Trigger
public class CountTrigger extends Trigger<Event, TimeWindow> {
private final long maxCount;
private final ReducingStateDescriptor<Long> countDesc;
public CountTrigger(long maxCount) {
this.maxCount = maxCount;
this.countDesc = new ReducingStateDescriptor<>("count", Long::sum, Long.class);
}
@Override
public TriggerResult onElement(Event event, long timestamp, TimeWindow window, TriggerContext ctx) {
ReducingState<Long> count = ctx.getPartitionedState(countDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) {
ctx.getPartitionedState(countDesc).clear();
}
}
// 使用自定义 Trigger
stream
.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.trigger(new CountTrigger(1000)) // 每1000条触发一次
.sum("value");7.2 Evictor(驱逐器)
// 驱逐器:在窗口函数执行前/后移除元素
stream
.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.evictor(TimeEvictor.of(Time.seconds(30))) // 只保留最近30秒
.sum("value");
// 常用 Evictor
// TimeEvictor: 只保留指定时间范围的数据
// CountEvictor: 只保留指定数量的数据
// DeltaEvictor: 基于差值的驱逐八、实战案例
8.1 案例:实时 UV 统计
/**
* 实时统计每小时 UV(独立访客数)
*/
public class HourlyUVExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<UserBehavior> source = env
.addSource(new UserBehaviorSource())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getTimestamp())
);
// 使用 BloomFilter 去重
source
.keyBy(event -> "all") // 全局统计
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new UVAggregateFunction(), new UVResultWindowFunction())
.print();
env.execute("Hourly UV");
}
// 使用 Bloom Filter 近似去重
public static class UVAggregateFunction implements AggregateFunction<UserBehavior, Tuple2<BloomFilter<Long>, Long>, Long> {
@Override
public Tuple2<BloomFilter<Long>, Long> createAccumulator() {
return Tuple2.of(
BloomFilter.create(Funnels.longFunnel(), 1000000, 0.01),
0L
);
}
@Override
public Tuple2<BloomFilter<Long>, Long> add(UserBehavior event, Tuple2<BloomFilter<Long>, Long> acc) {
if (!acc.f0.mightContain(event.getUserId())) {
acc.f0.put(event.getUserId());
return Tuple2.of(acc.f0, acc.f1 + 1);
}
return acc;
}
@Override
public Long getResult(Tuple2<BloomFilter<Long>, Long> acc) {
return acc.f1;
}
@Override
public Tuple2<BloomFilter<Long>, Long> merge(Tuple2<BloomFilter<Long>, Long> a, Tuple2<BloomFilter<Long>, Long> b) {
a.f0.putAll(b.f0);
return Tuple2.of(a.f0, a.f1 + b.f1);
}
}
}8.2 案例:订单超时告警
/**
* 订单30分钟未支付则告警
*/
public class OrderTimeoutExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Order> orders = env.addSource(new OrderSource())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((order, ts) -> order.getCreateTime())
);
// 使用会话窗口 + 定时器
orders
.keyBy(Order::getOrderId)
.process(new OrderTimeoutFunction(30 * 60 * 1000)) // 30分钟
.print();
env.execute("Order Timeout Alert");
}
public static class OrderTimeoutFunction extends KeyedProcessFunction<String, Order, String> {
private final long timeout;
private ValueState<Order> orderState;
public OrderTimeoutFunction(long timeout) {
this.timeout = timeout;
}
@Override
public void open(Configuration parameters) {
orderState = getRuntimeContext().getState(
new ValueStateDescriptor<>("order", Order.class)
);
}
@Override
public void processElement(Order order, Context ctx, Collector<String> out) throws Exception {
if (order.getStatus().equals("CREATED")) {
// 订单创建,设置定时器
orderState.update(order);
ctx.timerService().registerEventTimeTimer(order.getCreateTime() + timeout);
} else if (order.getStatus().equals("PAID")) {
// 订单支付,清除状态和定时器
Order savedOrder = orderState.value();
if (savedOrder != null) {
ctx.timerService().deleteEventTimeTimer(savedOrder.getCreateTime() + timeout);
orderState.clear();
}
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
Order order = orderState.value();
if (order != null) {
out.collect("订单超时告警: " + order.getOrderId());
orderState.clear();
}
}
}
}九、常见问题与排查
9.1 问题速查表
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
| 窗口不触发 | Watermark 未推进 | 检查数据是否有时间字段 |
| 数据全部丢失 | Watermark 延迟过小 | 增加 Watermark 延迟 |
| 结果不准确 | 使用了 Processing Time | 改用 Event Time |
| 窗口结果重复 | 设置了 allowedLateness | 下游做去重 |
| 内存溢出 | 窗口积压太多数据 | 缩小窗口或使用增量聚合 |
9.2 调试技巧
// 打印 Watermark
stream.assignTimestampsAndWatermarks(...)
.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event event, Context ctx, Collector<Event> out) {
System.out.println("Event: " + event + ", Watermark: " + ctx.timerService().currentWatermark());
out.collect(event);
}
});
// 使用 Metrics 监控
env.getConfig().setLatencyTrackingInterval(1000);十、总结
10.1 核心要点
mindmap
root((时间与窗口))
时间语义
Event Time 首选
Processing Time 最简单
Watermark 是关键
窗口类型
Tumbling 不重叠
Sliding 可重叠
Session 按活动
迟到处理
allowedLateness
sideOutput
合理设置 WM 延迟
窗口函数
增量聚合高效
Process 灵活
组合使用最佳
10.2 一句话总结
Event Time + Watermark + Window = Flink 流处理的核心三角
理解了这三者的关系,你就理解了 Flink 流处理的精髓。