搜 索

Hudi从入门到放弃:增量数据湖的实现原理

  • 12阅读
  • 2023年06月24日
  • 0评论
首页 / AI/大数据 / 正文

一、前言:数据湖的困境

1.1 传统数据湖的痛点

graph TD A[传统数据湖痛点] --> B[不支持更新删除] A --> C[缺乏事务支持] A --> D[小文件问题] A --> E[没有Schema演进] B --> B1["HDFS/S3 只能追加
更新一条记录要重写整个分区"] C --> C1["写入失败?
脏数据就在那里"] D --> D1["实时入湖产生大量小文件
查询性能灾难"] E --> E1["加个字段?
历史数据怎么办?"]

1.2 Hudi 的诞生

timeline title Hudi 发展历程 2016 : Uber 内部开发 2017 : Uber 开源 2019 : 进入 Apache 孵化 2020 : 成为 Apache 顶级项目 2023 : 0.14.x 版本,功能成熟

Hudi = Hadoop Upserts Deletes and Incrementals

Uber 的工程师发现:打车订单需要频繁更新状态(下单→接单→进行中→完成),传统数据湖完全搞不定!

二、Hudi 核心概念

2.1 整体架构

graph TB subgraph "写入层" W1[Spark] W2[Flink] W3[DeltaStreamer] end subgraph "Hudi 表" subgraph "元数据" M1[Timeline
时间线] M2[Index
索引] end subgraph "数据文件" D1[Base Files
基础文件 Parquet] D2[Log Files
日志文件 Avro] end end subgraph "查询层" Q1[Spark SQL] Q2[Presto/Trino] Q3[Hive] Q4[Flink] end W1 & W2 & W3 --> M1 & M2 M1 & M2 --> D1 & D2 D1 & D2 --> Q1 & Q2 & Q3 & Q4

2.2 Timeline:时间线机制

graph LR subgraph "Hudi Timeline" T1["20240115100000
commit"] --> T2["20240115110000
commit"] T2 --> T3["20240115120000
commit"] T3 --> T4["20240115130000
compaction"] T4 --> T5["20240115140000
commit"] end

Timeline 记录了表的所有操作历史:

操作类型说明
commit数据写入完成
deltacommitMOR 表增量写入
compaction日志合并到基础文件
clean清理旧版本文件
rollback回滚操作
savepoint保存点

2.3 文件布局

hudi_table/
├── .hoodie/                      # 元数据目录
│   ├── hoodie.properties        # 表属性
│   ├── 20240115100000.commit    # 提交记录
│   ├── 20240115110000.commit
│   └── ...
├── 2024/01/15/                   # 分区目录
│   ├── file_1.parquet           # 基础文件
│   ├── file_2.parquet
│   ├── .file_1.log.1            # 日志文件 (MOR)
│   └── .file_1.log.2
└── 2024/01/16/
    └── ...

2.4 File Group 与 File Slice

graph TB subgraph "File Group (文件组)" subgraph "File Slice 3 (最新)" FS3_BASE[base_file_v3.parquet] FS3_LOG1[.log.1] FS3_LOG2[.log.2] end subgraph "File Slice 2" FS2_BASE[base_file_v2.parquet] end subgraph "File Slice 1 (最旧)" FS1_BASE[base_file_v1.parquet] end end FS1_BASE -->|Compaction| FS2_BASE -->|Compaction| FS3_BASE

核心概念:

概念说明
File Group一组相关文件,共享同一个 File ID
File SliceFile Group 在某个时刻的快照
Base FileParquet 格式的基础数据文件
Log FileAvro 格式的增量日志文件

三、两种表类型:COW vs MOR

3.1 Copy-On-Write (COW)

sequenceDiagram participant Writer participant Table participant Reader Note over Table: 初始状态:file_1.parquet (100条) Writer->>Table: 更新10条记录 Table->>Table: 读取 file_1.parquet Table->>Table: 合并更新 Table->>Table: 写入新文件 file_1_v2.parquet Note over Table: file_1_v2.parquet (100条,含更新) Reader->>Table: 查询 Table-->>Reader: 直接读取 file_1_v2.parquet Note over Reader: 查询性能最优

COW 特点:

graph LR subgraph "COW优点" A1["读取性能最好"] A2["直接读Parquet"] A3["无需合并"] end subgraph "COW缺点" B1["写入放大"] B2["改1条重写整个文件"] B3["写入延迟高"] end

3.2 Merge-On-Read (MOR)

sequenceDiagram participant Writer participant Table participant Reader Note over Table: 初始状态:file_1.parquet (100条) Writer->>Table: 更新10条记录 Table->>Table: 追加到日志文件 .file_1.log.1 Note over Table: file_1.parquet + .file_1.log.1 Writer->>Table: 再更新5条记录 Table->>Table: 追加到日志文件 .file_1.log.2 Note over Table: file_1.parquet + .log.1 + .log.2 Reader->>Table: 查询(读优化视图) Table-->>Reader: 只读 file_1.parquet Reader->>Table: 查询(实时视图) Table->>Table: 合并 parquet + logs Table-->>Reader: 返回最新数据

MOR 特点:

graph LR subgraph "MOR优点" A1["写入性能好"] A2["只追加日志"] A3["写入延迟低"] end subgraph "MOR缺点" B1["读取时需合并"] B2["实时视图性能略差"] B3["需要定期Compaction"] end

3.3 COW vs MOR 对比

xychart-beta title "COW vs MOR 性能对比" x-axis ["写入延迟", "写入吞吐", "查询延迟", "存储效率"] y-axis "得分" 0 --> 10 bar [3, 4, 9, 6] bar [9, 9, 6, 8]
维度COWMOR
写入延迟高(需重写文件)低(只追加日志)
写入吞吐
读取性能最优需合并,略慢
存储效率一般(多版本)较好
适用场景读多写少写多读少

3.4 选型建议

flowchart TD A[选择表类型] --> B{写入频率?} B -->|低频批量写入
每天几次| C[COW] B -->|高频实时写入
每分钟/每秒| D[MOR] C --> C1["适合:
离线数仓
T+1 ETL"] D --> D1["适合:
实时数仓
CDC 场景"] style C fill:#4ecdc4 style D fill:#ff6b6b

四、索引机制

4.1 为什么需要索引?

graph TD A["问题:如何快速定位
一条记录在哪个文件?"] --> B{没有索引} B --> C["扫描所有文件
查找 record_key"] C --> D["性能灾难 💀"] A --> E{有索引} E --> F["索引直接告诉你
记录在哪个 File Group"] F --> G["直接定位,高效 🚀"]

4.2 索引类型

graph TB subgraph "Hudi索引类型" A[Bloom Index] --> A1["基于布隆过滤器
适合点查"] B[Simple Index] --> B1["基于 Join
适合批量更新"] C[HBase Index] --> C1["外部存储
适合大规模"] D[Bucket Index] --> D1["基于分桶
适合高吞吐"] E[Record Index] --> E1["全局索引
最精确"] end

4.3 Bloom Index 原理

graph LR subgraph "写入时" W1[新记录] --> W2[计算 Bloom Filter] W2 --> W3[写入文件 Footer] end subgraph "更新时" U1[待更新记录] --> U2[检查各文件的 Bloom] U2 --> U3{可能存在?} U3 -->|是| U4[精确查找] U3 -->|否| U5[跳过该文件] end

Bloom Index 配置:

# Bloom Filter 配置
hoodie.bloom.index.filter.type=DYNAMIC_V0
hoodie.bloom.index.filter.dynamic.max.entries=100000
hoodie.index.bloom.num_entries=60000
hoodie.index.bloom.fpp=0.000000001

4.4 索引选择建议

索引类型适用场景优点缺点
Bloom默认选择,点查更新无外部依赖大数据量时变慢
Simple批量更新简单可靠需要 Shuffle
HBase超大规模扩展性好需维护 HBase
Bucket高吞吐写入无需查找需预先分桶
Record精确去重最准确存储开销大

五、增量查询:Hudi 的杀手锏

5.1 三种查询模式

graph TB subgraph "Hudi查询模式" A[Snapshot Query
快照查询] --> A1["读取最新完整数据
类似普通表查询"] B[Incremental Query
增量查询] --> B1["只读取变化的数据
支持指定时间范围"] C[Read Optimized Query
读优化查询] --> C1["只读Base文件
MOR表专用,更快"] end

5.2 增量查询示例

sequenceDiagram participant Table as Hudi表 participant Consumer as 下游消费者 Note over Table: Commit 1: 10:00 Note over Table: Commit 2: 11:00 Note over Table: Commit 3: 12:00 Consumer->>Table: 增量查询 (10:00 - 11:00) Table-->>Consumer: 返回 Commit 2 的变化 Consumer->>Consumer: 处理数据 Consumer->>Consumer: 记录位点 = 11:00 Consumer->>Table: 增量查询 (11:00 - 12:00) Table-->>Consumer: 返回 Commit 3 的变化

Spark 增量查询代码:

// 增量读取 Hudi 表
val incrementalDF = spark.read
  .format("hudi")
  .option("hoodie.datasource.query.type", "incremental")
  .option("hoodie.datasource.read.begin.instanttime", "20240115100000")
  .option("hoodie.datasource.read.end.instanttime", "20240115120000")
  .load("/path/to/hudi_table")

// 处理增量数据
incrementalDF.show()

// 获取本次消费的最新位点
val latestCommit = incrementalDF
  .select("_hoodie_commit_time")
  .agg(max("_hoodie_commit_time"))
  .first().getString(0)

Flink 增量消费:

-- Flink SQL 流式读取 Hudi
CREATE TABLE hudi_orders (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10,2),
    order_time TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'hudi',
    'path' = 's3://bucket/orders',
    'table.type' = 'MERGE_ON_READ',
    'read.streaming.enabled' = 'true',
    'read.streaming.check-interval' = '10'  -- 10秒检查一次新数据
);

-- 实时消费变化数据
SELECT * FROM hudi_orders;

5.3 增量 ETL 架构

graph LR subgraph "增量ETL" A[ODS层 Hudi表] -->|增量查询| B[Flink/Spark] B -->|增量写入| C[DWD层 Hudi表] C -->|增量查询| D[Flink/Spark] D -->|增量写入| E[DWS层 Hudi表] end subgraph "传统ETL" F[ODS层] -->|全量读取| G[计算] G -->|全量覆盖| H[DWD层] end

六、Compaction 与 Clustering

6.1 Compaction(压缩合并)

graph LR subgraph "Compaction前" A1[base_file.parquet
100条] A2[.log.1
10条更新] A3[.log.2
5条更新] end subgraph "Compaction后" B1[new_base_file.parquet
100条最新数据] end A1 & A2 & A3 -->|Compaction| B1

Compaction 策略:

# 同步 Compaction(写入时执行)
hoodie.compact.inline=true
hoodie.compact.inline.max.delta.commits=5

# 异步 Compaction(独立任务执行)
hoodie.compact.inline=false
# 需要单独运行 Compaction 任务

6.2 Clustering(聚簇优化)

graph TB subgraph "Clustering前" A1[file_1: user_id 1,5,9,3] A2[file_2: user_id 2,8,4,7] A3[file_3: user_id 6,10,11,12] end subgraph "Clustering后 (按user_id排序)" B1[file_1: user_id 1,2,3,4] B2[file_2: user_id 5,6,7,8] B3[file_3: user_id 9,10,11,12] end A1 & A2 & A3 -->|Clustering| B1 & B2 & B3

Clustering 的好处:

graph LR A[Clustering优化] --> B[数据局部性] A --> C[查询性能] A --> D[压缩效果] B --> B1["相似数据在一起"] C --> C1["范围查询更快
跳过无关文件"] D --> D1["同类数据压缩率高"]

6.3 Clean(清理旧版本)

sequenceDiagram participant Table participant Clean Note over Table: Commit 1, 2, 3, 4, 5 (保留3个版本) Clean->>Table: 执行清理 Table->>Table: 删除 Commit 1, 2 的文件 Note over Table: 只保留 Commit 3, 4, 5
# 清理配置
hoodie.cleaner.policy=KEEP_LATEST_COMMITS
hoodie.cleaner.commits.retained=3

# 或者按时间保留
hoodie.cleaner.policy=KEEP_LATEST_BY_HOURS
hoodie.cleaner.hours.retained=24

七、实战:Hudi 表操作

7.1 创建 Hudi 表

// Spark 创建 Hudi 表
import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_orders"
val basePath = "s3://bucket/hudi/orders"

// 初始数据
val initialDF = spark.createDataFrame(Seq(
  (1L, 1001L, 100.00, "2024-01-15 10:00:00"),
  (2L, 1002L, 200.00, "2024-01-15 10:01:00"),
  (3L, 1003L, 150.00, "2024-01-15 10:02:00")
)).toDF("order_id", "user_id", "amount", "order_time")

// 写入 Hudi 表
initialDF.write
  .format("hudi")
  .options(Map(
    TABLE_NAME.key -> tableName,
    RECORDKEY_FIELD.key -> "order_id",           // 主键
    PRECOMBINE_FIELD.key -> "order_time",        // 预合并字段
    PARTITIONPATH_FIELD.key -> "",               // 分区字段(空=不分区)
    TABLE_TYPE.key -> "MERGE_ON_READ",           // 表类型
    "hoodie.datasource.write.operation" -> "upsert"
  ))
  .mode("overwrite")
  .save(basePath)

7.2 Upsert 操作

// 更新数据(Upsert)
val updateDF = spark.createDataFrame(Seq(
  (1L, 1001L, 120.00, "2024-01-15 11:00:00"),  // 更新
  (4L, 1004L, 300.00, "2024-01-15 11:01:00")   // 新增
)).toDF("order_id", "user_id", "amount", "order_time")

updateDF.write
  .format("hudi")
  .options(Map(
    TABLE_NAME.key -> tableName,
    RECORDKEY_FIELD.key -> "order_id",
    PRECOMBINE_FIELD.key -> "order_time",
    "hoodie.datasource.write.operation" -> "upsert"
  ))
  .mode("append")
  .save(basePath)

7.3 Delete 操作

// 删除数据
val deleteDF = spark.createDataFrame(Seq(
  (2L, 1002L, 0.00, "2024-01-15 12:00:00")  // 要删除的记录
)).toDF("order_id", "user_id", "amount", "order_time")

deleteDF.write
  .format("hudi")
  .options(Map(
    TABLE_NAME.key -> tableName,
    RECORDKEY_FIELD.key -> "order_id",
    PRECOMBINE_FIELD.key -> "order_time",
    "hoodie.datasource.write.operation" -> "delete"
  ))
  .mode("append")
  .save(basePath)

7.4 时间旅行查询

// 查询某个时间点的快照
val historicalDF = spark.read
  .format("hudi")
  .option("as.of.instant", "20240115100000")  // 指定时间点
  .load(basePath)

historicalDF.show()

// 查询两个时间点之间的变化
val changesDF = spark.read
  .format("hudi")
  .option("hoodie.datasource.query.type", "incremental")
  .option("hoodie.datasource.read.begin.instanttime", "20240115100000")
  .option("hoodie.datasource.read.end.instanttime", "20240115120000")
  .load(basePath)

八、Hudi + Flink 实时入湖

8.1 架构图

graph LR subgraph "数据源" A[MySQL] -->|CDC| B[Debezium] B --> C[Kafka] end subgraph "实时处理" C --> D[Flink] end subgraph "数据湖" D --> E[Hudi MOR表] E --> F[(S3/HDFS)] end subgraph "查询" F --> G[Trino/Spark] end

8.2 Flink SQL 示例

-- 1. 创建 Kafka 源表(CDC 数据)
CREATE TABLE kafka_orders (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10,2),
    order_status STRING,
    order_time TIMESTAMP(3),
    op_type STRING,  -- CDC 操作类型
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders_cdc',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'debezium-json'
);

-- 2. 创建 Hudi Sink 表
CREATE TABLE hudi_orders (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10,2),
    order_status STRING,
    order_time TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'hudi',
    'path' = 's3://bucket/hudi/orders',
    'table.type' = 'MERGE_ON_READ',
    'hoodie.datasource.write.recordkey.field' = 'order_id',
    'hoodie.datasource.write.precombine.field' = 'order_time',
    'write.operation' = 'upsert',
    'write.tasks' = '4',
    'compaction.tasks' = '2',
    'compaction.async.enabled' = 'true',
    'compaction.delta_commits' = '5'
);

-- 3. 实时写入
INSERT INTO hudi_orders
SELECT order_id, user_id, amount, order_status, order_time
FROM kafka_orders;

九、最佳实践

9.1 配置优化

mindmap root((Hudi优化)) 写入优化 合理设置并行度 批量写入大小 索引类型选择 存储优化 文件大小控制 分区策略 压缩算法 查询优化 Compaction策略 Clustering 元数据表

9.2 关键配置

# ===== 写入配置 =====
# 目标文件大小
hoodie.parquet.max.file.size=134217728  # 128MB
hoodie.parquet.small.file.limit=104857600  # 100MB

# 写入并行度
hoodie.insert.shuffle.parallelism=200
hoodie.upsert.shuffle.parallelism=200

# ===== 索引配置 =====
hoodie.index.type=BLOOM
hoodie.bloom.index.parallelism=200

# ===== Compaction 配置 =====
hoodie.compact.inline=false
hoodie.compact.inline.max.delta.commits=5
hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy

# ===== 清理配置 =====
hoodie.cleaner.policy=KEEP_LATEST_COMMITS
hoodie.cleaner.commits.retained=10

9.3 常见问题

flowchart TD A[常见问题] --> B{写入慢?} A --> C{小文件多?} A --> D{查询慢?} B --> B1["检查索引类型
考虑Bucket Index"] B --> B2["增加并行度"] B --> B3["使用MOR表类型"] C --> C1["增大文件大小阈值"] C --> C2["开启Clustering"] C --> C3["合理设置Compaction"] D --> D1["检查是否需要Compaction"] D --> D2["使用读优化视图"] D --> D3["开启元数据表"]

十、总结

10.1 Hudi 核心价值

graph LR A[Hudi核心价值] --> B[增量处理] A --> C[ACID事务] A --> D[时间旅行] A --> E[Schema演进] B --> B1["告别全量重刷"] C --> C1["写入失败可回滚"] D --> D1["历史数据可追溯"] E --> E1["加字段不怕"]

10.2 适用场景

场景推荐度说明
CDC 实时入湖⭐⭐⭐⭐⭐Hudi 的主战场
增量 ETL⭐⭐⭐⭐⭐增量查询能力强
数据更正/删除⭐⭐⭐⭐⭐GDPR 合规必备
时间旅行/审计⭐⭐⭐⭐Timeline 支持
纯追加场景⭐⭐⭐可以,但非必须

10.3 一句话总结

Hudi = 让数据湖支持增删改 + 增量处理 + 时间旅行

如果你需要在数据湖上实现实时更新和增量消费,Hudi 是目前最成熟的选择之一。

评论区
暂无评论
avatar