搜 索

Iceberg 从入门到放弃:表格式演进与时间旅行

  • 10阅读
  • 2023年07月01日
  • 0评论
首页 / AI/大数据 / 正文

一、前言:为什么需要 Iceberg?

1.1 Hive 的历史包袱

graph TD A[Hive表的问题] --> B[元数据瓶颈] A --> C[分区爆炸] A --> D[Schema演进困难] A --> E[无法安全并发写入] B --> B1["Hive Metastore
百万分区就崩溃"] C --> C1["按天+按小时分区
一年就几十万分区"] D --> D1["改个字段类型?
全部重刷吧"] E --> E1["两个Job同时写?
数据可能乱"]

1.2 Netflix 的挑战

graph LR A[Netflix数据规模] --> B["PB级数据"] A --> C["数十万张表"] A --> D["每天数百万查询"] B --> E["Hive搞不定"] C --> E D --> E E --> F["自研 Iceberg"]

1.3 Iceberg 发展历程

timeline title Iceberg 发展史 2017 : Netflix 内部开发 2018 : Netflix 开源 2018 : 进入 Apache 孵化 2020 : 成为 Apache 顶级项目 2023 : 1.3+ 版本,成为行业标准

二、Iceberg 核心设计

2.1 表格式 vs 存储格式

graph TB subgraph "存储格式 (File Format)" F1[Parquet] F2[ORC] F3[Avro] end subgraph "表格式 (Table Format)" T1[Iceberg] T2[Hudi] T3[Delta Lake] end subgraph "关系" T1 & T2 & T3 -->|使用| F1 & F2 & F3 end

表格式的职责:

职责说明
元数据管理追踪哪些文件属于这张表
Schema 管理定义和演进表结构
分区管理分区策略和裁剪
事务管理ACID 保证
时间旅行历史版本访问

2.2 Iceberg 架构全景

graph TB subgraph "Catalog层" CAT[Catalog
Hive/Glue/REST/Nessie] end subgraph "元数据层" META[Metadata File
metadata.json] SNAP[Snapshot
快照] MAN[Manifest List
清单列表] MANF[Manifest File
清单文件] end subgraph "数据层" DATA[Data Files
Parquet/ORC] end CAT --> META META --> SNAP SNAP --> MAN MAN --> MANF MANF --> DATA

2.3 元数据三层架构

graph TB subgraph "Metadata File (元数据文件)" M1["当前Schema"] M2["分区规范"] M3["快照历史"] M4["属性配置"] end subgraph "Manifest List (清单列表)" ML1["指向多个Manifest"] ML2["分区范围统计"] ML3["文件数量统计"] end subgraph "Manifest File (清单文件)" MF1["数据文件路径"] MF2["分区值"] MF3["列级统计信息"] MF4["文件大小/行数"] end M1 & M2 & M3 & M4 --> ML1 & ML2 & ML3 ML1 & ML2 & ML3 --> MF1 & MF2 & MF3 & MF4

三、元数据详解

3.1 文件布局

iceberg_table/
├── metadata/
│   ├── v1.metadata.json          # 版本1元数据
│   ├── v2.metadata.json          # 版本2元数据
│   ├── v3.metadata.json          # 当前版本
│   ├── snap-1234567890.avro      # 快照文件
│   ├── snap-1234567891.avro
│   └── manifest-list-xxxx.avro   # 清单列表
│   └── manifest-xxxx.avro        # 清单文件
└── data/
    ├── date=2024-01-15/
    │   ├── 00000-0-xxx.parquet
    │   └── 00001-0-xxx.parquet
    └── date=2024-01-16/
        └── 00000-0-xxx.parquet

3.2 Metadata File 结构

{
  "format-version": 2,
  "table-uuid": "xxx-xxx-xxx",
  "location": "s3://bucket/iceberg_table",
  "last-sequence-number": 5,
  "last-updated-ms": 1705312800000,
  "last-column-id": 10,
  "current-schema-id": 1,
  "schemas": [
    {
      "schema-id": 1,
      "fields": [
        {"id": 1, "name": "order_id", "type": "long", "required": true},
        {"id": 2, "name": "user_id", "type": "long"},
        {"id": 3, "name": "amount", "type": "decimal(10,2)"},
        {"id": 4, "name": "order_date", "type": "date"}
      ]
    }
  ],
  "partition-specs": [
    {
      "spec-id": 0,
      "fields": [
        {"source-id": 4, "field-id": 1000, "name": "order_date", "transform": "day"}
      ]
    }
  ],
  "current-snapshot-id": 1234567891,
  "snapshots": [
    {"snapshot-id": 1234567890, "timestamp-ms": 1705312700000, "manifest-list": "..."},
    {"snapshot-id": 1234567891, "timestamp-ms": 1705312800000, "manifest-list": "..."}
  ]
}

3.3 快照与继承

graph LR subgraph "快照链" S1[Snapshot 1
初始写入] --> S2[Snapshot 2
追加数据] S2 --> S3[Snapshot 3
更新数据] S3 --> S4[Snapshot 4
删除数据] end subgraph "文件复用" S2 -.->|继承| S1 S3 -.->|继承| S2 S4 -.->|继承| S3 end

快照继承的好处:

  • 只有变化的文件需要记录
  • 大量元数据可以复用
  • 时间旅行成本低

四、分区进化:Iceberg 的杀手锏

4.1 Hidden Partitioning(隐藏分区)

graph LR subgraph "传统Hive分区" A["用户需要知道分区列"] B["查询必须显式指定分区"] C["WHERE dt = '2024-01-15'"] end
graph LR subgraph "Iceberg隐藏分区" D["按order_time自动分区"] E["用户无感知分区结构"] F["WHERE order_time > '2024-01-15'
自动分区裁剪"] end

配置示例:

-- 创建表时定义分区转换
CREATE TABLE orders (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10,2),
    order_time TIMESTAMP
) USING iceberg
PARTITIONED BY (
    days(order_time),    -- 按天分区
    bucket(16, user_id)  -- user_id 分桶
);

-- 查询时无需关心分区
SELECT * FROM orders 
WHERE order_time >= '2024-01-15 00:00:00'
  AND order_time < '2024-01-16 00:00:00';
-- Iceberg 自动进行分区裁剪!

4.2 分区转换函数

graph TB subgraph "Iceberg分区转换" A[identity] --> A1["原值分区"] B[bucket] --> B1["哈希分桶"] C[truncate] --> C1["截断分区"] D[year/month/day/hour] --> D1["时间分区"] end
转换函数说明示例
identity(col)原值分区identity(country)
bucket(n, col)哈希分成 n 桶bucket(16, user_id)
truncate(w, col)截断到 w 宽度truncate(10, order_id)
year(ts)按年year(order_time)
month(ts)按月month(order_time)
day(ts)按天day(order_time)
hour(ts)按小时hour(order_time)

4.3 分区演进(Partition Evolution)

sequenceDiagram participant Table participant User Note over Table: 初始:按月分区 month(order_time) User->>Table: 业务增长,需要按天分区 Table->>Table: 添加新分区规范 day(order_time) Note over Table: 新数据按天分区
历史数据仍按月分区 User->>Table: 查询 2024 年数据 Table-->>User: 自动处理两种分区
-- 分区演进:从月分区改为天分区
ALTER TABLE orders 
SET PARTITION SPEC (
    day(order_time),
    bucket(16, user_id)
);

-- 旧数据仍然是月分区,新数据是天分区
-- 查询时 Iceberg 自动处理!

五、Schema 演进

5.1 支持的演进操作

graph TB subgraph "Iceberg Schema演进" A[Add Column
添加列] --> A1["任意位置
可设默认值"] B[Drop Column
删除列] --> B1["软删除
不影响历史"] C[Rename Column
重命名] --> C1["元数据变更
数据不动"] D[Update Type
类型变更] --> D1["支持安全变更
int→long"] E[Reorder Column
列重排] --> E1["元数据变更"] end

5.2 Schema 演进原理

graph LR subgraph "Column ID机制" A["每个列有唯一ID"] B["ID从不复用"] C["通过ID关联数据"] end subgraph "好处" D["重命名不影响数据"] E["删除列后重新添加
是新列"] F["历史数据自动兼容"] end A & B & C --> D & E & F

示例:

-- 添加列
ALTER TABLE orders ADD COLUMN discount DECIMAL(10,2);

-- 重命名列
ALTER TABLE orders RENAME COLUMN discount TO discount_amount;

-- 删除列
ALTER TABLE orders DROP COLUMN discount_amount;

-- 类型变更(安全变更)
ALTER TABLE orders ALTER COLUMN user_id TYPE BIGINT;

5.3 类型演进规则

原类型可变更为
intlong
floatdouble
decimal(P, S)decimal(P', S) where P' > P

六、时间旅行

6.1 基于快照的时间旅行

graph LR subgraph "快照时间线" S1["Snapshot 1
10:00
100条"] --> S2["Snapshot 2
11:00
150条"] S2 --> S3["Snapshot 3
12:00
200条"] S3 --> S4["Snapshot 4
13:00
当前"] end Q1["查询10:00的数据"] -.-> S1 Q2["查询11:30的数据"] -.-> S2

6.2 时间旅行查询

-- 方式1:按快照ID查询
SELECT * FROM orders VERSION AS OF 1234567890;

-- 方式2:按时间戳查询
SELECT * FROM orders TIMESTAMP AS OF '2024-01-15 10:00:00';

-- 方式3:按时间表达式
SELECT * FROM orders TIMESTAMP AS OF current_timestamp() - INTERVAL 1 HOUR;

-- Spark SQL 语法
SELECT * FROM orders.snapshots;  -- 查看所有快照
SELECT * FROM orders.history;    -- 查看表历史

6.3 回滚操作

sequenceDiagram participant User participant Iceberg participant Storage Note over Iceberg: 当前: Snapshot 4 User->>Iceberg: 发现数据问题,需要回滚 Iceberg->>Iceberg: 创建新 Snapshot 5 Note over Iceberg: Snapshot 5 指向 Snapshot 2 的文件 User->>Iceberg: 查询当前数据 Iceberg-->>User: 返回 Snapshot 2 时的数据 Note over Storage: 文件没有被删除
只是元数据变了
-- 回滚到指定快照
CALL system.rollback_to_snapshot('db.orders', 1234567890);

-- 回滚到指定时间点
CALL system.rollback_to_timestamp('db.orders', TIMESTAMP '2024-01-15 10:00:00');

七、ACID 事务与并发控制

7.1 乐观并发控制

sequenceDiagram participant Writer1 participant Writer2 participant Iceberg Note over Iceberg: 当前版本: v1 Writer1->>Iceberg: 开始写入 Writer2->>Iceberg: 开始写入 Writer1->>Writer1: 写入数据文件 Writer2->>Writer2: 写入数据文件 Writer1->>Iceberg: 提交 (基于v1) Iceberg->>Iceberg: v1 → v2 成功 Writer2->>Iceberg: 提交 (基于v1) Iceberg-->>Writer2: 冲突!当前是v2 Writer2->>Writer2: 重试:重新生成元数据 Writer2->>Iceberg: 提交 (基于v2) Iceberg->>Iceberg: v2 → v3 成功

7.2 行级更新与删除

graph TB subgraph "Copy-On-Write (COW)" A1["更新时重写整个文件"] A2["读取性能最优"] A3["写入放大"] end subgraph "Merge-On-Read (MOR)" B1["写入删除文件 (Delete File)"] B2["读取时合并"] B3["写入性能好"] end

Delete File 类型:

类型说明适用场景
Position Delete记录文件路径+行号精确删除少量行
Equality Delete记录主键值批量删除
-- 行级删除
DELETE FROM orders WHERE order_id = 12345;

-- 行级更新
UPDATE orders SET status = 'completed' WHERE order_id = 12345;

-- Merge 操作
MERGE INTO orders t
USING updates s ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET t.status = s.status
WHEN NOT MATCHED THEN INSERT *;

八、查询优化

8.1 元数据过滤

graph TB subgraph "查询优化流程" A[SQL查询] --> B[解析WHERE条件] B --> C{Manifest过滤} C -->|不相关| D[跳过Manifest] C -->|可能相关| E{Data File过滤} E -->|不相关| F[跳过文件] E -->|相关| G[读取文件] end

8.2 列级统计信息

graph LR subgraph "Manifest File中的统计" A["每个数据文件记录:"] B["- 行数"] C["- 列的min/max值"] D["- null计数"] E["- distinct计数"] end subgraph "查询优化" F["WHERE amount > 1000"] G["检查max(amount)"] H["max < 1000 → 跳过文件"] end A & B & C & D & E --> F --> G --> H

8.3 Data Skipping 示例

-- 假设有1000个数据文件
SELECT * FROM orders 
WHERE order_date = '2024-01-15' 
  AND amount > 1000;

-- Iceberg 优化流程:
-- 1. 通过 Manifest List 的分区统计,定位到 order_date='2024-01-15' 的 Manifest
-- 2. 通过 Manifest File 的列统计,跳过 max(amount) < 1000 的文件
-- 3. 最终可能只需要读取 10 个文件

九、实战操作

9.1 创建 Iceberg 表

-- Spark SQL 创建表
CREATE TABLE orders (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10,2),
    status STRING,
    order_time TIMESTAMP
) USING iceberg
PARTITIONED BY (days(order_time))
LOCATION 's3://bucket/iceberg/orders'
TBLPROPERTIES (
    'write.format.default' = 'parquet',
    'write.parquet.compression-codec' = 'zstd',
    'write.metadata.delete-after-commit.enabled' = 'true',
    'write.metadata.previous-versions-max' = '100'
);

9.2 数据写入

// Spark DataFrame 写入
val df = spark.createDataFrame(Seq(
  (1L, 1001L, BigDecimal("100.00"), "pending", Timestamp.valueOf("2024-01-15 10:00:00")),
  (2L, 1002L, BigDecimal("200.00"), "completed", Timestamp.valueOf("2024-01-15 11:00:00"))
)).toDF("order_id", "user_id", "amount", "status", "order_time")

// 追加写入
df.writeTo("orders").append()

// 覆盖写入
df.writeTo("orders").overwritePartitions()

9.3 Flink 流式写入

-- Flink SQL 创建 Iceberg Sink
CREATE TABLE iceberg_orders (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10,2),
    status STRING,
    order_time TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'iceberg',
    'catalog-type' = 'hive',
    'catalog-name' = 'hive_catalog',
    'warehouse' = 's3://bucket/iceberg',
    'format-version' = '2',
    'write.upsert.enabled' = 'true'
);

-- 流式 Upsert
INSERT INTO iceberg_orders
SELECT * FROM kafka_orders;

9.4 表维护操作

-- 查看快照
SELECT * FROM orders.snapshots;

-- 查看文件
SELECT * FROM orders.files;

-- 查看历史
SELECT * FROM orders.history;

-- 过期快照清理
CALL system.expire_snapshots('db.orders', TIMESTAMP '2024-01-01 00:00:00', 100);

-- 删除孤儿文件
CALL system.remove_orphan_files('db.orders');

-- 重写小文件
CALL system.rewrite_data_files('db.orders');

-- 重写 Manifest
CALL system.rewrite_manifests('db.orders');

十、与引擎集成

10.1 多引擎支持

graph TB subgraph "计算引擎" E1[Spark] E2[Flink] E3[Trino/Presto] E4[Hive] E5[Dremio] end subgraph "Iceberg" I[统一表格式] end subgraph "存储" S1[S3] S2[HDFS] S3[Azure Blob] S4[GCS] end E1 & E2 & E3 & E4 & E5 --> I --> S1 & S2 & S3 & S4

10.2 Catalog 选择

Catalog说明适用场景
Hive Catalog使用 Hive Metastore已有 Hive 生态
Hadoop Catalog文件系统元数据简单场景
Glue CatalogAWS GlueAWS 环境
REST CatalogREST API跨语言访问
Nessie CatalogGit-like 分支多版本管理

十一、最佳实践

11.1 配置建议

# 写入配置
write.format.default=parquet
write.parquet.compression-codec=zstd
write.target-file-size-bytes=536870912  # 512MB

# 分区配置
write.distribution-mode=hash  # 写入时hash分布

# 元数据配置
write.metadata.delete-after-commit.enabled=true
write.metadata.previous-versions-max=100

# 读取配置
read.split.target-size=134217728  # 128MB

11.2 性能优化

mindmap root((Iceberg优化)) 写入优化 合理的文件大小 分区策略选择 排序优化 读取优化 利用列裁剪 谓词下推 Data Skipping 维护优化 定期Compaction 清理过期快照 重写小文件

十二、总结

12.1 Iceberg 核心优势

graph LR A[Iceberg优势] --> B[隐藏分区] A --> C[Schema演进] A --> D[时间旅行] A --> E[ACID事务] A --> F[多引擎支持] B --> B1["用户无感
自动分区裁剪"] C --> C1["列ID机制
安全演进"] D --> D1["快照机制
任意回溯"] E --> E1["乐观并发
安全写入"] F --> F1["Spark/Flink/Trino
随便换"]

12.2 适用场景

场景推荐度说明
数据湖分析⭐⭐⭐⭐⭐核心场景
多引擎访问⭐⭐⭐⭐⭐Iceberg 强项
历史审计⭐⭐⭐⭐⭐时间旅行
Schema 频繁变更⭐⭐⭐⭐⭐演进能力强
实时更新⭐⭐⭐⭐MOR 模式支持

12.3 一句话总结

Iceberg = 开放表格式 + 隐藏分区 + 无缝 Schema 演进 + 时间旅行

如果你需要一个开放、灵活、多引擎支持的数据湖表格式,Iceberg 是当前最佳选择之一。

评论区
暂无评论
avatar