一、前言:数据湖的困境
1.1 传统数据湖的痛点
graph TD
A[传统数据湖痛点] --> B[不支持更新删除]
A --> C[缺乏事务支持]
A --> D[小文件问题]
A --> E[没有Schema演进]
B --> B1["HDFS/S3 只能追加
更新一条记录要重写整个分区"] C --> C1["写入失败?
脏数据就在那里"] D --> D1["实时入湖产生大量小文件
查询性能灾难"] E --> E1["加个字段?
历史数据怎么办?"]
更新一条记录要重写整个分区"] C --> C1["写入失败?
脏数据就在那里"] D --> D1["实时入湖产生大量小文件
查询性能灾难"] E --> E1["加个字段?
历史数据怎么办?"]
1.2 Hudi 的诞生
timeline
title Hudi 发展历程
2016 : Uber 内部开发
2017 : Uber 开源
2019 : 进入 Apache 孵化
2020 : 成为 Apache 顶级项目
2023 : 0.14.x 版本,功能成熟
Hudi = Hadoop Upserts Deletes and Incrementals
Uber 的工程师发现:打车订单需要频繁更新状态(下单→接单→进行中→完成),传统数据湖完全搞不定!
二、Hudi 核心概念
2.1 整体架构
graph TB
subgraph "写入层"
W1[Spark]
W2[Flink]
W3[DeltaStreamer]
end
subgraph "Hudi 表"
subgraph "元数据"
M1[Timeline
时间线] M2[Index
索引] end subgraph "数据文件" D1[Base Files
基础文件 Parquet] D2[Log Files
日志文件 Avro] end end subgraph "查询层" Q1[Spark SQL] Q2[Presto/Trino] Q3[Hive] Q4[Flink] end W1 & W2 & W3 --> M1 & M2 M1 & M2 --> D1 & D2 D1 & D2 --> Q1 & Q2 & Q3 & Q4
时间线] M2[Index
索引] end subgraph "数据文件" D1[Base Files
基础文件 Parquet] D2[Log Files
日志文件 Avro] end end subgraph "查询层" Q1[Spark SQL] Q2[Presto/Trino] Q3[Hive] Q4[Flink] end W1 & W2 & W3 --> M1 & M2 M1 & M2 --> D1 & D2 D1 & D2 --> Q1 & Q2 & Q3 & Q4
2.2 Timeline:时间线机制
graph LR
subgraph "Hudi Timeline"
T1["20240115100000
commit"] --> T2["20240115110000
commit"] T2 --> T3["20240115120000
commit"] T3 --> T4["20240115130000
compaction"] T4 --> T5["20240115140000
commit"] end
commit"] --> T2["20240115110000
commit"] T2 --> T3["20240115120000
commit"] T3 --> T4["20240115130000
compaction"] T4 --> T5["20240115140000
commit"] end
Timeline 记录了表的所有操作历史:
| 操作类型 | 说明 |
|---|---|
| commit | 数据写入完成 |
| deltacommit | MOR 表增量写入 |
| compaction | 日志合并到基础文件 |
| clean | 清理旧版本文件 |
| rollback | 回滚操作 |
| savepoint | 保存点 |
2.3 文件布局
hudi_table/
├── .hoodie/ # 元数据目录
│ ├── hoodie.properties # 表属性
│ ├── 20240115100000.commit # 提交记录
│ ├── 20240115110000.commit
│ └── ...
├── 2024/01/15/ # 分区目录
│ ├── file_1.parquet # 基础文件
│ ├── file_2.parquet
│ ├── .file_1.log.1 # 日志文件 (MOR)
│ └── .file_1.log.2
└── 2024/01/16/
└── ...2.4 File Group 与 File Slice
graph TB
subgraph "File Group (文件组)"
subgraph "File Slice 3 (最新)"
FS3_BASE[base_file_v3.parquet]
FS3_LOG1[.log.1]
FS3_LOG2[.log.2]
end
subgraph "File Slice 2"
FS2_BASE[base_file_v2.parquet]
end
subgraph "File Slice 1 (最旧)"
FS1_BASE[base_file_v1.parquet]
end
end
FS1_BASE -->|Compaction| FS2_BASE -->|Compaction| FS3_BASE
核心概念:
| 概念 | 说明 |
|---|---|
| File Group | 一组相关文件,共享同一个 File ID |
| File Slice | File Group 在某个时刻的快照 |
| Base File | Parquet 格式的基础数据文件 |
| Log File | Avro 格式的增量日志文件 |
三、两种表类型:COW vs MOR
3.1 Copy-On-Write (COW)
sequenceDiagram
participant Writer
participant Table
participant Reader
Note over Table: 初始状态:file_1.parquet (100条)
Writer->>Table: 更新10条记录
Table->>Table: 读取 file_1.parquet
Table->>Table: 合并更新
Table->>Table: 写入新文件 file_1_v2.parquet
Note over Table: file_1_v2.parquet (100条,含更新)
Reader->>Table: 查询
Table-->>Reader: 直接读取 file_1_v2.parquet
Note over Reader: 查询性能最优
COW 特点:
graph LR
subgraph "COW优点"
A1["读取性能最好"]
A2["直接读Parquet"]
A3["无需合并"]
end
subgraph "COW缺点"
B1["写入放大"]
B2["改1条重写整个文件"]
B3["写入延迟高"]
end
3.2 Merge-On-Read (MOR)
sequenceDiagram
participant Writer
participant Table
participant Reader
Note over Table: 初始状态:file_1.parquet (100条)
Writer->>Table: 更新10条记录
Table->>Table: 追加到日志文件 .file_1.log.1
Note over Table: file_1.parquet + .file_1.log.1
Writer->>Table: 再更新5条记录
Table->>Table: 追加到日志文件 .file_1.log.2
Note over Table: file_1.parquet + .log.1 + .log.2
Reader->>Table: 查询(读优化视图)
Table-->>Reader: 只读 file_1.parquet
Reader->>Table: 查询(实时视图)
Table->>Table: 合并 parquet + logs
Table-->>Reader: 返回最新数据
MOR 特点:
graph LR
subgraph "MOR优点"
A1["写入性能好"]
A2["只追加日志"]
A3["写入延迟低"]
end
subgraph "MOR缺点"
B1["读取时需合并"]
B2["实时视图性能略差"]
B3["需要定期Compaction"]
end
3.3 COW vs MOR 对比
xychart-beta
title "COW vs MOR 性能对比"
x-axis ["写入延迟", "写入吞吐", "查询延迟", "存储效率"]
y-axis "得分" 0 --> 10
bar [3, 4, 9, 6]
bar [9, 9, 6, 8]
| 维度 | COW | MOR |
|---|---|---|
| 写入延迟 | 高(需重写文件) | 低(只追加日志) |
| 写入吞吐 | 低 | 高 |
| 读取性能 | 最优 | 需合并,略慢 |
| 存储效率 | 一般(多版本) | 较好 |
| 适用场景 | 读多写少 | 写多读少 |
3.4 选型建议
flowchart TD
A[选择表类型] --> B{写入频率?}
B -->|低频批量写入
每天几次| C[COW] B -->|高频实时写入
每分钟/每秒| D[MOR] C --> C1["适合:
离线数仓
T+1 ETL"] D --> D1["适合:
实时数仓
CDC 场景"] style C fill:#4ecdc4 style D fill:#ff6b6b
每天几次| C[COW] B -->|高频实时写入
每分钟/每秒| D[MOR] C --> C1["适合:
离线数仓
T+1 ETL"] D --> D1["适合:
实时数仓
CDC 场景"] style C fill:#4ecdc4 style D fill:#ff6b6b
四、索引机制
4.1 为什么需要索引?
graph TD
A["问题:如何快速定位
一条记录在哪个文件?"] --> B{没有索引} B --> C["扫描所有文件
查找 record_key"] C --> D["性能灾难 💀"] A --> E{有索引} E --> F["索引直接告诉你
记录在哪个 File Group"] F --> G["直接定位,高效 🚀"]
一条记录在哪个文件?"] --> B{没有索引} B --> C["扫描所有文件
查找 record_key"] C --> D["性能灾难 💀"] A --> E{有索引} E --> F["索引直接告诉你
记录在哪个 File Group"] F --> G["直接定位,高效 🚀"]
4.2 索引类型
graph TB
subgraph "Hudi索引类型"
A[Bloom Index] --> A1["基于布隆过滤器
适合点查"] B[Simple Index] --> B1["基于 Join
适合批量更新"] C[HBase Index] --> C1["外部存储
适合大规模"] D[Bucket Index] --> D1["基于分桶
适合高吞吐"] E[Record Index] --> E1["全局索引
最精确"] end
适合点查"] B[Simple Index] --> B1["基于 Join
适合批量更新"] C[HBase Index] --> C1["外部存储
适合大规模"] D[Bucket Index] --> D1["基于分桶
适合高吞吐"] E[Record Index] --> E1["全局索引
最精确"] end
4.3 Bloom Index 原理
graph LR
subgraph "写入时"
W1[新记录] --> W2[计算 Bloom Filter]
W2 --> W3[写入文件 Footer]
end
subgraph "更新时"
U1[待更新记录] --> U2[检查各文件的 Bloom]
U2 --> U3{可能存在?}
U3 -->|是| U4[精确查找]
U3 -->|否| U5[跳过该文件]
end
Bloom Index 配置:
# Bloom Filter 配置
hoodie.bloom.index.filter.type=DYNAMIC_V0
hoodie.bloom.index.filter.dynamic.max.entries=100000
hoodie.index.bloom.num_entries=60000
hoodie.index.bloom.fpp=0.0000000014.4 索引选择建议
| 索引类型 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| Bloom | 默认选择,点查更新 | 无外部依赖 | 大数据量时变慢 |
| Simple | 批量更新 | 简单可靠 | 需要 Shuffle |
| HBase | 超大规模 | 扩展性好 | 需维护 HBase |
| Bucket | 高吞吐写入 | 无需查找 | 需预先分桶 |
| Record | 精确去重 | 最准确 | 存储开销大 |
五、增量查询:Hudi 的杀手锏
5.1 三种查询模式
graph TB
subgraph "Hudi查询模式"
A[Snapshot Query
快照查询] --> A1["读取最新完整数据
类似普通表查询"] B[Incremental Query
增量查询] --> B1["只读取变化的数据
支持指定时间范围"] C[Read Optimized Query
读优化查询] --> C1["只读Base文件
MOR表专用,更快"] end
快照查询] --> A1["读取最新完整数据
类似普通表查询"] B[Incremental Query
增量查询] --> B1["只读取变化的数据
支持指定时间范围"] C[Read Optimized Query
读优化查询] --> C1["只读Base文件
MOR表专用,更快"] end
5.2 增量查询示例
sequenceDiagram
participant Table as Hudi表
participant Consumer as 下游消费者
Note over Table: Commit 1: 10:00
Note over Table: Commit 2: 11:00
Note over Table: Commit 3: 12:00
Consumer->>Table: 增量查询 (10:00 - 11:00)
Table-->>Consumer: 返回 Commit 2 的变化
Consumer->>Consumer: 处理数据
Consumer->>Consumer: 记录位点 = 11:00
Consumer->>Table: 增量查询 (11:00 - 12:00)
Table-->>Consumer: 返回 Commit 3 的变化
Spark 增量查询代码:
// 增量读取 Hudi 表
val incrementalDF = spark.read
.format("hudi")
.option("hoodie.datasource.query.type", "incremental")
.option("hoodie.datasource.read.begin.instanttime", "20240115100000")
.option("hoodie.datasource.read.end.instanttime", "20240115120000")
.load("/path/to/hudi_table")
// 处理增量数据
incrementalDF.show()
// 获取本次消费的最新位点
val latestCommit = incrementalDF
.select("_hoodie_commit_time")
.agg(max("_hoodie_commit_time"))
.first().getString(0)Flink 增量消费:
-- Flink SQL 流式读取 Hudi
CREATE TABLE hudi_orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10,2),
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 's3://bucket/orders',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '10' -- 10秒检查一次新数据
);
-- 实时消费变化数据
SELECT * FROM hudi_orders;5.3 增量 ETL 架构
graph LR
subgraph "增量ETL"
A[ODS层 Hudi表] -->|增量查询| B[Flink/Spark]
B -->|增量写入| C[DWD层 Hudi表]
C -->|增量查询| D[Flink/Spark]
D -->|增量写入| E[DWS层 Hudi表]
end
subgraph "传统ETL"
F[ODS层] -->|全量读取| G[计算]
G -->|全量覆盖| H[DWD层]
end
六、Compaction 与 Clustering
6.1 Compaction(压缩合并)
graph LR
subgraph "Compaction前"
A1[base_file.parquet
100条] A2[.log.1
10条更新] A3[.log.2
5条更新] end subgraph "Compaction后" B1[new_base_file.parquet
100条最新数据] end A1 & A2 & A3 -->|Compaction| B1
100条] A2[.log.1
10条更新] A3[.log.2
5条更新] end subgraph "Compaction后" B1[new_base_file.parquet
100条最新数据] end A1 & A2 & A3 -->|Compaction| B1
Compaction 策略:
# 同步 Compaction(写入时执行)
hoodie.compact.inline=true
hoodie.compact.inline.max.delta.commits=5
# 异步 Compaction(独立任务执行)
hoodie.compact.inline=false
# 需要单独运行 Compaction 任务6.2 Clustering(聚簇优化)
graph TB
subgraph "Clustering前"
A1[file_1: user_id 1,5,9,3]
A2[file_2: user_id 2,8,4,7]
A3[file_3: user_id 6,10,11,12]
end
subgraph "Clustering后 (按user_id排序)"
B1[file_1: user_id 1,2,3,4]
B2[file_2: user_id 5,6,7,8]
B3[file_3: user_id 9,10,11,12]
end
A1 & A2 & A3 -->|Clustering| B1 & B2 & B3
Clustering 的好处:
graph LR
A[Clustering优化] --> B[数据局部性]
A --> C[查询性能]
A --> D[压缩效果]
B --> B1["相似数据在一起"]
C --> C1["范围查询更快
跳过无关文件"] D --> D1["同类数据压缩率高"]
跳过无关文件"] D --> D1["同类数据压缩率高"]
6.3 Clean(清理旧版本)
sequenceDiagram
participant Table
participant Clean
Note over Table: Commit 1, 2, 3, 4, 5 (保留3个版本)
Clean->>Table: 执行清理
Table->>Table: 删除 Commit 1, 2 的文件
Note over Table: 只保留 Commit 3, 4, 5
# 清理配置
hoodie.cleaner.policy=KEEP_LATEST_COMMITS
hoodie.cleaner.commits.retained=3
# 或者按时间保留
hoodie.cleaner.policy=KEEP_LATEST_BY_HOURS
hoodie.cleaner.hours.retained=24七、实战:Hudi 表操作
7.1 创建 Hudi 表
// Spark 创建 Hudi 表
import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_orders"
val basePath = "s3://bucket/hudi/orders"
// 初始数据
val initialDF = spark.createDataFrame(Seq(
(1L, 1001L, 100.00, "2024-01-15 10:00:00"),
(2L, 1002L, 200.00, "2024-01-15 10:01:00"),
(3L, 1003L, 150.00, "2024-01-15 10:02:00")
)).toDF("order_id", "user_id", "amount", "order_time")
// 写入 Hudi 表
initialDF.write
.format("hudi")
.options(Map(
TABLE_NAME.key -> tableName,
RECORDKEY_FIELD.key -> "order_id", // 主键
PRECOMBINE_FIELD.key -> "order_time", // 预合并字段
PARTITIONPATH_FIELD.key -> "", // 分区字段(空=不分区)
TABLE_TYPE.key -> "MERGE_ON_READ", // 表类型
"hoodie.datasource.write.operation" -> "upsert"
))
.mode("overwrite")
.save(basePath)7.2 Upsert 操作
// 更新数据(Upsert)
val updateDF = spark.createDataFrame(Seq(
(1L, 1001L, 120.00, "2024-01-15 11:00:00"), // 更新
(4L, 1004L, 300.00, "2024-01-15 11:01:00") // 新增
)).toDF("order_id", "user_id", "amount", "order_time")
updateDF.write
.format("hudi")
.options(Map(
TABLE_NAME.key -> tableName,
RECORDKEY_FIELD.key -> "order_id",
PRECOMBINE_FIELD.key -> "order_time",
"hoodie.datasource.write.operation" -> "upsert"
))
.mode("append")
.save(basePath)7.3 Delete 操作
// 删除数据
val deleteDF = spark.createDataFrame(Seq(
(2L, 1002L, 0.00, "2024-01-15 12:00:00") // 要删除的记录
)).toDF("order_id", "user_id", "amount", "order_time")
deleteDF.write
.format("hudi")
.options(Map(
TABLE_NAME.key -> tableName,
RECORDKEY_FIELD.key -> "order_id",
PRECOMBINE_FIELD.key -> "order_time",
"hoodie.datasource.write.operation" -> "delete"
))
.mode("append")
.save(basePath)7.4 时间旅行查询
// 查询某个时间点的快照
val historicalDF = spark.read
.format("hudi")
.option("as.of.instant", "20240115100000") // 指定时间点
.load(basePath)
historicalDF.show()
// 查询两个时间点之间的变化
val changesDF = spark.read
.format("hudi")
.option("hoodie.datasource.query.type", "incremental")
.option("hoodie.datasource.read.begin.instanttime", "20240115100000")
.option("hoodie.datasource.read.end.instanttime", "20240115120000")
.load(basePath)八、Hudi + Flink 实时入湖
8.1 架构图
graph LR
subgraph "数据源"
A[MySQL] -->|CDC| B[Debezium]
B --> C[Kafka]
end
subgraph "实时处理"
C --> D[Flink]
end
subgraph "数据湖"
D --> E[Hudi MOR表]
E --> F[(S3/HDFS)]
end
subgraph "查询"
F --> G[Trino/Spark]
end
8.2 Flink SQL 示例
-- 1. 创建 Kafka 源表(CDC 数据)
CREATE TABLE kafka_orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10,2),
order_status STRING,
order_time TIMESTAMP(3),
op_type STRING, -- CDC 操作类型
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders_cdc',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json'
);
-- 2. 创建 Hudi Sink 表
CREATE TABLE hudi_orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10,2),
order_status STRING,
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 's3://bucket/hudi/orders',
'table.type' = 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field' = 'order_id',
'hoodie.datasource.write.precombine.field' = 'order_time',
'write.operation' = 'upsert',
'write.tasks' = '4',
'compaction.tasks' = '2',
'compaction.async.enabled' = 'true',
'compaction.delta_commits' = '5'
);
-- 3. 实时写入
INSERT INTO hudi_orders
SELECT order_id, user_id, amount, order_status, order_time
FROM kafka_orders;九、最佳实践
9.1 配置优化
mindmap
root((Hudi优化))
写入优化
合理设置并行度
批量写入大小
索引类型选择
存储优化
文件大小控制
分区策略
压缩算法
查询优化
Compaction策略
Clustering
元数据表
9.2 关键配置
# ===== 写入配置 =====
# 目标文件大小
hoodie.parquet.max.file.size=134217728 # 128MB
hoodie.parquet.small.file.limit=104857600 # 100MB
# 写入并行度
hoodie.insert.shuffle.parallelism=200
hoodie.upsert.shuffle.parallelism=200
# ===== 索引配置 =====
hoodie.index.type=BLOOM
hoodie.bloom.index.parallelism=200
# ===== Compaction 配置 =====
hoodie.compact.inline=false
hoodie.compact.inline.max.delta.commits=5
hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy
# ===== 清理配置 =====
hoodie.cleaner.policy=KEEP_LATEST_COMMITS
hoodie.cleaner.commits.retained=109.3 常见问题
flowchart TD
A[常见问题] --> B{写入慢?}
A --> C{小文件多?}
A --> D{查询慢?}
B --> B1["检查索引类型
考虑Bucket Index"] B --> B2["增加并行度"] B --> B3["使用MOR表类型"] C --> C1["增大文件大小阈值"] C --> C2["开启Clustering"] C --> C3["合理设置Compaction"] D --> D1["检查是否需要Compaction"] D --> D2["使用读优化视图"] D --> D3["开启元数据表"]
考虑Bucket Index"] B --> B2["增加并行度"] B --> B3["使用MOR表类型"] C --> C1["增大文件大小阈值"] C --> C2["开启Clustering"] C --> C3["合理设置Compaction"] D --> D1["检查是否需要Compaction"] D --> D2["使用读优化视图"] D --> D3["开启元数据表"]
十、总结
10.1 Hudi 核心价值
graph LR
A[Hudi核心价值] --> B[增量处理]
A --> C[ACID事务]
A --> D[时间旅行]
A --> E[Schema演进]
B --> B1["告别全量重刷"]
C --> C1["写入失败可回滚"]
D --> D1["历史数据可追溯"]
E --> E1["加字段不怕"]
10.2 适用场景
| 场景 | 推荐度 | 说明 |
|---|---|---|
| CDC 实时入湖 | ⭐⭐⭐⭐⭐ | Hudi 的主战场 |
| 增量 ETL | ⭐⭐⭐⭐⭐ | 增量查询能力强 |
| 数据更正/删除 | ⭐⭐⭐⭐⭐ | GDPR 合规必备 |
| 时间旅行/审计 | ⭐⭐⭐⭐ | Timeline 支持 |
| 纯追加场景 | ⭐⭐⭐ | 可以,但非必须 |
10.3 一句话总结
Hudi = 让数据湖支持增删改 + 增量处理 + 时间旅行
如果你需要在数据湖上实现实时更新和增量消费,Hudi 是目前最成熟的选择之一。