搜 索

Flink从入门到放弃①:流批一体的秘密

  • 10阅读
  • 2023年07月29日
  • 0评论
首页 / AI/大数据 / 正文

一、前言:为什么是 Flink?

1.1 流处理框架演进

timeline title 流处理框架演进史 2011 : Storm (Twitter) - 第一代流处理 2013 : Spark Streaming - 微批处理 2014 : Flink 诞生 - 真正的流处理 2016 : Kafka Streams - 轻量级流处理 2019 : Flink 1.9 Blink 合并 2023 : Flink 成为事实标准

1.2 为什么选择 Flink?

graph TD A[为什么选Flink?] --> B[真正的流处理] A --> C[流批一体] A --> D[精确一次语义] A --> E[低延迟高吞吐] A --> F[丰富的API] B --> B1["事件驱动
非微批"] C --> C1["一套代码
两种模式"] D --> D1["Exactly-Once
状态一致性"] E --> E1["毫秒级延迟
百万级吞吐"] F --> F1["DataStream/SQL/Table
CEP/ML"]

1.3 Flink vs Spark Streaming

graph LR subgraph "Spark Streaming 微批处理" A1[数据流] --> A2[切分微批] A2 --> A3[批处理] A3 --> A4[输出] A5["延迟:秒级"] end
graph LR subgraph "Flink 真正流处理" B1[数据流] --> B2[逐条处理] B2 --> B3[输出] B4["延迟:毫秒级"] end
维度Spark StreamingFlink
处理模型微批 (Micro-batch)真正流处理
延迟秒级毫秒级
窗口基于批次时间灵活的事件时间窗口
状态管理需要外部存储内置状态管理
Exactly-Once需要配置原生支持
SQL 支持Spark SQLFlink SQL

二、Flink 核心概念

2.1 概念全景图

graph TB subgraph "Flink 核心概念" subgraph "数据抽象" DS[DataStream
流数据] DS2[DataSet
批数据-已废弃] TABLE[Table
表抽象] end subgraph "时间概念" ET[Event Time
事件时间] PT[Processing Time
处理时间] IT[Ingestion Time
摄入时间] end subgraph "状态与容错" STATE[State
状态] CP[Checkpoint
检查点] SP[Savepoint
保存点] end subgraph "窗口" TW[Tumbling Window
滚动窗口] SW[Sliding Window
滑动窗口] SEW[Session Window
会话窗口] end end

2.2 DataStream:流的抽象

graph LR subgraph "DataStream 转换" SOURCE[Source] --> MAP[map] MAP --> FILTER[filter] FILTER --> KEYBY[keyBy] KEYBY --> WINDOW[window] WINDOW --> AGG[aggregate] AGG --> SINK[Sink] end

代码示例:

// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 从 Kafka 读取数据
DataStream<String> source = env.addSource(
    new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties)
);

// 数据处理
DataStream<Order> orders = source
    .map(json -> JSON.parseObject(json, Order.class))  // 解析 JSON
    .filter(order -> order.getAmount() > 0)            // 过滤
    .keyBy(Order::getUserId)                           // 按用户分组
    .window(TumblingEventTimeWindows.of(Time.hours(1))) // 1小时窗口
    .sum("amount");                                    // 求和

// 输出到 Kafka
orders.addSink(new FlinkKafkaProducer<>("output-topic", new OrderSchema(), properties));

// 执行
env.execute("Order Processing Job");

2.3 算子链与 Task

graph LR subgraph "逻辑视图" A[Source] --> B[Map] B --> C[Filter] C --> D[KeyBy] D --> E[Window] E --> F[Sink] end
graph LR subgraph "物理视图 - 算子链" A1["Source → Map → Filter
(Chain)"] --> B1["KeyBy → Window
(Chain)"] B1 --> C1["Sink"] end

算子链优化:

  • 减少序列化/反序列化
  • 减少线程切换
  • 提高吞吐量
// 禁用算子链
stream.map(x -> x).disableChaining();

// 开始新的算子链
stream.map(x -> x).startNewChain();

// 全局禁用算子链
env.disableOperatorChaining();

三、Flink 架构详解

3.1 整体架构

graph TB subgraph "Flink 集群架构" subgraph "Client" CLI[命令行] REST[REST API] CODE[应用代码] end subgraph "JobManager" DISPATCHER[Dispatcher
接收作业] RM[ResourceManager
资源管理] JM[JobMaster
作业调度] end subgraph "TaskManager 集群" TM1[TaskManager 1] TM2[TaskManager 2] TM3[TaskManager N] end subgraph "存储" HA[(高可用存储
ZK/K8s)] STATE_STORE[(状态存储
HDFS/S3)] end end CLI & REST & CODE --> DISPATCHER DISPATCHER --> JM RM --> TM1 & TM2 & TM3 JM --> TM1 & TM2 & TM3 JM --> HA TM1 & TM2 & TM3 --> STATE_STORE

3.2 核心组件职责

组件职责
Dispatcher接收作业提交,启动 JobMaster
ResourceManager管理 TaskManager 资源,分配 Slot
JobMaster管理单个作业的执行,协调 Checkpoint
TaskManager执行具体的 Task,管理内存和网络

3.3 Task Slot 与并行度

graph TB subgraph "TaskManager 1 (3 Slots)" S1[Slot 1
Source→Map] S2[Slot 2
Source→Map] S3[Slot 3
KeyBy→Window] end subgraph "TaskManager 2 (3 Slots)" S4[Slot 4
Source→Map] S5[Slot 5
KeyBy→Window] S6[Slot 6
Sink] end

关键概念:

graph LR A[并行度 Parallelism] --> A1["算子的并行实例数"] B[Slot] --> B1["TaskManager 的资源单位"] C[Task] --> C1["一个并行实例"] D[SubTask] --> D1["算子链中的子任务"]

配置方式:

// 全局并行度
env.setParallelism(4);

// 算子级别并行度
stream.map(x -> x).setParallelism(2);

// flink-conf.yaml
parallelism.default: 4
taskmanager.numberOfTaskSlots: 4

3.4 作业提交流程

sequenceDiagram participant Client participant Dispatcher participant ResourceManager participant JobMaster participant TaskManager Client->>Client: 生成 JobGraph Client->>Dispatcher: 提交 JobGraph Dispatcher->>JobMaster: 启动 JobMaster JobMaster->>JobMaster: 生成 ExecutionGraph JobMaster->>ResourceManager: 请求 Slot ResourceManager->>TaskManager: 分配 Slot TaskManager-->>JobMaster: Slot 可用 JobMaster->>TaskManager: 部署 Task TaskManager->>TaskManager: 执行 Task

3.5 Graph 转换过程

graph LR A[StreamGraph
API 逻辑图] --> B[JobGraph
优化后的图] B --> C[ExecutionGraph
执行图] C --> D[物理执行
Task 部署] A --> A1["用户代码生成"] B --> B1["算子链优化"] C --> C1["并行化展开"] D --> D1["实际执行"]

四、部署模式

4.1 部署模式对比

graph TB subgraph "Session 模式" S1[预先启动集群] --> S2[提交多个作业] S2 --> S3[作业共享资源] end subgraph "Per-Job 模式" P1[提交作业] --> P2[启动专属集群] P2 --> P3[作业独占资源] P3 --> P4[作业结束集群销毁] end subgraph "Application 模式" A1[提交应用] --> A2[集群内执行 main] A2 --> A3[减少客户端负载] end
模式资源隔离启动速度适用场景
Session开发测试、短作业
Per-Job生产环境、长作业
Application生产环境、大型作业

4.2 资源管理器集成

graph TB subgraph "Flink 部署选项" A[Standalone] --> A1["独立集群
简单但需自行管理"] B[YARN] --> B1["Hadoop 生态
资源共享"] C[Kubernetes] --> C1["云原生
弹性伸缩"] D[Mesos] --> D1["已废弃"] end

4.3 Kubernetes 部署

# Flink Kubernetes Operator 示例
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-example
spec:
  image: flink:1.17
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3://bucket/checkpoints
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "4096m"
      cpu: 2
    replicas: 3
  job:
    jarURI: s3://bucket/jobs/my-job.jar
    parallelism: 6
    upgradeMode: savepoint

五、数据流转详解

5.1 数据交换模式

graph TB subgraph "数据交换模式" A[Forward] --> A1["一对一
同一分区"] B[Rebalance] --> B1["轮询分发
负载均衡"] C[Rescale] --> C1["本地轮询
减少网络"] D[Shuffle] --> D1["随机分发"] E[Broadcast] --> E1["广播到所有
下游分区"] F[KeyBy/Hash] --> F1["按 Key 哈希
相同 Key 同分区"] end
// Forward(默认,一对一)
stream.map(x -> x)  // 自动 Forward

// Rebalance(轮询)
stream.rebalance()

// Rescale(本地轮询)
stream.rescale()

// Shuffle(随机)
stream.shuffle()

// Broadcast(广播)
stream.broadcast()

// KeyBy(哈希分区)
stream.keyBy(x -> x.getKey())

5.2 背压机制

sequenceDiagram participant Source participant Operator participant Sink Source->>Operator: 发送数据 Operator->>Sink: 发送数据 Note over Sink: 处理变慢 Sink-->>Operator: Buffer 满,背压信号 Operator-->>Source: 继续传递背压 Note over Source: 降低发送速率 Note over Sink: 处理恢复 Sink-->>Operator: 恢复正常 Operator-->>Source: 恢复正常

背压监控:

graph LR A[背压指标] --> B["backPressuredTimeMsPerSecond"] A --> C["idleTimeMsPerSecond"] A --> D["busyTimeMsPerSecond"] B --> B1["背压时间占比
越高越严重"]

六、流批一体

6.1 流批一体理念

graph LR subgraph "传统观点" A1["批处理 = 处理有界数据"] A2["流处理 = 处理无界数据"] A3["两套系统,两套代码"] end
graph LR subgraph "Flink 观点" B1["批 = 流的特例"] B2["有界流 = 批"] B3["一套系统,一套代码"] end

6.2 执行模式

// 流模式(默认)
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

// 批模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

// 自动模式(根据数据源判断)
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

6.3 流批一体示例

// 同一份代码,既可以流处理也可以批处理
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Order> orders = env
    .readSource(...)  // 可以是 Kafka(流)或文件(批)
    .map(Order::parse)
    .keyBy(Order::getUserId)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .sum("amount");

orders.sinkTo(...);

env.execute();

七、Flink SQL 简介

7.1 SQL 架构

graph TB subgraph "Flink SQL 架构" SQL[SQL/Table API] --> PARSER[解析器 Calcite] PARSER --> OPTIMIZER[优化器] OPTIMIZER --> PLANNER[执行计划生成] PLANNER --> RUNTIME[DataStream Runtime] end

7.2 SQL 快速上手

-- 创建 Kafka 源表
CREATE TABLE orders (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10,2),
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
);

-- 创建结果表
CREATE TABLE order_stats (
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    user_id BIGINT,
    total_amount DECIMAL(10,2),
    order_count BIGINT,
    PRIMARY KEY (window_start, window_end, user_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/analytics',
    'table-name' = 'order_stats'
);

-- 窗口聚合
INSERT INTO order_stats
SELECT 
    TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start,
    TUMBLE_END(order_time, INTERVAL '1' HOUR) as window_end,
    user_id,
    SUM(amount) as total_amount,
    COUNT(*) as order_count
FROM orders
GROUP BY 
    TUMBLE(order_time, INTERVAL '1' HOUR),
    user_id;

八、快速上手指南

8.1 开发环境搭建

<!-- pom.xml -->
<properties>
    <flink.version>1.17.0</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
</properties>

<dependencies>
    <!-- Flink 核心 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    
    <!-- Flink Clients -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    
    <!-- Kafka Connector -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>${flink.version}</version>
    </dependency>
    
    <!-- Table API & SQL -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

8.2 Hello World

public class WordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 2. 读取数据
        DataStream<String> text = env.socketTextStream("localhost", 9999);
        
        // 3. 处理数据
        DataStream<Tuple2<String, Integer>> counts = text
            .flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
                for (String word : line.split(" ")) {
                    out.collect(Tuple2.of(word, 1));
                }
            })
            .returns(Types.TUPLE(Types.STRING, Types.INT))
            .keyBy(t -> t.f0)
            .sum(1);
        
        // 4. 输出结果
        counts.print();
        
        // 5. 执行
        env.execute("Socket Word Count");
    }
}

8.3 本地运行与调试

// 本地模式(开发调试)
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// 本地 WebUI 模式(可以查看执行图)
Configuration config = new Configuration();
config.setInteger(RestOptions.PORT, 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);

// 设置并行度为 1,方便调试
env.setParallelism(1);

九、常用配置速查

9.1 核心配置

# flink-conf.yaml

# JobManager 配置
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m

# TaskManager 配置
taskmanager.memory.process.size: 4096m
taskmanager.numberOfTaskSlots: 4

# 并行度
parallelism.default: 4

# Checkpoint 配置
execution.checkpointing.interval: 60000
execution.checkpointing.mode: EXACTLY_ONCE
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints

# 高可用配置
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.storageDir: hdfs:///flink/ha

9.2 内存配置详解

graph TB subgraph "TaskManager 内存组成" TOTAL[Total Process Memory] --> FLINK[Flink Memory] TOTAL --> JVM[JVM Overhead] FLINK --> FRAMEWORK[Framework Heap] FLINK --> TASK[Task Heap] FLINK --> MANAGED[Managed Memory] FLINK --> NETWORK[Network Memory] JVM --> META[Metaspace] JVM --> OVERHEAD[JVM Overhead] end
# 内存配置示例
taskmanager.memory.process.size: 8g
taskmanager.memory.flink.size: 6g
taskmanager.memory.task.heap.size: 2g
taskmanager.memory.managed.size: 2g
taskmanager.memory.network.min: 256m
taskmanager.memory.network.max: 1g
taskmanager.memory.jvm-overhead.min: 256m
taskmanager.memory.jvm-overhead.max: 1g

十、总结

10.1 Flink 核心优势

mindmap root((Flink核心优势)) 真正流处理 事件驱动 毫秒级延迟 非微批 流批一体 统一API 统一Runtime 一套代码 有状态计算 内置状态管理 增量Checkpoint Exactly-Once 丰富生态 SQL支持 CDC集成 多种Connector

10.2 核心概念回顾

概念说明
DataStream流数据的抽象
Transformation数据转换操作
State有状态计算的基础
Time事件时间、处理时间
Window将无界流切分为有界块
Checkpoint容错机制的基础

10.3 一句话总结

Flink = 真正的流处理 + 流批一体 + 有状态计算 + 精确一次语义

如果你需要低延迟、高吞吐、有状态的流处理,Flink 是当前最佳选择。


评论区
暂无评论
avatar