一、前言:为什么需要 CDC?
1.1 传统数据同步的痛点
graph TD
A[传统离线同步] --> B[T+1 延迟]
A --> C[全量同步耗时]
A --> D[无法捕获删除]
A --> E[业务库压力大]
B --> B1["今天的数据
明天才能分析"] C --> C1["每次同步几小时
窗口期紧张"] D --> D1["只能同步快照
丢失变更历史"] E --> E1["大查询影响
在线业务"]
明天才能分析"] C --> C1["每次同步几小时
窗口期紧张"] D --> D1["只能同步快照
丢失变更历史"] E --> E1["大查询影响
在线业务"]
1.2 CDC 的价值
graph LR
subgraph "CDC 数据流"
A[业务数据库] -->|实时变更| B[CDC]
B -->|增量数据| C[数据湖/数仓]
end
subgraph "优势"
D["秒级延迟"]
E["增量同步"]
F["完整变更历史"]
G["低侵入性"]
end
1.3 什么是 CDC?
CDC = Change Data Capture(变更数据捕获)
sequenceDiagram
participant App as 应用
participant DB as 数据库
participant CDC as CDC工具
participant Target as 目标系统
App->>DB: INSERT (id=1, name='张三')
DB->>DB: 写入 Binlog
CDC->>DB: 读取 Binlog
CDC->>Target: +I (id=1, name='张三')
App->>DB: UPDATE (id=1, name='李四')
DB->>DB: 写入 Binlog
CDC->>DB: 读取 Binlog
CDC->>Target: -U (id=1, name='张三')
+U (id=1, name='李四')
+U (id=1, name='李四')
二、CDC 技术方案对比
2.1 CDC 实现方式
graph TB
subgraph "CDC 实现方式"
A[基于查询] --> A1["定时扫描
对比差异"] B[基于日志] --> B1["解析数据库日志
如 MySQL Binlog"] C[基于触发器] --> C1["数据库触发器
写入变更表"] end A1 --> A2["延迟高、压力大"] B1 --> B2["延迟低、无侵入
推荐方案"] C1 --> C2["性能影响大
不推荐"]
对比差异"] B[基于日志] --> B1["解析数据库日志
如 MySQL Binlog"] C[基于触发器] --> C1["数据库触发器
写入变更表"] end A1 --> A2["延迟高、压力大"] B1 --> B2["延迟低、无侵入
推荐方案"] C1 --> C2["性能影响大
不推荐"]
2.2 CDC 工具对比
| 工具 | 类型 | 优点 | 缺点 |
|---|---|---|---|
| Flink CDC | 流处理框架 | 功能强大、生态丰富 | 需要 Flink 集群 |
| Debezium | CDC 平台 | 成熟稳定、Kafka 集成 | 依赖 Kafka |
| Canal | MySQL CDC | 阿里开源、国内流行 | 仅支持 MySQL |
| Maxwell | MySQL CDC | 轻量简单 | 功能有限 |
| DataX | 批量同步 | 阿里开源、支持多数据源 | 非实时 |
2.3 Flink CDC 优势
graph TB
A[Flink CDC 优势] --> B[全增量一体]
A --> C[Exactly-Once]
A --> D[无需 Kafka]
A --> E[丰富的下游]
B --> B1["先全量再增量
无缝切换"] C --> C1["精确一次语义
数据不丢不重"] D --> D1["直连数据库
架构简化"] E --> E1["直接写入
数据湖/OLAP"]
无缝切换"] C --> C1["精确一次语义
数据不丢不重"] D --> D1["直连数据库
架构简化"] E --> E1["直接写入
数据湖/OLAP"]
三、Flink CDC 快速入门
3.1 架构图
graph LR
subgraph "数据源"
MySQL[(MySQL)]
PostgreSQL[(PostgreSQL)]
MongoDB[(MongoDB)]
Oracle[(Oracle)]
end
subgraph "Flink CDC"
CDC[Flink CDC Connector]
end
subgraph "目标"
Kafka[(Kafka)]
ES[(Elasticsearch)]
Hudi[(Hudi)]
Iceberg[(Iceberg)]
Doris[(Doris)]
end
MySQL & PostgreSQL & MongoDB & Oracle --> CDC
CDC --> Kafka & ES & Hudi & Iceberg & Doris
3.2 Maven 依赖
<dependencies>
<!-- Flink CDC MySQL Connector -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.4.0</version>
</dependency>
<!-- Flink CDC PostgreSQL Connector -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>2.4.0</version>
</dependency>
<!-- Flink CDC MongoDB Connector -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies>3.3 MySQL 配置
-- 开启 Binlog(my.cnf)
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 7
-- 创建 CDC 用户
CREATE USER 'flink_cdc'@'%' IDENTIFIED BY 'password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_cdc'@'%';
FLUSH PRIVILEGES;3.4 第一个 CDC 任务
// DataStream API
public class MySQLCDCExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
MySqlSource<String> source = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("mydb")
.tableList("mydb.orders", "mydb.users")
.username("flink_cdc")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial()) // 全量 + 增量
.build();
DataStreamSource<String> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"MySQL CDC Source"
);
stream.print();
env.execute("MySQL CDC Job");
}
}-- Flink SQL
-- 创建 MySQL CDC 源表
CREATE TABLE mysql_orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
status STRING,
create_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flink_cdc',
'password' = 'password',
'database-name' = 'mydb',
'table-name' = 'orders',
'scan.startup.mode' = 'initial'
);
-- 查询变更数据
SELECT * FROM mysql_orders;四、全增量一体化同步
4.1 启动模式
graph TB
subgraph "启动模式"
A[initial] --> A1["先全量快照
再增量 Binlog
最常用"] B[earliest] --> B1["从最早 Binlog 开始
不做全量"] C[latest] --> C1["从最新位置开始
只消费新增量"] D[specific-offset] --> D1["从指定位置开始"] E[timestamp] --> E1["从指定时间开始"] end
再增量 Binlog
最常用"] B[earliest] --> B1["从最早 Binlog 开始
不做全量"] C[latest] --> C1["从最新位置开始
只消费新增量"] D[specific-offset] --> D1["从指定位置开始"] E[timestamp] --> E1["从指定时间开始"] end
// 不同启动模式
MySqlSource.<String>builder()
// 全量 + 增量(默认推荐)
.startupOptions(StartupOptions.initial())
// 从最早 Binlog 开始
.startupOptions(StartupOptions.earliest())
// 从最新位置开始
.startupOptions(StartupOptions.latest())
// 从指定 Binlog 位置
.startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L))
// 从指定时间戳
.startupOptions(StartupOptions.timestamp(1609430400000L))
.build();4.2 全量阶段原理
sequenceDiagram
participant Flink
participant MySQL
participant Checkpoint
Note over Flink: 全量阶段开始
Flink->>MySQL: 获取当前 Binlog 位置
Flink->>MySQL: SELECT * FROM table (分 Chunk)
MySQL-->>Flink: Chunk 1 数据
Flink->>Checkpoint: 保存 Chunk 1 进度
MySQL-->>Flink: Chunk 2 数据
Flink->>Checkpoint: 保存 Chunk 2 进度
Note over Flink: 全量完成
Note over Flink: 增量阶段开始
Flink->>MySQL: 从记录的 Binlog 位置消费
loop 持续消费
MySQL-->>Flink: Binlog 事件
Flink->>Flink: 处理变更
end
4.3 无锁快照 (2.0+)
graph TB
subgraph "传统快照(有锁)"
A1["FLUSH TABLES WITH READ LOCK"]
A2["影响在线业务"]
end
subgraph "Flink CDC 2.0+ 无锁快照"
B1["基于 Chunk 分片读取"]
B2["利用数据库 MVCC"]
B3["不影响在线业务"]
end
五、实战:MySQL 同步到多目标
5.1 同步到 Kafka
-- 源表
CREATE TABLE mysql_orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
status STRING,
create_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flink_cdc',
'password' = 'password',
'database-name' = 'mydb',
'table-name' = 'orders'
);
-- Kafka 目标表
CREATE TABLE kafka_orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
status STRING,
create_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'orders_cdc',
'properties.bootstrap.servers' = 'kafka:9092',
'key.format' = 'json',
'value.format' = 'debezium-json'
);
-- 同步
INSERT INTO kafka_orders SELECT * FROM mysql_orders;5.2 同步到 Elasticsearch
-- ES 目标表
CREATE TABLE es_orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
status STRING,
create_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'orders'
);
-- 同步
INSERT INTO es_orders SELECT * FROM mysql_orders;5.3 同步到 Hudi
-- Hudi 目标表
CREATE TABLE hudi_orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
status STRING,
create_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 's3://bucket/hudi/orders',
'table.type' = 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field' = 'order_id',
'hoodie.datasource.write.precombine.field' = 'create_time',
'write.operation' = 'upsert'
);
-- 同步
INSERT INTO hudi_orders SELECT * FROM mysql_orders;5.4 同步到 Doris/StarRocks
-- Doris 目标表
CREATE TABLE doris_orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
status STRING,
create_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'doris',
'fenodes' = 'doris-fe:8030',
'table.identifier' = 'mydb.orders',
'username' = 'root',
'password' = '',
'sink.enable-2pc' = 'true',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true'
);
-- 同步
INSERT INTO doris_orders SELECT * FROM mysql_orders;六、多表同步与整库同步
6.1 多表同步
// 同步多张表
MySqlSource<String> source = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("mydb")
.tableList("mydb.orders", "mydb.users", "mydb.products") // 多张表
.username("flink_cdc")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
// 按表名分流
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC");
// 使用 Side Output 分流
OutputTag<String> ordersTag = new OutputTag<String>("orders"){};
OutputTag<String> usersTag = new OutputTag<String>("users"){};
SingleOutputStreamOperator<String> mainStream = stream.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
JSONObject json = JSON.parseObject(value);
String tableName = json.getJSONObject("source").getString("table");
if ("orders".equals(tableName)) {
ctx.output(ordersTag, value);
} else if ("users".equals(tableName)) {
ctx.output(usersTag, value);
}
}
});
// 获取分流后的数据
DataStream<String> ordersStream = mainStream.getSideOutput(ordersTag);
DataStream<String> usersStream = mainStream.getSideOutput(usersTag);6.2 整库同步(YAML 作业)
# Flink CDC YAML 作业定义(Flink CDC 3.0+)
source:
type: mysql
hostname: localhost
port: 3306
username: flink_cdc
password: password
database-list: mydb
table-list: mydb.* # 整库同步
sink:
type: doris
fenodes: doris-fe:8030
username: root
password: ""
table.identifier: ${databaseName}.${tableName} # 动态表名
route:
- source-table: mydb.orders
sink-table: analytics.orders
- source-table: mydb.users
sink-table: analytics.users
- source-table: mydb.products
sink-table: analytics.products
pipeline:
name: MySQL to Doris CDC Pipeline
parallelism: 46.3 Schema 变更处理
sequenceDiagram
participant MySQL
participant CDC
participant Target
MySQL->>MySQL: ALTER TABLE ADD COLUMN
MySQL->>CDC: DDL 事件
Note over CDC: 捕获 Schema 变更
CDC->>Target: 同步 Schema 变更
Note over Target: 自动加列
-- 启用 DDL 同步(Flink CDC 3.0+)
CREATE TABLE mysql_orders (
...
) WITH (
'connector' = 'mysql-cdc',
...
'scan.incremental.snapshot.enabled' = 'true',
'include.schema.changes' = 'true' -- 启用 Schema 变更
);七、高级特性
7.1 并行快照
graph TB
subgraph "并行快照读取"
TABLE[大表] --> C1[Chunk 1
id: 1-10000] TABLE --> C2[Chunk 2
id: 10001-20000] TABLE --> C3[Chunk 3
id: 20001-30000] TABLE --> C4[Chunk N] C1 --> P1[并行任务 1] C2 --> P2[并行任务 2] C3 --> P3[并行任务 3] C4 --> P4[并行任务 N] end
id: 1-10000] TABLE --> C2[Chunk 2
id: 10001-20000] TABLE --> C3[Chunk 3
id: 20001-30000] TABLE --> C4[Chunk N] C1 --> P1[并行任务 1] C2 --> P2[并行任务 2] C3 --> P3[并行任务 3] C4 --> P4[并行任务 N] end
MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("mydb")
.tableList("mydb.orders")
.username("flink_cdc")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
// 快照并行度配置
.splitSize(8096) // 每个 Split 的记录数
.fetchSize(1024) // 每次 Fetch 的记录数
.build();
// 设置并行度
env.setParallelism(4);7.2 增量快照 Checkpoint
// 开启 Checkpoint
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");
// CDC 源会自动保存 Binlog 位点
// 故障恢复时从保存的位点继续消费7.3 数据过滤与转换
-- 只同步特定条件的数据
CREATE TABLE mysql_orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
status STRING,
create_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flink_cdc',
'password' = 'password',
'database-name' = 'mydb',
'table-name' = 'orders'
);
-- 过滤 + 转换后同步
INSERT INTO target_orders
SELECT
order_id,
user_id,
amount,
CASE status
WHEN '1' THEN 'CREATED'
WHEN '2' THEN 'PAID'
WHEN '3' THEN 'COMPLETED'
ELSE 'UNKNOWN'
END as status,
create_time
FROM mysql_orders
WHERE amount > 0; -- 过滤条件八、其他数据库 CDC
8.1 PostgreSQL CDC
-- PostgreSQL 配置
-- postgresql.conf
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
-- 创建复制槽
SELECT * FROM pg_create_logical_replication_slot('flink_cdc_slot', 'pgoutput');
-- Flink SQL
CREATE TABLE pg_orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'password',
'database-name' = 'mydb',
'schema-name' = 'public',
'table-name' = 'orders',
'slot.name' = 'flink_cdc_slot',
'decoding.plugin.name' = 'pgoutput'
);8.2 MongoDB CDC
CREATE TABLE mongo_orders (
_id STRING,
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017',
'username' = 'admin',
'password' = 'password',
'database' = 'mydb',
'collection' = 'orders'
);8.3 Oracle CDC
CREATE TABLE oracle_orders (
ORDER_ID NUMBER,
USER_ID NUMBER,
AMOUNT NUMBER(10, 2),
PRIMARY KEY (ORDER_ID) NOT ENFORCED
) WITH (
'connector' = 'oracle-cdc',
'hostname' = 'localhost',
'port' = '1521',
'username' = 'system',
'password' = 'oracle',
'database-name' = 'ORCLCDB',
'schema-name' = 'MYSCHEMA',
'table-name' = 'ORDERS',
'debezium.log.mining.strategy' = 'online_catalog'
);九、监控与运维
9.1 关键监控指标
graph TB
subgraph "CDC 监控指标"
A[Binlog 延迟] --> A1["currentBinlogPosition
vs MySQL 最新位置"] B[处理速率] --> B1["numRecordsInPerSecond"] C[快照进度] --> C1["sourceSnapshotCompleted"] D[错误计数] --> D1["numRecordsErroredPerSecond"] end
vs MySQL 最新位置"] B[处理速率] --> B1["numRecordsInPerSecond"] C[快照进度] --> C1["sourceSnapshotCompleted"] D[错误计数] --> D1["numRecordsErroredPerSecond"] end
9.2 常见问题排查
flowchart TD
A[CDC 问题] --> B{问题类型}
B -->|连接失败| C["检查网络/权限
检查 Binlog 配置"] B -->|数据延迟| D["增加并行度
检查下游瓶颈"] B -->|数据丢失| E["检查 Checkpoint
检查 Binlog 保留"] B -->|OOM| F["减少表数量
增加内存"]
检查 Binlog 配置"] B -->|数据延迟| D["增加并行度
检查下游瓶颈"] B -->|数据丢失| E["检查 Checkpoint
检查 Binlog 保留"] B -->|OOM| F["减少表数量
增加内存"]
9.3 Binlog 保留配置
-- MySQL Binlog 保留天数
SET GLOBAL expire_logs_days = 7;
-- 或者 MySQL 8.0+
SET GLOBAL binlog_expire_logs_seconds = 604800;
-- 检查 Binlog 状态
SHOW BINARY LOGS;
SHOW MASTER STATUS;十、最佳实践
10.1 生产环境架构
graph TB
subgraph "数据源"
M1[(MySQL 主库)]
M2[(MySQL 从库)]
end
subgraph "Flink CDC 集群"
F1[Flink CDC Job 1]
F2[Flink CDC Job 2]
end
subgraph "消息队列"
K[(Kafka)]
end
subgraph "数据湖/仓"
H[(Hudi)]
D[(Doris)]
end
M2 --> F1 --> K
M2 --> F2 --> H & D
NOTE["建议从从库读取
避免影响主库"]
避免影响主库"]
10.2 配置建议
// 生产环境配置建议
MySqlSource.<String>builder()
.hostname("mysql-slave") // 从从库读取
.port(3306)
.databaseList("mydb")
.tableList("mydb.orders")
.username("flink_cdc")
.password("password")
.serverTimeZone("Asia/Shanghai") // 时区设置
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
// 性能配置
.splitSize(8096)
.fetchSize(1024)
.connectTimeout(Duration.ofSeconds(30))
// 心跳配置
.heartbeatInterval(Duration.ofSeconds(30))
.build();10.3 注意事项
mindmap
root((CDC最佳实践))
数据源
从从库读取
合理配置Binlog保留
授予最小权限
Flink配置
开启Checkpoint
合理设置并行度
配置足够内存
监控
监控Binlog延迟
监控处理速率
设置告警
容错
配置故障重试
保存Checkpoint
定期验证数据
十一、总结
11.1 核心要点
mindmap
root((Flink CDC))
核心能力
全增量一体
无锁快照
Exactly-Once
支持数据源
MySQL
PostgreSQL
MongoDB
Oracle
目标系统
Kafka
数据湖
OLAP
高级特性
并行快照
Schema变更
整库同步
11.2 一句话总结
Flink CDC = 实时数据同步的最佳选择
全增量一体、无锁快照、精确一次,让数据实时流动起来。