一、前言:为什么是 Flink?
1.1 流处理框架演进
timeline
title 流处理框架演进史
2011 : Storm (Twitter) - 第一代流处理
2013 : Spark Streaming - 微批处理
2014 : Flink 诞生 - 真正的流处理
2016 : Kafka Streams - 轻量级流处理
2019 : Flink 1.9 Blink 合并
2023 : Flink 成为事实标准
1.2 为什么选择 Flink?
graph TD
A[为什么选Flink?] --> B[真正的流处理]
A --> C[流批一体]
A --> D[精确一次语义]
A --> E[低延迟高吞吐]
A --> F[丰富的API]
B --> B1["事件驱动
非微批"] C --> C1["一套代码
两种模式"] D --> D1["Exactly-Once
状态一致性"] E --> E1["毫秒级延迟
百万级吞吐"] F --> F1["DataStream/SQL/Table
CEP/ML"]
非微批"] C --> C1["一套代码
两种模式"] D --> D1["Exactly-Once
状态一致性"] E --> E1["毫秒级延迟
百万级吞吐"] F --> F1["DataStream/SQL/Table
CEP/ML"]
1.3 Flink vs Spark Streaming
graph LR
subgraph "Spark Streaming 微批处理"
A1[数据流] --> A2[切分微批]
A2 --> A3[批处理]
A3 --> A4[输出]
A5["延迟:秒级"]
end
graph LR
subgraph "Flink 真正流处理"
B1[数据流] --> B2[逐条处理]
B2 --> B3[输出]
B4["延迟:毫秒级"]
end
| 维度 | Spark Streaming | Flink |
|---|---|---|
| 处理模型 | 微批 (Micro-batch) | 真正流处理 |
| 延迟 | 秒级 | 毫秒级 |
| 窗口 | 基于批次时间 | 灵活的事件时间窗口 |
| 状态管理 | 需要外部存储 | 内置状态管理 |
| Exactly-Once | 需要配置 | 原生支持 |
| SQL 支持 | Spark SQL | Flink SQL |
二、Flink 核心概念
2.1 概念全景图
graph TB
subgraph "Flink 核心概念"
subgraph "数据抽象"
DS[DataStream
流数据] DS2[DataSet
批数据-已废弃] TABLE[Table
表抽象] end subgraph "时间概念" ET[Event Time
事件时间] PT[Processing Time
处理时间] IT[Ingestion Time
摄入时间] end subgraph "状态与容错" STATE[State
状态] CP[Checkpoint
检查点] SP[Savepoint
保存点] end subgraph "窗口" TW[Tumbling Window
滚动窗口] SW[Sliding Window
滑动窗口] SEW[Session Window
会话窗口] end end
流数据] DS2[DataSet
批数据-已废弃] TABLE[Table
表抽象] end subgraph "时间概念" ET[Event Time
事件时间] PT[Processing Time
处理时间] IT[Ingestion Time
摄入时间] end subgraph "状态与容错" STATE[State
状态] CP[Checkpoint
检查点] SP[Savepoint
保存点] end subgraph "窗口" TW[Tumbling Window
滚动窗口] SW[Sliding Window
滑动窗口] SEW[Session Window
会话窗口] end end
2.2 DataStream:流的抽象
graph LR
subgraph "DataStream 转换"
SOURCE[Source] --> MAP[map]
MAP --> FILTER[filter]
FILTER --> KEYBY[keyBy]
KEYBY --> WINDOW[window]
WINDOW --> AGG[aggregate]
AGG --> SINK[Sink]
end
代码示例:
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从 Kafka 读取数据
DataStream<String> source = env.addSource(
new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties)
);
// 数据处理
DataStream<Order> orders = source
.map(json -> JSON.parseObject(json, Order.class)) // 解析 JSON
.filter(order -> order.getAmount() > 0) // 过滤
.keyBy(Order::getUserId) // 按用户分组
.window(TumblingEventTimeWindows.of(Time.hours(1))) // 1小时窗口
.sum("amount"); // 求和
// 输出到 Kafka
orders.addSink(new FlinkKafkaProducer<>("output-topic", new OrderSchema(), properties));
// 执行
env.execute("Order Processing Job");2.3 算子链与 Task
graph LR
subgraph "逻辑视图"
A[Source] --> B[Map]
B --> C[Filter]
C --> D[KeyBy]
D --> E[Window]
E --> F[Sink]
end
graph LR
subgraph "物理视图 - 算子链"
A1["Source → Map → Filter
(Chain)"] --> B1["KeyBy → Window
(Chain)"] B1 --> C1["Sink"] end
(Chain)"] --> B1["KeyBy → Window
(Chain)"] B1 --> C1["Sink"] end
算子链优化:
- 减少序列化/反序列化
- 减少线程切换
- 提高吞吐量
// 禁用算子链
stream.map(x -> x).disableChaining();
// 开始新的算子链
stream.map(x -> x).startNewChain();
// 全局禁用算子链
env.disableOperatorChaining();三、Flink 架构详解
3.1 整体架构
graph TB
subgraph "Flink 集群架构"
subgraph "Client"
CLI[命令行]
REST[REST API]
CODE[应用代码]
end
subgraph "JobManager"
DISPATCHER[Dispatcher
接收作业] RM[ResourceManager
资源管理] JM[JobMaster
作业调度] end subgraph "TaskManager 集群" TM1[TaskManager 1] TM2[TaskManager 2] TM3[TaskManager N] end subgraph "存储" HA[(高可用存储
ZK/K8s)] STATE_STORE[(状态存储
HDFS/S3)] end end CLI & REST & CODE --> DISPATCHER DISPATCHER --> JM RM --> TM1 & TM2 & TM3 JM --> TM1 & TM2 & TM3 JM --> HA TM1 & TM2 & TM3 --> STATE_STORE
接收作业] RM[ResourceManager
资源管理] JM[JobMaster
作业调度] end subgraph "TaskManager 集群" TM1[TaskManager 1] TM2[TaskManager 2] TM3[TaskManager N] end subgraph "存储" HA[(高可用存储
ZK/K8s)] STATE_STORE[(状态存储
HDFS/S3)] end end CLI & REST & CODE --> DISPATCHER DISPATCHER --> JM RM --> TM1 & TM2 & TM3 JM --> TM1 & TM2 & TM3 JM --> HA TM1 & TM2 & TM3 --> STATE_STORE
3.2 核心组件职责
| 组件 | 职责 |
|---|---|
| Dispatcher | 接收作业提交,启动 JobMaster |
| ResourceManager | 管理 TaskManager 资源,分配 Slot |
| JobMaster | 管理单个作业的执行,协调 Checkpoint |
| TaskManager | 执行具体的 Task,管理内存和网络 |
3.3 Task Slot 与并行度
graph TB
subgraph "TaskManager 1 (3 Slots)"
S1[Slot 1
Source→Map] S2[Slot 2
Source→Map] S3[Slot 3
KeyBy→Window] end subgraph "TaskManager 2 (3 Slots)" S4[Slot 4
Source→Map] S5[Slot 5
KeyBy→Window] S6[Slot 6
Sink] end
Source→Map] S2[Slot 2
Source→Map] S3[Slot 3
KeyBy→Window] end subgraph "TaskManager 2 (3 Slots)" S4[Slot 4
Source→Map] S5[Slot 5
KeyBy→Window] S6[Slot 6
Sink] end
关键概念:
graph LR
A[并行度 Parallelism] --> A1["算子的并行实例数"]
B[Slot] --> B1["TaskManager 的资源单位"]
C[Task] --> C1["一个并行实例"]
D[SubTask] --> D1["算子链中的子任务"]
配置方式:
// 全局并行度
env.setParallelism(4);
// 算子级别并行度
stream.map(x -> x).setParallelism(2);
// flink-conf.yaml
parallelism.default: 4
taskmanager.numberOfTaskSlots: 43.4 作业提交流程
sequenceDiagram
participant Client
participant Dispatcher
participant ResourceManager
participant JobMaster
participant TaskManager
Client->>Client: 生成 JobGraph
Client->>Dispatcher: 提交 JobGraph
Dispatcher->>JobMaster: 启动 JobMaster
JobMaster->>JobMaster: 生成 ExecutionGraph
JobMaster->>ResourceManager: 请求 Slot
ResourceManager->>TaskManager: 分配 Slot
TaskManager-->>JobMaster: Slot 可用
JobMaster->>TaskManager: 部署 Task
TaskManager->>TaskManager: 执行 Task
3.5 Graph 转换过程
graph LR
A[StreamGraph
API 逻辑图] --> B[JobGraph
优化后的图] B --> C[ExecutionGraph
执行图] C --> D[物理执行
Task 部署] A --> A1["用户代码生成"] B --> B1["算子链优化"] C --> C1["并行化展开"] D --> D1["实际执行"]
API 逻辑图] --> B[JobGraph
优化后的图] B --> C[ExecutionGraph
执行图] C --> D[物理执行
Task 部署] A --> A1["用户代码生成"] B --> B1["算子链优化"] C --> C1["并行化展开"] D --> D1["实际执行"]
四、部署模式
4.1 部署模式对比
graph TB
subgraph "Session 模式"
S1[预先启动集群] --> S2[提交多个作业]
S2 --> S3[作业共享资源]
end
subgraph "Per-Job 模式"
P1[提交作业] --> P2[启动专属集群]
P2 --> P3[作业独占资源]
P3 --> P4[作业结束集群销毁]
end
subgraph "Application 模式"
A1[提交应用] --> A2[集群内执行 main]
A2 --> A3[减少客户端负载]
end
| 模式 | 资源隔离 | 启动速度 | 适用场景 |
|---|---|---|---|
| Session | 低 | 快 | 开发测试、短作业 |
| Per-Job | 高 | 慢 | 生产环境、长作业 |
| Application | 高 | 中 | 生产环境、大型作业 |
4.2 资源管理器集成
graph TB
subgraph "Flink 部署选项"
A[Standalone] --> A1["独立集群
简单但需自行管理"] B[YARN] --> B1["Hadoop 生态
资源共享"] C[Kubernetes] --> C1["云原生
弹性伸缩"] D[Mesos] --> D1["已废弃"] end
简单但需自行管理"] B[YARN] --> B1["Hadoop 生态
资源共享"] C[Kubernetes] --> C1["云原生
弹性伸缩"] D[Mesos] --> D1["已废弃"] end
4.3 Kubernetes 部署
# Flink Kubernetes Operator 示例
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-example
spec:
image: flink:1.17
flinkVersion: v1_17
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.checkpoints.dir: s3://bucket/checkpoints
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "4096m"
cpu: 2
replicas: 3
job:
jarURI: s3://bucket/jobs/my-job.jar
parallelism: 6
upgradeMode: savepoint五、数据流转详解
5.1 数据交换模式
graph TB
subgraph "数据交换模式"
A[Forward] --> A1["一对一
同一分区"] B[Rebalance] --> B1["轮询分发
负载均衡"] C[Rescale] --> C1["本地轮询
减少网络"] D[Shuffle] --> D1["随机分发"] E[Broadcast] --> E1["广播到所有
下游分区"] F[KeyBy/Hash] --> F1["按 Key 哈希
相同 Key 同分区"] end
同一分区"] B[Rebalance] --> B1["轮询分发
负载均衡"] C[Rescale] --> C1["本地轮询
减少网络"] D[Shuffle] --> D1["随机分发"] E[Broadcast] --> E1["广播到所有
下游分区"] F[KeyBy/Hash] --> F1["按 Key 哈希
相同 Key 同分区"] end
// Forward(默认,一对一)
stream.map(x -> x) // 自动 Forward
// Rebalance(轮询)
stream.rebalance()
// Rescale(本地轮询)
stream.rescale()
// Shuffle(随机)
stream.shuffle()
// Broadcast(广播)
stream.broadcast()
// KeyBy(哈希分区)
stream.keyBy(x -> x.getKey())5.2 背压机制
sequenceDiagram
participant Source
participant Operator
participant Sink
Source->>Operator: 发送数据
Operator->>Sink: 发送数据
Note over Sink: 处理变慢
Sink-->>Operator: Buffer 满,背压信号
Operator-->>Source: 继续传递背压
Note over Source: 降低发送速率
Note over Sink: 处理恢复
Sink-->>Operator: 恢复正常
Operator-->>Source: 恢复正常
背压监控:
graph LR
A[背压指标] --> B["backPressuredTimeMsPerSecond"]
A --> C["idleTimeMsPerSecond"]
A --> D["busyTimeMsPerSecond"]
B --> B1["背压时间占比
越高越严重"]
越高越严重"]
六、流批一体
6.1 流批一体理念
graph LR
subgraph "传统观点"
A1["批处理 = 处理有界数据"]
A2["流处理 = 处理无界数据"]
A3["两套系统,两套代码"]
end
graph LR
subgraph "Flink 观点"
B1["批 = 流的特例"]
B2["有界流 = 批"]
B3["一套系统,一套代码"]
end
6.2 执行模式
// 流模式(默认)
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 批模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 自动模式(根据数据源判断)
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);6.3 流批一体示例
// 同一份代码,既可以流处理也可以批处理
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Order> orders = env
.readSource(...) // 可以是 Kafka(流)或文件(批)
.map(Order::parse)
.keyBy(Order::getUserId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sum("amount");
orders.sinkTo(...);
env.execute();七、Flink SQL 简介
7.1 SQL 架构
graph TB
subgraph "Flink SQL 架构"
SQL[SQL/Table API] --> PARSER[解析器 Calcite]
PARSER --> OPTIMIZER[优化器]
OPTIMIZER --> PLANNER[执行计划生成]
PLANNER --> RUNTIME[DataStream Runtime]
end
7.2 SQL 快速上手
-- 创建 Kafka 源表
CREATE TABLE orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10,2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
-- 创建结果表
CREATE TABLE order_stats (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
user_id BIGINT,
total_amount DECIMAL(10,2),
order_count BIGINT,
PRIMARY KEY (window_start, window_end, user_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/analytics',
'table-name' = 'order_stats'
);
-- 窗口聚合
INSERT INTO order_stats
SELECT
TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start,
TUMBLE_END(order_time, INTERVAL '1' HOUR) as window_end,
user_id,
SUM(amount) as total_amount,
COUNT(*) as order_count
FROM orders
GROUP BY
TUMBLE(order_time, INTERVAL '1' HOUR),
user_id;八、快速上手指南
8.1 开发环境搭建
<!-- pom.xml -->
<properties>
<flink.version>1.17.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- Flink 核心 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Table API & SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>8.2 Hello World
public class WordCount {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取数据
DataStream<String> text = env.socketTextStream("localhost", 9999);
// 3. 处理数据
DataStream<Tuple2<String, Integer>> counts = text
.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
for (String word : line.split(" ")) {
out.collect(Tuple2.of(word, 1));
}
})
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(t -> t.f0)
.sum(1);
// 4. 输出结果
counts.print();
// 5. 执行
env.execute("Socket Word Count");
}
}8.3 本地运行与调试
// 本地模式(开发调试)
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// 本地 WebUI 模式(可以查看执行图)
Configuration config = new Configuration();
config.setInteger(RestOptions.PORT, 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
// 设置并行度为 1,方便调试
env.setParallelism(1);九、常用配置速查
9.1 核心配置
# flink-conf.yaml
# JobManager 配置
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
# TaskManager 配置
taskmanager.memory.process.size: 4096m
taskmanager.numberOfTaskSlots: 4
# 并行度
parallelism.default: 4
# Checkpoint 配置
execution.checkpointing.interval: 60000
execution.checkpointing.mode: EXACTLY_ONCE
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints
# 高可用配置
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.storageDir: hdfs:///flink/ha9.2 内存配置详解
graph TB
subgraph "TaskManager 内存组成"
TOTAL[Total Process Memory] --> FLINK[Flink Memory]
TOTAL --> JVM[JVM Overhead]
FLINK --> FRAMEWORK[Framework Heap]
FLINK --> TASK[Task Heap]
FLINK --> MANAGED[Managed Memory]
FLINK --> NETWORK[Network Memory]
JVM --> META[Metaspace]
JVM --> OVERHEAD[JVM Overhead]
end
# 内存配置示例
taskmanager.memory.process.size: 8g
taskmanager.memory.flink.size: 6g
taskmanager.memory.task.heap.size: 2g
taskmanager.memory.managed.size: 2g
taskmanager.memory.network.min: 256m
taskmanager.memory.network.max: 1g
taskmanager.memory.jvm-overhead.min: 256m
taskmanager.memory.jvm-overhead.max: 1g十、总结
10.1 Flink 核心优势
mindmap
root((Flink核心优势))
真正流处理
事件驱动
毫秒级延迟
非微批
流批一体
统一API
统一Runtime
一套代码
有状态计算
内置状态管理
增量Checkpoint
Exactly-Once
丰富生态
SQL支持
CDC集成
多种Connector
10.2 核心概念回顾
| 概念 | 说明 |
|---|---|
| DataStream | 流数据的抽象 |
| Transformation | 数据转换操作 |
| State | 有状态计算的基础 |
| Time | 事件时间、处理时间 |
| Window | 将无界流切分为有界块 |
| Checkpoint | 容错机制的基础 |
10.3 一句话总结
Flink = 真正的流处理 + 流批一体 + 有状态计算 + 精确一次语义
如果你需要低延迟、高吞吐、有状态的流处理,Flink 是当前最佳选择。