一、前言:为什么需要 Iceberg?
1.1 Hive 的历史包袱
graph TD
A[Hive表的问题] --> B[元数据瓶颈]
A --> C[分区爆炸]
A --> D[Schema演进困难]
A --> E[无法安全并发写入]
B --> B1["Hive Metastore
百万分区就崩溃"] C --> C1["按天+按小时分区
一年就几十万分区"] D --> D1["改个字段类型?
全部重刷吧"] E --> E1["两个Job同时写?
数据可能乱"]
百万分区就崩溃"] C --> C1["按天+按小时分区
一年就几十万分区"] D --> D1["改个字段类型?
全部重刷吧"] E --> E1["两个Job同时写?
数据可能乱"]
1.2 Netflix 的挑战
graph LR
A[Netflix数据规模] --> B["PB级数据"]
A --> C["数十万张表"]
A --> D["每天数百万查询"]
B --> E["Hive搞不定"]
C --> E
D --> E
E --> F["自研 Iceberg"]
1.3 Iceberg 发展历程
timeline
title Iceberg 发展史
2017 : Netflix 内部开发
2018 : Netflix 开源
2018 : 进入 Apache 孵化
2020 : 成为 Apache 顶级项目
2023 : 1.3+ 版本,成为行业标准
二、Iceberg 核心设计
2.1 表格式 vs 存储格式
graph TB
subgraph "存储格式 (File Format)"
F1[Parquet]
F2[ORC]
F3[Avro]
end
subgraph "表格式 (Table Format)"
T1[Iceberg]
T2[Hudi]
T3[Delta Lake]
end
subgraph "关系"
T1 & T2 & T3 -->|使用| F1 & F2 & F3
end
表格式的职责:
| 职责 | 说明 |
|---|---|
| 元数据管理 | 追踪哪些文件属于这张表 |
| Schema 管理 | 定义和演进表结构 |
| 分区管理 | 分区策略和裁剪 |
| 事务管理 | ACID 保证 |
| 时间旅行 | 历史版本访问 |
2.2 Iceberg 架构全景
graph TB
subgraph "Catalog层"
CAT[Catalog
Hive/Glue/REST/Nessie] end subgraph "元数据层" META[Metadata File
metadata.json] SNAP[Snapshot
快照] MAN[Manifest List
清单列表] MANF[Manifest File
清单文件] end subgraph "数据层" DATA[Data Files
Parquet/ORC] end CAT --> META META --> SNAP SNAP --> MAN MAN --> MANF MANF --> DATA
Hive/Glue/REST/Nessie] end subgraph "元数据层" META[Metadata File
metadata.json] SNAP[Snapshot
快照] MAN[Manifest List
清单列表] MANF[Manifest File
清单文件] end subgraph "数据层" DATA[Data Files
Parquet/ORC] end CAT --> META META --> SNAP SNAP --> MAN MAN --> MANF MANF --> DATA
2.3 元数据三层架构
graph TB
subgraph "Metadata File (元数据文件)"
M1["当前Schema"]
M2["分区规范"]
M3["快照历史"]
M4["属性配置"]
end
subgraph "Manifest List (清单列表)"
ML1["指向多个Manifest"]
ML2["分区范围统计"]
ML3["文件数量统计"]
end
subgraph "Manifest File (清单文件)"
MF1["数据文件路径"]
MF2["分区值"]
MF3["列级统计信息"]
MF4["文件大小/行数"]
end
M1 & M2 & M3 & M4 --> ML1 & ML2 & ML3
ML1 & ML2 & ML3 --> MF1 & MF2 & MF3 & MF4
三、元数据详解
3.1 文件布局
iceberg_table/
├── metadata/
│ ├── v1.metadata.json # 版本1元数据
│ ├── v2.metadata.json # 版本2元数据
│ ├── v3.metadata.json # 当前版本
│ ├── snap-1234567890.avro # 快照文件
│ ├── snap-1234567891.avro
│ └── manifest-list-xxxx.avro # 清单列表
│ └── manifest-xxxx.avro # 清单文件
└── data/
├── date=2024-01-15/
│ ├── 00000-0-xxx.parquet
│ └── 00001-0-xxx.parquet
└── date=2024-01-16/
└── 00000-0-xxx.parquet3.2 Metadata File 结构
{
"format-version": 2,
"table-uuid": "xxx-xxx-xxx",
"location": "s3://bucket/iceberg_table",
"last-sequence-number": 5,
"last-updated-ms": 1705312800000,
"last-column-id": 10,
"current-schema-id": 1,
"schemas": [
{
"schema-id": 1,
"fields": [
{"id": 1, "name": "order_id", "type": "long", "required": true},
{"id": 2, "name": "user_id", "type": "long"},
{"id": 3, "name": "amount", "type": "decimal(10,2)"},
{"id": 4, "name": "order_date", "type": "date"}
]
}
],
"partition-specs": [
{
"spec-id": 0,
"fields": [
{"source-id": 4, "field-id": 1000, "name": "order_date", "transform": "day"}
]
}
],
"current-snapshot-id": 1234567891,
"snapshots": [
{"snapshot-id": 1234567890, "timestamp-ms": 1705312700000, "manifest-list": "..."},
{"snapshot-id": 1234567891, "timestamp-ms": 1705312800000, "manifest-list": "..."}
]
}3.3 快照与继承
graph LR
subgraph "快照链"
S1[Snapshot 1
初始写入] --> S2[Snapshot 2
追加数据] S2 --> S3[Snapshot 3
更新数据] S3 --> S4[Snapshot 4
删除数据] end subgraph "文件复用" S2 -.->|继承| S1 S3 -.->|继承| S2 S4 -.->|继承| S3 end
初始写入] --> S2[Snapshot 2
追加数据] S2 --> S3[Snapshot 3
更新数据] S3 --> S4[Snapshot 4
删除数据] end subgraph "文件复用" S2 -.->|继承| S1 S3 -.->|继承| S2 S4 -.->|继承| S3 end
快照继承的好处:
- 只有变化的文件需要记录
- 大量元数据可以复用
- 时间旅行成本低
四、分区进化:Iceberg 的杀手锏
4.1 Hidden Partitioning(隐藏分区)
graph LR
subgraph "传统Hive分区"
A["用户需要知道分区列"]
B["查询必须显式指定分区"]
C["WHERE dt = '2024-01-15'"]
end
graph LR
subgraph "Iceberg隐藏分区"
D["按order_time自动分区"]
E["用户无感知分区结构"]
F["WHERE order_time > '2024-01-15'
自动分区裁剪"] end
自动分区裁剪"] end
配置示例:
-- 创建表时定义分区转换
CREATE TABLE orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10,2),
order_time TIMESTAMP
) USING iceberg
PARTITIONED BY (
days(order_time), -- 按天分区
bucket(16, user_id) -- user_id 分桶
);
-- 查询时无需关心分区
SELECT * FROM orders
WHERE order_time >= '2024-01-15 00:00:00'
AND order_time < '2024-01-16 00:00:00';
-- Iceberg 自动进行分区裁剪!4.2 分区转换函数
graph TB
subgraph "Iceberg分区转换"
A[identity] --> A1["原值分区"]
B[bucket] --> B1["哈希分桶"]
C[truncate] --> C1["截断分区"]
D[year/month/day/hour] --> D1["时间分区"]
end
| 转换函数 | 说明 | 示例 |
|---|---|---|
identity(col) | 原值分区 | identity(country) |
bucket(n, col) | 哈希分成 n 桶 | bucket(16, user_id) |
truncate(w, col) | 截断到 w 宽度 | truncate(10, order_id) |
year(ts) | 按年 | year(order_time) |
month(ts) | 按月 | month(order_time) |
day(ts) | 按天 | day(order_time) |
hour(ts) | 按小时 | hour(order_time) |
4.3 分区演进(Partition Evolution)
sequenceDiagram
participant Table
participant User
Note over Table: 初始:按月分区 month(order_time)
User->>Table: 业务增长,需要按天分区
Table->>Table: 添加新分区规范 day(order_time)
Note over Table: 新数据按天分区
历史数据仍按月分区 User->>Table: 查询 2024 年数据 Table-->>User: 自动处理两种分区
历史数据仍按月分区 User->>Table: 查询 2024 年数据 Table-->>User: 自动处理两种分区
-- 分区演进:从月分区改为天分区
ALTER TABLE orders
SET PARTITION SPEC (
day(order_time),
bucket(16, user_id)
);
-- 旧数据仍然是月分区,新数据是天分区
-- 查询时 Iceberg 自动处理!五、Schema 演进
5.1 支持的演进操作
graph TB
subgraph "Iceberg Schema演进"
A[Add Column
添加列] --> A1["任意位置
可设默认值"] B[Drop Column
删除列] --> B1["软删除
不影响历史"] C[Rename Column
重命名] --> C1["元数据变更
数据不动"] D[Update Type
类型变更] --> D1["支持安全变更
int→long"] E[Reorder Column
列重排] --> E1["元数据变更"] end
添加列] --> A1["任意位置
可设默认值"] B[Drop Column
删除列] --> B1["软删除
不影响历史"] C[Rename Column
重命名] --> C1["元数据变更
数据不动"] D[Update Type
类型变更] --> D1["支持安全变更
int→long"] E[Reorder Column
列重排] --> E1["元数据变更"] end
5.2 Schema 演进原理
graph LR
subgraph "Column ID机制"
A["每个列有唯一ID"]
B["ID从不复用"]
C["通过ID关联数据"]
end
subgraph "好处"
D["重命名不影响数据"]
E["删除列后重新添加
是新列"] F["历史数据自动兼容"] end A & B & C --> D & E & F
是新列"] F["历史数据自动兼容"] end A & B & C --> D & E & F
示例:
-- 添加列
ALTER TABLE orders ADD COLUMN discount DECIMAL(10,2);
-- 重命名列
ALTER TABLE orders RENAME COLUMN discount TO discount_amount;
-- 删除列
ALTER TABLE orders DROP COLUMN discount_amount;
-- 类型变更(安全变更)
ALTER TABLE orders ALTER COLUMN user_id TYPE BIGINT;5.3 类型演进规则
| 原类型 | 可变更为 |
|---|---|
| int | long |
| float | double |
| decimal(P, S) | decimal(P', S) where P' > P |
六、时间旅行
6.1 基于快照的时间旅行
graph LR
subgraph "快照时间线"
S1["Snapshot 1
10:00
100条"] --> S2["Snapshot 2
11:00
150条"] S2 --> S3["Snapshot 3
12:00
200条"] S3 --> S4["Snapshot 4
13:00
当前"] end Q1["查询10:00的数据"] -.-> S1 Q2["查询11:30的数据"] -.-> S2
10:00
100条"] --> S2["Snapshot 2
11:00
150条"] S2 --> S3["Snapshot 3
12:00
200条"] S3 --> S4["Snapshot 4
13:00
当前"] end Q1["查询10:00的数据"] -.-> S1 Q2["查询11:30的数据"] -.-> S2
6.2 时间旅行查询
-- 方式1:按快照ID查询
SELECT * FROM orders VERSION AS OF 1234567890;
-- 方式2:按时间戳查询
SELECT * FROM orders TIMESTAMP AS OF '2024-01-15 10:00:00';
-- 方式3:按时间表达式
SELECT * FROM orders TIMESTAMP AS OF current_timestamp() - INTERVAL 1 HOUR;
-- Spark SQL 语法
SELECT * FROM orders.snapshots; -- 查看所有快照
SELECT * FROM orders.history; -- 查看表历史6.3 回滚操作
sequenceDiagram
participant User
participant Iceberg
participant Storage
Note over Iceberg: 当前: Snapshot 4
User->>Iceberg: 发现数据问题,需要回滚
Iceberg->>Iceberg: 创建新 Snapshot 5
Note over Iceberg: Snapshot 5 指向 Snapshot 2 的文件
User->>Iceberg: 查询当前数据
Iceberg-->>User: 返回 Snapshot 2 时的数据
Note over Storage: 文件没有被删除
只是元数据变了
只是元数据变了
-- 回滚到指定快照
CALL system.rollback_to_snapshot('db.orders', 1234567890);
-- 回滚到指定时间点
CALL system.rollback_to_timestamp('db.orders', TIMESTAMP '2024-01-15 10:00:00');七、ACID 事务与并发控制
7.1 乐观并发控制
sequenceDiagram
participant Writer1
participant Writer2
participant Iceberg
Note over Iceberg: 当前版本: v1
Writer1->>Iceberg: 开始写入
Writer2->>Iceberg: 开始写入
Writer1->>Writer1: 写入数据文件
Writer2->>Writer2: 写入数据文件
Writer1->>Iceberg: 提交 (基于v1)
Iceberg->>Iceberg: v1 → v2 成功
Writer2->>Iceberg: 提交 (基于v1)
Iceberg-->>Writer2: 冲突!当前是v2
Writer2->>Writer2: 重试:重新生成元数据
Writer2->>Iceberg: 提交 (基于v2)
Iceberg->>Iceberg: v2 → v3 成功
7.2 行级更新与删除
graph TB
subgraph "Copy-On-Write (COW)"
A1["更新时重写整个文件"]
A2["读取性能最优"]
A3["写入放大"]
end
subgraph "Merge-On-Read (MOR)"
B1["写入删除文件 (Delete File)"]
B2["读取时合并"]
B3["写入性能好"]
end
Delete File 类型:
| 类型 | 说明 | 适用场景 |
|---|---|---|
| Position Delete | 记录文件路径+行号 | 精确删除少量行 |
| Equality Delete | 记录主键值 | 批量删除 |
-- 行级删除
DELETE FROM orders WHERE order_id = 12345;
-- 行级更新
UPDATE orders SET status = 'completed' WHERE order_id = 12345;
-- Merge 操作
MERGE INTO orders t
USING updates s ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET t.status = s.status
WHEN NOT MATCHED THEN INSERT *;八、查询优化
8.1 元数据过滤
graph TB
subgraph "查询优化流程"
A[SQL查询] --> B[解析WHERE条件]
B --> C{Manifest过滤}
C -->|不相关| D[跳过Manifest]
C -->|可能相关| E{Data File过滤}
E -->|不相关| F[跳过文件]
E -->|相关| G[读取文件]
end
8.2 列级统计信息
graph LR
subgraph "Manifest File中的统计"
A["每个数据文件记录:"]
B["- 行数"]
C["- 列的min/max值"]
D["- null计数"]
E["- distinct计数"]
end
subgraph "查询优化"
F["WHERE amount > 1000"]
G["检查max(amount)"]
H["max < 1000 → 跳过文件"]
end
A & B & C & D & E --> F --> G --> H
8.3 Data Skipping 示例
-- 假设有1000个数据文件
SELECT * FROM orders
WHERE order_date = '2024-01-15'
AND amount > 1000;
-- Iceberg 优化流程:
-- 1. 通过 Manifest List 的分区统计,定位到 order_date='2024-01-15' 的 Manifest
-- 2. 通过 Manifest File 的列统计,跳过 max(amount) < 1000 的文件
-- 3. 最终可能只需要读取 10 个文件九、实战操作
9.1 创建 Iceberg 表
-- Spark SQL 创建表
CREATE TABLE orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10,2),
status STRING,
order_time TIMESTAMP
) USING iceberg
PARTITIONED BY (days(order_time))
LOCATION 's3://bucket/iceberg/orders'
TBLPROPERTIES (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'zstd',
'write.metadata.delete-after-commit.enabled' = 'true',
'write.metadata.previous-versions-max' = '100'
);9.2 数据写入
// Spark DataFrame 写入
val df = spark.createDataFrame(Seq(
(1L, 1001L, BigDecimal("100.00"), "pending", Timestamp.valueOf("2024-01-15 10:00:00")),
(2L, 1002L, BigDecimal("200.00"), "completed", Timestamp.valueOf("2024-01-15 11:00:00"))
)).toDF("order_id", "user_id", "amount", "status", "order_time")
// 追加写入
df.writeTo("orders").append()
// 覆盖写入
df.writeTo("orders").overwritePartitions()9.3 Flink 流式写入
-- Flink SQL 创建 Iceberg Sink
CREATE TABLE iceberg_orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10,2),
status STRING,
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'iceberg',
'catalog-type' = 'hive',
'catalog-name' = 'hive_catalog',
'warehouse' = 's3://bucket/iceberg',
'format-version' = '2',
'write.upsert.enabled' = 'true'
);
-- 流式 Upsert
INSERT INTO iceberg_orders
SELECT * FROM kafka_orders;9.4 表维护操作
-- 查看快照
SELECT * FROM orders.snapshots;
-- 查看文件
SELECT * FROM orders.files;
-- 查看历史
SELECT * FROM orders.history;
-- 过期快照清理
CALL system.expire_snapshots('db.orders', TIMESTAMP '2024-01-01 00:00:00', 100);
-- 删除孤儿文件
CALL system.remove_orphan_files('db.orders');
-- 重写小文件
CALL system.rewrite_data_files('db.orders');
-- 重写 Manifest
CALL system.rewrite_manifests('db.orders');十、与引擎集成
10.1 多引擎支持
graph TB
subgraph "计算引擎"
E1[Spark]
E2[Flink]
E3[Trino/Presto]
E4[Hive]
E5[Dremio]
end
subgraph "Iceberg"
I[统一表格式]
end
subgraph "存储"
S1[S3]
S2[HDFS]
S3[Azure Blob]
S4[GCS]
end
E1 & E2 & E3 & E4 & E5 --> I --> S1 & S2 & S3 & S4
10.2 Catalog 选择
| Catalog | 说明 | 适用场景 |
|---|---|---|
| Hive Catalog | 使用 Hive Metastore | 已有 Hive 生态 |
| Hadoop Catalog | 文件系统元数据 | 简单场景 |
| Glue Catalog | AWS Glue | AWS 环境 |
| REST Catalog | REST API | 跨语言访问 |
| Nessie Catalog | Git-like 分支 | 多版本管理 |
十一、最佳实践
11.1 配置建议
# 写入配置
write.format.default=parquet
write.parquet.compression-codec=zstd
write.target-file-size-bytes=536870912 # 512MB
# 分区配置
write.distribution-mode=hash # 写入时hash分布
# 元数据配置
write.metadata.delete-after-commit.enabled=true
write.metadata.previous-versions-max=100
# 读取配置
read.split.target-size=134217728 # 128MB11.2 性能优化
mindmap
root((Iceberg优化))
写入优化
合理的文件大小
分区策略选择
排序优化
读取优化
利用列裁剪
谓词下推
Data Skipping
维护优化
定期Compaction
清理过期快照
重写小文件
十二、总结
12.1 Iceberg 核心优势
graph LR
A[Iceberg优势] --> B[隐藏分区]
A --> C[Schema演进]
A --> D[时间旅行]
A --> E[ACID事务]
A --> F[多引擎支持]
B --> B1["用户无感
自动分区裁剪"] C --> C1["列ID机制
安全演进"] D --> D1["快照机制
任意回溯"] E --> E1["乐观并发
安全写入"] F --> F1["Spark/Flink/Trino
随便换"]
自动分区裁剪"] C --> C1["列ID机制
安全演进"] D --> D1["快照机制
任意回溯"] E --> E1["乐观并发
安全写入"] F --> F1["Spark/Flink/Trino
随便换"]
12.2 适用场景
| 场景 | 推荐度 | 说明 |
|---|---|---|
| 数据湖分析 | ⭐⭐⭐⭐⭐ | 核心场景 |
| 多引擎访问 | ⭐⭐⭐⭐⭐ | Iceberg 强项 |
| 历史审计 | ⭐⭐⭐⭐⭐ | 时间旅行 |
| Schema 频繁变更 | ⭐⭐⭐⭐⭐ | 演进能力强 |
| 实时更新 | ⭐⭐⭐⭐ | MOR 模式支持 |
12.3 一句话总结
Iceberg = 开放表格式 + 隐藏分区 + 无缝 Schema 演进 + 时间旅行
如果你需要一个开放、灵活、多引擎支持的数据湖表格式,Iceberg 是当前最佳选择之一。