搜 索

Spark从入门到放弃③之内核原理与调优

  • 13阅读
  • 2023年03月18日
  • 0评论
首页 / AI/大数据 / 正文

前言:从"能跑"到"跑得好"

写Spark代码分三个境界:

第一境界:代码能跑起来(入门)
第二境界:代码跑得又快又稳(进阶)
第三境界:代码跑崩了知道怎么修(高手)

大多数人停留在第一境界,能跑就行,跑不动就加内存。

但当你的任务从处理1GB变成处理1TB,从跑1分钟变成跑1天,你就会发现:不懂原理的Spark,就像不懂发动机的老司机,迟早要在高速上抛锚。

本篇是Spark系列的第三篇,我们将深入Spark内核,从Job到Task,从Shuffle到内存管理,彻底搞懂Spark是怎么跑的,以及怎么让它跑得更快。


一、Spark作业执行全流程

1.1 从代码到执行

graph TB subgraph 用户代码["👨‍💻 用户代码"] A[Spark Application] end subgraph Driver端["🚗 Driver端"] B[SparkContext] --> C[DAGScheduler] C --> D[TaskScheduler] end subgraph 集群["🖥️ 集群"] D --> E1[Executor 1] D --> E2[Executor 2] D --> E3[Executor N] E1 --> T1[Task] E1 --> T2[Task] E2 --> T3[Task] E2 --> T4[Task] E3 --> T5[Task] end A --> B style B fill:#e74c3c,color:#fff style C fill:#3498db,color:#fff style D fill:#2ecc71,color:#fff

1.2 Job、Stage、Task的关系

graph TB subgraph Application["📱 Application"] subgraph Job1["Job 1(由Action触发)"] subgraph Stage1["Stage 1(ShuffleMapStage)"] T1[Task 1.1] T2[Task 1.2] T3[Task 1.3] end subgraph Stage2["Stage 2(ResultStage)"] T4[Task 2.1] T5[Task 2.2] end Stage1 --> |"Shuffle"|Stage2 end subgraph Job2["Job 2"] subgraph Stage3["Stage 3"] T6[Task 3.1] T7[Task 3.2] end end end style Job1 fill:#ff6b6b style Job2 fill:#4ecdc4 style Stage1 fill:#ffe66d style Stage2 fill:#ffe66d

关系总结

  • 一个Application包含多个Job
  • 一个Job由一个Action操作触发
  • 一个Job包含多个Stage
  • Stage之间以Shuffle为边界划分
  • 一个Stage包含多个Task
  • Task数量 = 该Stage最后一个RDD的分区数

1.3 Stage划分原理

Stage划分的关键:宽依赖(Shuffle)

graph LR subgraph 窄依赖["窄依赖(同一Stage)"] A1[Partition] --> B1[Partition] A2[Partition] --> B2[Partition] A3[Partition] --> B3[Partition] end subgraph 宽依赖["宽依赖(划分Stage)"] C1[Partition] --> D1[Partition] C1[Partition] --> D2[Partition] C2[Partition] --> D1 C2[Partition] --> D2 C3[Partition] --> D1 C3[Partition] --> D2 end style 窄依赖 fill:#4ecdc4 style 宽依赖 fill:#ff6b6b

窄依赖:一个父分区对应一个子分区(map、filter、union)
宽依赖:一个父分区对应多个子分区(groupByKey、reduceByKey、join)

// 示例:这段代码会产生几个Stage?
val result = sc.textFile("data.txt")      // Stage 1 开始
  .flatMap(_.split(" "))                   // 窄依赖
  .map(word => (word, 1))                  // 窄依赖
  .reduceByKey(_ + _)                      // 宽依赖!Stage 1 结束,Stage 2 开始
  .filter(_._2 > 10)                       // 窄依赖
  .collect()                               // Action,触发Job

// 答案:2个Stage
// Stage 1: textFile -> flatMap -> map
// Stage 2: reduceByKey -> filter -> collect

1.4 DAGScheduler工作流程

sequenceDiagram participant User as 用户代码 participant SC as SparkContext participant DAG as DAGScheduler participant Task as TaskScheduler participant Exec as Executor User->>SC: rdd.collect() (Action) SC->>DAG: 提交Job DAG->>DAG: 1. 构建DAG DAG->>DAG: 2. 划分Stage(从后往前,遇Shuffle切分) DAG->>DAG: 3. 生成TaskSet loop 每个Stage DAG->>Task: 提交TaskSet Task->>Exec: 调度Task到Executor Exec-->>Task: Task执行完成 Task-->>DAG: Stage完成 end DAG-->>SC: Job完成 SC-->>User: 返回结果

二、Shuffle深度剖析

2.1 什么是Shuffle?

Shuffle是分布式计算的性能杀手,它涉及:

  • 数据序列化
  • 磁盘IO
  • 网络传输
  • 数据反序列化
graph TB subgraph Shuffle过程["🔀 Shuffle过程"] subgraph MapSide["Map端(Shuffle Write)"] M1[Mapper 1] --> |"分区、排序、写磁盘"|F1[Shuffle文件] M2[Mapper 2] --> F2[Shuffle文件] M3[Mapper 3] --> F3[Shuffle文件] end subgraph Network["网络传输"] F1 --> |"拉取"|R1[Reducer 1] F1 --> |"拉取"|R2[Reducer 2] F2 --> |"拉取"|R1 F2 --> |"拉取"|R2 F3 --> |"拉取"|R1 F3 --> |"拉取"|R2 end subgraph ReduceSide["Reduce端(Shuffle Read)"] R1 --> |"合并、聚合"|Result1[结果1] R2 --> |"合并、聚合"|Result2[结果2] end end style MapSide fill:#ff6b6b style Network fill:#ffe66d style ReduceSide fill:#4ecdc4

2.2 Sort-Based Shuffle

Spark 2.0后默认使用Sort-Based Shuffle。

graph TB subgraph SortShuffle["Sort-Based Shuffle流程"] subgraph MapTask["Map Task"] A[数据记录] --> B[写入内存缓冲区
PartitionedAppendOnlyMap] B --> C{缓冲区满?} C --> |"是"|D[排序并溢写到磁盘] C --> |"否"|B D --> E[继续写入] E --> B end subgraph 最终合并["Task结束"] F[合并所有溢写文件] F --> G[生成一个数据文件] F --> H[生成一个索引文件] end D --> F subgraph 输出["输出文件"] G --> G1["shuffle_0_0_0.data
(所有分区数据)"] H --> H1["shuffle_0_0_0.index
(各分区偏移量)"] end end style B fill:#ff6b6b style D fill:#ffe66d style G fill:#4ecdc4

2.3 Shuffle Write详解

sequenceDiagram participant Task as Map Task participant Buffer as 内存缓冲区 participant Disk as 磁盘 participant Index as 索引文件 loop 处理每条记录 Task->>Buffer: 写入(partitionId, key, value) alt 缓冲区满 Buffer->>Buffer: 按(partitionId, key)排序 Buffer->>Disk: 溢写到临时文件 end end Note over Task: Task结束 Task->>Disk: 合并所有溢写文件 Task->>Index: 写入各分区偏移量 Note over Disk,Index: 最终每个Map Task产生
1个数据文件 + 1个索引文件

2.4 Shuffle Read详解

graph TB subgraph ShuffleRead["Shuffle Read流程"] A[Reduce Task启动] --> B[向Driver获取Map输出位置] B --> C[并行拉取各Map的数据] C --> D{数据量大?} D --> |"是"|E[边拉取边溢写] D --> |"否"|F[全部放内存] E --> G[外部排序合并] F --> H[内存中聚合] G --> I[输出结果] H --> I end style C fill:#ff6b6b style G fill:#ffe66d

2.5 Shuffle调优参数

参数默认值说明
spark.shuffle.file.buffer32kShuffle写缓冲区
spark.reducer.maxSizeInFlight48mReduce端拉取缓冲区
spark.shuffle.io.maxRetries3拉取失败重试次数
spark.shuffle.io.retryWait5s重试等待时间
spark.shuffle.sort.bypassMergeThreshold200绕过排序的分区阈值
spark.shuffle.compresstrue是否压缩Shuffle数据
spark.shuffle.spill.compresstrue是否压缩溢写数据
// 优化配置示例
spark.conf.set("spark.shuffle.file.buffer", "64k")
spark.conf.set("spark.reducer.maxSizeInFlight", "96m")
spark.conf.set("spark.shuffle.io.maxRetries", "6")
spark.conf.set("spark.shuffle.compress", "true")

三、内存管理机制

3.1 Executor内存布局

graph TB subgraph Executor内存["🧠 Executor内存布局(统一内存管理)"] Total["总内存
spark.executor.memory"] Total --> Reserved["预留内存
300MB固定"] Total --> Usable["可用内存
(总内存 - 300MB)"] Usable --> Unified["统一内存
spark.memory.fraction=0.6"] Usable --> User["用户内存
剩余40%"] Unified --> Execution["执行内存
Shuffle/Join/Sort/Aggregation"] Unified --> Storage["存储内存
Cache/Broadcast"] Execution <--> |"动态占用"|Storage end style Total fill:#ff6b6b style Unified fill:#4ecdc4 style Execution fill:#ffe66d style Storage fill:#3498db

3.2 内存计算示例

假设:spark.executor.memory = 10GB

预留内存 = 300MB(固定)
可用内存 = 10GB - 300MB ≈ 9.7GB

统一内存 = 9.7GB × 0.6 ≈ 5.8GB
├── 执行内存(初始)= 5.8GB × 0.5 ≈ 2.9GB
└── 存储内存(初始)= 5.8GB × 0.5 ≈ 2.9GB

用户内存 = 9.7GB × 0.4 ≈ 3.9GB
└── 用户代码中创建的对象

3.3 统一内存管理

graph LR subgraph 动态占用["🔄 执行/存储内存动态占用"] subgraph 场景1["场景1:存储内存不足"] E1[执行内存
空闲2GB] --> S1[存储内存
占用执行内存] end subgraph 场景2["场景2:执行内存不足"] S2[存储内存
被强制释放] --> E2[执行内存
抢占存储内存] end end style E1 fill:#ffe66d style S1 fill:#3498db style S2 fill:#3498db style E2 fill:#ffe66d

关键规则

  • 执行内存和存储内存可以互相借用
  • 存储内存被执行内存借用后,可以被强制释放
  • 执行内存被存储内存借用后,不能被强制释放(等任务完成)

3.4 堆外内存

graph TB subgraph 内存类型["📦 内存类型"] subgraph 堆内内存["堆内内存(On-Heap)"] OH1["受JVM GC管理"] OH2["默认使用"] OH3["有GC开销"] end subgraph 堆外内存["堆外内存(Off-Heap)"] OF1["不受JVM GC管理"] OF2["需要手动开启"] OF3["减少GC,但需要手动管理"] end end style 堆内内存 fill:#4ecdc4 style 堆外内存 fill:#ff6b6b
// 开启堆外内存
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")

3.5 OOM问题排查

flowchart TB Start[OOM发生] --> Q1{哪里OOM?} Q1 --> |"Driver"|D1[Driver内存不足] Q1 --> |"Executor"|E1[Executor内存不足] D1 --> D2["可能原因:
1. collect大量数据
2. 广播大变量
3. Driver端聚合"] D2 --> D3["解决:
1. 增加driver-memory
2. 避免collect
3. 使用take/sample"] E1 --> E2{OOM类型?} E2 --> |"堆内存"|H1["可能原因:
1. 数据量太大
2. 数据倾斜
3. Cache过多"] E2 --> |"堆外内存"|O1["可能原因:
1. Shuffle数据量大
2. 堆外内存设置小"] H1 --> H2["解决:
1. 增加executor-memory
2. 增加分区数
3. 处理数据倾斜"] O1 --> O2["解决:
1. 增加堆外内存
2. 减少并发度"] style Start fill:#ff6b6b style D3 fill:#4ecdc4 style H2 fill:#4ecdc4 style O2 fill:#4ecdc4

四、数据倾斜深度解决

4.1 什么是数据倾斜?

graph TB subgraph 正常分布["✅ 正常数据分布"] N1[Task 1
100万条
⏱️ 10s] N2[Task 2
98万条
⏱️ 10s] N3[Task 3
102万条
⏱️ 10s] N4[Task 4
100万条
⏱️ 10s] N1 --> NR[总耗时:10s] N2 --> NR N3 --> NR N4 --> NR end subgraph 数据倾斜["❌ 数据倾斜"] S1[Task 1
10万条
⏱️ 1s] S2[Task 2
5万条
⏱️ 0.5s] S3[Task 3
380万条
⏱️ 60s 😱] S4[Task 4
5万条
⏱️ 0.5s] S1 --> SR[总耗时:60s ❌] S2 --> SR S3 --> SR S4 --> SR end style S3 fill:#ff0000,color:#fff style SR fill:#ff6b6b

4.2 如何发现数据倾斜?

graph TB subgraph 诊断方法["🔍 数据倾斜诊断"] A[Spark UI] --> B[查看Stage详情] B --> C[比较Task执行时间] C --> D{时间差异大?} D --> |"是"|E[数据倾斜!] D --> |"否"|F[正常] G[代码分析] --> H["查看key分布
df.groupBy('key').count()"] H --> I{某key数量异常大?} I --> |"是"|E I --> |"否"|F end style E fill:#ff6b6b style F fill:#4ecdc4

4.3 八大解决方案

方案一:过滤异常Key

// 场景:大量NULL或异常值
// 解决:直接过滤

val df = spark.read.parquet("data")
  .filter($"key".isNotNull)
  .filter($"key" =!= "")
  .filter($"key" =!= "NULL")

方案二:提高并行度

// 场景:数据量大但分区少
// 解决:增加分区数

// 方式1:设置全局参数
spark.conf.set("spark.sql.shuffle.partitions", "1000")

// 方式2:repartition
df.repartition(1000)

// 方式3:重新分区到特定列
df.repartition(1000, $"key")

方案三:双重聚合(加盐)

// 场景:聚合操作数据倾斜
// 解决:两阶段聚合

// 原始倾斜代码
val result = df.groupBy("city").agg(sum("amount"))

// 优化后
import scala.util.Random

val salted = df.withColumn("salted_city", 
  concat($"city", lit("_"), lit(Random.nextInt(100))))

val partialAgg = salted
  .groupBy("salted_city")
  .agg(sum("amount").as("partial_sum"))

val finalResult = partialAgg
  .withColumn("city", split($"salted_city", "_")(0))
  .groupBy("city")
  .agg(sum("partial_sum").as("total_amount"))
graph TB subgraph 双重聚合["🧂 双重聚合(加盐)"] subgraph 原始["原始数据"] O1["Beijing: 1000万条"] O2["Shanghai: 100万条"] end subgraph 加盐后["加盐后"] S1["Beijing_0: 100万条"] S2["Beijing_1: 100万条"] S3["Beijing_..."] S4["Beijing_99: 100万条"] S5["Shanghai_0~99: 各1万条"] end subgraph 一次聚合["第一次聚合"] A1["Beijing_0: sum"] A2["Beijing_1: sum"] A3["..."] A4["Beijing_99: sum"] end subgraph 二次聚合["第二次聚合"] F1["Beijing: total_sum"] end 原始 --> 加盐后 --> 一次聚合 --> 二次聚合 end style O1 fill:#ff6b6b style F1 fill:#4ecdc4

方案四:Broadcast Join

// 场景:大表Join小表,小表倾斜
// 解决:广播小表,避免Shuffle

import org.apache.spark.sql.functions.broadcast

// 小表(< 10MB)
val smallDF = spark.table("dim_city")
// 大表
val largeDF = spark.table("fact_orders")

// 广播小表
val result = largeDF.join(broadcast(smallDF), Seq("city_id"))

// 调整广播阈值
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100MB")

方案五:倾斜Key单独处理

// 场景:只有少数Key倾斜
// 解决:分离处理

// 找出倾斜的Key
val skewedKeys = df.groupBy("key")
  .count()
  .filter($"count" > 1000000)
  .select("key")
  .collect()
  .map(_.getString(0))
  .toSet

val skewedKeysBC = spark.sparkContext.broadcast(skewedKeys)

// 分离数据
val normalData = df.filter(!$"key".isin(skewedKeys.toSeq: _*))
val skewedData = df.filter($"key".isin(skewedKeys.toSeq: _*))

// 正常数据正常处理
val normalResult = normalData.groupBy("key").agg(sum("value"))

// 倾斜数据特殊处理(如加盐、或提高并行度)
val skewedResult = skewedData
  .repartition(1000)
  .groupBy("key")
  .agg(sum("value"))

// 合并结果
val finalResult = normalResult.union(skewedResult)

方案六:使用Map端聚合

// 场景:reduceByKey类操作
// 解决:使用reduceByKey而非groupByKey

// ❌ 不推荐:groupByKey(无Map端聚合)
rdd.groupByKey().mapValues(_.sum)

// ✅ 推荐:reduceByKey(有Map端聚合)
rdd.reduceByKey(_ + _)

// ✅ 推荐:aggregateByKey
rdd.aggregateByKey(0)(
  (acc, value) => acc + value,  // 分区内聚合
  (acc1, acc2) => acc1 + acc2   // 分区间聚合
)

方案七:调整Join策略

// Skew Join(Spark 3.0+ AQE)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

方案八:自定义Partitioner

// 场景:需要精确控制数据分布
// 解决:自定义分区器

class CustomPartitioner(numParts: Int) extends Partitioner {
  override def numPartitions: Int = numParts
  
  override def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[String]
    if (k == "热点Key") {
      // 热点Key随机分配到多个分区
      Random.nextInt(numParts / 10)
    } else {
      // 其他Key正常Hash
      Math.abs(k.hashCode % numParts)
    }
  }
}

rdd.partitionBy(new CustomPartitioner(1000))

4.4 方案选择指南

flowchart TB Start[数据倾斜] --> Q1{倾斜类型?} Q1 --> |"聚合倾斜"|A1{Key数量多?} Q1 --> |"Join倾斜"|B1{小表 < 1GB?} A1 --> |"是"|A2[双重聚合/加盐] A1 --> |"否
少数Key倾斜"|A3[单独处理倾斜Key] B1 --> |"是"|B2[Broadcast Join] B1 --> |"否"|B3{Spark 3.0+?} B3 --> |"是"|B4[开启AQE Skew Join] B3 --> |"否"|B5[倾斜Key单独处理+加盐] style A2 fill:#4ecdc4 style B2 fill:#4ecdc4 style B4 fill:#4ecdc4

五、Spark UI深度解读

5.1 UI页面结构

graph TB subgraph SparkUI["🖥️ Spark UI页面"] A[Jobs] --> A1[所有Job列表
执行状态、时间] B[Stages] --> B1[所有Stage列表
Task分布、Shuffle] C[Storage] --> C1[缓存的RDD/DataFrame
内存使用] D[Environment] --> D1[配置参数
运行环境] E[Executors] --> E1[Executor状态
内存、GC、Task] F[SQL] --> F1[SQL执行计划
详细指标] end style A fill:#ff6b6b style B fill:#ff6b6b style E fill:#4ecdc4 style F fill:#4ecdc4

5.2 关键指标解读

Jobs页面

Job 0 (成功)
├── 描述: count at MyApp.scala:25
├── 提交时间: 2024-01-15 10:30:00
├── 持续时间: 2.5 min
├── Stages: 3/3 (成功)
└── Tasks: 1000/1000 (成功)

Stages页面

Stage 2 (成功)
├── 描述: collect at MyApp.scala:30
├── 持续时间: 1.5 min
├── Input: 10 GB / 1000 records
├── Output: 100 MB / 100 records
├── Shuffle Read: 5 GB
├── Shuffle Write: 2 GB
└── Tasks: 
    ├── 成功: 200
    ├── 失败: 0
    ├── 总计: 200
    └── 时间分布:
        ├── 最小: 2s
        ├── 中位数: 30s
        ├── 最大: 5min  ← 可能数据倾斜!
        └── 75%分位: 45s

5.3 问题诊断流程

flowchart TB subgraph 诊断流程["🔍 性能问题诊断"] A[任务慢] --> B[打开Spark UI] B --> C[查看Jobs页面] C --> D{哪个Job慢?} D --> E[进入Job详情] E --> F{哪个Stage慢?} F --> G[进入Stage详情] G --> H[查看Task时间分布] H --> I{Task时间差异大?} I --> |"是"|J[数据倾斜
查看Shuffle Read] I --> |"否"|K{Task都很慢?} K --> |"是"|L[资源不足/代码问题] K --> |"否"|M[正常] J --> N[解决数据倾斜] L --> O[增加资源/优化代码] end style J fill:#ff6b6b style N fill:#4ecdc4 style O fill:#4ecdc4

5.4 SQL页面解读

Query 0
├── 描述: SELECT city, sum(amount) FROM orders GROUP BY city
├── 提交时间: 2024-01-15 10:30:00
├── 持续时间: 30s
├── 扫描行数: 1,000,000,000
├── 输出行数: 100
└── 执行计划:
    *(2) HashAggregate(keys=[city], functions=[sum(amount)])
    +- Exchange hashpartitioning(city, 200)
       +- *(1) HashAggregate(keys=[city], functions=[partial_sum(amount)])
          +- *(1) FileScan parquet [city, amount]
             PartitionFilters: []
             PushedFilters: []
             ReadSchema: struct<city:string,amount:decimal(10,2)>

关键点

  • Exchange表示Shuffle
  • HashAggregate分两阶段(partial_sum是Map端聚合)
  • PushedFilters显示下推的过滤条件

六、实战调优案例

6.1 案例一:百亿数据Join优化

问题:两个大表Join,跑了6小时还没完成

// 原始代码
val orders = spark.table("orders")      // 100亿条
val users = spark.table("users")        // 10亿条
val result = orders.join(users, Seq("user_id"))

分析

  1. 两张都是大表,无法Broadcast
  2. Shuffle数据量巨大
  3. 存在数据倾斜(大V用户订单多)

优化方案

// 1. 开启AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

// 2. 增加Shuffle分区
spark.conf.set("spark.sql.shuffle.partitions", "2000")

// 3. 分离倾斜Key(大V用户)
val topUsers = orders.groupBy("user_id")
  .count()
  .filter($"count" > 1000000)
  .select("user_id")

val topUsersBroadcast = broadcast(
  users.join(topUsers, Seq("user_id"))
)

// 普通用户正常Join
val normalOrders = orders.join(
  topUsers.select($"user_id", lit(1).as("is_top")),
  Seq("user_id"),
  "left_anti"  // 排除大V
)
val normalResult = normalOrders.join(users, Seq("user_id"))

// 大V用户Broadcast Join
val topOrders = orders.join(
  topUsers.select($"user_id", lit(1).as("is_top")),
  Seq("user_id"),
  "left_semi"  // 只保留大V
)
val topResult = topOrders.join(topUsersBroadcast, Seq("user_id"))

// 合并结果
val finalResult = normalResult.union(topResult)

效果:从6小时优化到30分钟

6.2 案例二:OOM问题解决

问题:Executor频繁OOM

java.lang.OutOfMemoryError: Java heap space

分析步骤

// 1. 查看数据量
df.count()  // 10亿条
df.rdd.partitions.size  // 只有100个分区!

// 2. 计算每个分区数据量
10亿 / 100 = 1000万条/分区
// 假设每条1KB,每个分区约10GB!

优化方案

// 1. 增加分区数
df.repartition(2000)  // 增加到2000个分区

// 2. 增加Executor内存
--executor-memory 8g

// 3. 增加Executor数量
--num-executors 100

// 4. 减少单个Executor的核数(减少并发)
--executor-cores 2

// 5. 使用更高效的序列化
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

6.3 案例三:小文件问题

问题:读取10万个小文件,任务启动就花了30分钟

分析

  • 每个小文件一个Task
  • Task调度开销远大于实际处理时间

优化方案

// 1. 合并小文件读取
spark.conf.set("spark.sql.files.maxPartitionBytes", "128MB")
spark.conf.set("spark.sql.files.openCostInBytes", "4MB")

// 2. 预处理:合并小文件
val df = spark.read.parquet("small_files/*")
df.coalesce(100).write.parquet("merged_files")

// 3. 使用CombineFilesInputFormat(底层优化)
spark.conf.set("spark.hadoop.mapreduce.input.fileinputformat.split.minsize", "134217728")

七、调优参数速查表

7.1 资源配置

参数默认值说明建议
spark.executor.memory1gExecutor堆内存4-8g
spark.executor.cores1Executor核数2-4
spark.executor.instances2Executor数量按需
spark.driver.memory1gDriver内存2-4g
spark.driver.cores1Driver核数1-2

7.2 Shuffle配置

参数默认值说明建议
spark.sql.shuffle.partitions200SQL Shuffle分区数据量/128MB
spark.default.parallelism-RDD默认并行度executor数×cores×2~3
spark.shuffle.file.buffer32kShuffle写缓冲64k
spark.reducer.maxSizeInFlight48mShuffle读缓冲96m

7.3 内存配置

参数默认值说明建议
spark.memory.fraction0.6统一内存占比0.6-0.8
spark.memory.storageFraction0.5存储内存初始占比0.5
spark.memory.offHeap.enabledfalse堆外内存大数据量开启
spark.memory.offHeap.size0堆外内存大小按需

7.4 SQL配置

参数默认值说明建议
spark.sql.adaptive.enabledtrue自适应执行true
spark.sql.autoBroadcastJoinThreshold10MB广播阈值100MB
spark.sql.adaptive.skewJoin.enabledtrue倾斜Join处理true

八、写在最后

调优是一门艺术,没有银弹,只有不断尝试。

记住几个原则:

  1. 先跑通,再优化:别一上来就调参数
  2. 数据说话:用Spark UI分析,不要猜
  3. 从大到小:先解决最大的瓶颈
  4. 记录变化:每次调优记录参数和效果

最后送大家一句话:

调优的最高境界是不用调优——设计好的数据架构和代码,比任何参数都重要。

本文作者:一个与OOM搏斗多年的老兵

最惨经历:调了一天参数,最后发现是代码死循环


附录:面试高频题

  1. Spark的Stage是怎么划分的?

    从后往前回溯,遇到宽依赖(Shuffle)就切分Stage。窄依赖在同一Stage内。
  2. Spark的内存管理是怎样的?

    统一内存管理:执行内存和存储内存共享,可互相借用。执行内存可强制释放存储内存。
  3. 什么是数据倾斜?怎么解决?

    某个Key数据量远大于其他Key,导致Task执行时间不均衡。解决:加盐、Broadcast Join、AQE等。
  4. Shuffle的过程是怎样的?

    Map端:分区→排序→溢写→合并;Reduce端:拉取→合并→聚合。
  5. 如何定位Spark性能问题?

    通过Spark UI查看Job/Stage/Task执行情况,关注时间分布、Shuffle数据量、GC时间等指标。
  6. AQE有什么作用?

    运行时动态优化:合并小分区、切换Join策略、处理数据倾斜。
评论区
暂无评论
avatar