前言:两个流派的战争
在数据同步的江湖里,有两大门派:
批量派:讲究稳扎稳打,每天定时把数据从A搬到B,T+1看报表,岁月静好。代表工具:Sqoop(已退休)、DataX(当红炸子鸡)。
实时派:追求极致速度,数据产生的那一刻就要同步过去,延迟超过1秒就浑身难受。代表工具:Canal、Debezium、Flink CDC(新晋顶流)。
两派弟子经常在技术群里吵架:
批量派:"你们实时同步搞那么复杂,出了Bug排查到天亮!"
实时派:"都2024年了还T+1?用户等得花儿都谢了!"
批量派:"我们稳定!"
实时派:"我们快!"
架构师(和稀泥):"都别吵了,两个都要!"
没错,现实中很多公司确实是两个都要。批量用DataX,实时用Flink CDC,各司其职。
今天,让我们深入对比这两个工具,看看它们各自的绝活和短板。
一、选手介绍
1.1 DataX:阿里出品的数据搬运工
一句话概括:DataX就是一个数据搬运的瑞士军刀,从哪搬到哪、搬什么格式,配置一下就行。
1.2 Flink CDC:实时数据捕获专家
一句话概括:Flink CDC就是一个数据库的监控摄像头,任何数据变化都逃不过它的眼睛,而且实时同步。
1.3 核心区别一览
SELECT * FROM table"] end subgraph FlinkCDC方式["⚡ Flink CDC同步方式"] DB2[(MySQL)] --> |"Binlog实时流"|CDC[Flink CDC] CDC --> |"实时写入"|Target2[目标存储] Note2["监听Binlog
有变化立即同步"] end style Note1 fill:#ffe66d style Note2 fill:#4ecdc4
| 维度 | DataX | Flink CDC |
|---|---|---|
| 同步模式 | 批量(全量/增量) | 实时(CDC) |
| 数据延迟 | 分钟~小时级 | 秒~毫秒级 |
| 捕获方式 | SQL查询 | Binlog解析 |
| UPDATE/DELETE | 需特殊处理 | 原生支持 |
| 架构复杂度 | 简单 | 较复杂 |
| 资源消耗 | 按需启动 | 常驻运行 |
| 对源库影响 | 有(查询压力) | 小(读Binlog) |
二、架构深度对比
2.1 DataX架构
作业容器] end subgraph TaskGroup["Task Group(任务组)"] TG1[TaskGroup 1] TG2[TaskGroup 2] TG3[TaskGroup N...] end subgraph Task["Task(任务)"] T1[Reader Task] T2[Writer Task] Channel[Channel
内存缓冲队列] end JobContainer --> TG1 JobContainer --> TG2 JobContainer --> TG3 TG1 --> T1 T1 --> Channel Channel --> T2 end subgraph 插件["插件体系"] subgraph Readers["Reader插件"] R1[mysqlreader] R2[oraclereader] R3[hdfsreader] R4[...] end subgraph Writers["Writer插件"] W1[hdfswriter] W2[hivewriter] W3[mysqlwriter] W4[...] end end style JobContainer fill:#ff6b6b style Channel fill:#ffe66d
DataX的核心设计:
- Framework + Plugin:框架负责调度,插件负责读写
- Channel:Reader和Writer之间的内存缓冲队列
- 并发控制:通过TaskGroup和Channel数量控制并发
- 无中心化:单机运行,简单可靠
2.2 Flink CDC架构
Binlog解析] end subgraph Flink["Flink Runtime"] Source2[Source Operator
读取变更流] Transform[Transform
数据处理] Sink[Sink Operator
写入目标] Source2 --> Transform --> Sink end subgraph Target["目标存储"] Kafka[Kafka] HDFS2[HDFS/Hive] ES[Elasticsearch] Doris[Doris/StarRocks] end MySQL --> |"Binlog"|Debezium PG --> |"WAL"|Debezium Debezium --> Source2 Sink --> Target end style Debezium fill:#4ecdc4 style Flink fill:#ff6b6b
Flink CDC的核心设计:
- 基于Debezium:复用成熟的CDC引擎
- 流式处理:数据以流的形式持续处理
- Checkpoint机制:保证Exactly-Once语义
- 全增量一体:先全量快照,再增量Binlog,无缝衔接
2.3 数据流对比
三、DataX实战
3.1 安装部署
# 下载DataX
wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz
# 解压
tar -zxvf datax.tar.gz
# 查看支持的插件
ls datax/plugin/reader/
ls datax/plugin/writer/
# 测试运行
python datax/bin/datax.py datax/job/job.json3.2 MySQL到HDFS全量同步
{
"job": {
"setting": {
"speed": {
"channel": 4,
"byte": 104857600
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": ["id", "user_name", "age", "city", "create_time", "update_time"],
"splitPk": "id",
"connection": [{
"table": ["user_info"],
"jdbcUrl": ["jdbc:mysql://192.168.1.100:3306/mydb?useUnicode=true&characterEncoding=utf8"]
}]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://namenode:8020",
"fileType": "parquet",
"path": "/user/hive/warehouse/ods.db/ods_user_info/dt=$dt",
"fileName": "user_info",
"column": [
{"name": "id", "type": "bigint"},
{"name": "user_name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "city", "type": "string"},
{"name": "create_time", "type": "timestamp"},
{"name": "update_time", "type": "timestamp"}
],
"writeMode": "truncate",
"fieldDelimiter": "\u0001",
"compress": "snappy"
}
}
}]
}
}3.3 MySQL到Hive增量同步
{
"job": {
"setting": {
"speed": {"channel": 4}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": ["id", "user_name", "age", "city", "create_time", "update_time"],
"where": "update_time >= '${start_time}' AND update_time < '${end_time}'",
"splitPk": "id",
"connection": [{
"table": ["user_info"],
"jdbcUrl": ["jdbc:mysql://192.168.1.100:3306/mydb"]
}]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://namenode:8020",
"fileType": "parquet",
"path": "/user/hive/warehouse/ods.db/ods_user_info_inc/dt=${dt}",
"fileName": "user_info",
"column": [
{"name": "id", "type": "bigint"},
{"name": "user_name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "city", "type": "string"},
{"name": "create_time", "type": "timestamp"},
{"name": "update_time", "type": "timestamp"}
],
"writeMode": "append",
"compress": "snappy"
}
}
}]
}
}运行命令:
# 传递参数运行
python datax/bin/datax.py \
-p "-Ddt=2024-01-15 -Dstart_time='2024-01-15 00:00:00' -Dend_time='2024-01-16 00:00:00'" \
job/mysql2hive_inc.json3.4 多表同步脚本
#!/bin/bash
# datax_sync_all.sh - 批量同步多张表
DATAX_HOME=/opt/datax
JOB_DIR=/opt/datax/job
DT=$(date +%Y-%m-%d)
LOG_DIR=/var/log/datax/${DT}
mkdir -p ${LOG_DIR}
# 需要同步的表列表
TABLES=(
"user_info"
"order_info"
"product_info"
"payment_info"
)
# 并行度
PARALLEL=2
COUNT=0
for table in "${TABLES[@]}"; do
echo "$(date '+%Y-%m-%d %H:%M:%S') 开始同步表: ${table}"
# 后台运行,控制并行度
python ${DATAX_HOME}/bin/datax.py \
-p "-Ddt=${DT} -Dtable=${table}" \
${JOB_DIR}/${table}.json \
> ${LOG_DIR}/${table}.log 2>&1 &
COUNT=$((COUNT + 1))
# 控制并行度
if [ $((COUNT % PARALLEL)) -eq 0 ]; then
wait
fi
done
wait
echo "$(date '+%Y-%m-%d %H:%M:%S') 所有表同步完成!"3.5 DataX性能调优
调优参数示例:
{
"job": {
"setting": {
"speed": {
"channel": 8,
"byte": 209715200,
"record": 100000
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"splitPk": "id",
"fetchSize": 10000
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"batchSize": 4096
}
}
}]
}
}四、Flink CDC实战
4.1 环境准备
<!-- pom.xml依赖 -->
<dependencies>
<!-- Flink CDC MySQL Connector -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>3.0.1</version>
</dependency>
<!-- Flink SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.18.0</version>
</dependency>
<!-- 其他Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.0.2-1.18</version>
</dependency>
</dependencies>MySQL开启Binlog:
-- 查看是否开启binlog
SHOW VARIABLES LIKE 'log_bin';
-- my.cnf配置
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL
expire_logs_days=7
-- 创建CDC用户
CREATE USER 'flink_cdc'@'%' IDENTIFIED BY 'password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_cdc'@'%';
FLUSH PRIVILEGES;4.2 Flink SQL方式(推荐)
-- 创建Flink SQL CLI环境
-- bin/sql-client.sh
-- 创建MySQL CDC源表
CREATE TABLE mysql_users (
id BIGINT,
user_name STRING,
age INT,
city STRING,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.100',
'port' = '3306',
'username' = 'flink_cdc',
'password' = 'password',
'database-name' = 'mydb',
'table-name' = 'user_info',
'server-time-zone' = 'Asia/Shanghai',
'scan.startup.mode' = 'initial'
);
-- 创建Kafka目标表
CREATE TABLE kafka_users (
id BIGINT,
user_name STRING,
age INT,
city STRING,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'user_info_cdc',
'properties.bootstrap.servers' = 'kafka:9092',
'key.format' = 'json',
'value.format' = 'json'
);
-- 创建Doris/StarRocks目标表
CREATE TABLE doris_users (
id BIGINT,
user_name STRING,
age INT,
city STRING,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'doris',
'fenodes' = '192.168.1.101:8030',
'table.identifier' = 'mydb.user_info',
'username' = 'root',
'password' = '',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-delete' = 'true'
);
-- 开始实时同步到Kafka
INSERT INTO kafka_users SELECT * FROM mysql_users;
-- 开始实时同步到Doris
INSERT INTO doris_users SELECT * FROM mysql_users;4.3 DataStream API方式
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MySqlCDCExample {
public static void main(String[] args) throws Exception {
// 创建MySQL CDC Source
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("192.168.1.100")
.port(3306)
.databaseList("mydb")
.tableList("mydb.user_info", "mydb.order_info")
.username("flink_cdc")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial()) // 全量+增量
.build();
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // 60秒checkpoint
// 添加Source并处理
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
.print(); // 或写入其他存储
env.execute("MySQL CDC Job");
}
}4.4 整库同步(多表)
-- Flink CDC 3.0+ 支持整库同步
-- 使用正则匹配多表
CREATE TABLE mysql_all_tables (
database_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
id BIGINT,
data STRING, -- 将所有字段序列化为JSON
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.100',
'port' = '3306',
'username' = 'flink_cdc',
'password' = 'password',
'database-name' = 'mydb',
'table-name' = 'user_.*|order_.*', -- 正则匹配多表
'scan.startup.mode' = 'initial'
);使用YAML配置整库同步(Flink CDC 3.0 Pipeline模式):
# mysql-to-doris.yaml
source:
type: mysql
hostname: 192.168.1.100
port: 3306
username: flink_cdc
password: password
tables: mydb.\.*
server-id: 5400-5404
server-time-zone: Asia/Shanghai
sink:
type: doris
fenodes: 192.168.1.101:8030
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
pipeline:
name: MySQL to Doris Pipeline
parallelism: 4# 提交整库同步任务
bin/flink-cdc.sh mysql-to-doris.yaml4.5 Flink CDC性能调优
五、深度对比分析
5.1 数据一致性对比
(需配合merge逻辑)"] end subgraph FlinkCDC一致性["⚡ Flink CDC数据一致性"] A2[基于Binlog捕获] --> B2[完整捕获INSERT/UPDATE/DELETE] B2 --> C2[Checkpoint保证Exactly-Once] C2 --> D2["强一致性
(数据不丢不重)"] end style D1 fill:#ffe66d style D2 fill:#4ecdc4
关键区别:
| 操作类型 | DataX处理方式 | Flink CDC处理方式 |
|---|---|---|
| INSERT | ✅ 正常同步 | ✅ 实时捕获 |
| UPDATE | ⚠️ 需要时间戳字段 | ✅ 实时捕获,带前后值 |
| DELETE | ❌ 通常无法感知 | ✅ 实时捕获 |
| DDL变更 | ❌ 不支持 | ⚠️ 部分支持 |
5.2 性能对比
取决于数据库和网络"] CDC_Throughput["Flink CDC: 10-50MB/s
受限于Binlog速度"] end subgraph 资源["资源消耗"] DX_Resource["DataX: 按需启动
跑完释放"] CDC_Resource["Flink CDC: 常驻运行
持续消耗"] end end style DX_Latency fill:#ff6b6b style CDC_Latency fill:#4ecdc4 style DX_Resource fill:#4ecdc4 style CDC_Resource fill:#ffe66d
真实场景性能数据:
| 场景 | DataX | Flink CDC |
|---|---|---|
| 1000万行全量同步 | 5-10分钟 | 10-20分钟(首次快照) |
| 增量同步延迟 | 取决于调度间隔 | 1-5秒 |
| 每秒同步记录数 | 5-20万(批量) | 1-5万(实时) |
| CPU占用 | 高(运行时) | 低(常态) |
| 内存占用 | 4-8GB(运行时) | 2-4GB(常态) |
5.3 场景适用性对比
简单高效] Q2 --> |">100GB/天"|Q3{有Spark集群?} Q3 --> |"有"|Spark[Spark JDBC] Q3 --> |"没有"|DataX2[DataX分布式
或SeaTunnel] CDC1 --> Q4{需要捕获DELETE?} Q4 --> |"是"|CDC2[Flink CDC
必选] Q4 --> |"否"|Q5{数据库压力敏感?} Q5 --> |"是"|CDC3[Flink CDC
Binlog压力小] Q5 --> |"否"|DataX3[DataX也可以
高频调度] end style CDC1 fill:#4ecdc4 style CDC2 fill:#4ecdc4 style DataX1 fill:#ff6b6b
5.4 运维复杂度对比
Airflow/DolphinScheduler] D3 --> D4[监控简单
看日志即可] D4 --> D5["故障恢复简单
重跑即可"] end subgraph FlinkCDC运维["⚡ Flink CDC运维"] F1[部署复杂] --> F2[需要Flink集群] F2 --> F3[任务常驻运行] F3 --> F4[需要监控
Checkpoint/延迟/积压] F4 --> F5["故障恢复复杂
需要从Checkpoint恢复"] end style D1 fill:#4ecdc4 style D5 fill:#4ecdc4 style F1 fill:#ff6b6b style F5 fill:#ff6b6b
六、混合架构:两个都要!
在实际生产中,很多公司采用混合架构:批量用DataX,实时用Flink CDC。
6.1 经典混合架构
实时数仓] Flink --> Redis[Redis
实时指标] end subgraph 离线链路["📦 离线链路"] MySQL --> |"每日全量"|DataX[DataX] DataX --> HDFS[HDFS/Hive
ODS层] HDFS --> Spark[Spark
离线计算] Spark --> DW[数据仓库
DWD/DWS/ADS] end subgraph 应用层["🖥️ 应用层"] Doris --> |"实时报表"|App1[实时大屏] Redis --> |"实时指标"|App2[业务系统] DW --> |"离线报表"|App3[BI报表] end style CDC fill:#4ecdc4 style DataX fill:#ff6b6b
6.2 数据一致性保证
6.3 混合架构最佳实践
七、踩坑指南
7.1 DataX常见坑
useUnicode=true&characterEncoding=utf8"] P2["2. 时间类型错乱"] P2 --> S2["解决:时区配置
serverTimezone=Asia/Shanghai"] P3["3. 小文件问题"] P3 --> S3["解决:控制channel数
后置合并"] P4["4. 内存溢出"] P4 --> S4["解决:调整JVM参数
限制fetchSize"] P5["5. 数据库连接数爆满"] P5 --> S5["解决:控制channel数
使用连接池"] end style P1 fill:#ff6b6b style P2 fill:#ff6b6b style P3 fill:#ff6b6b style P4 fill:#ff6b6b style P5 fill:#ff6b6b
7.2 Flink CDC常见坑
增加并行度"] P2["2. Binlog被清理"] P2 --> S2["解决:增大expire_logs_days
尽快启动任务"] P3["3. Checkpoint失败"] P3 --> S3["解决:增大超时时间
检查Sink写入速度"] P4["4. 数据积压"] P4 --> S4["解决:增加并行度
优化Sink性能"] P5["5. DDL变更导致任务失败"] P5 --> S5["解决:谨慎DDL
使用兼容模式"] P6["6. Server ID冲突"] P6 --> S6["解决:多任务用不同server-id范围"] end style P1 fill:#ff6b6b style P2 fill:#ff6b6b style P3 fill:#ff6b6b style P4 fill:#ff6b6b style P5 fill:#ff6b6b style P6 fill:#ff6b6b
7.3 详细解决方案
DataX中文乱码
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [{
"jdbcUrl": ["jdbc:mysql://host:3306/db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"]
}]
}
}
}Flink CDC Checkpoint配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Checkpoint配置
env.enableCheckpointing(60000); // 60秒
env.getCheckpointConfig().setCheckpointTimeout(600000); // 10分钟超时
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 最小间隔30秒
// 重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 最多重启3次
Time.of(10, TimeUnit.SECONDS) // 每次间隔10秒
));Flink CDC Server ID配置
-- 多个CDC任务时,确保server-id范围不重叠
-- 任务1
CREATE TABLE cdc_table1 (...) WITH (
'connector' = 'mysql-cdc',
'server-id' = '5400-5404', -- 任务1用5400-5404
...
);
-- 任务2
CREATE TABLE cdc_table2 (...) WITH (
'connector' = 'mysql-cdc',
'server-id' = '5410-5414', -- 任务2用5410-5414
...
);八、监控与告警
8.1 DataX监控
#!/bin/bash
# datax_monitor.sh - DataX任务监控
LOG_FILE=$1
JOB_NAME=$2
# 检查任务是否成功
if grep -q "任务启动时刻" $LOG_FILE && grep -q "任务结束时刻" $LOG_FILE; then
# 提取关键指标
TOTAL_RECORDS=$(grep "读出记录总数" $LOG_FILE | awk -F':' '{print $2}' | tr -d ' ')
TOTAL_BYTES=$(grep "读写总字节数" $LOG_FILE | awk -F':' '{print $2}' | tr -d ' ')
SPEED=$(grep "速度" $LOG_FILE | awk -F':' '{print $2}' | tr -d ' ')
echo "✅ DataX任务成功: ${JOB_NAME}"
echo " 记录数: ${TOTAL_RECORDS}"
echo " 字节数: ${TOTAL_BYTES}"
echo " 速度: ${SPEED}"
else
echo "❌ DataX任务失败: ${JOB_NAME}"
# 发送告警
curl -X POST "https://alert.example.com/webhook" \
-H "Content-Type: application/json" \
-d "{\"job\": \"${JOB_NAME}\", \"status\": \"failed\"}"
fi8.2 Flink CDC监控
当前数据延迟] S2[sourceIdleTime
Source空闲时间] S3[numRecordsIn
输入记录数] end subgraph Checkpoint指标["Checkpoint指标"] C1[lastCheckpointDuration
上次Checkpoint耗时] C2[numberOfFailedCheckpoints
失败次数] C3[lastCheckpointSize
状态大小] end subgraph Sink指标["Sink指标"] K1[numRecordsOut
输出记录数] K2[numBytesOut
输出字节数] K3[currentSendTime
写入延迟] end end style S1 fill:#ff6b6b style C2 fill:#ff6b6b
Prometheus + Grafana监控配置:
# flink-conf.yaml
metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
metrics.reporter.promgateway.host: prometheus-pushgateway
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: flink-cdc
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
metrics.reporter.promgateway.interval: 30 SECONDS九、总结与选型建议
9.1 一图总结
9.2 选型决策表
| 场景 | 推荐工具 | 原因 |
|---|---|---|
| T+1数据仓库 | DataX | 简单可靠,批量性能好 |
| 实时数仓 | Flink CDC | 秒级延迟,完整变更 |
| 实时大屏 | Flink CDC | 低延迟要求 |
| 历史数据迁移 | DataX | 批量迁移效率高 |
| 数据库同步复制 | Flink CDC | 完整捕获变更 |
| 简单ETL | DataX | 上手快,运维简单 |
| 需要捕获DELETE | Flink CDC | 唯一选择 |
| 资源受限 | DataX | 按需使用资源 |
| 已有Flink集群 | Flink CDC | 复用现有资源 |
9.3 终极建议
十、写在最后
写到这里,我想起了一个段子:
面试官:"你们公司用什么做数据同步?"
候选人:"用Sqoop。"
面试官:"Sqoop已经退役了,你落伍了。"
候选人:"那DataX?"
面试官:"可以,但你们有实时需求吗?"
候选人:"有,用的Canal。"
面试官:"Canal只能同步MySQL,你们有其他数据源吗?"
候选人:"有Oracle..."
面试官:"那你应该用Flink CDC。"
候选人:"我们没有Flink集群..."
面试官:"那你可以考虑SeaTunnel..."
候选人:"......"
数据同步这个领域,工具层出不穷,没有银弹。选择适合自己场景的工具,比追逐最新技术更重要。
如果你问我的建议:
- 刚起步的团队:先用DataX,简单可靠
- 有实时需求:上Flink CDC,投资未来
- 两者都要:混合架构,各取所长
最后送大家一句话:
"工具是手段,数据是目的。不管黑猫白猫,能把数据同步对的就是好猫。"
祝大家数据同步顺利,永不丢数据!🎉
本文作者:一个在DataX和Flink CDC之间反复横跳的程序员
最惨经历:Flink CDC任务跑了一周,发现Binlog被清了,从头再来
江湖再见!⚡📦
附录:面试高频题
DataX相关
DataX的架构是什么?
Framework + Plugin架构。Framework负责调度、缓冲、限流,Plugin负责具体的读写。Reader和Writer之间通过Channel(内存队列)传输数据。
DataX如何实现并发?
通过splitPk将数据分片,每个TaskGroup处理一部分数据。Channel数量决定并发度。
DataX如何处理脏数据?
通过errorLimit配置容忍的脏数据量,超过阈值任务失败。脏数据会记录到日志。
Flink CDC相关
Flink CDC的工作原理是什么?
基于Debezium捕获数据库Binlog,将变更数据转换为Flink的数据流。支持全量快照+增量Binlog无缝衔接。
Flink CDC如何保证Exactly-Once?
通过Checkpoint机制。Source记录Binlog位点,Sink支持幂等或事务写入,从Checkpoint恢复时可以精确恢复状态。
Flink CDC全量阶段为什么不锁表?
使用无锁快照算法。通过记录Binlog位点和数据chunk的高低水位,实现一致性快照而不需要锁表。
对比相关
DataX和Flink CDC的核心区别是什么?
DataX是批量同步,基于SQL查询;Flink CDC是实时同步,基于Binlog。DataX无法捕获DELETE,Flink CDC可以。
什么场景用DataX,什么场景用Flink CDC?
T+1报表、历史数据迁移用DataX;实时数仓、需要捕获完整变更用Flink CDC。两者可以混合使用。