搜 索

Spark从入门到放弃

  • 220阅读
  • 2022年06月27日
  • 0评论
首页 / AI/大数据 / 正文

写在前面

自从上海疫情被封在家2个月之后,缺吃少用,天天发牢骚骂娘,我焦虑的头发又少了不少。不过因祸得福,时间是比以前多了不少,这不又研究起了心心念念的Spark。

Spark?不就是火花嘛,当初我可是要一周从入门到精通的。

一个月后的今天,我坐在电脑前,看着满屏的OutOfMemoryErrorTask not serializable,默默地打开了Word,开始写这篇文章。

友情提示:本文适合以下人群阅读:

  • 对Spark充满好奇的萌新
  • 被Spark折磨得死去活来的中年人
  • 想找点乐子的围观群众

第一章:Spark是什么鬼?

1.1 官方定义(假装专业)

Apache Spark是一个快速通用可扩展的大数据处理引擎。

翻译成人话就是:一个能让你在多台机器上同时跑代码的框架,而且它号称比MapReduce快100倍。

至于为什么快100倍...因为它把数据放内存里了啊!

这就好比你问我为什么开法拉利比骑自行车快,答案是:废话,一个烧油一个烧腿。

1.2 Spark生态系统

先来看看Spark的全家桶:

graph TB subgraph "Spark 全家桶" A[Spark Core
核心引擎] --> B[Spark SQL
结构化数据处理] A --> C[Spark Streaming
实时流处理] A --> D[MLlib
机器学习库] A --> E[GraphX
图计算] end subgraph "数据源" F[HDFS] --> A G[Hive] --> A H[Kafka] --> A I[MySQL] --> A J[各种乱七八糟的] --> A end subgraph "集群管理" A --> K[Standalone] A --> L[YARN] A --> M[Mesos] A --> N[Kubernetes] end style A fill:#e74c3c,stroke:#c0392b,color:#fff style B fill:#3498db,stroke:#2980b9,color:#fff style C fill:#2ecc71,stroke:#27ae60,color:#fff style D fill:#9b59b6,stroke:#8e44ad,color:#fff style E fill:#f39c12,stroke:#d68910,color:#fff

看到没?这就是为什么学Spark会掉头发的原因——它的生态系统比我的发际线还要广阔。


第二章:核心概念(掉发重灾区)

2.1 RDD:弹性分布式数据集

RDD(Resilient Distributed Dataset)是Spark的核心抽象,翻译过来叫"弹性分布式数据集"。

弹性:数据丢了能恢复(理论上)
分布式:数据分散在多台机器上
数据集:就是一堆数据

简单来说,RDD就是一个被切成很多块、散落在集群各个节点上的数据集合。

graph LR subgraph "你的数据" A[一个巨大的文件
比如10TB的日志] end subgraph "Spark眼中的数据" B[Partition 1
节点A] C[Partition 2
节点B] D[Partition 3
节点C] E[Partition 4
节点D] F[Partition N
节点...] end A -->|"分区"| B A -->|"分区"| C A -->|"分区"| D A -->|"分区"| E A -->|"分区"| F style A fill:#e74c3c,stroke:#c0392b,color:#fff style B fill:#3498db,stroke:#2980b9,color:#fff style C fill:#3498db,stroke:#2980b9,color:#fff style D fill:#3498db,stroke:#2980b9,color:#fff style E fill:#3498db,stroke:#2980b9,color:#fff style F fill:#3498db,stroke:#2980b9,color:#fff

2.2 RDD的两种操作

RDD有两种操作,这是面试必考题,请务必背下来:

操作类型英文名特点举例
转换操作Transformation懒加载,不会立即执行map, filter, flatMap, groupBy
行动操作Action触发真正的计算count, collect, save, reduce

重点来了:Transformation是懒的!

什么意思呢?就是你写了一堆mapfilter,Spark根本不执行,它就记个小本本。直到你调用count()collect()这种Action,它才开始干活。

这就像我老婆让我洗碗,我说"好的好的",但直到她站在厨房门口盯着我,我才真正开始洗。

sequenceDiagram participant 你 as 程序员 participant Spark as Spark participant 集群 as 集群 你->>Spark: rdd.map(x => x * 2) Spark-->>你: 好的,记下了📝 Note right of Spark: 并没有执行 你->>Spark: rdd.filter(x => x > 10) Spark-->>你: 好的,记下了📝 Note right of Spark: 还是没执行 你->>Spark: rdd.count() Spark->>集群: 开始干活! 集群-->>Spark: 结果:12345 Spark-->>你: 结果是12345 Note right of Spark: 这才真正执行

2.3 DAG:有向无环图

当你写完一堆Transformation后,Spark会生成一个DAG(Directed Acyclic Graph)。

翻译:一个没有环的有方向的图。

再翻译:Spark会记录你对数据做的所有操作,形成一个执行计划。

graph TD A[读取文件] --> B[map: 解析JSON] B --> C[filter: 过滤空值] C --> D[map: 提取字段] D --> E[groupByKey] E --> F[reduceByKey: 聚合] F --> G[saveAsTextFile] subgraph "Stage 1" A B C D end subgraph "Stage 2" E F G end style A fill:#3498db,stroke:#2980b9,color:#fff style B fill:#3498db,stroke:#2980b9,color:#fff style C fill:#3498db,stroke:#2980b9,color:#fff style D fill:#3498db,stroke:#2980b9,color:#fff style E fill:#e74c3c,stroke:#c0392b,color:#fff style F fill:#e74c3c,stroke:#c0392b,color:#fff style G fill:#e74c3c,stroke:#c0392b,color:#fff

第三章:Spark架构(开始头秃)

3.1 集群架构图

graph TB subgraph "Driver Program" A[SparkContext
大脑,负责调度] end subgraph "Cluster Manager" B[YARN / Mesos / K8s
资源管理] end subgraph "Worker Node 1" C[Executor 1] C --> C1[Task] C --> C2[Task] C --> C3[Cache] end subgraph "Worker Node 2" D[Executor 2] D --> D1[Task] D --> D2[Task] D --> D3[Cache] end subgraph "Worker Node 3" E[Executor 3] E --> E1[Task] E --> E2[Task] E --> E3[Cache] end A <-->|"申请资源"| B B -->|"分配资源"| C B -->|"分配资源"| D B -->|"分配资源"| E A -->|"分发任务"| C A -->|"分发任务"| D A -->|"分发任务"| E style A fill:#e74c3c,stroke:#c0392b,color:#fff style B fill:#9b59b6,stroke:#8e44ad,color:#fff style C fill:#3498db,stroke:#2980b9,color:#fff style D fill:#3498db,stroke:#2980b9,color:#fff style E fill:#3498db,stroke:#2980b9,color:#fff

3.2 名词解释

角色职责类比
Driver运行main函数,创建SparkContext包工头
Cluster Manager管理集群资源人力资源部
Worker Node运行计算任务的节点搬砖工人
ExecutorWorker上的JVM进程工人的手
Task最小的执行单元一块砖

所以整个流程就是:

  1. 包工头(Driver)向人力资源部(Cluster Manager)申请工人
  2. 人力资源部分配工人(Worker)
  3. 包工头把活分给工人的手(Executor)
  4. 手开始搬砖(执行Task)

第四章:代码实战(正式开始放弃)

4.1 WordCount:大数据界的Hello World

每个学大数据的人,都逃不过WordCount。这就像每个学编程的人都要写Hello World一样。

// Scala版本 - 简洁到令人发指
val textFile = spark.read.textFile("hdfs://path/to/file.txt")
val counts = textFile
  .flatMap(line => line.split(" "))
  .groupByKey(identity)
  .count()
counts.show()
// Java版本 - 啰嗦到令人窒息
JavaRDD<String> textFile = sc.textFile("hdfs://path/to/file.txt");
JavaPairRDD<String, Integer> counts = textFile
    .flatMap(s -> Arrays.asList(s.split(" ")).iterator())
    .mapToPair(word -> new Tuple2<>(word, 1))
    .reduceByKey((a, b) -> a + b);
counts.saveAsTextFile("hdfs://path/to/output");
# Python版本 - 人生苦短,我用PySpark
text_file = sc.textFile("hdfs://path/to/file.txt")
counts = text_file.flatMap(lambda line: line.split(" ")) \
                  .map(lambda word: (word, 1)) \
                  .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://path/to/output")

4.2 Spark SQL:终于像个正常人了

如果你觉得RDD太难用,恭喜你,你可以用Spark SQL。

// 创建DataFrame
val df = spark.read.json("path/to/people.json")

// 像写SQL一样操作数据
df.createOrReplaceTempView("people")

val result = spark.sql("""
  SELECT 
    name,
    age,
    CASE 
      WHEN age < 18 THEN '未成年'
      WHEN age < 35 THEN '青年'
      WHEN age < 60 THEN '中年(脱发高危)'
      ELSE '老年'
    END as age_group
  FROM people
  WHERE age IS NOT NULL
  ORDER BY age DESC
""")

result.show()

4.3 DataFrame操作流程

graph LR A[数据源
JSON/CSV/Parquet] -->|"spark.read"| B[DataFrame] B -->|"select/filter/groupBy"| C[转换后的DataFrame] C -->|"write"| D[输出
HDFS/Hive/MySQL] B -->|"createTempView"| E[临时视图] E -->|"spark.sql"| C style A fill:#95a5a6,stroke:#7f8c8d,color:#fff style B fill:#3498db,stroke:#2980b9,color:#fff style C fill:#2ecc71,stroke:#27ae60,color:#fff style D fill:#e74c3c,stroke:#c0392b,color:#fff style E fill:#9b59b6,stroke:#8e44ad,color:#fff

第五章:性能调优(脱发加速器)

5.1 常见的坑

我来总结一下,在使用Spark的过程中,你会遇到的各种令人想砸电脑的问题:

pie title "Spark调优时间分配" "排查OOM" : 35 "解决数据倾斜" : 25 "调参数" : 20 "等待任务跑完" : 15 "实际写代码" : 5

5.2 数据倾斜:万恶之源

数据倾斜是什么?就是某个key的数据特别多,导致某个Task干得累死,其他Task早就下班了。

graph TB subgraph "正常情况" A1[Partition 1
100万条] A2[Partition 2
98万条] A3[Partition 3
102万条] A4[Partition 4
100万条] end subgraph "数据倾斜" B1[Partition 1
10万条] B2[Partition 2
5万条] B3[Partition 3
380万条
😱] B4[Partition 4
5万条] end style B3 fill:#e74c3c,stroke:#c0392b,color:#fff

解决方案

  1. 加盐打散:给倾斜的key加随机前缀
  2. 增加并行度:让数据分得更散
  3. 广播小表:避免shuffle
  4. 换个框架:开玩笑的...除非你真的受不了了

5.3 内存配置

graph TB subgraph "Executor内存分配" A[总内存
spark.executor.memory] A --> B[执行内存
spark.memory.fraction × 60%] A --> C[存储内存
spark.memory.fraction × 40%] A --> D[用户内存
剩余部分] A --> E[预留内存
300MB] B --> B1[Shuffle/Join/Sort/Aggregation] C --> C1[Cache/Broadcast] end style A fill:#e74c3c,stroke:#c0392b,color:#fff style B fill:#3498db,stroke:#2980b9,color:#fff style C fill:#2ecc71,stroke:#27ae60,color:#fff style D fill:#9b59b6,stroke:#8e44ad,color:#fff style E fill:#f39c12,stroke:#d68910,color:#fff

5.4 常用调优参数

参数说明建议值我的血泪经验
spark.executor.memoryExecutor内存4-8G太大会GC,太小会OOM
spark.executor.coresExecutor核数2-4别太贪心
spark.sql.shuffle.partitionsShuffle分区数100-500默认200太小了
spark.default.parallelism默认并行度CPU核数×2~3看数据量
spark.memory.fraction内存占比0.6一般不用改

第六章:踩坑实录(真实经历)

6.1 Task not serializable

这个错误,我愿称之为"Spark第一坑"。

症状

org.apache.spark.SparkException: Task not serializable

原因:你在RDD操作里引用了不可序列化的对象。

解决方案

// ❌ 错误示范
class MyService {
  val db = new DatabaseConnection() // 不可序列化!
  
  def process(rdd: RDD[String]) = {
    rdd.map(x => db.query(x)) // 💥 爆炸
  }
}

// ✅ 正确姿势
class MyService extends Serializable {
  def process(rdd: RDD[String]) = {
    rdd.mapPartitions { partition =>
      val db = new DatabaseConnection() // 每个分区创建一个
      partition.map(x => db.query(x))
    }
  }
}

6.2 OOM三连

graph TD A[开开心心写代码] --> B{运行} B -->|"成功"| C[不可能] B -->|"失败"| D[OutOfMemoryError] D --> E[增加内存] E --> B D --> F[减少数据量] F --> B D --> G[优化代码] G --> B D --> H[放弃] H --> I[去健身房撸铁] style D fill:#e74c3c,stroke:#c0392b,color:#fff style H fill:#9b59b6,stroke:#8e44ad,color:#fff style I fill:#2ecc71,stroke:#27ae60,color:#fff

6.3 小文件问题

症状:任务跑得奇慢无比,但CPU和内存都没打满。

原因:你有10万个小文件,每个只有几KB。

后果:Spark会创建10万个Task,调度开销比实际计算还大。

解决

// 合并小文件
spark.conf.set("spark.sql.files.maxPartitionBytes", "128MB")
spark.conf.set("spark.sql.files.openCostInBytes", "4MB")

// 或者用coalesce
val df = spark.read.parquet("path/to/small/files")
  .coalesce(100) // 合并成100个分区

第七章:Spark vs 其他框架

7.1 大数据框架对比

graph TB subgraph "批处理" A[MapReduce
元老级,但慢] B[Spark
快,但费内存] C[Flink
真·流批一体] end subgraph "流处理" D[Storm
老牌流处理] E[Spark Streaming
微批处理] F[Flink
真·流处理] G[Kafka Streams
轻量级] end A -.->|"进化"| B B -.->|"进化"| C D -.->|"进化"| F style A fill:#95a5a6,stroke:#7f8c8d,color:#fff style B fill:#e74c3c,stroke:#c0392b,color:#fff style C fill:#2ecc71,stroke:#27ae60,color:#fff style F fill:#2ecc71,stroke:#27ae60,color:#fff

7.2 选型建议

场景推荐框架原因
离线批处理Spark生态成熟,资料多
实时流处理Flink真正的流处理,延迟低
简单ETLSpark SQL写SQL就行
机器学习Spark MLlib和Spark无缝集成
图计算Spark GraphX没什么好选的
想不加班外包出去真心建议

第八章:学习路线图

graph TD A[开始学习Spark] --> B[安装环境] B --> C[写WordCount] C --> D[看官方文档] D --> E[跑Demo] E --> F{能跑通吗?} F -->|"Yes"| G[开始写业务代码] F -->|"No"| H[Google/百度/Stack Overflow] H --> E G --> I{遇到问题了吗?} I -->|"Yes"| J[数据倾斜?] I -->|"No"| K[继续写] K --> I J -->|"Yes"| L[疯狂调优] J -->|"No"| M[OOM?] M -->|"Yes"| N[加内存/优化代码] M -->|"No"| O[其他问题] L --> P{解决了吗?} N --> P O --> P P -->|"Yes"| K P -->|"No"| Q[头发-1] Q --> R{还有头发吗?} R -->|"Yes"| H R -->|"No"| S[恭喜,你已精通Spark] style A fill:#2ecc71,stroke:#27ae60,color:#fff style S fill:#e74c3c,stroke:#c0392b,color:#fff style Q fill:#f39c12,stroke:#d68910,color:#fff

第九章:常用命令速查

9.1 spark-submit参数

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 4g \
  --executor-memory 8g \
  --executor-cores 4 \
  --num-executors 10 \
  --conf spark.sql.shuffle.partitions=500 \
  --conf spark.default.parallelism=500 \
  --conf spark.yarn.maxAppAttempts=1 \
  --class com.joey.spark.MyApp \
  my-spark-app.jar \
  arg1 arg2

9.2 常用Spark Shell命令

// 启动spark-shell
spark-shell --master yarn --executor-memory 4g

// 查看SparkContext信息
sc.version
sc.master
sc.defaultParallelism

// 查看当前配置
spark.conf.getAll.foreach(println)

// 设置日志级别
sc.setLogLevel("WARN")

写在最后

如果你能看到这里,说明你还没有放弃,或者你直接拉到底了。

Spark确实是一个强大的分布式计算框架,但它的学习曲线也确实陡峭。我写这篇文章的时候,又掉了几根头发。

几点忠告

  1. 不要急:Spark的概念很多,慢慢来
  2. 多实践:看100遍文档不如自己跑一遍
  3. 学会看日志:Spark的错误信息虽然长,但其实挺有用的
  4. 善用UI:Spark Web UI能帮你定位很多问题
  5. 保持健身:身体是革命的本钱,头发没了身体不能没

最后送大家一句话:

代码虐我千百遍,我待代码如初恋。

参考资料


作者:Joey | 写于某个深夜 | 头发状态:危

评论区
暂无评论
avatar