搜 索

Flink从入门到放弃⑤—CDC实时数据集成:告别离线同步

  • 11阅读
  • 2023年08月26日
  • 0评论
首页 / AI/大数据 / 正文

一、前言:为什么需要 CDC?

1.1 传统数据同步的痛点

graph TD A[传统离线同步] --> B[T+1 延迟] A --> C[全量同步耗时] A --> D[无法捕获删除] A --> E[业务库压力大] B --> B1["今天的数据
明天才能分析"] C --> C1["每次同步几小时
窗口期紧张"] D --> D1["只能同步快照
丢失变更历史"] E --> E1["大查询影响
在线业务"]

1.2 CDC 的价值

graph LR subgraph "CDC 数据流" A[业务数据库] -->|实时变更| B[CDC] B -->|增量数据| C[数据湖/数仓] end subgraph "优势" D["秒级延迟"] E["增量同步"] F["完整变更历史"] G["低侵入性"] end

1.3 什么是 CDC?

CDC = Change Data Capture(变更数据捕获)

sequenceDiagram participant App as 应用 participant DB as 数据库 participant CDC as CDC工具 participant Target as 目标系统 App->>DB: INSERT (id=1, name='张三') DB->>DB: 写入 Binlog CDC->>DB: 读取 Binlog CDC->>Target: +I (id=1, name='张三') App->>DB: UPDATE (id=1, name='李四') DB->>DB: 写入 Binlog CDC->>DB: 读取 Binlog CDC->>Target: -U (id=1, name='张三')
+U (id=1, name='李四')

二、CDC 技术方案对比

2.1 CDC 实现方式

graph TB subgraph "CDC 实现方式" A[基于查询] --> A1["定时扫描
对比差异"] B[基于日志] --> B1["解析数据库日志
如 MySQL Binlog"] C[基于触发器] --> C1["数据库触发器
写入变更表"] end A1 --> A2["延迟高、压力大"] B1 --> B2["延迟低、无侵入
推荐方案"] C1 --> C2["性能影响大
不推荐"]

2.2 CDC 工具对比

工具类型优点缺点
Flink CDC流处理框架功能强大、生态丰富需要 Flink 集群
DebeziumCDC 平台成熟稳定、Kafka 集成依赖 Kafka
CanalMySQL CDC阿里开源、国内流行仅支持 MySQL
MaxwellMySQL CDC轻量简单功能有限
DataX批量同步阿里开源、支持多数据源非实时

2.3 Flink CDC 优势

graph TB A[Flink CDC 优势] --> B[全增量一体] A --> C[Exactly-Once] A --> D[无需 Kafka] A --> E[丰富的下游] B --> B1["先全量再增量
无缝切换"] C --> C1["精确一次语义
数据不丢不重"] D --> D1["直连数据库
架构简化"] E --> E1["直接写入
数据湖/OLAP"]

三、Flink CDC 快速入门

3.1 架构图

graph LR subgraph "数据源" MySQL[(MySQL)] PostgreSQL[(PostgreSQL)] MongoDB[(MongoDB)] Oracle[(Oracle)] end subgraph "Flink CDC" CDC[Flink CDC Connector] end subgraph "目标" Kafka[(Kafka)] ES[(Elasticsearch)] Hudi[(Hudi)] Iceberg[(Iceberg)] Doris[(Doris)] end MySQL & PostgreSQL & MongoDB & Oracle --> CDC CDC --> Kafka & ES & Hudi & Iceberg & Doris

3.2 Maven 依赖

<dependencies>
    <!-- Flink CDC MySQL Connector -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.4.0</version>
    </dependency>
    
    <!-- Flink CDC PostgreSQL Connector -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-postgres-cdc</artifactId>
        <version>2.4.0</version>
    </dependency>
    
    <!-- Flink CDC MongoDB Connector -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mongodb-cdc</artifactId>
        <version>2.4.0</version>
    </dependency>
</dependencies>

3.3 MySQL 配置

-- 开启 Binlog(my.cnf)
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 7

-- 创建 CDC 用户
CREATE USER 'flink_cdc'@'%' IDENTIFIED BY 'password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_cdc'@'%';
FLUSH PRIVILEGES;

3.4 第一个 CDC 任务

// DataStream API
public class MySQLCDCExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60000);
        
        MySqlSource<String> source = MySqlSource.<String>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("mydb")
            .tableList("mydb.orders", "mydb.users")
            .username("flink_cdc")
            .password("password")
            .deserializer(new JsonDebeziumDeserializationSchema())
            .startupOptions(StartupOptions.initial())  // 全量 + 增量
            .build();
        
        DataStreamSource<String> stream = env.fromSource(
            source,
            WatermarkStrategy.noWatermarks(),
            "MySQL CDC Source"
        );
        
        stream.print();
        
        env.execute("MySQL CDC Job");
    }
}
-- Flink SQL
-- 创建 MySQL CDC 源表
CREATE TABLE mysql_orders (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10, 2),
    status STRING,
    create_time TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'flink_cdc',
    'password' = 'password',
    'database-name' = 'mydb',
    'table-name' = 'orders',
    'scan.startup.mode' = 'initial'
);

-- 查询变更数据
SELECT * FROM mysql_orders;

四、全增量一体化同步

4.1 启动模式

graph TB subgraph "启动模式" A[initial] --> A1["先全量快照
再增量 Binlog
最常用"] B[earliest] --> B1["从最早 Binlog 开始
不做全量"] C[latest] --> C1["从最新位置开始
只消费新增量"] D[specific-offset] --> D1["从指定位置开始"] E[timestamp] --> E1["从指定时间开始"] end
// 不同启动模式
MySqlSource.<String>builder()
    // 全量 + 增量(默认推荐)
    .startupOptions(StartupOptions.initial())
    
    // 从最早 Binlog 开始
    .startupOptions(StartupOptions.earliest())
    
    // 从最新位置开始
    .startupOptions(StartupOptions.latest())
    
    // 从指定 Binlog 位置
    .startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L))
    
    // 从指定时间戳
    .startupOptions(StartupOptions.timestamp(1609430400000L))
    .build();

4.2 全量阶段原理

sequenceDiagram participant Flink participant MySQL participant Checkpoint Note over Flink: 全量阶段开始 Flink->>MySQL: 获取当前 Binlog 位置 Flink->>MySQL: SELECT * FROM table (分 Chunk) MySQL-->>Flink: Chunk 1 数据 Flink->>Checkpoint: 保存 Chunk 1 进度 MySQL-->>Flink: Chunk 2 数据 Flink->>Checkpoint: 保存 Chunk 2 进度 Note over Flink: 全量完成 Note over Flink: 增量阶段开始 Flink->>MySQL: 从记录的 Binlog 位置消费 loop 持续消费 MySQL-->>Flink: Binlog 事件 Flink->>Flink: 处理变更 end

4.3 无锁快照 (2.0+)

graph TB subgraph "传统快照(有锁)" A1["FLUSH TABLES WITH READ LOCK"] A2["影响在线业务"] end subgraph "Flink CDC 2.0+ 无锁快照" B1["基于 Chunk 分片读取"] B2["利用数据库 MVCC"] B3["不影响在线业务"] end

五、实战:MySQL 同步到多目标

5.1 同步到 Kafka

-- 源表
CREATE TABLE mysql_orders (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10, 2),
    status STRING,
    create_time TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'flink_cdc',
    'password' = 'password',
    'database-name' = 'mydb',
    'table-name' = 'orders'
);

-- Kafka 目标表
CREATE TABLE kafka_orders (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10, 2),
    status STRING,
    create_time TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'orders_cdc',
    'properties.bootstrap.servers' = 'kafka:9092',
    'key.format' = 'json',
    'value.format' = 'debezium-json'
);

-- 同步
INSERT INTO kafka_orders SELECT * FROM mysql_orders;

5.2 同步到 Elasticsearch

-- ES 目标表
CREATE TABLE es_orders (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10, 2),
    status STRING,
    create_time TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://localhost:9200',
    'index' = 'orders'
);

-- 同步
INSERT INTO es_orders SELECT * FROM mysql_orders;

5.3 同步到 Hudi

-- Hudi 目标表
CREATE TABLE hudi_orders (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10, 2),
    status STRING,
    create_time TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'hudi',
    'path' = 's3://bucket/hudi/orders',
    'table.type' = 'MERGE_ON_READ',
    'hoodie.datasource.write.recordkey.field' = 'order_id',
    'hoodie.datasource.write.precombine.field' = 'create_time',
    'write.operation' = 'upsert'
);

-- 同步
INSERT INTO hudi_orders SELECT * FROM mysql_orders;

5.4 同步到 Doris/StarRocks

-- Doris 目标表
CREATE TABLE doris_orders (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10, 2),
    status STRING,
    create_time TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'doris',
    'fenodes' = 'doris-fe:8030',
    'table.identifier' = 'mydb.orders',
    'username' = 'root',
    'password' = '',
    'sink.enable-2pc' = 'true',
    'sink.properties.format' = 'json',
    'sink.properties.read_json_by_line' = 'true'
);

-- 同步
INSERT INTO doris_orders SELECT * FROM mysql_orders;

六、多表同步与整库同步

6.1 多表同步

// 同步多张表
MySqlSource<String> source = MySqlSource.<String>builder()
    .hostname("localhost")
    .port(3306)
    .databaseList("mydb")
    .tableList("mydb.orders", "mydb.users", "mydb.products")  // 多张表
    .username("flink_cdc")
    .password("password")
    .deserializer(new JsonDebeziumDeserializationSchema())
    .build();

// 按表名分流
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC");

// 使用 Side Output 分流
OutputTag<String> ordersTag = new OutputTag<String>("orders"){};
OutputTag<String> usersTag = new OutputTag<String>("users"){};

SingleOutputStreamOperator<String> mainStream = stream.process(new ProcessFunction<String, String>() {
    @Override
    public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
        JSONObject json = JSON.parseObject(value);
        String tableName = json.getJSONObject("source").getString("table");
        
        if ("orders".equals(tableName)) {
            ctx.output(ordersTag, value);
        } else if ("users".equals(tableName)) {
            ctx.output(usersTag, value);
        }
    }
});

// 获取分流后的数据
DataStream<String> ordersStream = mainStream.getSideOutput(ordersTag);
DataStream<String> usersStream = mainStream.getSideOutput(usersTag);

6.2 整库同步(YAML 作业)

# Flink CDC YAML 作业定义(Flink CDC 3.0+)
source:
  type: mysql
  hostname: localhost
  port: 3306
  username: flink_cdc
  password: password
  database-list: mydb
  table-list: mydb.*  # 整库同步

sink:
  type: doris
  fenodes: doris-fe:8030
  username: root
  password: ""
  table.identifier: ${databaseName}.${tableName}  # 动态表名

route:
  - source-table: mydb.orders
    sink-table: analytics.orders
  - source-table: mydb.users
    sink-table: analytics.users
  - source-table: mydb.products
    sink-table: analytics.products

pipeline:
  name: MySQL to Doris CDC Pipeline
  parallelism: 4

6.3 Schema 变更处理

sequenceDiagram participant MySQL participant CDC participant Target MySQL->>MySQL: ALTER TABLE ADD COLUMN MySQL->>CDC: DDL 事件 Note over CDC: 捕获 Schema 变更 CDC->>Target: 同步 Schema 变更 Note over Target: 自动加列
-- 启用 DDL 同步(Flink CDC 3.0+)
CREATE TABLE mysql_orders (
    ...
) WITH (
    'connector' = 'mysql-cdc',
    ...
    'scan.incremental.snapshot.enabled' = 'true',
    'include.schema.changes' = 'true'  -- 启用 Schema 变更
);

七、高级特性

7.1 并行快照

graph TB subgraph "并行快照读取" TABLE[大表] --> C1[Chunk 1
id: 1-10000] TABLE --> C2[Chunk 2
id: 10001-20000] TABLE --> C3[Chunk 3
id: 20001-30000] TABLE --> C4[Chunk N] C1 --> P1[并行任务 1] C2 --> P2[并行任务 2] C3 --> P3[并行任务 3] C4 --> P4[并行任务 N] end
MySqlSource.<String>builder()
    .hostname("localhost")
    .port(3306)
    .databaseList("mydb")
    .tableList("mydb.orders")
    .username("flink_cdc")
    .password("password")
    .deserializer(new JsonDebeziumDeserializationSchema())
    // 快照并行度配置
    .splitSize(8096)  // 每个 Split 的记录数
    .fetchSize(1024)  // 每次 Fetch 的记录数
    .build();

// 设置并行度
env.setParallelism(4);

7.2 增量快照 Checkpoint

// 开启 Checkpoint
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");

// CDC 源会自动保存 Binlog 位点
// 故障恢复时从保存的位点继续消费

7.3 数据过滤与转换

-- 只同步特定条件的数据
CREATE TABLE mysql_orders (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10, 2),
    status STRING,
    create_time TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'flink_cdc',
    'password' = 'password',
    'database-name' = 'mydb',
    'table-name' = 'orders'
);

-- 过滤 + 转换后同步
INSERT INTO target_orders
SELECT 
    order_id,
    user_id,
    amount,
    CASE status 
        WHEN '1' THEN 'CREATED'
        WHEN '2' THEN 'PAID'
        WHEN '3' THEN 'COMPLETED'
        ELSE 'UNKNOWN'
    END as status,
    create_time
FROM mysql_orders
WHERE amount > 0;  -- 过滤条件

八、其他数据库 CDC

8.1 PostgreSQL CDC

-- PostgreSQL 配置
-- postgresql.conf
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10

-- 创建复制槽
SELECT * FROM pg_create_logical_replication_slot('flink_cdc_slot', 'pgoutput');

-- Flink SQL
CREATE TABLE pg_orders (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10, 2),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'postgres-cdc',
    'hostname' = 'localhost',
    'port' = '5432',
    'username' = 'postgres',
    'password' = 'password',
    'database-name' = 'mydb',
    'schema-name' = 'public',
    'table-name' = 'orders',
    'slot.name' = 'flink_cdc_slot',
    'decoding.plugin.name' = 'pgoutput'
);

8.2 MongoDB CDC

CREATE TABLE mongo_orders (
    _id STRING,
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10, 2),
    PRIMARY KEY (_id) NOT ENFORCED
) WITH (
    'connector' = 'mongodb-cdc',
    'hosts' = 'localhost:27017',
    'username' = 'admin',
    'password' = 'password',
    'database' = 'mydb',
    'collection' = 'orders'
);

8.3 Oracle CDC

CREATE TABLE oracle_orders (
    ORDER_ID NUMBER,
    USER_ID NUMBER,
    AMOUNT NUMBER(10, 2),
    PRIMARY KEY (ORDER_ID) NOT ENFORCED
) WITH (
    'connector' = 'oracle-cdc',
    'hostname' = 'localhost',
    'port' = '1521',
    'username' = 'system',
    'password' = 'oracle',
    'database-name' = 'ORCLCDB',
    'schema-name' = 'MYSCHEMA',
    'table-name' = 'ORDERS',
    'debezium.log.mining.strategy' = 'online_catalog'
);

九、监控与运维

9.1 关键监控指标

graph TB subgraph "CDC 监控指标" A[Binlog 延迟] --> A1["currentBinlogPosition
vs MySQL 最新位置"] B[处理速率] --> B1["numRecordsInPerSecond"] C[快照进度] --> C1["sourceSnapshotCompleted"] D[错误计数] --> D1["numRecordsErroredPerSecond"] end

9.2 常见问题排查

flowchart TD A[CDC 问题] --> B{问题类型} B -->|连接失败| C["检查网络/权限
检查 Binlog 配置"] B -->|数据延迟| D["增加并行度
检查下游瓶颈"] B -->|数据丢失| E["检查 Checkpoint
检查 Binlog 保留"] B -->|OOM| F["减少表数量
增加内存"]

9.3 Binlog 保留配置

-- MySQL Binlog 保留天数
SET GLOBAL expire_logs_days = 7;

-- 或者 MySQL 8.0+
SET GLOBAL binlog_expire_logs_seconds = 604800;

-- 检查 Binlog 状态
SHOW BINARY LOGS;
SHOW MASTER STATUS;

十、最佳实践

10.1 生产环境架构

graph TB subgraph "数据源" M1[(MySQL 主库)] M2[(MySQL 从库)] end subgraph "Flink CDC 集群" F1[Flink CDC Job 1] F2[Flink CDC Job 2] end subgraph "消息队列" K[(Kafka)] end subgraph "数据湖/仓" H[(Hudi)] D[(Doris)] end M2 --> F1 --> K M2 --> F2 --> H & D NOTE["建议从从库读取
避免影响主库"]

10.2 配置建议

// 生产环境配置建议
MySqlSource.<String>builder()
    .hostname("mysql-slave")  // 从从库读取
    .port(3306)
    .databaseList("mydb")
    .tableList("mydb.orders")
    .username("flink_cdc")
    .password("password")
    .serverTimeZone("Asia/Shanghai")  // 时区设置
    .deserializer(new JsonDebeziumDeserializationSchema())
    .startupOptions(StartupOptions.initial())
    // 性能配置
    .splitSize(8096)
    .fetchSize(1024)
    .connectTimeout(Duration.ofSeconds(30))
    // 心跳配置
    .heartbeatInterval(Duration.ofSeconds(30))
    .build();

10.3 注意事项

mindmap root((CDC最佳实践)) 数据源 从从库读取 合理配置Binlog保留 授予最小权限 Flink配置 开启Checkpoint 合理设置并行度 配置足够内存 监控 监控Binlog延迟 监控处理速率 设置告警 容错 配置故障重试 保存Checkpoint 定期验证数据

十一、总结

11.1 核心要点

mindmap root((Flink CDC)) 核心能力 全增量一体 无锁快照 Exactly-Once 支持数据源 MySQL PostgreSQL MongoDB Oracle 目标系统 Kafka 数据湖 OLAP 高级特性 并行快照 Schema变更 整库同步

11.2 一句话总结

Flink CDC = 实时数据同步的最佳选择

全增量一体、无锁快照、精确一次,让数据实时流动起来。


评论区
暂无评论
avatar