一、前言:OOM 的噩梦
graph TD
A[凌晨3点] --> B[手机响了]
B --> C["告警:Spark任务失败
java.lang.OutOfMemoryError"] C --> D["你:😱"] D --> E["重启试试?"] E --> F["又挂了"] F --> G["开始怀疑人生..."]
java.lang.OutOfMemoryError"] C --> D["你:😱"] D --> E["重启试试?"] E --> F["又挂了"] F --> G["开始怀疑人生..."]
本文将覆盖的 OOM 场景:
| 组件 | 常见 OOM 原因 |
|---|---|
| Spark | Shuffle、广播变量、数据倾斜 |
| Flink | 状态过大、背压、序列化 |
| Hive/Presto | Map/Reduce 内存、查询复杂度 |
| JVM | 堆内存、元空间、直接内存 |
二、JVM 内存模型基础
2.1 JVM 内存结构
graph TB
subgraph "JVM内存结构"
subgraph "堆内存 Heap"
YOUNG[年轻代 Young]
OLD[老年代 Old]
YOUNG --> |晋升| OLD
end
subgraph "非堆内存"
META[元空间 Metaspace]
DIRECT[直接内存 Direct]
STACK[线程栈 Stack]
end
end
2.2 常见 OOM 类型
graph TB
A[OOM类型] --> B["Java heap space
堆内存不足"] A --> C["Metaspace
元空间不足"] A --> D["Direct buffer memory
直接内存不足"] A --> E["GC overhead limit exceeded
GC时间过长"] A --> F["Unable to create native thread
线程数过多"] B --> B1["对象太多/太大"] C --> C1["类加载太多"] D --> D1["NIO Buffer泄漏"] E --> E1["内存不足频繁GC"] F --> F1["线程泄漏"]
堆内存不足"] A --> C["Metaspace
元空间不足"] A --> D["Direct buffer memory
直接内存不足"] A --> E["GC overhead limit exceeded
GC时间过长"] A --> F["Unable to create native thread
线程数过多"] B --> B1["对象太多/太大"] C --> C1["类加载太多"] D --> D1["NIO Buffer泄漏"] E --> E1["内存不足频繁GC"] F --> F1["线程泄漏"]
2.3 JVM 参数速查
# 堆内存设置
-Xms4g # 初始堆大小
-Xmx8g # 最大堆大小
-Xmn2g # 年轻代大小
# 元空间设置
-XX:MetaspaceSize=256m # 初始元空间
-XX:MaxMetaspaceSize=512m # 最大元空间
# 直接内存
-XX:MaxDirectMemorySize=1g
# GC 相关
-XX:+UseG1GC # 使用 G1 GC
-XX:MaxGCPauseMillis=200 # 目标暂停时间
# OOM 时 dump 堆
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/heapdump.hprof三、Spark OOM 全解析
3.1 Spark 内存模型
graph TB
subgraph "Spark Executor 内存"
subgraph "堆内存 Heap"
EXEC[Execution Memory
执行内存] STORAGE[Storage Memory
存储内存] USER[User Memory
用户内存] RESERVED[Reserved
保留内存] end subgraph "堆外内存 Off-Heap" OFF_EXEC[Off-Heap Execution] OFF_STORAGE[Off-Heap Storage] end end
执行内存] STORAGE[Storage Memory
存储内存] USER[User Memory
用户内存] RESERVED[Reserved
保留内存] end subgraph "堆外内存 Off-Heap" OFF_EXEC[Off-Heap Execution] OFF_STORAGE[Off-Heap Storage] end end
内存配置:
# Executor 内存
spark.executor.memory=8g
spark.executor.memoryOverhead=2g # 堆外内存
# 内存比例
spark.memory.fraction=0.6 # 执行+存储 占比
spark.memory.storageFraction=0.5 # 存储在上面的占比
# 堆外内存
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=2g3.2 案例一:Shuffle OOM
sequenceDiagram
participant Map
participant Shuffle
participant Reduce
Map->>Shuffle: 输出数据
Note over Shuffle: 数据量太大
内存放不下 Shuffle->>Shuffle: Spill to Disk Note over Shuffle: 磁盘IO变慢 Shuffle-->>Reduce: OOM 💥
内存放不下 Shuffle->>Shuffle: Spill to Disk Note over Shuffle: 磁盘IO变慢 Shuffle-->>Reduce: OOM 💥
问题代码:
// ❌ 大表 JOIN 大表,Shuffle 数据量爆炸
val result = tableA.join(tableB, "key") // 两个表各10亿行解决方案:
// ✅ 方案1:增加 Shuffle 分区数
spark.conf.set("spark.sql.shuffle.partitions", 2000) // 默认200太小
// ✅ 方案2:增加 Executor 内存
spark.conf.set("spark.executor.memory", "16g")
spark.conf.set("spark.executor.memoryOverhead", "4g")
// ✅ 方案3:如果小表可以广播
val result = tableA.join(broadcast(smallTable), "key")
// ✅ 方案4:分批处理
val dates = (1 to 30).map(i => s"2024-01-$i")
dates.foreach { date =>
val dailyResult = tableA.filter(col("dt") === date)
.join(tableB.filter(col("dt") === date), "key")
dailyResult.write.mode("append").save("/output")
}3.3 案例二:数据倾斜导致 OOM
graph TB
subgraph "数据倾斜"
A[100个分区] --> B[99个分区正常]
A --> C[1个分区数据量巨大]
C --> D["单个Task OOM 💥"]
end
问题场景:
// ❌ 某个 user_id 有1000万条记录,其他用户只有几百条
df.groupBy("user_id").agg(sum("amount"))解决方案:
// ✅ 方案1:加盐打散
val saltedDF = df.withColumn("salted_key",
concat(col("user_id"), lit("_"), (rand() * 10).cast("int")))
val step1 = saltedDF.groupBy("salted_key", "user_id")
.agg(sum("amount").as("partial_sum"))
val result = step1.groupBy("user_id")
.agg(sum("partial_sum").as("total_amount"))
// ✅ 方案2:单独处理热点 Key
val hotKeys = Set("hot_user_1", "hot_user_2")
val hotData = df.filter(col("user_id").isin(hotKeys.toSeq: _*))
val normalData = df.filter(!col("user_id").isin(hotKeys.toSeq: _*))
val hotResult = hotData.repartition(100, col("user_id"))
.groupBy("user_id").agg(sum("amount"))
val normalResult = normalData.groupBy("user_id").agg(sum("amount"))
val result = hotResult.union(normalResult)
// ✅ 方案3:使用 AQE (Adaptive Query Execution)
spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)3.4 案例三:广播变量 OOM
graph LR
A[Driver] -->|广播| B[Executor 1]
A -->|广播| C[Executor 2]
A -->|广播| D[Executor N]
subgraph "问题"
E["广播变量太大
Driver OOM"] end
Driver OOM"] end
问题代码:
// ❌ 广播了一个超大的 Map
val bigMap = hugeDF.collect().map(r => r.getAs[String]("key") -> r).toMap
val broadcastMap = spark.sparkContext.broadcast(bigMap) // Driver OOM!解决方案:
// ✅ 方案1:限制广播大小
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024) // 100MB
// ✅ 方案2:使用 MapSide Join(适合维表)
// 将维表预先分发到每个分区
val dimensionDF = spark.read.parquet("/dimension_table")
val result = factDF.join(dimensionDF, Seq("dim_key"), "left")
// ✅ 方案3:如果必须用 Map,使用 mapPartitions
df.mapPartitions { iter =>
// 每个分区独立加载需要的数据
val localMap = loadLocalData()
iter.map(row => processWithMap(row, localMap))
}3.5 案例四:Driver OOM
graph TD
A[Driver OOM 原因] --> B["collect() 收集太多数据"]
A --> C["广播变量太大"]
A --> D["创建太多分区元数据"]
A --> E["DAG 太复杂"]
问题代码:
// ❌ 收集大量数据到 Driver
val allData = hugeDF.collect() // Driver OOM!
// ❌ 保存过程中产生太多小文件的元数据
df.repartition(100000).write.save("/output") // 10万个文件的元数据解决方案:
// ✅ 方案1:增加 Driver 内存
spark.conf.set("spark.driver.memory", "8g")
spark.conf.set("spark.driver.maxResultSize", "4g")
// ✅ 方案2:避免 collect,使用 take 或 limit
val sample = hugeDF.take(1000) // 只取部分
// ✅ 方案3:直接写入存储,不要收集
hugeDF.write.parquet("/output") // 不经过 Driver
// ✅ 方案4:合理控制输出文件数
df.repartition(200).write.save("/output") // 控制文件数量3.6 Spark OOM 排查流程
flowchart TD
A[Spark OOM] --> B{哪里 OOM?}
B -->|Driver| C{原因}
C -->|collect()| C1["减少收集数据量
增加 Driver 内存"] C -->|广播变量| C2["减小广播大小
换用 Join"] B -->|Executor| D{阶段} D -->|Shuffle Write| D1["增加分区数
增加 Executor 内存"] D -->|Shuffle Read| D2["检查数据倾斜
加盐打散"] D -->|Task 执行| D3["检查数据量
检查用户代码"] B -->|Container| E["增加 memoryOverhead
检查堆外内存使用"]
增加 Driver 内存"] C -->|广播变量| C2["减小广播大小
换用 Join"] B -->|Executor| D{阶段} D -->|Shuffle Write| D1["增加分区数
增加 Executor 内存"] D -->|Shuffle Read| D2["检查数据倾斜
加盐打散"] D -->|Task 执行| D3["检查数据量
检查用户代码"] B -->|Container| E["增加 memoryOverhead
检查堆外内存使用"]
四、Flink OOM 全解析
4.1 Flink 内存模型
graph TB
subgraph "Flink TaskManager 内存"
subgraph "JVM Heap"
FRAMEWORK[Framework Heap]
TASK[Task Heap]
end
subgraph "Off-Heap"
MANAGED[Managed Memory
RocksDB/批处理] DIRECT[Direct Memory] NETWORK[Network Buffers] end subgraph "JVM Overhead" META[Metaspace] OVERHEAD[JVM Overhead] end end
RocksDB/批处理] DIRECT[Direct Memory] NETWORK[Network Buffers] end subgraph "JVM Overhead" META[Metaspace] OVERHEAD[JVM Overhead] end end
内存配置:
# flink-conf.yaml
taskmanager.memory.process.size: 8g
taskmanager.memory.framework.heap.size: 128m
taskmanager.memory.task.heap.size: 4g
taskmanager.memory.managed.size: 2g
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.jvm-overhead.fraction: 0.14.2 案例一:State 过大 OOM
graph TD
A[State OOM] --> B["ValueState 存储了大对象"]
A --> C["ListState 无限增长"]
A --> D["MapState 键值对太多"]
A --> E["窗口状态积压"]
问题代码:
// ❌ 在 State 中存储了整个列表,没有清理
public class BadStatefulFunction extends KeyedProcessFunction<String, Event, Result> {
private ListState<Event> historyState;
@Override
public void processElement(Event event, Context ctx, Collector<Result> out) {
historyState.add(event); // 无限增长!
// 没有清理逻辑...
}
}解决方案:
// ✅ 方案1:设置 State TTL
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(7))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(1000) // RocksDB 后台清理
.build();
ListStateDescriptor<Event> descriptor = new ListStateDescriptor<>("history", Event.class);
descriptor.enableTimeToLive(ttlConfig);
// ✅ 方案2:使用 RocksDB State Backend(状态可以溢出到磁盘)
env.setStateBackend(new EmbeddedRocksDBStateBackend());
// ✅ 方案3:定期清理 State
@Override
public void processElement(Event event, Context ctx, Collector<Result> out) {
historyState.add(event);
// 只保留最近 1000 条
List<Event> history = Lists.newArrayList(historyState.get());
if (history.size() > 1000) {
historyState.clear();
historyState.addAll(history.subList(history.size() - 1000, history.size()));
}
}4.3 案例二:窗口积压 OOM
sequenceDiagram
participant Source
participant Window
participant Sink
Source->>Window: 持续发送数据
Note over Window: 窗口未触发
数据持续积压 Window->>Window: 内存增长 Window-->>Window: OOM 💥
数据持续积压 Window->>Window: 内存增长 Window-->>Window: OOM 💥
问题场景:
// ❌ 会话窗口 gap 设置过大,窗口迟迟不关闭
dataStream
.keyBy(Event::getUserId)
.window(EventTimeSessionWindows.withGap(Time.hours(24))) // 24小时不活动才关闭
.aggregate(new MyAggregator()); // 窗口内数据持续增长解决方案:
// ✅ 方案1:限制窗口大小
dataStream
.keyBy(Event::getUserId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30))) // 缩短 gap
.trigger(CountTrigger.of(10000)) // 或者达到数量就触发
.aggregate(new MyAggregator());
// ✅ 方案2:使用增量聚合,不存储原始数据
dataStream
.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(
new AggregateFunction<Event, Accumulator, Result>() {
// 只存储聚合结果,不存储原始数据
}
);
// ✅ 方案3:设置允许延迟并清理
dataStream
.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.minutes(10)) // 允许延迟
.sideOutputLateData(lateOutputTag) // 延迟数据旁路输出
.aggregate(new MyAggregator());4.4 案例三:Checkpoint 超时导致内存积压
graph TD
A[Checkpoint 慢] --> B[Barrier 对齐等待]
B --> C[数据在 Buffer 中积压]
C --> D[内存占用增加]
D --> E[OOM 或 背压]
解决方案:
// ✅ 方案1:使用 Unaligned Checkpoint (Flink 1.11+)
env.getCheckpointConfig().enableUnalignedCheckpoints();
// ✅ 方案2:调整 Checkpoint 参数
env.enableCheckpointing(60000); // 60秒间隔
env.getCheckpointConfig().setCheckpointTimeout(300000); // 5分钟超时
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 最小间隔
// ✅ 方案3:增加 Network Buffer
// flink-conf.yaml
taskmanager.memory.network.min: 256mb
taskmanager.memory.network.max: 1gb4.5 Flink OOM 排查流程
flowchart TD
A[Flink OOM] --> B{错误类型}
B -->|Heap OOM| C{检查}
C --> C1["State 是否过大?"]
C --> C2["窗口是否积压?"]
C --> C3["用户代码是否有泄漏?"]
B -->|Direct Memory| D{检查}
D --> D1["Network Buffer 是否不足?"]
D --> D2["RocksDB 配置是否合理?"]
B -->|Metaspace| E{检查}
E --> E1["是否加载了太多类?"]
E --> E2["是否有 ClassLoader 泄漏?"]
C1 -->|是| F["开启 State TTL
使用 RocksDB"] C2 -->|是| G["缩小窗口
增量聚合"]
使用 RocksDB"] C2 -->|是| G["缩小窗口
增量聚合"]
五、Hive/Presto OOM
5.1 Hive OOM 场景
graph TB
A[Hive OOM] --> B[Map 端 OOM]
A --> C[Reduce 端 OOM]
A --> D[Driver 端 OOM]
B --> B1["读取大文件
复杂 UDF"] C --> C1["数据倾斜
聚合结果大"] D --> D1["元数据太多
分区太多"]
复杂 UDF"] C --> C1["数据倾斜
聚合结果大"] D --> D1["元数据太多
分区太多"]
5.2 Hive 内存配置
-- Map 端内存
SET mapreduce.map.memory.mb=4096;
SET mapreduce.map.java.opts=-Xmx3276m;
-- Reduce 端内存
SET mapreduce.reduce.memory.mb=8192;
SET mapreduce.reduce.java.opts=-Xmx6553m;
-- 容器内存
SET yarn.nodemanager.resource.memory-mb=65536;
-- 处理数据倾斜
SET hive.optimize.skewjoin=true;
SET hive.skewjoin.key=100000;
-- Map Join 优化
SET hive.auto.convert.join=true;
SET hive.mapjoin.smalltable.filesize=50000000;5.3 Presto OOM 处理
-- 查询内存限制
SET query_max_memory = '50GB';
SET query_max_memory_per_node = '10GB';
-- 聚合内存
SET aggregation_operator_unspill_memory_limit = '4GB';
-- 分页处理
SELECT * FROM huge_table
WHERE dt = '2024-01-15'
LIMIT 1000000 OFFSET 0; -- 分批查询六、通用排查方法
6.1 获取内存 Dump
# JVM 堆 Dump
jmap -dump:format=b,file=heap.hprof <pid>
# 在 OOM 时自动 Dump(启动参数)
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/heapdump.hprof
# Spark 配置
spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/6.2 分析工具
graph LR
A[内存分析工具] --> B[MAT
Eclipse Memory Analyzer] A --> C[VisualVM] A --> D[JProfiler] A --> E[Arthas] B --> B1["分析 Heap Dump
查找内存泄漏"] C --> C1["实时监控
采样分析"] D --> D1["商业工具
功能强大"] E --> E1["在线诊断
阿里开源"]
Eclipse Memory Analyzer] A --> C[VisualVM] A --> D[JProfiler] A --> E[Arthas] B --> B1["分析 Heap Dump
查找内存泄漏"] C --> C1["实时监控
采样分析"] D --> D1["商业工具
功能强大"] E --> E1["在线诊断
阿里开源"]
6.3 Arthas 常用命令
# 安装
curl -O https://arthas.aliyun.com/arthas-boot.jar
java -jar arthas-boot.jar
# 常用命令
dashboard # 总览
thread -n 5 # 最忙的5个线程
heapdump /tmp/dump.hprof # 堆 Dump
memory # 内存信息
jvm # JVM 信息七、OOM 预防清单
7.1 Spark 预防清单
mindmap
root((Spark OOM预防))
配置优化
合理设置executor内存
调整shuffle分区数
控制广播变量大小
代码优化
避免collect大数据
使用增量聚合
处理数据倾斜
监控告警
监控GC时间
监控内存使用率
设置OOM dump
7.2 Flink 预防清单
mindmap
root((Flink OOM预防))
State管理
设置State TTL
使用RocksDB后端
定期清理状态
窗口管理
控制窗口大小
使用增量聚合
设置允许延迟
Checkpoint
开启Unaligned CP
合理设置间隔
监控CP耗时
7.3 通用预防措施
| 措施 | 说明 |
|---|---|
| 内存监控 | Prometheus + Grafana 监控内存使用 |
| GC 监控 | 监控 GC 时间和频率 |
| 压力测试 | 上线前进行压力测试 |
| 熔断机制 | 设置内存阈值告警 |
| 优雅降级 | OOM 前能自动扩容或降级 |
| 日志分析 | 保留足够的日志用于分析 |
八、OOM 案例速查表
8.1 错误信息速查
| 错误信息 | 可能原因 | 解决方向 |
|---|---|---|
Java heap space | 堆内存不足 | 增加内存/优化代码 |
GC overhead limit exceeded | GC 时间过长 | 增加内存/检查泄漏 |
Metaspace | 类加载过多 | 增加元空间/检查 ClassLoader |
Direct buffer memory | 堆外内存不足 | 增加直接内存配置 |
Cannot allocate memory | 系统内存不足 | 检查系统内存/进程数 |
Container killed by YARN | 超出容器限制 | 增加 memoryOverhead |
8.2 快速定位公式
OOM 定位 = 确定组件 + 确定阶段 + 分析堆栈 + 重现问题flowchart LR
A[OOM发生] --> B[确定组件
Spark/Flink/Hive] B --> C[确定阶段
Map/Shuffle/Reduce] C --> D[分析堆栈
找到关键代码] D --> E[重现问题
本地调试] E --> F[解决问题]
Spark/Flink/Hive] B --> C[确定阶段
Map/Shuffle/Reduce] C --> D[分析堆栈
找到关键代码] D --> E[重现问题
本地调试] E --> F[解决问题]
九、总结
9.1 核心要点
graph LR
A[OOM处理核心] --> B[预防为主]
A --> C[快速定位]
A --> D[根因解决]
B --> B1["合理配置
压力测试
监控告警"] C --> C1["错误类型
组件阶段
堆栈分析"] D --> D1["代码优化
配置调整
架构改进"]
压力测试
监控告警"] C --> C1["错误类型
组件阶段
堆栈分析"] D --> D1["代码优化
配置调整
架构改进"]
9.2 一句话总结
预防:合理配置 + 代码优化 + 监控告警
定位:错误类型 + 组件阶段 + 堆栈分析
解决:增加内存是下策,优化代码是上策
9.3 记住这个口诀
Spark OOM 三板斧:增分区、加内存、治倾斜
Flink OOM 三板斧:限状态、用RocksDB、调Checkpoint
通用 OOM 三板斧:看日志、dump堆、找泄漏
十、结语
恭喜你完成了「从入门到放弃」大数据系列的学习!
graph LR
A[你] --> B[从入门]
B --> C[到放弃?]
C --> D[不!到精通!]
style D fill:#4ecdc4
这个系列我们覆盖了:
| 模块 | 文章 |
|---|---|
| 数仓与 OLAP | Hive 建模、ClickHouse、Doris/StarRocks、OLAP 选型 |
| 调度器 | Airflow、DolphinScheduler、调度器选型 |
| 架构与湖仓 | Lambda/Kappa、Hudi、Iceberg、湖仓选型 |
| 避坑指南 | OOM 全解析 |
希望这些内容能帮助你在大数据领域少踩坑、快成长!