搜 索

Spark从入门到放弃②之SparkSQL深度实战

  • 22阅读
  • 2023年03月04日
  • 0评论
首页 / AI/大数据 / 正文

前言:SQL,永远的神

在大数据的世界里,有一个永恒的真理:

会写SQL的人,永远不会失业。

为什么?因为无论技术怎么变,老板们永远只会说一句话:"给我拉一下昨天的GMV数据。"

他不会问你用的是RDD还是DataFrame,他只关心数据对不对、快不快。

Spark SQL就是为了让你用写SQL的方式处理大数据。它让那些被RDD折磨得死去活来的程序员们,终于可以像个正常人一样写代码了。

本篇是Spark系列的第二篇,我们将深入Spark SQL的世界,从DataFrame到Catalyst优化器,从窗口函数到性能调优,一网打尽。

友情提示:本文代码以Scala为主,Python选手请自行脑补语法转换。


一、DataFrame vs Dataset vs RDD

1.1 三剑客对比

graph TB subgraph 演进历史["📜 Spark数据抽象演进"] RDD["RDD
Spark 1.0
2014年"] --> DF["DataFrame
Spark 1.3
2015年"] DF --> DS["Dataset
Spark 1.6
2016年"] end subgraph 特点对比["💡 特点对比"] RDD2["RDD
✅ 类型安全
✅ 函数式编程
❌ 无优化器
❌ 性能较差"] DF2["DataFrame
✅ Catalyst优化
✅ 性能好
❌ 无类型安全
⚠️ 运行时报错"] DS2["Dataset
✅ 类型安全
✅ Catalyst优化
✅ 编译时检查
⚠️ 仅Scala/Java"] end style RDD fill:#95a5a6 style DF fill:#3498db style DS fill:#2ecc71

1.2 详细对比表

特性RDDDataFrameDataset
数据表示分布式对象集合分布式Row集合分布式类型化对象集合
类型安全✅ 编译时❌ 运行时✅ 编译时
优化器❌ 无✅ Catalyst✅ Catalyst
序列化Java序列化Tungsten二进制Tungsten二进制
API函数式DSL + SQL函数式 + DSL + SQL
适用语言全部全部Scala/Java
性能⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐

1.3 代码对比

// 需求:统计每个城市的用户数量

// ========== RDD方式 ==========
val rdd = sc.textFile("users.csv")
val result = rdd
  .map(line => line.split(","))
  .map(arr => (arr(2), 1))  // (city, 1)
  .reduceByKey(_ + _)
  .collect()
// 问题:没有优化,全靠程序员自己优化

// ========== DataFrame方式 ==========
val df = spark.read
  .option("header", "true")
  .csv("users.csv")

val result = df
  .groupBy("city")
  .count()
  .show()
// 优点:Catalyst自动优化,代码简洁

// ========== Dataset方式 ==========
case class User(id: Long, name: String, city: String, age: Int)

val ds = spark.read
  .option("header", "true")
  .csv("users.csv")
  .as[User]  // 类型化

val result = ds
  .groupByKey(_.city)
  .count()
  .show()
// 优点:类型安全,编译时检查

1.4 该用哪个?

flowchart TB Start[选择数据抽象] --> Q1{需要类型安全?} Q1 --> |"是"|Q2{用什么语言?} Q1 --> |"否"|DF[DataFrame
推荐大多数场景] Q2 --> |"Scala/Java"|DS[Dataset
类型安全+优化] Q2 --> |"Python/R"|DF Q3{需要底层控制?} Q3 --> |"是"|RDD[RDD
复杂转换/自定义分区] Q3 --> |"否"|DF Start --> Q3 style DF fill:#4ecdc4 style DS fill:#4ecdc4 style RDD fill:#ff6b6b

结论

  • 90%的场景用DataFrame就够了
  • 需要类型安全用Dataset(Scala/Java)
  • 需要底层控制或复杂转换用RDD

二、DataFrame操作大全

2.1 创建DataFrame

// 1. 从文件读取
val jsonDF = spark.read.json("path/to/file.json")
val csvDF = spark.read.option("header", "true").csv("path/to/file.csv")
val parquetDF = spark.read.parquet("path/to/file.parquet")
val orcDF = spark.read.orc("path/to/file.orc")

// 2. 从Hive表读取
val hiveDF = spark.sql("SELECT * FROM my_database.my_table")
// 或者
val hiveDF2 = spark.table("my_database.my_table")

// 3. 从JDBC读取
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:mysql://localhost:3306/mydb")
  .option("dbtable", "users")
  .option("user", "root")
  .option("password", "123456")
  .load()

// 4. 从RDD创建
case class Person(name: String, age: Int)
val rdd = sc.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val df = rdd.toDF()

// 5. 手动创建
import spark.implicits._
val df = Seq(
  ("Alice", 25, "Beijing"),
  ("Bob", 30, "Shanghai"),
  ("Charlie", 35, "Guangzhou")
).toDF("name", "age", "city")

2.2 常用操作

// 创建示例数据
val df = spark.createDataFrame(Seq(
  (1, "Alice", 25, "Beijing", 8000.0),
  (2, "Bob", 30, "Shanghai", 12000.0),
  (3, "Charlie", 35, "Beijing", 15000.0),
  (4, "David", 28, "Guangzhou", 9500.0),
  (5, "Eve", 32, "Shanghai", 11000.0)
)).toDF("id", "name", "age", "city", "salary")

// ========== 查看数据 ==========
df.show()           // 显示前20行
df.show(5, false)   // 显示5行,不截断
df.printSchema()    // 打印Schema
df.columns          // 获取列名数组
df.dtypes           // 获取列名和类型
df.describe().show() // 统计信息

// ========== 选择列 ==========
df.select("name", "age").show()
df.select($"name", $"age" + 1).show()
df.select(col("name"), col("age")).show()
df.selectExpr("name", "age + 1 as age_plus_one").show()

// ========== 过滤 ==========
df.filter($"age" > 30).show()
df.filter("age > 30").show()
df.where($"city" === "Beijing").show()
df.filter($"age" > 25 && $"city" === "Shanghai").show()

// ========== 添加/修改列 ==========
df.withColumn("salary_after_tax", $"salary" * 0.8).show()
df.withColumn("age", $"age" + 1).show()  // 修改现有列
df.withColumnRenamed("name", "user_name").show()

// ========== 删除列 ==========
df.drop("salary").show()
df.drop("salary", "city").show()

// ========== 去重 ==========
df.distinct().show()
df.dropDuplicates("city").show()  // 按city去重

// ========== 排序 ==========
df.orderBy($"age".desc).show()
df.orderBy($"city", $"age".desc).show()
df.sort($"salary".desc).show()

// ========== 限制行数 ==========
df.limit(3).show()

// ========== 聚合 ==========
df.groupBy("city").count().show()
df.groupBy("city").agg(
  count("*").as("cnt"),
  avg("salary").as("avg_salary"),
  max("age").as("max_age"),
  min("age").as("min_age")
).show()

// ========== 连接 ==========
val df2 = Seq(
  ("Beijing", "North"),
  ("Shanghai", "East"),
  ("Guangzhou", "South")
).toDF("city", "region")

df.join(df2, Seq("city"), "inner").show()
df.join(df2, Seq("city"), "left").show()
df.join(df2, df("city") === df2("city"), "inner").show()

2.3 操作流程图

graph LR subgraph read["📥 读取数据"] R1[JSON] R2[CSV] R3[Parquet] R4[Hive] R5[JDBC] end subgraph transform["🔄 转换操作"] T1[select] T2[filter/where] T3[groupBy] T4[join] T5[orderBy] T6[withColumn] end subgraph action["⚡ 行动操作"] A1[show] A2[collect] A3[count] A4[write] end R1 --> T1 T6 --> T1 T5 --> A1

三、Spark SQL语法进阶

3.1 使用SQL查询

// 创建临时视图
df.createOrReplaceTempView("users")

// 创建全局临时视图(跨Session可用)
df.createOrReplaceGlobalTempView("global_users")

// 执行SQL
spark.sql("""
  SELECT 
    city,
    COUNT(*) as user_count,
    AVG(salary) as avg_salary,
    MAX(salary) as max_salary
  FROM users
  WHERE age > 25
  GROUP BY city
  HAVING COUNT(*) > 1
  ORDER BY avg_salary DESC
""").show()

// 访问全局视图
spark.sql("SELECT * FROM global_temp.global_users").show()

3.2 窗口函数(重点!)

窗口函数是SQL的精华,面试必考!

import org.apache.spark.sql.expressions.Window

// 创建示例数据
val salesDF = Seq(
  ("2024-01-01", "Beijing", 1000),
  ("2024-01-01", "Shanghai", 1500),
  ("2024-01-02", "Beijing", 1200),
  ("2024-01-02", "Shanghai", 1800),
  ("2024-01-03", "Beijing", 900),
  ("2024-01-03", "Shanghai", 2000),
  ("2024-01-04", "Beijing", 1100),
  ("2024-01-04", "Shanghai", 1700)
).toDF("date", "city", "sales")

// ========== ROW_NUMBER:排名(不重复) ==========
val windowSpec = Window.partitionBy("city").orderBy($"sales".desc)
salesDF.withColumn("rank", row_number().over(windowSpec)).show()
// 每个城市按销售额排名:1, 2, 3, 4...

// ========== RANK:排名(重复跳号) ==========
salesDF.withColumn("rank", rank().over(windowSpec)).show()
// 相同值同排名,下一个跳号:1, 2, 2, 4...

// ========== DENSE_RANK:排名(重复不跳号) ==========
salesDF.withColumn("rank", dense_rank().over(windowSpec)).show()
// 相同值同排名,下一个不跳号:1, 2, 2, 3...

// ========== LAG/LEAD:前后行 ==========
val timeWindow = Window.partitionBy("city").orderBy("date")
salesDF.withColumn("prev_sales", lag("sales", 1).over(timeWindow))
       .withColumn("next_sales", lead("sales", 1).over(timeWindow))
       .show()

// ========== 累计求和 ==========
salesDF.withColumn("cumsum", sum("sales").over(
  Window.partitionBy("city").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
)).show()

// ========== 移动平均 ==========
salesDF.withColumn("moving_avg", avg("sales").over(
  Window.partitionBy("city").orderBy("date").rowsBetween(-2, 0)  // 最近3天
)).show()

3.3 窗口函数SQL写法

-- 创建视图
CREATE OR REPLACE TEMP VIEW sales AS
SELECT * FROM VALUES
  ('2024-01-01', 'Beijing', 1000),
  ('2024-01-01', 'Shanghai', 1500),
  ('2024-01-02', 'Beijing', 1200),
  ('2024-01-02', 'Shanghai', 1800)
AS sales(date, city, amount);

-- ROW_NUMBER
SELECT 
  *,
  ROW_NUMBER() OVER (PARTITION BY city ORDER BY amount DESC) as rn
FROM sales;

-- 取每个城市销售额最高的记录
SELECT * FROM (
  SELECT 
    *,
    ROW_NUMBER() OVER (PARTITION BY city ORDER BY amount DESC) as rn
  FROM sales
) t WHERE rn = 1;

-- LAG/LEAD
SELECT 
  *,
  LAG(amount, 1) OVER (PARTITION BY city ORDER BY date) as prev_amount,
  LEAD(amount, 1) OVER (PARTITION BY city ORDER BY date) as next_amount,
  amount - LAG(amount, 1) OVER (PARTITION BY city ORDER BY date) as diff
FROM sales;

-- 累计求和
SELECT 
  *,
  SUM(amount) OVER (PARTITION BY city ORDER BY date) as cumsum
FROM sales;

3.4 窗口函数图解

graph TB subgraph 窗口函数分类["🪟 窗口函数分类"] subgraph 排名函数["排名函数"] R1[ROW_NUMBER
1,2,3,4,5] R2[RANK
1,2,2,4,5] R3[DENSE_RANK
1,2,2,3,4] R4[NTILE
分桶] end subgraph 偏移函数["偏移函数"] O1[LAG
前N行] O2[LEAD
后N行] O3[FIRST_VALUE
第一行] O4[LAST_VALUE
最后一行] end subgraph 聚合函数["聚合函数"] A1[SUM] A2[AVG] A3[COUNT] A4[MAX/MIN] end end style R1 fill:#ff6b6b style O1 fill:#4ecdc4 style O2 fill:#4ecdc4

3.5 复杂数据类型

// ========== Array类型 ==========
val arrayDF = Seq(
  (1, "Alice", Array("Java", "Scala", "Python")),
  (2, "Bob", Array("Go", "Rust"))
).toDF("id", "name", "skills")

// 展开数组
arrayDF.select($"name", explode($"skills").as("skill")).show()

// 数组操作
arrayDF.select(
  $"name",
  size($"skills").as("skill_count"),
  array_contains($"skills", "Java").as("knows_java"),
  $"skills"(0).as("first_skill")
).show()

// ========== Map类型 ==========
val mapDF = Seq(
  (1, "Alice", Map("math" -> 90, "english" -> 85)),
  (2, "Bob", Map("math" -> 80, "english" -> 95))
).toDF("id", "name", "scores")

// 展开Map
mapDF.select($"name", explode($"scores")).show()

// Map操作
mapDF.select(
  $"name",
  $"scores"("math").as("math_score"),
  map_keys($"scores").as("subjects")
).show()

// ========== Struct类型 ==========
val structDF = Seq(
  (1, "Alice", ("Beijing", "China")),
  (2, "Bob", ("Shanghai", "China"))
).toDF("id", "name", "address")

// 访问Struct字段
structDF.select($"name", $"address._1".as("city")).show()

四、Catalyst优化器

4.1 Catalyst工作流程

graph LR subgraph Catalyst优化器["⚡ Catalyst优化器流程"] A[SQL/DataFrame
API] --> B[未解析的
逻辑计划] B --> |"分析器Analyzer"|C[解析后的
逻辑计划] C --> |"优化器Optimizer"|D[优化后的
逻辑计划] D --> |"计划器Planner"|E[物理计划] E --> |"代码生成"|F[RDD执行] end style A fill:#3498db style D fill:#2ecc71 style F fill:#e74c3c

4.2 查看执行计划

val df = spark.sql("""
  SELECT city, AVG(salary) as avg_salary
  FROM users
  WHERE age > 25
  GROUP BY city
  HAVING AVG(salary) > 10000
  ORDER BY avg_salary DESC
""")

// 查看逻辑计划
df.explain()

// 查看详细计划(包括物理计划)
df.explain(true)

// 查看各阶段计划
df.explain("simple")   // 物理计划
df.explain("extended") // 逻辑+物理
df.explain("codegen")  // 生成的代码
df.explain("cost")     // 带成本估算
df.explain("formatted") // 格式化输出

4.3 常见优化规则

graph TB subgraph 优化规则["🔧 Catalyst常见优化"] subgraph 谓词下推["谓词下推 Predicate Pushdown"] P1["优化前:先Join再Filter"] P2["优化后:先Filter再Join"] end subgraph 列裁剪["列裁剪 Column Pruning"] C1["优化前:SELECT * 读所有列"] C2["优化后:只读需要的列"] end subgraph 常量折叠["常量折叠 Constant Folding"] F1["优化前:1 + 2 + col"] F2["优化后:3 + col"] end subgraph 分区裁剪["分区裁剪 Partition Pruning"] PP1["优化前:扫描所有分区"] PP2["优化后:只扫描相关分区"] end end style P2 fill:#4ecdc4 style C2 fill:#4ecdc4 style F2 fill:#4ecdc4 style PP2 fill:#4ecdc4

4.4 执行计划示例

== Physical Plan ==
*(2) Sort [avg_salary#123 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(avg_salary#123 DESC NULLS LAST, 200), true
   +- *(1) HashAggregate(keys=[city#45], functions=[avg(salary#47)])
      +- Exchange hashpartitioning(city#45, 200), true
         +- *(1) HashAggregate(keys=[city#45], functions=[partial_avg(salary#47)])
            +- *(1) Project [city#45, salary#47]
               +- *(1) Filter (isnotnull(age#46) && (age#46 > 25))
                  +- *(1) FileScan parquet [city#45,age#46,salary#47]
                     Batched: true
                     Location: InMemoryFileIndex[hdfs://path/to/users.parquet]
                     PartitionFilters: []
                     PushedFilters: [IsNotNull(age), GreaterThan(age,25)]
                     ReadSchema: struct<city:string,age:int,salary:double>

关键信息解读

  • FileScan:数据源扫描
  • PushedFilters:下推到数据源的过滤条件
  • Exchange:Shuffle操作
  • HashAggregate:聚合操作
  • Sort:排序操作

五、UDF开发

5.1 普通UDF

// 方式1:注册SQL函数
spark.udf.register("to_upper", (s: String) => s.toUpperCase)
spark.sql("SELECT to_upper(name) FROM users").show()

// 方式2:DataFrame API使用
import org.apache.spark.sql.functions.udf

val toUpperUDF = udf((s: String) => s.toUpperCase)
df.select(toUpperUDF($"name")).show()

// 复杂UDF示例:解析JSON字段
import scala.util.parsing.json.JSON

val parseJsonUDF = udf((json: String) => {
  JSON.parseFull(json) match {
    case Some(map: Map[String, Any]) => map.get("name").map(_.toString).getOrElse("")
    case _ => ""
  }
})

// 带多个参数的UDF
val concatWithSep = udf((s1: String, s2: String, sep: String) => s"$s1$sep$s2")
df.select(concatWithSep($"first_name", $"last_name", lit("-"))).show()

5.2 UDAF(用户自定义聚合函数)

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders

// 定义UDAF:计算几何平均数
case class GeoMeanBuffer(product: Double, count: Long)

object GeoMean extends Aggregator[Double, GeoMeanBuffer, Double] {
  // 初始值
  def zero: GeoMeanBuffer = GeoMeanBuffer(1.0, 0L)
  
  // 聚合单个值
  def reduce(buffer: GeoMeanBuffer, value: Double): GeoMeanBuffer = {
    GeoMeanBuffer(buffer.product * value, buffer.count + 1)
  }
  
  // 合并两个buffer
  def merge(b1: GeoMeanBuffer, b2: GeoMeanBuffer): GeoMeanBuffer = {
    GeoMeanBuffer(b1.product * b2.product, b1.count + b2.count)
  }
  
  // 计算最终结果
  def finish(buffer: GeoMeanBuffer): Double = {
    math.pow(buffer.product, 1.0 / buffer.count)
  }
  
  def bufferEncoder: Encoder[GeoMeanBuffer] = Encoders.product
  def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

// 注册使用
val geoMeanUDAF = udaf(GeoMean)
df.select(geoMeanUDAF($"salary")).show()

// 或注册为SQL函数
spark.udf.register("geo_mean", udaf(GeoMean))
spark.sql("SELECT city, geo_mean(salary) FROM users GROUP BY city").show()

5.3 UDF性能注意事项

graph TB subgraph UDF性能["⚠️ UDF性能注意事项"] A["UDF是黑盒"] --> B["Catalyst无法优化"] B --> C["无法谓词下推"] B --> D["无法列裁剪"] E["UDF序列化开销"] --> F["每行数据都要序列化/反序列化"] G["建议"] --> H["优先使用内置函数"] G --> I["避免在UDF中创建对象"] G --> J["使用Pandas UDF(Python)"] end style A fill:#ff6b6b style H fill:#4ecdc4

最佳实践

// ❌ 不推荐:使用UDF
val myUpperUDF = udf((s: String) => s.toUpperCase)
df.select(myUpperUDF($"name"))

// ✅ 推荐:使用内置函数
df.select(upper($"name"))

六、Spark SQL性能调优

6.1 AQE自适应查询执行

// 开启AQE(Spark 3.0+默认开启)
spark.conf.set("spark.sql.adaptive.enabled", "true")

// 自动合并小分区
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64MB")

// 自动处理数据倾斜
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

// 动态切换Join策略
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
graph TB subgraph AQE功能["🚀 AQE自适应执行"] A[运行时统计信息] --> B[动态合并分区] A --> C[动态切换Join策略] A --> D[动态处理数据倾斜] B --> B1["小分区合并
减少Task数量"] C --> C1["Broadcast Join自动选择"] D --> D1["倾斜分区自动拆分"] end style A fill:#ff6b6b style B1 fill:#4ecdc4 style C1 fill:#4ecdc4 style D1 fill:#4ecdc4

6.2 Join优化

graph TB subgraph Join策略["🔗 Join策略选择"] Q{表大小?} Q --> |"小表 < 10MB"|BHJ[Broadcast Hash Join
广播小表,无Shuffle] Q --> |"中等表"|SHJ[Shuffle Hash Join
Shuffle后Hash Join] Q --> |"大表"|SMJ[Sort Merge Join
Shuffle + 排序合并] BHJ --> Best["性能最好 ⭐⭐⭐⭐⭐"] SHJ --> Medium["性能中等 ⭐⭐⭐"] SMJ --> Large["适合大表 ⭐⭐⭐⭐"] end style BHJ fill:#4ecdc4 style SMJ fill:#ffe66d
// 强制使用Broadcast Join
import org.apache.spark.sql.functions.broadcast

val result = largeDF.join(broadcast(smallDF), Seq("key"))

// 设置Broadcast阈值
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100MB")

// 禁用Broadcast Join(调试用)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

// 查看Join策略
result.explain()

6.3 分区优化

// 设置Shuffle分区数
spark.conf.set("spark.sql.shuffle.partitions", "200")  // 默认200

// 动态分区数(AQE)
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "1000")

// 读取时控制分区
val df = spark.read
  .option("maxPartitionBytes", "128MB")
  .parquet("path/to/data")

// 重分区
df.repartition(100)  // 增加分区(有Shuffle)
df.coalesce(10)      // 减少分区(无Shuffle)
df.repartition($"city")  // 按列分区
df.repartition(100, $"city")  // 指定分区数和列

6.4 缓存优化

// 缓存DataFrame
df.cache()  // 等同于 df.persist(StorageLevel.MEMORY_AND_DISK)
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.MEMORY_AND_DISK_SER)

// 释放缓存
df.unpersist()

// 检查是否已缓存
df.storageLevel

// 缓存表
spark.sql("CACHE TABLE my_table")
spark.sql("UNCACHE TABLE my_table")

// 延迟缓存(查询时才缓存)
spark.sql("CACHE LAZY TABLE my_table")
graph TB subgraph 缓存策略["💾 缓存级别选择"] A{数据能放内存?} A --> |"是"|B{需要快速访问?} A --> |"否"|C[MEMORY_AND_DISK] B --> |"是"|D[MEMORY_ONLY] B --> |"要省内存"|E[MEMORY_ONLY_SER] F{需要容错?} F --> |"是"|G[MEMORY_AND_DISK_2] style D fill:#4ecdc4 style C fill:#ffe66d end

6.5 常用调优参数

参数默认值说明
spark.sql.shuffle.partitions200Shuffle分区数
spark.sql.autoBroadcastJoinThreshold10MBBroadcast阈值
spark.sql.adaptive.enabledtrue启用AQE
spark.sql.adaptive.coalescePartitions.enabledtrue自动合并分区
spark.sql.adaptive.skewJoin.enabledtrue自动处理倾斜
spark.sql.files.maxPartitionBytes128MB单分区最大字节
spark.sql.inMemoryColumnarStorage.compressedtrue缓存压缩

七、Hive集成

7.1 配置Hive支持

// 创建支持Hive的SparkSession
val spark = SparkSession.builder()
  .appName("Spark Hive")
  .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
  .config("hive.metastore.uris", "thrift://hive-metastore:9083")
  .enableHiveSupport()
  .getOrCreate()

// 查看数据库
spark.sql("SHOW DATABASES").show()

// 使用数据库
spark.sql("USE my_database")

// 查看表
spark.sql("SHOW TABLES").show()

7.2 读写Hive表

// 读取Hive表
val df = spark.table("my_database.my_table")
val df2 = spark.sql("SELECT * FROM my_database.my_table WHERE dt = '2024-01-01'")

// 写入Hive表(覆盖)
df.write
  .mode("overwrite")
  .saveAsTable("my_database.result_table")

// 写入分区表
df.write
  .mode("overwrite")
  .partitionBy("dt")
  .saveAsTable("my_database.partitioned_table")

// 插入数据
df.write
  .mode("append")
  .insertInto("my_database.existing_table")

// 动态分区写入
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

df.write
  .mode("overwrite")
  .format("hive")
  .partitionBy("year", "month")
  .saveAsTable("my_database.dynamic_partition_table")

7.3 创建Hive表

// 通过SQL创建
spark.sql("""
  CREATE TABLE IF NOT EXISTS my_database.users (
    id BIGINT,
    name STRING,
    age INT
  )
  PARTITIONED BY (dt STRING)
  STORED AS PARQUET
  TBLPROPERTIES ('parquet.compression'='SNAPPY')
""")

// 通过DataFrame创建
df.write
  .format("hive")
  .option("fileFormat", "parquet")
  .option("compression", "snappy")
  .partitionBy("dt")
  .saveAsTable("my_database.new_table")

八、实战案例

8.1 电商用户分析

// 需求:分析用户购买行为,计算RFM指标

val orders = spark.table("dwd.dwd_order_info")

// 计算RFM
val rfm = orders
  .filter($"order_status" === "completed")
  .groupBy("user_id")
  .agg(
    // R: 最近一次购买距今天数
    datediff(current_date(), max("order_time")).as("recency"),
    // F: 购买频次
    count("order_id").as("frequency"),
    // M: 购买金额
    sum("order_amount").as("monetary")
  )

// RFM评分
val rfmScored = rfm
  .withColumn("r_score", 
    when($"recency" <= 7, 5)
    .when($"recency" <= 30, 4)
    .when($"recency" <= 90, 3)
    .when($"recency" <= 180, 2)
    .otherwise(1))
  .withColumn("f_score",
    when($"frequency" >= 10, 5)
    .when($"frequency" >= 5, 4)
    .when($"frequency" >= 3, 3)
    .when($"frequency" >= 2, 2)
    .otherwise(1))
  .withColumn("m_score",
    when($"monetary" >= 10000, 5)
    .when($"monetary" >= 5000, 4)
    .when($"monetary" >= 1000, 3)
    .when($"monetary" >= 500, 2)
    .otherwise(1))
  .withColumn("rfm_score", $"r_score" + $"f_score" + $"m_score")

// 用户分层
val userSegment = rfmScored
  .withColumn("segment",
    when($"rfm_score" >= 12, "高价值用户")
    .when($"rfm_score" >= 9, "中价值用户")
    .when($"rfm_score" >= 6, "低价值用户")
    .otherwise("流失用户"))

userSegment.groupBy("segment").count().show()

8.2 实时Top N

// 需求:每个类目下销售额Top 10商品

val sales = spark.table("dwd.dwd_order_detail")

val topN = sales
  .groupBy("category", "product_id", "product_name")
  .agg(sum("amount").as("total_sales"))
  .withColumn("rank", 
    row_number().over(
      Window.partitionBy("category").orderBy($"total_sales".desc)
    ))
  .filter($"rank" <= 10)
  .orderBy("category", "rank")

topN.show(100)

九、踩坑与最佳实践

9.1 常见踩坑

graph TB subgraph 常见坑["😱 Spark SQL常见坑"] P1["数据类型不匹配"] --> S1["显式转换:cast"] P2["NULL值处理"] --> S2["使用coalesce/nvl"] P3["小文件问题"] --> S3["合并分区写入"] P4["OOM"] --> S4["增加内存/优化SQL"] P5["数据倾斜"] --> S5["加盐/Broadcast"] end style P1 fill:#ff6b6b style P2 fill:#ff6b6b style P3 fill:#ff6b6b

9.2 最佳实践

// 1. 避免SELECT *
// ❌ 不推荐
spark.sql("SELECT * FROM large_table")
// ✅ 推荐
spark.sql("SELECT col1, col2 FROM large_table")

// 2. 谓词前置
// ❌ 不推荐
df.join(df2, "key").filter($"col" > 100)
// ✅ 推荐
df.filter($"col" > 100).join(df2, "key")

// 3. 避免使用UDF
// ❌ 不推荐
val upperUDF = udf((s: String) => s.toUpperCase)
// ✅ 推荐
df.select(upper($"name"))

// 4. 合理使用缓存
// ❌ 不推荐:缓存只用一次的数据
df.cache()
df.count()
// ✅ 推荐:缓存重复使用的数据
val cachedDF = df.cache()
cachedDF.groupBy("a").count().show()
cachedDF.groupBy("b").count().show()

// 5. 写入时合并小文件
df.coalesce(10).write.parquet("output")

// 6. 分区表查询带分区条件
// ❌ 不推荐
spark.sql("SELECT * FROM partitioned_table")
// ✅ 推荐
spark.sql("SELECT * FROM partitioned_table WHERE dt = '2024-01-01'")

十、写在最后

Spark SQL是Spark中最常用的模块,没有之一。

学好Spark SQL,你就能应对90%的数据处理需求。剩下的10%...可能需要你继续掉头发。

几点忠告

  1. 优先用内置函数:性能好,Catalyst能优化
  2. 学会看执行计划:explain是你最好的朋友
  3. 善用AQE:Spark 3.0的神器,能自动解决很多问题
  4. 窗口函数必须会:面试必考,实际工作也常用
  5. 缓存要谨慎:不是什么都需要缓存

最后送大家一句话:

SQL写得好,下班回家早。

本文作者:一个SQL Boy

最惨经历:写了个笛卡尔积,差点把集群跑崩

下一篇:Spark内核原理与调优


附录:面试高频题

  1. DataFrame和RDD的区别?

    DataFrame有Schema,经过Catalyst优化,性能更好;RDD是底层抽象,灵活但性能差。
  2. Catalyst优化器做了哪些优化?

    谓词下推、列裁剪、常量折叠、分区裁剪、Join重排序等。
  3. Spark SQL有哪些Join策略?

    Broadcast Hash Join、Shuffle Hash Join、Sort Merge Join。小表用Broadcast,大表用Sort Merge。
  4. 窗口函数row_number、rank、dense_rank的区别?

    row_number:1,2,3,4(无重复);rank:1,2,2,4(重复跳号);dense_rank:1,2,2,3(重复不跳号)
  5. 如何优化Spark SQL性能?

    开启AQE、使用Broadcast Join、合理设置分区数、避免UDF、使用缓存、避免数据倾斜。
  6. 什么是AQE?

    自适应查询执行,运行时根据统计信息动态调整执行计划,包括合并分区、切换Join策略、处理数据倾斜。
评论区
暂无评论
avatar