前言:从"能跑"到"跑得好"
写Spark代码分三个境界:
第一境界:代码能跑起来(入门)
第二境界:代码跑得又快又稳(进阶)
第三境界:代码跑崩了知道怎么修(高手)
大多数人停留在第一境界,能跑就行,跑不动就加内存。
但当你的任务从处理1GB变成处理1TB,从跑1分钟变成跑1天,你就会发现:不懂原理的Spark,就像不懂发动机的老司机,迟早要在高速上抛锚。
本篇是Spark系列的第三篇,我们将深入Spark内核,从Job到Task,从Shuffle到内存管理,彻底搞懂Spark是怎么跑的,以及怎么让它跑得更快。
一、Spark作业执行全流程
1.1 从代码到执行
graph TB
subgraph 用户代码["👨💻 用户代码"]
A[Spark Application]
end
subgraph Driver端["🚗 Driver端"]
B[SparkContext] --> C[DAGScheduler]
C --> D[TaskScheduler]
end
subgraph 集群["🖥️ 集群"]
D --> E1[Executor 1]
D --> E2[Executor 2]
D --> E3[Executor N]
E1 --> T1[Task]
E1 --> T2[Task]
E2 --> T3[Task]
E2 --> T4[Task]
E3 --> T5[Task]
end
A --> B
style B fill:#e74c3c,color:#fff
style C fill:#3498db,color:#fff
style D fill:#2ecc71,color:#fff
1.2 Job、Stage、Task的关系
graph TB
subgraph Application["📱 Application"]
subgraph Job1["Job 1(由Action触发)"]
subgraph Stage1["Stage 1(ShuffleMapStage)"]
T1[Task 1.1]
T2[Task 1.2]
T3[Task 1.3]
end
subgraph Stage2["Stage 2(ResultStage)"]
T4[Task 2.1]
T5[Task 2.2]
end
Stage1 --> |"Shuffle"|Stage2
end
subgraph Job2["Job 2"]
subgraph Stage3["Stage 3"]
T6[Task 3.1]
T7[Task 3.2]
end
end
end
style Job1 fill:#ff6b6b
style Job2 fill:#4ecdc4
style Stage1 fill:#ffe66d
style Stage2 fill:#ffe66d
关系总结:
- 一个Application包含多个Job
- 一个Job由一个Action操作触发
- 一个Job包含多个Stage
- Stage之间以Shuffle为边界划分
- 一个Stage包含多个Task
- Task数量 = 该Stage最后一个RDD的分区数
1.3 Stage划分原理
Stage划分的关键:宽依赖(Shuffle)
graph LR
subgraph 窄依赖["窄依赖(同一Stage)"]
A1[Partition] --> B1[Partition]
A2[Partition] --> B2[Partition]
A3[Partition] --> B3[Partition]
end
subgraph 宽依赖["宽依赖(划分Stage)"]
C1[Partition] --> D1[Partition]
C1[Partition] --> D2[Partition]
C2[Partition] --> D1
C2[Partition] --> D2
C3[Partition] --> D1
C3[Partition] --> D2
end
style 窄依赖 fill:#4ecdc4
style 宽依赖 fill:#ff6b6b
窄依赖:一个父分区对应一个子分区(map、filter、union)
宽依赖:一个父分区对应多个子分区(groupByKey、reduceByKey、join)
// 示例:这段代码会产生几个Stage?
val result = sc.textFile("data.txt") // Stage 1 开始
.flatMap(_.split(" ")) // 窄依赖
.map(word => (word, 1)) // 窄依赖
.reduceByKey(_ + _) // 宽依赖!Stage 1 结束,Stage 2 开始
.filter(_._2 > 10) // 窄依赖
.collect() // Action,触发Job
// 答案:2个Stage
// Stage 1: textFile -> flatMap -> map
// Stage 2: reduceByKey -> filter -> collect1.4 DAGScheduler工作流程
sequenceDiagram
participant User as 用户代码
participant SC as SparkContext
participant DAG as DAGScheduler
participant Task as TaskScheduler
participant Exec as Executor
User->>SC: rdd.collect() (Action)
SC->>DAG: 提交Job
DAG->>DAG: 1. 构建DAG
DAG->>DAG: 2. 划分Stage(从后往前,遇Shuffle切分)
DAG->>DAG: 3. 生成TaskSet
loop 每个Stage
DAG->>Task: 提交TaskSet
Task->>Exec: 调度Task到Executor
Exec-->>Task: Task执行完成
Task-->>DAG: Stage完成
end
DAG-->>SC: Job完成
SC-->>User: 返回结果
二、Shuffle深度剖析
2.1 什么是Shuffle?
Shuffle是分布式计算的性能杀手,它涉及:
- 数据序列化
- 磁盘IO
- 网络传输
- 数据反序列化
graph TB
subgraph Shuffle过程["🔀 Shuffle过程"]
subgraph MapSide["Map端(Shuffle Write)"]
M1[Mapper 1] --> |"分区、排序、写磁盘"|F1[Shuffle文件]
M2[Mapper 2] --> F2[Shuffle文件]
M3[Mapper 3] --> F3[Shuffle文件]
end
subgraph Network["网络传输"]
F1 --> |"拉取"|R1[Reducer 1]
F1 --> |"拉取"|R2[Reducer 2]
F2 --> |"拉取"|R1
F2 --> |"拉取"|R2
F3 --> |"拉取"|R1
F3 --> |"拉取"|R2
end
subgraph ReduceSide["Reduce端(Shuffle Read)"]
R1 --> |"合并、聚合"|Result1[结果1]
R2 --> |"合并、聚合"|Result2[结果2]
end
end
style MapSide fill:#ff6b6b
style Network fill:#ffe66d
style ReduceSide fill:#4ecdc4
2.2 Sort-Based Shuffle
Spark 2.0后默认使用Sort-Based Shuffle。
graph TB
subgraph SortShuffle["Sort-Based Shuffle流程"]
subgraph MapTask["Map Task"]
A[数据记录] --> B[写入内存缓冲区
PartitionedAppendOnlyMap] B --> C{缓冲区满?} C --> |"是"|D[排序并溢写到磁盘] C --> |"否"|B D --> E[继续写入] E --> B end subgraph 最终合并["Task结束"] F[合并所有溢写文件] F --> G[生成一个数据文件] F --> H[生成一个索引文件] end D --> F subgraph 输出["输出文件"] G --> G1["shuffle_0_0_0.data
(所有分区数据)"] H --> H1["shuffle_0_0_0.index
(各分区偏移量)"] end end style B fill:#ff6b6b style D fill:#ffe66d style G fill:#4ecdc4
PartitionedAppendOnlyMap] B --> C{缓冲区满?} C --> |"是"|D[排序并溢写到磁盘] C --> |"否"|B D --> E[继续写入] E --> B end subgraph 最终合并["Task结束"] F[合并所有溢写文件] F --> G[生成一个数据文件] F --> H[生成一个索引文件] end D --> F subgraph 输出["输出文件"] G --> G1["shuffle_0_0_0.data
(所有分区数据)"] H --> H1["shuffle_0_0_0.index
(各分区偏移量)"] end end style B fill:#ff6b6b style D fill:#ffe66d style G fill:#4ecdc4
2.3 Shuffle Write详解
sequenceDiagram
participant Task as Map Task
participant Buffer as 内存缓冲区
participant Disk as 磁盘
participant Index as 索引文件
loop 处理每条记录
Task->>Buffer: 写入(partitionId, key, value)
alt 缓冲区满
Buffer->>Buffer: 按(partitionId, key)排序
Buffer->>Disk: 溢写到临时文件
end
end
Note over Task: Task结束
Task->>Disk: 合并所有溢写文件
Task->>Index: 写入各分区偏移量
Note over Disk,Index: 最终每个Map Task产生
1个数据文件 + 1个索引文件
1个数据文件 + 1个索引文件
2.4 Shuffle Read详解
graph TB
subgraph ShuffleRead["Shuffle Read流程"]
A[Reduce Task启动] --> B[向Driver获取Map输出位置]
B --> C[并行拉取各Map的数据]
C --> D{数据量大?}
D --> |"是"|E[边拉取边溢写]
D --> |"否"|F[全部放内存]
E --> G[外部排序合并]
F --> H[内存中聚合]
G --> I[输出结果]
H --> I
end
style C fill:#ff6b6b
style G fill:#ffe66d
2.5 Shuffle调优参数
| 参数 | 默认值 | 说明 |
|---|---|---|
| spark.shuffle.file.buffer | 32k | Shuffle写缓冲区 |
| spark.reducer.maxSizeInFlight | 48m | Reduce端拉取缓冲区 |
| spark.shuffle.io.maxRetries | 3 | 拉取失败重试次数 |
| spark.shuffle.io.retryWait | 5s | 重试等待时间 |
| spark.shuffle.sort.bypassMergeThreshold | 200 | 绕过排序的分区阈值 |
| spark.shuffle.compress | true | 是否压缩Shuffle数据 |
| spark.shuffle.spill.compress | true | 是否压缩溢写数据 |
// 优化配置示例
spark.conf.set("spark.shuffle.file.buffer", "64k")
spark.conf.set("spark.reducer.maxSizeInFlight", "96m")
spark.conf.set("spark.shuffle.io.maxRetries", "6")
spark.conf.set("spark.shuffle.compress", "true")三、内存管理机制
3.1 Executor内存布局
graph TB
subgraph Executor内存["🧠 Executor内存布局(统一内存管理)"]
Total["总内存
spark.executor.memory"] Total --> Reserved["预留内存
300MB固定"] Total --> Usable["可用内存
(总内存 - 300MB)"] Usable --> Unified["统一内存
spark.memory.fraction=0.6"] Usable --> User["用户内存
剩余40%"] Unified --> Execution["执行内存
Shuffle/Join/Sort/Aggregation"] Unified --> Storage["存储内存
Cache/Broadcast"] Execution <--> |"动态占用"|Storage end style Total fill:#ff6b6b style Unified fill:#4ecdc4 style Execution fill:#ffe66d style Storage fill:#3498db
spark.executor.memory"] Total --> Reserved["预留内存
300MB固定"] Total --> Usable["可用内存
(总内存 - 300MB)"] Usable --> Unified["统一内存
spark.memory.fraction=0.6"] Usable --> User["用户内存
剩余40%"] Unified --> Execution["执行内存
Shuffle/Join/Sort/Aggregation"] Unified --> Storage["存储内存
Cache/Broadcast"] Execution <--> |"动态占用"|Storage end style Total fill:#ff6b6b style Unified fill:#4ecdc4 style Execution fill:#ffe66d style Storage fill:#3498db
3.2 内存计算示例
假设:spark.executor.memory = 10GB
预留内存 = 300MB(固定)
可用内存 = 10GB - 300MB ≈ 9.7GB
统一内存 = 9.7GB × 0.6 ≈ 5.8GB
├── 执行内存(初始)= 5.8GB × 0.5 ≈ 2.9GB
└── 存储内存(初始)= 5.8GB × 0.5 ≈ 2.9GB
用户内存 = 9.7GB × 0.4 ≈ 3.9GB
└── 用户代码中创建的对象3.3 统一内存管理
graph LR
subgraph 动态占用["🔄 执行/存储内存动态占用"]
subgraph 场景1["场景1:存储内存不足"]
E1[执行内存
空闲2GB] --> S1[存储内存
占用执行内存] end subgraph 场景2["场景2:执行内存不足"] S2[存储内存
被强制释放] --> E2[执行内存
抢占存储内存] end end style E1 fill:#ffe66d style S1 fill:#3498db style S2 fill:#3498db style E2 fill:#ffe66d
空闲2GB] --> S1[存储内存
占用执行内存] end subgraph 场景2["场景2:执行内存不足"] S2[存储内存
被强制释放] --> E2[执行内存
抢占存储内存] end end style E1 fill:#ffe66d style S1 fill:#3498db style S2 fill:#3498db style E2 fill:#ffe66d
关键规则:
- 执行内存和存储内存可以互相借用
- 存储内存被执行内存借用后,可以被强制释放
- 执行内存被存储内存借用后,不能被强制释放(等任务完成)
3.4 堆外内存
graph TB
subgraph 内存类型["📦 内存类型"]
subgraph 堆内内存["堆内内存(On-Heap)"]
OH1["受JVM GC管理"]
OH2["默认使用"]
OH3["有GC开销"]
end
subgraph 堆外内存["堆外内存(Off-Heap)"]
OF1["不受JVM GC管理"]
OF2["需要手动开启"]
OF3["减少GC,但需要手动管理"]
end
end
style 堆内内存 fill:#4ecdc4
style 堆外内存 fill:#ff6b6b
// 开启堆外内存
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")3.5 OOM问题排查
flowchart TB
Start[OOM发生] --> Q1{哪里OOM?}
Q1 --> |"Driver"|D1[Driver内存不足]
Q1 --> |"Executor"|E1[Executor内存不足]
D1 --> D2["可能原因:
1. collect大量数据
2. 广播大变量
3. Driver端聚合"] D2 --> D3["解决:
1. 增加driver-memory
2. 避免collect
3. 使用take/sample"] E1 --> E2{OOM类型?} E2 --> |"堆内存"|H1["可能原因:
1. 数据量太大
2. 数据倾斜
3. Cache过多"] E2 --> |"堆外内存"|O1["可能原因:
1. Shuffle数据量大
2. 堆外内存设置小"] H1 --> H2["解决:
1. 增加executor-memory
2. 增加分区数
3. 处理数据倾斜"] O1 --> O2["解决:
1. 增加堆外内存
2. 减少并发度"] style Start fill:#ff6b6b style D3 fill:#4ecdc4 style H2 fill:#4ecdc4 style O2 fill:#4ecdc4
1. collect大量数据
2. 广播大变量
3. Driver端聚合"] D2 --> D3["解决:
1. 增加driver-memory
2. 避免collect
3. 使用take/sample"] E1 --> E2{OOM类型?} E2 --> |"堆内存"|H1["可能原因:
1. 数据量太大
2. 数据倾斜
3. Cache过多"] E2 --> |"堆外内存"|O1["可能原因:
1. Shuffle数据量大
2. 堆外内存设置小"] H1 --> H2["解决:
1. 增加executor-memory
2. 增加分区数
3. 处理数据倾斜"] O1 --> O2["解决:
1. 增加堆外内存
2. 减少并发度"] style Start fill:#ff6b6b style D3 fill:#4ecdc4 style H2 fill:#4ecdc4 style O2 fill:#4ecdc4
四、数据倾斜深度解决
4.1 什么是数据倾斜?
graph TB
subgraph 正常分布["✅ 正常数据分布"]
N1[Task 1
100万条
⏱️ 10s] N2[Task 2
98万条
⏱️ 10s] N3[Task 3
102万条
⏱️ 10s] N4[Task 4
100万条
⏱️ 10s] N1 --> NR[总耗时:10s] N2 --> NR N3 --> NR N4 --> NR end subgraph 数据倾斜["❌ 数据倾斜"] S1[Task 1
10万条
⏱️ 1s] S2[Task 2
5万条
⏱️ 0.5s] S3[Task 3
380万条
⏱️ 60s 😱] S4[Task 4
5万条
⏱️ 0.5s] S1 --> SR[总耗时:60s ❌] S2 --> SR S3 --> SR S4 --> SR end style S3 fill:#ff0000,color:#fff style SR fill:#ff6b6b
100万条
⏱️ 10s] N2[Task 2
98万条
⏱️ 10s] N3[Task 3
102万条
⏱️ 10s] N4[Task 4
100万条
⏱️ 10s] N1 --> NR[总耗时:10s] N2 --> NR N3 --> NR N4 --> NR end subgraph 数据倾斜["❌ 数据倾斜"] S1[Task 1
10万条
⏱️ 1s] S2[Task 2
5万条
⏱️ 0.5s] S3[Task 3
380万条
⏱️ 60s 😱] S4[Task 4
5万条
⏱️ 0.5s] S1 --> SR[总耗时:60s ❌] S2 --> SR S3 --> SR S4 --> SR end style S3 fill:#ff0000,color:#fff style SR fill:#ff6b6b
4.2 如何发现数据倾斜?
graph TB
subgraph 诊断方法["🔍 数据倾斜诊断"]
A[Spark UI] --> B[查看Stage详情]
B --> C[比较Task执行时间]
C --> D{时间差异大?}
D --> |"是"|E[数据倾斜!]
D --> |"否"|F[正常]
G[代码分析] --> H["查看key分布
df.groupBy('key').count()"] H --> I{某key数量异常大?} I --> |"是"|E I --> |"否"|F end style E fill:#ff6b6b style F fill:#4ecdc4
df.groupBy('key').count()"] H --> I{某key数量异常大?} I --> |"是"|E I --> |"否"|F end style E fill:#ff6b6b style F fill:#4ecdc4
4.3 八大解决方案
方案一:过滤异常Key
// 场景:大量NULL或异常值
// 解决:直接过滤
val df = spark.read.parquet("data")
.filter($"key".isNotNull)
.filter($"key" =!= "")
.filter($"key" =!= "NULL")方案二:提高并行度
// 场景:数据量大但分区少
// 解决:增加分区数
// 方式1:设置全局参数
spark.conf.set("spark.sql.shuffle.partitions", "1000")
// 方式2:repartition
df.repartition(1000)
// 方式3:重新分区到特定列
df.repartition(1000, $"key")方案三:双重聚合(加盐)
// 场景:聚合操作数据倾斜
// 解决:两阶段聚合
// 原始倾斜代码
val result = df.groupBy("city").agg(sum("amount"))
// 优化后
import scala.util.Random
val salted = df.withColumn("salted_city",
concat($"city", lit("_"), lit(Random.nextInt(100))))
val partialAgg = salted
.groupBy("salted_city")
.agg(sum("amount").as("partial_sum"))
val finalResult = partialAgg
.withColumn("city", split($"salted_city", "_")(0))
.groupBy("city")
.agg(sum("partial_sum").as("total_amount"))graph TB
subgraph 双重聚合["🧂 双重聚合(加盐)"]
subgraph 原始["原始数据"]
O1["Beijing: 1000万条"]
O2["Shanghai: 100万条"]
end
subgraph 加盐后["加盐后"]
S1["Beijing_0: 100万条"]
S2["Beijing_1: 100万条"]
S3["Beijing_..."]
S4["Beijing_99: 100万条"]
S5["Shanghai_0~99: 各1万条"]
end
subgraph 一次聚合["第一次聚合"]
A1["Beijing_0: sum"]
A2["Beijing_1: sum"]
A3["..."]
A4["Beijing_99: sum"]
end
subgraph 二次聚合["第二次聚合"]
F1["Beijing: total_sum"]
end
原始 --> 加盐后 --> 一次聚合 --> 二次聚合
end
style O1 fill:#ff6b6b
style F1 fill:#4ecdc4
方案四:Broadcast Join
// 场景:大表Join小表,小表倾斜
// 解决:广播小表,避免Shuffle
import org.apache.spark.sql.functions.broadcast
// 小表(< 10MB)
val smallDF = spark.table("dim_city")
// 大表
val largeDF = spark.table("fact_orders")
// 广播小表
val result = largeDF.join(broadcast(smallDF), Seq("city_id"))
// 调整广播阈值
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100MB")方案五:倾斜Key单独处理
// 场景:只有少数Key倾斜
// 解决:分离处理
// 找出倾斜的Key
val skewedKeys = df.groupBy("key")
.count()
.filter($"count" > 1000000)
.select("key")
.collect()
.map(_.getString(0))
.toSet
val skewedKeysBC = spark.sparkContext.broadcast(skewedKeys)
// 分离数据
val normalData = df.filter(!$"key".isin(skewedKeys.toSeq: _*))
val skewedData = df.filter($"key".isin(skewedKeys.toSeq: _*))
// 正常数据正常处理
val normalResult = normalData.groupBy("key").agg(sum("value"))
// 倾斜数据特殊处理(如加盐、或提高并行度)
val skewedResult = skewedData
.repartition(1000)
.groupBy("key")
.agg(sum("value"))
// 合并结果
val finalResult = normalResult.union(skewedResult)方案六:使用Map端聚合
// 场景:reduceByKey类操作
// 解决:使用reduceByKey而非groupByKey
// ❌ 不推荐:groupByKey(无Map端聚合)
rdd.groupByKey().mapValues(_.sum)
// ✅ 推荐:reduceByKey(有Map端聚合)
rdd.reduceByKey(_ + _)
// ✅ 推荐:aggregateByKey
rdd.aggregateByKey(0)(
(acc, value) => acc + value, // 分区内聚合
(acc1, acc2) => acc1 + acc2 // 分区间聚合
)方案七:调整Join策略
// Skew Join(Spark 3.0+ AQE)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")方案八:自定义Partitioner
// 场景:需要精确控制数据分布
// 解决:自定义分区器
class CustomPartitioner(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[String]
if (k == "热点Key") {
// 热点Key随机分配到多个分区
Random.nextInt(numParts / 10)
} else {
// 其他Key正常Hash
Math.abs(k.hashCode % numParts)
}
}
}
rdd.partitionBy(new CustomPartitioner(1000))4.4 方案选择指南
flowchart TB
Start[数据倾斜] --> Q1{倾斜类型?}
Q1 --> |"聚合倾斜"|A1{Key数量多?}
Q1 --> |"Join倾斜"|B1{小表 < 1GB?}
A1 --> |"是"|A2[双重聚合/加盐]
A1 --> |"否
少数Key倾斜"|A3[单独处理倾斜Key] B1 --> |"是"|B2[Broadcast Join] B1 --> |"否"|B3{Spark 3.0+?} B3 --> |"是"|B4[开启AQE Skew Join] B3 --> |"否"|B5[倾斜Key单独处理+加盐] style A2 fill:#4ecdc4 style B2 fill:#4ecdc4 style B4 fill:#4ecdc4
少数Key倾斜"|A3[单独处理倾斜Key] B1 --> |"是"|B2[Broadcast Join] B1 --> |"否"|B3{Spark 3.0+?} B3 --> |"是"|B4[开启AQE Skew Join] B3 --> |"否"|B5[倾斜Key单独处理+加盐] style A2 fill:#4ecdc4 style B2 fill:#4ecdc4 style B4 fill:#4ecdc4
五、Spark UI深度解读
5.1 UI页面结构
graph TB
subgraph SparkUI["🖥️ Spark UI页面"]
A[Jobs] --> A1[所有Job列表
执行状态、时间] B[Stages] --> B1[所有Stage列表
Task分布、Shuffle] C[Storage] --> C1[缓存的RDD/DataFrame
内存使用] D[Environment] --> D1[配置参数
运行环境] E[Executors] --> E1[Executor状态
内存、GC、Task] F[SQL] --> F1[SQL执行计划
详细指标] end style A fill:#ff6b6b style B fill:#ff6b6b style E fill:#4ecdc4 style F fill:#4ecdc4
执行状态、时间] B[Stages] --> B1[所有Stage列表
Task分布、Shuffle] C[Storage] --> C1[缓存的RDD/DataFrame
内存使用] D[Environment] --> D1[配置参数
运行环境] E[Executors] --> E1[Executor状态
内存、GC、Task] F[SQL] --> F1[SQL执行计划
详细指标] end style A fill:#ff6b6b style B fill:#ff6b6b style E fill:#4ecdc4 style F fill:#4ecdc4
5.2 关键指标解读
Jobs页面
Job 0 (成功)
├── 描述: count at MyApp.scala:25
├── 提交时间: 2024-01-15 10:30:00
├── 持续时间: 2.5 min
├── Stages: 3/3 (成功)
└── Tasks: 1000/1000 (成功)Stages页面
Stage 2 (成功)
├── 描述: collect at MyApp.scala:30
├── 持续时间: 1.5 min
├── Input: 10 GB / 1000 records
├── Output: 100 MB / 100 records
├── Shuffle Read: 5 GB
├── Shuffle Write: 2 GB
└── Tasks:
├── 成功: 200
├── 失败: 0
├── 总计: 200
└── 时间分布:
├── 最小: 2s
├── 中位数: 30s
├── 最大: 5min ← 可能数据倾斜!
└── 75%分位: 45s5.3 问题诊断流程
flowchart TB
subgraph 诊断流程["🔍 性能问题诊断"]
A[任务慢] --> B[打开Spark UI]
B --> C[查看Jobs页面]
C --> D{哪个Job慢?}
D --> E[进入Job详情]
E --> F{哪个Stage慢?}
F --> G[进入Stage详情]
G --> H[查看Task时间分布]
H --> I{Task时间差异大?}
I --> |"是"|J[数据倾斜
查看Shuffle Read] I --> |"否"|K{Task都很慢?} K --> |"是"|L[资源不足/代码问题] K --> |"否"|M[正常] J --> N[解决数据倾斜] L --> O[增加资源/优化代码] end style J fill:#ff6b6b style N fill:#4ecdc4 style O fill:#4ecdc4
查看Shuffle Read] I --> |"否"|K{Task都很慢?} K --> |"是"|L[资源不足/代码问题] K --> |"否"|M[正常] J --> N[解决数据倾斜] L --> O[增加资源/优化代码] end style J fill:#ff6b6b style N fill:#4ecdc4 style O fill:#4ecdc4
5.4 SQL页面解读
Query 0
├── 描述: SELECT city, sum(amount) FROM orders GROUP BY city
├── 提交时间: 2024-01-15 10:30:00
├── 持续时间: 30s
├── 扫描行数: 1,000,000,000
├── 输出行数: 100
└── 执行计划:
*(2) HashAggregate(keys=[city], functions=[sum(amount)])
+- Exchange hashpartitioning(city, 200)
+- *(1) HashAggregate(keys=[city], functions=[partial_sum(amount)])
+- *(1) FileScan parquet [city, amount]
PartitionFilters: []
PushedFilters: []
ReadSchema: struct<city:string,amount:decimal(10,2)>关键点:
Exchange表示ShuffleHashAggregate分两阶段(partial_sum是Map端聚合)PushedFilters显示下推的过滤条件
六、实战调优案例
6.1 案例一:百亿数据Join优化
问题:两个大表Join,跑了6小时还没完成
// 原始代码
val orders = spark.table("orders") // 100亿条
val users = spark.table("users") // 10亿条
val result = orders.join(users, Seq("user_id"))分析:
- 两张都是大表,无法Broadcast
- Shuffle数据量巨大
- 存在数据倾斜(大V用户订单多)
优化方案:
// 1. 开启AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
// 2. 增加Shuffle分区
spark.conf.set("spark.sql.shuffle.partitions", "2000")
// 3. 分离倾斜Key(大V用户)
val topUsers = orders.groupBy("user_id")
.count()
.filter($"count" > 1000000)
.select("user_id")
val topUsersBroadcast = broadcast(
users.join(topUsers, Seq("user_id"))
)
// 普通用户正常Join
val normalOrders = orders.join(
topUsers.select($"user_id", lit(1).as("is_top")),
Seq("user_id"),
"left_anti" // 排除大V
)
val normalResult = normalOrders.join(users, Seq("user_id"))
// 大V用户Broadcast Join
val topOrders = orders.join(
topUsers.select($"user_id", lit(1).as("is_top")),
Seq("user_id"),
"left_semi" // 只保留大V
)
val topResult = topOrders.join(topUsersBroadcast, Seq("user_id"))
// 合并结果
val finalResult = normalResult.union(topResult)效果:从6小时优化到30分钟
6.2 案例二:OOM问题解决
问题:Executor频繁OOM
java.lang.OutOfMemoryError: Java heap space分析步骤:
// 1. 查看数据量
df.count() // 10亿条
df.rdd.partitions.size // 只有100个分区!
// 2. 计算每个分区数据量
10亿 / 100 = 1000万条/分区
// 假设每条1KB,每个分区约10GB!优化方案:
// 1. 增加分区数
df.repartition(2000) // 增加到2000个分区
// 2. 增加Executor内存
--executor-memory 8g
// 3. 增加Executor数量
--num-executors 100
// 4. 减少单个Executor的核数(减少并发)
--executor-cores 2
// 5. 使用更高效的序列化
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")6.3 案例三:小文件问题
问题:读取10万个小文件,任务启动就花了30分钟
分析:
- 每个小文件一个Task
- Task调度开销远大于实际处理时间
优化方案:
// 1. 合并小文件读取
spark.conf.set("spark.sql.files.maxPartitionBytes", "128MB")
spark.conf.set("spark.sql.files.openCostInBytes", "4MB")
// 2. 预处理:合并小文件
val df = spark.read.parquet("small_files/*")
df.coalesce(100).write.parquet("merged_files")
// 3. 使用CombineFilesInputFormat(底层优化)
spark.conf.set("spark.hadoop.mapreduce.input.fileinputformat.split.minsize", "134217728")七、调优参数速查表
7.1 资源配置
| 参数 | 默认值 | 说明 | 建议 |
|---|---|---|---|
| spark.executor.memory | 1g | Executor堆内存 | 4-8g |
| spark.executor.cores | 1 | Executor核数 | 2-4 |
| spark.executor.instances | 2 | Executor数量 | 按需 |
| spark.driver.memory | 1g | Driver内存 | 2-4g |
| spark.driver.cores | 1 | Driver核数 | 1-2 |
7.2 Shuffle配置
| 参数 | 默认值 | 说明 | 建议 |
|---|---|---|---|
| spark.sql.shuffle.partitions | 200 | SQL Shuffle分区 | 数据量/128MB |
| spark.default.parallelism | - | RDD默认并行度 | executor数×cores×2~3 |
| spark.shuffle.file.buffer | 32k | Shuffle写缓冲 | 64k |
| spark.reducer.maxSizeInFlight | 48m | Shuffle读缓冲 | 96m |
7.3 内存配置
| 参数 | 默认值 | 说明 | 建议 |
|---|---|---|---|
| spark.memory.fraction | 0.6 | 统一内存占比 | 0.6-0.8 |
| spark.memory.storageFraction | 0.5 | 存储内存初始占比 | 0.5 |
| spark.memory.offHeap.enabled | false | 堆外内存 | 大数据量开启 |
| spark.memory.offHeap.size | 0 | 堆外内存大小 | 按需 |
7.4 SQL配置
| 参数 | 默认值 | 说明 | 建议 |
|---|---|---|---|
| spark.sql.adaptive.enabled | true | 自适应执行 | true |
| spark.sql.autoBroadcastJoinThreshold | 10MB | 广播阈值 | 100MB |
| spark.sql.adaptive.skewJoin.enabled | true | 倾斜Join处理 | true |
八、写在最后
调优是一门艺术,没有银弹,只有不断尝试。
记住几个原则:
- 先跑通,再优化:别一上来就调参数
- 数据说话:用Spark UI分析,不要猜
- 从大到小:先解决最大的瓶颈
- 记录变化:每次调优记录参数和效果
最后送大家一句话:
调优的最高境界是不用调优——设计好的数据架构和代码,比任何参数都重要。
本文作者:一个与OOM搏斗多年的老兵
最惨经历:调了一天参数,最后发现是代码死循环
附录:面试高频题
Spark的Stage是怎么划分的?
从后往前回溯,遇到宽依赖(Shuffle)就切分Stage。窄依赖在同一Stage内。
Spark的内存管理是怎样的?
统一内存管理:执行内存和存储内存共享,可互相借用。执行内存可强制释放存储内存。
什么是数据倾斜?怎么解决?
某个Key数据量远大于其他Key,导致Task执行时间不均衡。解决:加盐、Broadcast Join、AQE等。
Shuffle的过程是怎样的?
Map端:分区→排序→溢写→合并;Reduce端:拉取→合并→聚合。
如何定位Spark性能问题?
通过Spark UI查看Job/Stage/Task执行情况,关注时间分布、Shuffle数据量、GC时间等指标。
AQE有什么作用?
运行时动态优化:合并小分区、切换Join策略、处理数据倾斜。