前言:SQL,永远的神
在大数据的世界里,有一个永恒的真理:
会写SQL的人,永远不会失业。
为什么?因为无论技术怎么变,老板们永远只会说一句话:"给我拉一下昨天的GMV数据。"
他不会问你用的是RDD还是DataFrame,他只关心数据对不对、快不快。
Spark SQL就是为了让你用写SQL的方式处理大数据。它让那些被RDD折磨得死去活来的程序员们,终于可以像个正常人一样写代码了。
本篇是Spark系列的第二篇,我们将深入Spark SQL的世界,从DataFrame到Catalyst优化器,从窗口函数到性能调优,一网打尽。
友情提示:本文代码以Scala为主,Python选手请自行脑补语法转换。
一、DataFrame vs Dataset vs RDD
1.1 三剑客对比
graph TB
subgraph 演进历史["📜 Spark数据抽象演进"]
RDD["RDD
Spark 1.0
2014年"] --> DF["DataFrame
Spark 1.3
2015年"] DF --> DS["Dataset
Spark 1.6
2016年"] end subgraph 特点对比["💡 特点对比"] RDD2["RDD
✅ 类型安全
✅ 函数式编程
❌ 无优化器
❌ 性能较差"] DF2["DataFrame
✅ Catalyst优化
✅ 性能好
❌ 无类型安全
⚠️ 运行时报错"] DS2["Dataset
✅ 类型安全
✅ Catalyst优化
✅ 编译时检查
⚠️ 仅Scala/Java"] end style RDD fill:#95a5a6 style DF fill:#3498db style DS fill:#2ecc71
Spark 1.0
2014年"] --> DF["DataFrame
Spark 1.3
2015年"] DF --> DS["Dataset
Spark 1.6
2016年"] end subgraph 特点对比["💡 特点对比"] RDD2["RDD
✅ 类型安全
✅ 函数式编程
❌ 无优化器
❌ 性能较差"] DF2["DataFrame
✅ Catalyst优化
✅ 性能好
❌ 无类型安全
⚠️ 运行时报错"] DS2["Dataset
✅ 类型安全
✅ Catalyst优化
✅ 编译时检查
⚠️ 仅Scala/Java"] end style RDD fill:#95a5a6 style DF fill:#3498db style DS fill:#2ecc71
1.2 详细对比表
| 特性 | RDD | DataFrame | Dataset |
|---|---|---|---|
| 数据表示 | 分布式对象集合 | 分布式Row集合 | 分布式类型化对象集合 |
| 类型安全 | ✅ 编译时 | ❌ 运行时 | ✅ 编译时 |
| 优化器 | ❌ 无 | ✅ Catalyst | ✅ Catalyst |
| 序列化 | Java序列化 | Tungsten二进制 | Tungsten二进制 |
| API | 函数式 | DSL + SQL | 函数式 + DSL + SQL |
| 适用语言 | 全部 | 全部 | Scala/Java |
| 性能 | ⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
1.3 代码对比
// 需求:统计每个城市的用户数量
// ========== RDD方式 ==========
val rdd = sc.textFile("users.csv")
val result = rdd
.map(line => line.split(","))
.map(arr => (arr(2), 1)) // (city, 1)
.reduceByKey(_ + _)
.collect()
// 问题:没有优化,全靠程序员自己优化
// ========== DataFrame方式 ==========
val df = spark.read
.option("header", "true")
.csv("users.csv")
val result = df
.groupBy("city")
.count()
.show()
// 优点:Catalyst自动优化,代码简洁
// ========== Dataset方式 ==========
case class User(id: Long, name: String, city: String, age: Int)
val ds = spark.read
.option("header", "true")
.csv("users.csv")
.as[User] // 类型化
val result = ds
.groupByKey(_.city)
.count()
.show()
// 优点:类型安全,编译时检查1.4 该用哪个?
flowchart TB
Start[选择数据抽象] --> Q1{需要类型安全?}
Q1 --> |"是"|Q2{用什么语言?}
Q1 --> |"否"|DF[DataFrame
推荐大多数场景] Q2 --> |"Scala/Java"|DS[Dataset
类型安全+优化] Q2 --> |"Python/R"|DF Q3{需要底层控制?} Q3 --> |"是"|RDD[RDD
复杂转换/自定义分区] Q3 --> |"否"|DF Start --> Q3 style DF fill:#4ecdc4 style DS fill:#4ecdc4 style RDD fill:#ff6b6b
推荐大多数场景] Q2 --> |"Scala/Java"|DS[Dataset
类型安全+优化] Q2 --> |"Python/R"|DF Q3{需要底层控制?} Q3 --> |"是"|RDD[RDD
复杂转换/自定义分区] Q3 --> |"否"|DF Start --> Q3 style DF fill:#4ecdc4 style DS fill:#4ecdc4 style RDD fill:#ff6b6b
结论:
- 90%的场景用DataFrame就够了
- 需要类型安全用Dataset(Scala/Java)
- 需要底层控制或复杂转换用RDD
二、DataFrame操作大全
2.1 创建DataFrame
// 1. 从文件读取
val jsonDF = spark.read.json("path/to/file.json")
val csvDF = spark.read.option("header", "true").csv("path/to/file.csv")
val parquetDF = spark.read.parquet("path/to/file.parquet")
val orcDF = spark.read.orc("path/to/file.orc")
// 2. 从Hive表读取
val hiveDF = spark.sql("SELECT * FROM my_database.my_table")
// 或者
val hiveDF2 = spark.table("my_database.my_table")
// 3. 从JDBC读取
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/mydb")
.option("dbtable", "users")
.option("user", "root")
.option("password", "123456")
.load()
// 4. 从RDD创建
case class Person(name: String, age: Int)
val rdd = sc.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val df = rdd.toDF()
// 5. 手动创建
import spark.implicits._
val df = Seq(
("Alice", 25, "Beijing"),
("Bob", 30, "Shanghai"),
("Charlie", 35, "Guangzhou")
).toDF("name", "age", "city")2.2 常用操作
// 创建示例数据
val df = spark.createDataFrame(Seq(
(1, "Alice", 25, "Beijing", 8000.0),
(2, "Bob", 30, "Shanghai", 12000.0),
(3, "Charlie", 35, "Beijing", 15000.0),
(4, "David", 28, "Guangzhou", 9500.0),
(5, "Eve", 32, "Shanghai", 11000.0)
)).toDF("id", "name", "age", "city", "salary")
// ========== 查看数据 ==========
df.show() // 显示前20行
df.show(5, false) // 显示5行,不截断
df.printSchema() // 打印Schema
df.columns // 获取列名数组
df.dtypes // 获取列名和类型
df.describe().show() // 统计信息
// ========== 选择列 ==========
df.select("name", "age").show()
df.select($"name", $"age" + 1).show()
df.select(col("name"), col("age")).show()
df.selectExpr("name", "age + 1 as age_plus_one").show()
// ========== 过滤 ==========
df.filter($"age" > 30).show()
df.filter("age > 30").show()
df.where($"city" === "Beijing").show()
df.filter($"age" > 25 && $"city" === "Shanghai").show()
// ========== 添加/修改列 ==========
df.withColumn("salary_after_tax", $"salary" * 0.8).show()
df.withColumn("age", $"age" + 1).show() // 修改现有列
df.withColumnRenamed("name", "user_name").show()
// ========== 删除列 ==========
df.drop("salary").show()
df.drop("salary", "city").show()
// ========== 去重 ==========
df.distinct().show()
df.dropDuplicates("city").show() // 按city去重
// ========== 排序 ==========
df.orderBy($"age".desc).show()
df.orderBy($"city", $"age".desc).show()
df.sort($"salary".desc).show()
// ========== 限制行数 ==========
df.limit(3).show()
// ========== 聚合 ==========
df.groupBy("city").count().show()
df.groupBy("city").agg(
count("*").as("cnt"),
avg("salary").as("avg_salary"),
max("age").as("max_age"),
min("age").as("min_age")
).show()
// ========== 连接 ==========
val df2 = Seq(
("Beijing", "North"),
("Shanghai", "East"),
("Guangzhou", "South")
).toDF("city", "region")
df.join(df2, Seq("city"), "inner").show()
df.join(df2, Seq("city"), "left").show()
df.join(df2, df("city") === df2("city"), "inner").show()2.3 操作流程图
graph LR
subgraph read["📥 读取数据"]
R1[JSON]
R2[CSV]
R3[Parquet]
R4[Hive]
R5[JDBC]
end
subgraph transform["🔄 转换操作"]
T1[select]
T2[filter/where]
T3[groupBy]
T4[join]
T5[orderBy]
T6[withColumn]
end
subgraph action["⚡ 行动操作"]
A1[show]
A2[collect]
A3[count]
A4[write]
end
R1 --> T1
T6 --> T1
T5 --> A1
三、Spark SQL语法进阶
3.1 使用SQL查询
// 创建临时视图
df.createOrReplaceTempView("users")
// 创建全局临时视图(跨Session可用)
df.createOrReplaceGlobalTempView("global_users")
// 执行SQL
spark.sql("""
SELECT
city,
COUNT(*) as user_count,
AVG(salary) as avg_salary,
MAX(salary) as max_salary
FROM users
WHERE age > 25
GROUP BY city
HAVING COUNT(*) > 1
ORDER BY avg_salary DESC
""").show()
// 访问全局视图
spark.sql("SELECT * FROM global_temp.global_users").show()3.2 窗口函数(重点!)
窗口函数是SQL的精华,面试必考!
import org.apache.spark.sql.expressions.Window
// 创建示例数据
val salesDF = Seq(
("2024-01-01", "Beijing", 1000),
("2024-01-01", "Shanghai", 1500),
("2024-01-02", "Beijing", 1200),
("2024-01-02", "Shanghai", 1800),
("2024-01-03", "Beijing", 900),
("2024-01-03", "Shanghai", 2000),
("2024-01-04", "Beijing", 1100),
("2024-01-04", "Shanghai", 1700)
).toDF("date", "city", "sales")
// ========== ROW_NUMBER:排名(不重复) ==========
val windowSpec = Window.partitionBy("city").orderBy($"sales".desc)
salesDF.withColumn("rank", row_number().over(windowSpec)).show()
// 每个城市按销售额排名:1, 2, 3, 4...
// ========== RANK:排名(重复跳号) ==========
salesDF.withColumn("rank", rank().over(windowSpec)).show()
// 相同值同排名,下一个跳号:1, 2, 2, 4...
// ========== DENSE_RANK:排名(重复不跳号) ==========
salesDF.withColumn("rank", dense_rank().over(windowSpec)).show()
// 相同值同排名,下一个不跳号:1, 2, 2, 3...
// ========== LAG/LEAD:前后行 ==========
val timeWindow = Window.partitionBy("city").orderBy("date")
salesDF.withColumn("prev_sales", lag("sales", 1).over(timeWindow))
.withColumn("next_sales", lead("sales", 1).over(timeWindow))
.show()
// ========== 累计求和 ==========
salesDF.withColumn("cumsum", sum("sales").over(
Window.partitionBy("city").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
)).show()
// ========== 移动平均 ==========
salesDF.withColumn("moving_avg", avg("sales").over(
Window.partitionBy("city").orderBy("date").rowsBetween(-2, 0) // 最近3天
)).show()3.3 窗口函数SQL写法
-- 创建视图
CREATE OR REPLACE TEMP VIEW sales AS
SELECT * FROM VALUES
('2024-01-01', 'Beijing', 1000),
('2024-01-01', 'Shanghai', 1500),
('2024-01-02', 'Beijing', 1200),
('2024-01-02', 'Shanghai', 1800)
AS sales(date, city, amount);
-- ROW_NUMBER
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY city ORDER BY amount DESC) as rn
FROM sales;
-- 取每个城市销售额最高的记录
SELECT * FROM (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY city ORDER BY amount DESC) as rn
FROM sales
) t WHERE rn = 1;
-- LAG/LEAD
SELECT
*,
LAG(amount, 1) OVER (PARTITION BY city ORDER BY date) as prev_amount,
LEAD(amount, 1) OVER (PARTITION BY city ORDER BY date) as next_amount,
amount - LAG(amount, 1) OVER (PARTITION BY city ORDER BY date) as diff
FROM sales;
-- 累计求和
SELECT
*,
SUM(amount) OVER (PARTITION BY city ORDER BY date) as cumsum
FROM sales;3.4 窗口函数图解
graph TB
subgraph 窗口函数分类["🪟 窗口函数分类"]
subgraph 排名函数["排名函数"]
R1[ROW_NUMBER
1,2,3,4,5] R2[RANK
1,2,2,4,5] R3[DENSE_RANK
1,2,2,3,4] R4[NTILE
分桶] end subgraph 偏移函数["偏移函数"] O1[LAG
前N行] O2[LEAD
后N行] O3[FIRST_VALUE
第一行] O4[LAST_VALUE
最后一行] end subgraph 聚合函数["聚合函数"] A1[SUM] A2[AVG] A3[COUNT] A4[MAX/MIN] end end style R1 fill:#ff6b6b style O1 fill:#4ecdc4 style O2 fill:#4ecdc4
1,2,3,4,5] R2[RANK
1,2,2,4,5] R3[DENSE_RANK
1,2,2,3,4] R4[NTILE
分桶] end subgraph 偏移函数["偏移函数"] O1[LAG
前N行] O2[LEAD
后N行] O3[FIRST_VALUE
第一行] O4[LAST_VALUE
最后一行] end subgraph 聚合函数["聚合函数"] A1[SUM] A2[AVG] A3[COUNT] A4[MAX/MIN] end end style R1 fill:#ff6b6b style O1 fill:#4ecdc4 style O2 fill:#4ecdc4
3.5 复杂数据类型
// ========== Array类型 ==========
val arrayDF = Seq(
(1, "Alice", Array("Java", "Scala", "Python")),
(2, "Bob", Array("Go", "Rust"))
).toDF("id", "name", "skills")
// 展开数组
arrayDF.select($"name", explode($"skills").as("skill")).show()
// 数组操作
arrayDF.select(
$"name",
size($"skills").as("skill_count"),
array_contains($"skills", "Java").as("knows_java"),
$"skills"(0).as("first_skill")
).show()
// ========== Map类型 ==========
val mapDF = Seq(
(1, "Alice", Map("math" -> 90, "english" -> 85)),
(2, "Bob", Map("math" -> 80, "english" -> 95))
).toDF("id", "name", "scores")
// 展开Map
mapDF.select($"name", explode($"scores")).show()
// Map操作
mapDF.select(
$"name",
$"scores"("math").as("math_score"),
map_keys($"scores").as("subjects")
).show()
// ========== Struct类型 ==========
val structDF = Seq(
(1, "Alice", ("Beijing", "China")),
(2, "Bob", ("Shanghai", "China"))
).toDF("id", "name", "address")
// 访问Struct字段
structDF.select($"name", $"address._1".as("city")).show()四、Catalyst优化器
4.1 Catalyst工作流程
graph LR
subgraph Catalyst优化器["⚡ Catalyst优化器流程"]
A[SQL/DataFrame
API] --> B[未解析的
逻辑计划] B --> |"分析器Analyzer"|C[解析后的
逻辑计划] C --> |"优化器Optimizer"|D[优化后的
逻辑计划] D --> |"计划器Planner"|E[物理计划] E --> |"代码生成"|F[RDD执行] end style A fill:#3498db style D fill:#2ecc71 style F fill:#e74c3c
API] --> B[未解析的
逻辑计划] B --> |"分析器Analyzer"|C[解析后的
逻辑计划] C --> |"优化器Optimizer"|D[优化后的
逻辑计划] D --> |"计划器Planner"|E[物理计划] E --> |"代码生成"|F[RDD执行] end style A fill:#3498db style D fill:#2ecc71 style F fill:#e74c3c
4.2 查看执行计划
val df = spark.sql("""
SELECT city, AVG(salary) as avg_salary
FROM users
WHERE age > 25
GROUP BY city
HAVING AVG(salary) > 10000
ORDER BY avg_salary DESC
""")
// 查看逻辑计划
df.explain()
// 查看详细计划(包括物理计划)
df.explain(true)
// 查看各阶段计划
df.explain("simple") // 物理计划
df.explain("extended") // 逻辑+物理
df.explain("codegen") // 生成的代码
df.explain("cost") // 带成本估算
df.explain("formatted") // 格式化输出4.3 常见优化规则
graph TB
subgraph 优化规则["🔧 Catalyst常见优化"]
subgraph 谓词下推["谓词下推 Predicate Pushdown"]
P1["优化前:先Join再Filter"]
P2["优化后:先Filter再Join"]
end
subgraph 列裁剪["列裁剪 Column Pruning"]
C1["优化前:SELECT * 读所有列"]
C2["优化后:只读需要的列"]
end
subgraph 常量折叠["常量折叠 Constant Folding"]
F1["优化前:1 + 2 + col"]
F2["优化后:3 + col"]
end
subgraph 分区裁剪["分区裁剪 Partition Pruning"]
PP1["优化前:扫描所有分区"]
PP2["优化后:只扫描相关分区"]
end
end
style P2 fill:#4ecdc4
style C2 fill:#4ecdc4
style F2 fill:#4ecdc4
style PP2 fill:#4ecdc4
4.4 执行计划示例
== Physical Plan ==
*(2) Sort [avg_salary#123 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(avg_salary#123 DESC NULLS LAST, 200), true
+- *(1) HashAggregate(keys=[city#45], functions=[avg(salary#47)])
+- Exchange hashpartitioning(city#45, 200), true
+- *(1) HashAggregate(keys=[city#45], functions=[partial_avg(salary#47)])
+- *(1) Project [city#45, salary#47]
+- *(1) Filter (isnotnull(age#46) && (age#46 > 25))
+- *(1) FileScan parquet [city#45,age#46,salary#47]
Batched: true
Location: InMemoryFileIndex[hdfs://path/to/users.parquet]
PartitionFilters: []
PushedFilters: [IsNotNull(age), GreaterThan(age,25)]
ReadSchema: struct<city:string,age:int,salary:double>关键信息解读:
FileScan:数据源扫描PushedFilters:下推到数据源的过滤条件Exchange:Shuffle操作HashAggregate:聚合操作Sort:排序操作
五、UDF开发
5.1 普通UDF
// 方式1:注册SQL函数
spark.udf.register("to_upper", (s: String) => s.toUpperCase)
spark.sql("SELECT to_upper(name) FROM users").show()
// 方式2:DataFrame API使用
import org.apache.spark.sql.functions.udf
val toUpperUDF = udf((s: String) => s.toUpperCase)
df.select(toUpperUDF($"name")).show()
// 复杂UDF示例:解析JSON字段
import scala.util.parsing.json.JSON
val parseJsonUDF = udf((json: String) => {
JSON.parseFull(json) match {
case Some(map: Map[String, Any]) => map.get("name").map(_.toString).getOrElse("")
case _ => ""
}
})
// 带多个参数的UDF
val concatWithSep = udf((s1: String, s2: String, sep: String) => s"$s1$sep$s2")
df.select(concatWithSep($"first_name", $"last_name", lit("-"))).show()5.2 UDAF(用户自定义聚合函数)
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
// 定义UDAF:计算几何平均数
case class GeoMeanBuffer(product: Double, count: Long)
object GeoMean extends Aggregator[Double, GeoMeanBuffer, Double] {
// 初始值
def zero: GeoMeanBuffer = GeoMeanBuffer(1.0, 0L)
// 聚合单个值
def reduce(buffer: GeoMeanBuffer, value: Double): GeoMeanBuffer = {
GeoMeanBuffer(buffer.product * value, buffer.count + 1)
}
// 合并两个buffer
def merge(b1: GeoMeanBuffer, b2: GeoMeanBuffer): GeoMeanBuffer = {
GeoMeanBuffer(b1.product * b2.product, b1.count + b2.count)
}
// 计算最终结果
def finish(buffer: GeoMeanBuffer): Double = {
math.pow(buffer.product, 1.0 / buffer.count)
}
def bufferEncoder: Encoder[GeoMeanBuffer] = Encoders.product
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
// 注册使用
val geoMeanUDAF = udaf(GeoMean)
df.select(geoMeanUDAF($"salary")).show()
// 或注册为SQL函数
spark.udf.register("geo_mean", udaf(GeoMean))
spark.sql("SELECT city, geo_mean(salary) FROM users GROUP BY city").show()5.3 UDF性能注意事项
graph TB
subgraph UDF性能["⚠️ UDF性能注意事项"]
A["UDF是黑盒"] --> B["Catalyst无法优化"]
B --> C["无法谓词下推"]
B --> D["无法列裁剪"]
E["UDF序列化开销"] --> F["每行数据都要序列化/反序列化"]
G["建议"] --> H["优先使用内置函数"]
G --> I["避免在UDF中创建对象"]
G --> J["使用Pandas UDF(Python)"]
end
style A fill:#ff6b6b
style H fill:#4ecdc4
最佳实践:
// ❌ 不推荐:使用UDF
val myUpperUDF = udf((s: String) => s.toUpperCase)
df.select(myUpperUDF($"name"))
// ✅ 推荐:使用内置函数
df.select(upper($"name"))六、Spark SQL性能调优
6.1 AQE自适应查询执行
// 开启AQE(Spark 3.0+默认开启)
spark.conf.set("spark.sql.adaptive.enabled", "true")
// 自动合并小分区
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64MB")
// 自动处理数据倾斜
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
// 动态切换Join策略
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")graph TB
subgraph AQE功能["🚀 AQE自适应执行"]
A[运行时统计信息] --> B[动态合并分区]
A --> C[动态切换Join策略]
A --> D[动态处理数据倾斜]
B --> B1["小分区合并
减少Task数量"] C --> C1["Broadcast Join自动选择"] D --> D1["倾斜分区自动拆分"] end style A fill:#ff6b6b style B1 fill:#4ecdc4 style C1 fill:#4ecdc4 style D1 fill:#4ecdc4
减少Task数量"] C --> C1["Broadcast Join自动选择"] D --> D1["倾斜分区自动拆分"] end style A fill:#ff6b6b style B1 fill:#4ecdc4 style C1 fill:#4ecdc4 style D1 fill:#4ecdc4
6.2 Join优化
graph TB
subgraph Join策略["🔗 Join策略选择"]
Q{表大小?}
Q --> |"小表 < 10MB"|BHJ[Broadcast Hash Join
广播小表,无Shuffle] Q --> |"中等表"|SHJ[Shuffle Hash Join
Shuffle后Hash Join] Q --> |"大表"|SMJ[Sort Merge Join
Shuffle + 排序合并] BHJ --> Best["性能最好 ⭐⭐⭐⭐⭐"] SHJ --> Medium["性能中等 ⭐⭐⭐"] SMJ --> Large["适合大表 ⭐⭐⭐⭐"] end style BHJ fill:#4ecdc4 style SMJ fill:#ffe66d
广播小表,无Shuffle] Q --> |"中等表"|SHJ[Shuffle Hash Join
Shuffle后Hash Join] Q --> |"大表"|SMJ[Sort Merge Join
Shuffle + 排序合并] BHJ --> Best["性能最好 ⭐⭐⭐⭐⭐"] SHJ --> Medium["性能中等 ⭐⭐⭐"] SMJ --> Large["适合大表 ⭐⭐⭐⭐"] end style BHJ fill:#4ecdc4 style SMJ fill:#ffe66d
// 强制使用Broadcast Join
import org.apache.spark.sql.functions.broadcast
val result = largeDF.join(broadcast(smallDF), Seq("key"))
// 设置Broadcast阈值
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100MB")
// 禁用Broadcast Join(调试用)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
// 查看Join策略
result.explain()6.3 分区优化
// 设置Shuffle分区数
spark.conf.set("spark.sql.shuffle.partitions", "200") // 默认200
// 动态分区数(AQE)
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "1000")
// 读取时控制分区
val df = spark.read
.option("maxPartitionBytes", "128MB")
.parquet("path/to/data")
// 重分区
df.repartition(100) // 增加分区(有Shuffle)
df.coalesce(10) // 减少分区(无Shuffle)
df.repartition($"city") // 按列分区
df.repartition(100, $"city") // 指定分区数和列6.4 缓存优化
// 缓存DataFrame
df.cache() // 等同于 df.persist(StorageLevel.MEMORY_AND_DISK)
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
// 释放缓存
df.unpersist()
// 检查是否已缓存
df.storageLevel
// 缓存表
spark.sql("CACHE TABLE my_table")
spark.sql("UNCACHE TABLE my_table")
// 延迟缓存(查询时才缓存)
spark.sql("CACHE LAZY TABLE my_table")graph TB
subgraph 缓存策略["💾 缓存级别选择"]
A{数据能放内存?}
A --> |"是"|B{需要快速访问?}
A --> |"否"|C[MEMORY_AND_DISK]
B --> |"是"|D[MEMORY_ONLY]
B --> |"要省内存"|E[MEMORY_ONLY_SER]
F{需要容错?}
F --> |"是"|G[MEMORY_AND_DISK_2]
style D fill:#4ecdc4
style C fill:#ffe66d
end
6.5 常用调优参数
| 参数 | 默认值 | 说明 |
|---|---|---|
| spark.sql.shuffle.partitions | 200 | Shuffle分区数 |
| spark.sql.autoBroadcastJoinThreshold | 10MB | Broadcast阈值 |
| spark.sql.adaptive.enabled | true | 启用AQE |
| spark.sql.adaptive.coalescePartitions.enabled | true | 自动合并分区 |
| spark.sql.adaptive.skewJoin.enabled | true | 自动处理倾斜 |
| spark.sql.files.maxPartitionBytes | 128MB | 单分区最大字节 |
| spark.sql.inMemoryColumnarStorage.compressed | true | 缓存压缩 |
七、Hive集成
7.1 配置Hive支持
// 创建支持Hive的SparkSession
val spark = SparkSession.builder()
.appName("Spark Hive")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://hive-metastore:9083")
.enableHiveSupport()
.getOrCreate()
// 查看数据库
spark.sql("SHOW DATABASES").show()
// 使用数据库
spark.sql("USE my_database")
// 查看表
spark.sql("SHOW TABLES").show()7.2 读写Hive表
// 读取Hive表
val df = spark.table("my_database.my_table")
val df2 = spark.sql("SELECT * FROM my_database.my_table WHERE dt = '2024-01-01'")
// 写入Hive表(覆盖)
df.write
.mode("overwrite")
.saveAsTable("my_database.result_table")
// 写入分区表
df.write
.mode("overwrite")
.partitionBy("dt")
.saveAsTable("my_database.partitioned_table")
// 插入数据
df.write
.mode("append")
.insertInto("my_database.existing_table")
// 动态分区写入
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
df.write
.mode("overwrite")
.format("hive")
.partitionBy("year", "month")
.saveAsTable("my_database.dynamic_partition_table")7.3 创建Hive表
// 通过SQL创建
spark.sql("""
CREATE TABLE IF NOT EXISTS my_database.users (
id BIGINT,
name STRING,
age INT
)
PARTITIONED BY (dt STRING)
STORED AS PARQUET
TBLPROPERTIES ('parquet.compression'='SNAPPY')
""")
// 通过DataFrame创建
df.write
.format("hive")
.option("fileFormat", "parquet")
.option("compression", "snappy")
.partitionBy("dt")
.saveAsTable("my_database.new_table")八、实战案例
8.1 电商用户分析
// 需求:分析用户购买行为,计算RFM指标
val orders = spark.table("dwd.dwd_order_info")
// 计算RFM
val rfm = orders
.filter($"order_status" === "completed")
.groupBy("user_id")
.agg(
// R: 最近一次购买距今天数
datediff(current_date(), max("order_time")).as("recency"),
// F: 购买频次
count("order_id").as("frequency"),
// M: 购买金额
sum("order_amount").as("monetary")
)
// RFM评分
val rfmScored = rfm
.withColumn("r_score",
when($"recency" <= 7, 5)
.when($"recency" <= 30, 4)
.when($"recency" <= 90, 3)
.when($"recency" <= 180, 2)
.otherwise(1))
.withColumn("f_score",
when($"frequency" >= 10, 5)
.when($"frequency" >= 5, 4)
.when($"frequency" >= 3, 3)
.when($"frequency" >= 2, 2)
.otherwise(1))
.withColumn("m_score",
when($"monetary" >= 10000, 5)
.when($"monetary" >= 5000, 4)
.when($"monetary" >= 1000, 3)
.when($"monetary" >= 500, 2)
.otherwise(1))
.withColumn("rfm_score", $"r_score" + $"f_score" + $"m_score")
// 用户分层
val userSegment = rfmScored
.withColumn("segment",
when($"rfm_score" >= 12, "高价值用户")
.when($"rfm_score" >= 9, "中价值用户")
.when($"rfm_score" >= 6, "低价值用户")
.otherwise("流失用户"))
userSegment.groupBy("segment").count().show()8.2 实时Top N
// 需求:每个类目下销售额Top 10商品
val sales = spark.table("dwd.dwd_order_detail")
val topN = sales
.groupBy("category", "product_id", "product_name")
.agg(sum("amount").as("total_sales"))
.withColumn("rank",
row_number().over(
Window.partitionBy("category").orderBy($"total_sales".desc)
))
.filter($"rank" <= 10)
.orderBy("category", "rank")
topN.show(100)九、踩坑与最佳实践
9.1 常见踩坑
graph TB
subgraph 常见坑["😱 Spark SQL常见坑"]
P1["数据类型不匹配"] --> S1["显式转换:cast"]
P2["NULL值处理"] --> S2["使用coalesce/nvl"]
P3["小文件问题"] --> S3["合并分区写入"]
P4["OOM"] --> S4["增加内存/优化SQL"]
P5["数据倾斜"] --> S5["加盐/Broadcast"]
end
style P1 fill:#ff6b6b
style P2 fill:#ff6b6b
style P3 fill:#ff6b6b
9.2 最佳实践
// 1. 避免SELECT *
// ❌ 不推荐
spark.sql("SELECT * FROM large_table")
// ✅ 推荐
spark.sql("SELECT col1, col2 FROM large_table")
// 2. 谓词前置
// ❌ 不推荐
df.join(df2, "key").filter($"col" > 100)
// ✅ 推荐
df.filter($"col" > 100).join(df2, "key")
// 3. 避免使用UDF
// ❌ 不推荐
val upperUDF = udf((s: String) => s.toUpperCase)
// ✅ 推荐
df.select(upper($"name"))
// 4. 合理使用缓存
// ❌ 不推荐:缓存只用一次的数据
df.cache()
df.count()
// ✅ 推荐:缓存重复使用的数据
val cachedDF = df.cache()
cachedDF.groupBy("a").count().show()
cachedDF.groupBy("b").count().show()
// 5. 写入时合并小文件
df.coalesce(10).write.parquet("output")
// 6. 分区表查询带分区条件
// ❌ 不推荐
spark.sql("SELECT * FROM partitioned_table")
// ✅ 推荐
spark.sql("SELECT * FROM partitioned_table WHERE dt = '2024-01-01'")十、写在最后
Spark SQL是Spark中最常用的模块,没有之一。
学好Spark SQL,你就能应对90%的数据处理需求。剩下的10%...可能需要你继续掉头发。
几点忠告:
- 优先用内置函数:性能好,Catalyst能优化
- 学会看执行计划:explain是你最好的朋友
- 善用AQE:Spark 3.0的神器,能自动解决很多问题
- 窗口函数必须会:面试必考,实际工作也常用
- 缓存要谨慎:不是什么都需要缓存
最后送大家一句话:
SQL写得好,下班回家早。
本文作者:一个SQL Boy
最惨经历:写了个笛卡尔积,差点把集群跑崩
下一篇:Spark内核原理与调优
附录:面试高频题
DataFrame和RDD的区别?
DataFrame有Schema,经过Catalyst优化,性能更好;RDD是底层抽象,灵活但性能差。
Catalyst优化器做了哪些优化?
谓词下推、列裁剪、常量折叠、分区裁剪、Join重排序等。
Spark SQL有哪些Join策略?
Broadcast Hash Join、Shuffle Hash Join、Sort Merge Join。小表用Broadcast,大表用Sort Merge。
窗口函数row_number、rank、dense_rank的区别?
row_number:1,2,3,4(无重复);rank:1,2,2,4(重复跳号);dense_rank:1,2,2,3(重复不跳号)
如何优化Spark SQL性能?
开启AQE、使用Broadcast Join、合理设置分区数、避免UDF、使用缓存、避免数据倾斜。
什么是AQE?
自适应查询执行,运行时根据统计信息动态调整执行计划,包括合并分区、切换Join策略、处理数据倾斜。