搜 索

Spark从入门到放弃④之Structured Streaming

  • 12阅读
  • 2023年03月25日
  • 0评论
首页 / AI/大数据 / 正文

前言:从"等数据"到"追数据"

在批处理的世界里,我们是这样工作的:

凌晨2点:跑批任务
早上8点:老板看报表
老板问:为什么数据是昨天的?
你答:因为是T+1...

在实时处理的世界里:

用户下单的那一刻:数据已经在报表上了
老板问:这个数据是实时的吗?
你答:秒级延迟
老板:很好,那你能不能做到毫秒级?
你:...(头发-1)

Structured Streaming是Spark推出的流处理解决方案,它让你用写批处理的方式写流处理

本篇是Spark系列的第四篇,我们将深入Structured Streaming的世界,从入门到实战,从Kafka到实时数仓。


一、流处理基础概念

1.1 批处理 vs 流处理

graph TB subgraph 批处理["📦 批处理"] B1[数据积累] --> B2[定时处理] B2 --> B3[输出结果] B4["特点:高吞吐、高延迟
延迟:分钟~小时"] end subgraph 流处理["🌊 流处理"] S1[数据到达] --> S2[立即处理] S2 --> S3[输出结果] S4["特点:低延迟、持续处理
延迟:毫秒~秒"] end style B4 fill:#ff6b6b style S4 fill:#4ecdc4

1.2 Spark Streaming vs Structured Streaming

graph TB subgraph SparkStreaming["Spark Streaming(旧)"] SS1[DStream API] SS2[微批处理] SS3["基于RDD
API复杂"] SS4["仅At-Least-Once"] end subgraph StructuredStreaming["Structured Streaming(新)"] ST1[DataFrame/Dataset API] ST2[微批处理 + 连续处理] ST3["与批处理API统一
简单易用"] ST4["支持Exactly-Once"] end SparkStreaming --> |"演进"|StructuredStreaming style SparkStreaming fill:#ff6b6b style StructuredStreaming fill:#4ecdc4

为什么选择Structured Streaming?

  • API与Spark SQL统一,学习成本低
  • 支持Exactly-Once语义
  • 支持Event Time处理
  • 与Spark生态无缝集成

1.3 核心概念:无界表

Structured Streaming的核心思想:把流数据当作一张不断增长的表

graph TB subgraph 无界表概念["📊 无界表(Unbounded Table)"] subgraph 时刻1["时刻T1"] T1R1[Row 1] T1R2[Row 2] T1R3[Row 3] end subgraph 时刻2["时刻T2"] T2R1[Row 1] T2R2[Row 2] T2R3[Row 3] T2R4[Row 4 新增] T2R5[Row 5 新增] end subgraph 时刻3["时刻T3"] T3R1[Row 1] T3R2[Row 2] T3R3[Row 3] T3R4[Row 4] T3R5[Row 5] T3R6[Row 6 新增] T3R7[Row 7 新增] end 时刻1 --> 时刻2 --> 时刻3 end style T2R4 fill:#4ecdc4 style T2R5 fill:#4ecdc4 style T3R6 fill:#4ecdc4 style T3R7 fill:#4ecdc4

二、快速入门

2.1 第一个Streaming程序

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder()
  .appName("StructuredStreamingDemo")
  .master("local[*]")
  .getOrCreate()

import spark.implicits._

// 从Socket读取数据(测试用)
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// 处理:WordCount
val wordCounts = lines
  .as[String]
  .flatMap(_.split(" "))
  .groupBy("value")
  .count()

// 输出到控制台
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()

运行测试

# 终端1:启动nc服务
nc -lk 9999

# 终端2:运行Spark程序
spark-submit --class StreamingDemo app.jar

# 终端1:输入测试数据
hello world
hello spark
spark streaming

2.2 程序结构

graph LR subgraph 流处理程序结构["🔧 Structured Streaming程序结构"] A[输入源
readStream] --> B[转换操作
select/filter/groupBy] B --> C[输出
writeStream] C --> D[启动
start] D --> E[等待终止
awaitTermination] end style A fill:#4ecdc4 style B fill:#ffe66d style C fill:#ff6b6b
// 通用模板
val inputDF = spark.readStream
  .format("数据源")    // kafka, socket, file, rate
  .option(...)
  .load()

val resultDF = inputDF
  .select(...)
  .filter(...)
  .groupBy(...)
  .agg(...)

val query = resultDF.writeStream
  .outputMode("模式")   // append, complete, update
  .format("输出目标")   // console, kafka, parquet, memory
  .option(...)
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()

query.awaitTermination()

三、输入源详解

3.1 支持的输入源

graph TB subgraph 输入源["📥 Structured Streaming输入源"] subgraph 生产环境["生产环境"] K[Kafka] F[File Source
文件目录监控] end subgraph 测试环境["测试环境"] S[Socket] R[Rate
生成测试数据] end end style K fill:#4ecdc4 style F fill:#4ecdc4 style S fill:#ffe66d style R fill:#ffe66d

3.2 Kafka Source(重点)

// 读取Kafka
val kafkaDF = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092")
  .option("subscribe", "topic1,topic2")  // 订阅多个topic
  // 或使用正则
  // .option("subscribePattern", "topic.*")
  .option("startingOffsets", "latest")  // earliest, latest, 或具体offset
  .option("maxOffsetsPerTrigger", "100000")  // 每批次最大消息数
  .option("failOnDataLoss", "false")  // 数据丢失是否失败
  .load()

// Kafka返回的Schema
// key: binary
// value: binary
// topic: string
// partition: int
// offset: long
// timestamp: timestamp
// timestampType: int

// 解析value(假设是JSON)
val parsedDF = kafkaDF
  .selectExpr("CAST(value AS STRING) as json_str")
  .select(from_json($"json_str", schema).as("data"))
  .select("data.*")

3.3 File Source

// 监控目录,读取新增文件
val fileDF = spark.readStream
  .format("parquet")  // 或 json, csv, orc
  .schema(mySchema)   // 文件源必须指定Schema
  .option("maxFilesPerTrigger", "100")  // 每批次最大文件数
  .option("latestFirst", "false")  // 是否先处理最新文件
  .load("/data/input/")

// CSV文件
val csvDF = spark.readStream
  .format("csv")
  .schema(mySchema)
  .option("header", "true")
  .option("maxFilesPerTrigger", "10")
  .load("/data/csv/")

3.4 Rate Source(测试用)

// 生成测试数据
val rateDF = spark.readStream
  .format("rate")
  .option("rowsPerSecond", "1000")  // 每秒生成行数
  .option("numPartitions", "10")    // 分区数
  .load()

// Schema:
// timestamp: timestamp
// value: long (递增)

// 示例:模拟订单数据
val ordersDF = rateDF
  .withColumn("order_id", $"value")
  .withColumn("user_id", ($"value" % 1000).cast("long"))
  .withColumn("amount", (rand() * 1000).cast("decimal(10,2)"))
  .withColumn("order_time", $"timestamp")

四、输出模式与输出目标

4.1 三种输出模式

graph TB subgraph 输出模式["📤 输出模式(Output Mode)"] subgraph Append["Append模式"] A1["只输出新增的行"] A2["适用于:不涉及聚合的查询"] A3["或带Watermark的聚合"] end subgraph Complete["Complete模式"] C1["输出全部结果"] C2["适用于:聚合查询"] C3["每次输出完整的聚合结果"] end subgraph Update["Update模式"] U1["只输出更新的行"] U2["适用于:聚合查询"] U3["只输出有变化的聚合结果"] end end style Append fill:#4ecdc4 style Complete fill:#ffe66d style Update fill:#ff6b6b

使用场景对比

输出模式适用场景聚合支持输出数据量
Append简单转换、带Watermark的聚合有限最少
Complete聚合统计完全最多
Update聚合统计、需要增量更新完全中等

4.2 输出目标

// 1. Console(调试用)
query.writeStream
  .format("console")
  .option("truncate", "false")
  .option("numRows", "100")
  .outputMode("complete")
  .start()

// 2. Kafka
query.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9092")
  .option("topic", "output_topic")
  .option("checkpointLocation", "/checkpoint/kafka")
  .outputMode("append")
  .start()

// 3. 文件(Parquet/JSON/CSV)
query.writeStream
  .format("parquet")
  .option("path", "/data/output/")
  .option("checkpointLocation", "/checkpoint/parquet")
  .partitionBy("dt", "hour")
  .outputMode("append")
  .start()

// 4. Memory(调试用)
query.writeStream
  .format("memory")
  .queryName("my_table")
  .outputMode("complete")
  .start()

// 然后可以用SQL查询
spark.sql("SELECT * FROM my_table").show()

// 5. ForeachBatch(自定义输出)
query.writeStream
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    // 可以做任何操作
    batchDF.write.mode("append").saveAsTable("my_hive_table")
    batchDF.write.format("jdbc").save()
  }
  .start()

4.3 Kafka输出详解

// 写入Kafka需要key和value列
val outputDF = resultDF
  .selectExpr(
    "CAST(user_id AS STRING) as key",
    "to_json(struct(*)) as value"  // 整行转JSON
  )

outputDF.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9092")
  .option("topic", "output_topic")
  .option("checkpointLocation", "/checkpoint/output")
  .outputMode("append")
  .start()

五、触发器(Trigger)

5.1 触发器类型

graph TB subgraph 触发器类型["⏰ 触发器类型"] subgraph Default["默认(微批处理)"] D1["尽可能快地处理"] D2["前一批完成立即开始下一批"] end subgraph ProcessingTime["固定间隔"] P1["每隔指定时间触发一次"] P2["Trigger.ProcessingTime"] end subgraph Once["一次性"] O1["只处理一批就停止"] O2["Trigger.Once"] end subgraph Continuous["连续处理(实验性)"] C1["毫秒级延迟"] C2["Trigger.Continuous"] end end style ProcessingTime fill:#4ecdc4 style Continuous fill:#ff6b6b

5.2 触发器使用

import org.apache.spark.sql.streaming.Trigger

// 默认:尽可能快
query.writeStream
  .trigger(Trigger.ProcessingTime(0))  // 等同于不设置
  .start()

// 固定间隔:每10秒一批
query.writeStream
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()

// 一次性:只跑一批(用于追数据)
query.writeStream
  .trigger(Trigger.Once())
  .start()

// 连续处理(实验性,毫秒级延迟)
query.writeStream
  .trigger(Trigger.Continuous("1 second"))  // checkpoint间隔
  .start()

// AvailableNow:处理所有可用数据后停止(Spark 3.3+)
query.writeStream
  .trigger(Trigger.AvailableNow())
  .start()

六、窗口操作

6.1 窗口类型

graph TB subgraph 窗口类型["🪟 窗口类型"] subgraph Tumbling["滚动窗口 Tumbling"] T1["窗口不重叠"] T2["每条数据属于一个窗口"] TG["| 0-10 | 10-20 | 20-30 |"] end subgraph Sliding["滑动窗口 Sliding"] S1["窗口可重叠"] S2["每条数据可能属于多个窗口"] SG["| 0-10 |
| 5-15 |
| 10-20 |"] end subgraph Session["会话窗口 Session"] SS1["基于活动间隙"] SS2["动态窗口大小"] end end style Tumbling fill:#4ecdc4 style Sliding fill:#ffe66d style Session fill:#ff6b6b

6.2 窗口操作代码

import org.apache.spark.sql.functions._

// 假设数据有event_time字段
val events = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9092")
  .option("subscribe", "events")
  .load()
  .selectExpr("CAST(value AS STRING) as json")
  .select(from_json($"json", schema).as("data"))
  .select(
    $"data.user_id",
    $"data.event_type",
    $"data.event_time".cast("timestamp")
  )

// ========== 滚动窗口 ==========
val tumblingWindow = events
  .groupBy(
    window($"event_time", "10 minutes"),  // 10分钟滚动窗口
    $"event_type"
  )
  .count()

// ========== 滑动窗口 ==========
val slidingWindow = events
  .groupBy(
    window($"event_time", "10 minutes", "5 minutes"),  // 窗口10分钟,滑动5分钟
    $"event_type"
  )
  .count()

// 窗口结果示例:
// +--------------------+----------+-----+
// |window              |event_type|count|
// +--------------------+----------+-----+
// |{2024-01-01 10:00:00, 2024-01-01 10:10:00}|click|1234|
// |{2024-01-01 10:05:00, 2024-01-01 10:15:00}|click|1567|
// +--------------------+----------+-----+

// 展开窗口字段
tumblingWindow
  .select(
    $"window.start".as("window_start"),
    $"window.end".as("window_end"),
    $"event_type",
    $"count"
  )

6.3 窗口图解

gantt title 滚动窗口 vs 滑动窗口 dateFormat HH:mm axisFormat %H:%M section 滚动窗口 窗口1 (00:00-00:10) :a1, 00:00, 10m 窗口2 (00:10-00:20) :a2, 00:10, 10m 窗口3 (00:20-00:30) :a3, 00:20, 10m section 滑动窗口 窗口1 (00:00-00:10) :b1, 00:00, 10m 窗口2 (00:05-00:15) :b2, 00:05, 10m 窗口3 (00:10-00:20) :b3, 00:10, 10m 窗口4 (00:15-00:25) :b4, 00:15, 10m

七、Watermark与延迟数据

7.1 什么是Watermark?

graph TB subgraph Watermark概念["🌊 Watermark(水位线)"] A["问题:数据可能迟到"] A --> B["Event Time: 10:00"] B --> C["Processing Time: 10:05"] C --> D["迟到了5分钟!"] E["解决:设置Watermark"] E --> F["允许最多迟到10分钟"] F --> G["10分钟后关闭窗口"] G --> H["之后到达的数据丢弃"] end style A fill:#ff6b6b style E fill:#4ecdc4

7.2 Watermark使用

// 设置Watermark
val withWatermark = events
  .withWatermark("event_time", "10 minutes")  // 允许迟到10分钟
  .groupBy(
    window($"event_time", "5 minutes"),
    $"event_type"
  )
  .count()

// Watermark工作原理:
// 1. 追踪看到的最大Event Time: max_event_time
// 2. Watermark = max_event_time - 10 minutes
// 3. 当Watermark超过窗口结束时间,窗口关闭
// 4. 关闭后到达的数据被丢弃

7.3 Watermark图解

sequenceDiagram participant Data as 数据 participant Spark as Spark participant Window as 窗口状态 Note over Spark: Watermark延迟 = 10分钟 Data->>Spark: Event Time = 10:05 Spark->>Spark: max_event_time = 10:05 Spark->>Spark: watermark = 09:55 Spark->>Window: 更新窗口[10:00-10:05] Data->>Spark: Event Time = 10:15 Spark->>Spark: max_event_time = 10:15 Spark->>Spark: watermark = 10:05 Spark->>Window: 窗口[10:00-10:05]可以关闭 Data->>Spark: Event Time = 10:03 (迟到) Spark->>Spark: 10:03 < watermark(10:05) Spark->>Data: 丢弃!窗口已关闭

7.4 Watermark与输出模式

输出模式Watermark行为
Append窗口关闭后才输出(需要Watermark)
Update每批次输出更新,窗口关闭后停止更新
Complete每批次输出全部结果
// Append模式 + Watermark(推荐生产使用)
val result = events
  .withWatermark("event_time", "10 minutes")
  .groupBy(window($"event_time", "5 minutes"))
  .count()

result.writeStream
  .outputMode("append")  // 窗口关闭后输出
  .format("parquet")
  .start()

八、状态管理

8.1 有状态操作

graph TB subgraph 有状态操作["🗃️ 有状态操作"] A[groupBy + 聚合] B[窗口操作] C[去重 dropDuplicates] D[流流Join] E[flatMapGroupsWithState] F[mapGroupsWithState] end subgraph 状态存储["状态存储"] G["内存 + Checkpoint"] H["RocksDB(大状态)"] end A & B & C & D & E & F --> 状态存储 style E fill:#ff6b6b style F fill:#ff6b6b

8.2 自定义状态处理

import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}

// 定义状态和输出类型
case class UserState(lastActivity: Long, totalClicks: Long)
case class UserOutput(userId: String, totalClicks: Long, isTimeout: Boolean)

// 自定义状态更新函数
def updateUserState(
    userId: String,
    events: Iterator[Event],
    state: GroupState[UserState]
): Iterator[UserOutput] = {
  
  // 检查是否超时
  if (state.hasTimedOut) {
    val oldState = state.get
    state.remove()
    return Iterator(UserOutput(userId, oldState.totalClicks, isTimeout = true))
  }
  
  // 获取或初始化状态
  val currentState = state.getOption.getOrElse(UserState(0L, 0L))
  
  // 处理新事件
  var newClicks = currentState.totalClicks
  var lastActivity = currentState.lastActivity
  
  events.foreach { event =>
    newClicks += 1
    lastActivity = math.max(lastActivity, event.timestamp)
  }
  
  // 更新状态
  val newState = UserState(lastActivity, newClicks)
  state.update(newState)
  
  // 设置超时(30分钟不活跃)
  state.setTimeoutDuration("30 minutes")
  
  // 输出
  Iterator(UserOutput(userId, newClicks, isTimeout = false))
}

// 使用
val result = events
  .groupByKey(_.userId)
  .flatMapGroupsWithState(
    OutputMode.Append(),
    GroupStateTimeout.ProcessingTimeTimeout
  )(updateUserState)

8.3 状态清理与Checkpoint

// Checkpoint配置
query.writeStream
  .option("checkpointLocation", "/checkpoint/my_app")
  .start()

// 状态存储配置(大状态使用RocksDB)
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
  "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

// 状态清理(通过Watermark自动清理)
val result = events
  .withWatermark("event_time", "1 hour")  // 1小时后清理状态
  .groupBy(window($"event_time", "10 minutes"), $"user_id")
  .count()

九、Kafka深度集成

9.1 完整的Kafka读写示例

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val spark = SparkSession.builder()
  .appName("KafkaIntegration")
  .getOrCreate()

// 定义输入Schema
val inputSchema = StructType(Seq(
  StructField("user_id", StringType),
  StructField("event_type", StringType),
  StructField("event_time", TimestampType),
  StructField("properties", MapType(StringType, StringType))
))

// 读取Kafka
val kafkaDF = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092")
  .option("subscribe", "user_events")
  .option("startingOffsets", "latest")
  .option("maxOffsetsPerTrigger", "100000")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.sasl.jaas.config", 
    """org.apache.kafka.common.security.plain.PlainLoginModule required 
       username="user" password="password";""")
  .load()

// 解析JSON
val eventsDF = kafkaDF
  .selectExpr("CAST(value AS STRING) as json", "timestamp as kafka_time")
  .select(
    from_json($"json", inputSchema).as("data"),
    $"kafka_time"
  )
  .select("data.*", "kafka_time")

// 处理逻辑
val resultDF = eventsDF
  .withWatermark("event_time", "5 minutes")
  .groupBy(
    window($"event_time", "1 minute"),
    $"event_type"
  )
  .agg(
    count("*").as("event_count"),
    countDistinct("user_id").as("unique_users")
  )
  .select(
    $"window.start".as("window_start"),
    $"window.end".as("window_end"),
    $"event_type",
    $"event_count",
    $"unique_users"
  )

// 写入Kafka
val outputDF = resultDF.select(
  $"event_type".as("key"),
  to_json(struct($"*")).as("value")
)

val query = outputDF.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka1:9092")
  .option("topic", "event_stats")
  .option("checkpointLocation", "/checkpoint/event_stats")
  .outputMode("update")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

query.awaitTermination()

9.2 Kafka配置参数

参数说明建议值
kafka.bootstrap.serversKafka地址生产环境多节点
subscribe订阅的topic-
startingOffsets起始位置earliest/latest
maxOffsetsPerTrigger每批次最大消息数根据处理能力设置
failOnDataLoss数据丢失是否失败生产环境true
kafka.group.id消费者组设置便于监控

9.3 Exactly-Once语义

graph TB subgraph ExactlyOnce["✅ Exactly-Once语义保证"] A[幂等写入] --> B["Kafka事务
或目标支持幂等"] C[Checkpoint] --> D["记录偏移量
失败可恢复"] E[原子提交] --> F["偏移量和数据
一起提交"] end style A fill:#4ecdc4 style C fill:#4ecdc4 style E fill:#4ecdc4
// 启用Kafka事务(Exactly-Once)
query.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9092")
  .option("topic", "output")
  // 启用幂等性
  .option("kafka.enable.idempotence", "true")
  // 启用事务
  .option("kafka.transactional.id", "my-transactional-id")
  .option("checkpointLocation", "/checkpoint/output")
  .start()

十、实战案例:实时数仓

10.1 架构设计

graph TB subgraph 实时数仓架构["🏠 实时数仓架构"] subgraph 数据源["数据源"] App[App埋点] Server[服务端日志] DB[(数据库CDC)] end subgraph 消息队列["消息队列"] Kafka[Kafka] end subgraph 实时处理["实时处理"] SS[Structured Streaming] end subgraph 存储层["存储层"] ODS[ODS
Kafka] DWD[DWD
Kafka] DWS[DWS
Doris/ClickHouse] ADS[ADS
Redis/MySQL] end subgraph 应用层["应用层"] Dashboard[实时大屏] Alert[实时告警] BI[BI报表] end 数据源 --> Kafka Kafka --> SS SS --> ODS --> DWD --> DWS --> ADS ADS --> 应用层 end style SS fill:#ff6b6b style DWS fill:#4ecdc4

10.2 实时指标计算

// 需求:实时计算每分钟的PV、UV、订单数、GMV

val events = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9092")
  .option("subscribe", "user_events,orders")
  .option("startingOffsets", "latest")
  .load()

// 解析事件
val parsedEvents = events
  .selectExpr("CAST(value AS STRING) as json", "topic")
  .select(
    $"topic",
    from_json($"json", eventSchema).as("data")
  )
  .select($"topic", $"data.*")
  .withColumn("event_time", $"timestamp".cast("timestamp"))

// PV/UV统计
val pvUv = parsedEvents
  .filter($"topic" === "user_events")
  .withWatermark("event_time", "5 minutes")
  .groupBy(window($"event_time", "1 minute"))
  .agg(
    count("*").as("pv"),
    countDistinct("user_id").as("uv")
  )

// 订单统计
val orderStats = parsedEvents
  .filter($"topic" === "orders")
  .filter($"event_type" === "order_created")
  .withWatermark("event_time", "5 minutes")
  .groupBy(window($"event_time", "1 minute"))
  .agg(
    count("*").as("order_count"),
    sum("amount").as("gmv")
  )

// 合并指标
val metrics = pvUv.join(orderStats, Seq("window"), "outer")
  .select(
    $"window.start".as("stat_time"),
    coalesce($"pv", lit(0)).as("pv"),
    coalesce($"uv", lit(0)).as("uv"),
    coalesce($"order_count", lit(0)).as("order_count"),
    coalesce($"gmv", lit(0)).as("gmv")
  )

// 写入Doris
metrics.writeStream
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    batchDF.write
      .format("doris")
      .option("doris.table.identifier", "realtime.metrics_1min")
      .option("doris.fenodes", "doris:8030")
      .option("user", "root")
      .option("password", "")
      .mode("append")
      .save()
  }
  .option("checkpointLocation", "/checkpoint/metrics")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

10.3 实时告警

// 需求:订单金额异常告警

val orders = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9092")
  .option("subscribe", "orders")
  .load()
  .selectExpr("CAST(value AS STRING) as json")
  .select(from_json($"json", orderSchema).as("data"))
  .select("data.*")

// 计算滑动窗口平均值
val avgAmount = orders
  .withWatermark("order_time", "5 minutes")
  .groupBy(window($"order_time", "10 minutes", "1 minute"))
  .agg(
    avg("amount").as("avg_amount"),
    stddev("amount").as("stddev_amount")
  )

// 检测异常(金额超过3倍标准差)
val anomalies = orders
  .withWatermark("order_time", "5 minutes")
  .join(avgAmount, 
    orders("order_time") >= avgAmount("window.start") &&
    orders("order_time") < avgAmount("window.end"))
  .filter($"amount" > $"avg_amount" + $"stddev_amount" * 3)
  .select(
    $"order_id",
    $"user_id",
    $"amount",
    $"avg_amount",
    $"order_time"
  )

// 发送告警
anomalies.writeStream
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    batchDF.collect().foreach { row =>
      sendAlert(
        s"订单异常告警: 订单${row.getAs[String]("order_id")}, " +
        s"金额${row.getAs[Double]("amount")}, " +
        s"平均值${row.getAs[Double]("avg_amount")}"
      )
    }
  }
  .start()

十一、生产环境最佳实践

11.1 Checkpoint配置

// 必须配置Checkpoint(生产环境)
query.writeStream
  .option("checkpointLocation", "hdfs:///checkpoint/my_app")
  .start()

// Checkpoint包含:
// 1. 偏移量(Kafka offset)
// 2. 状态数据
// 3. 提交日志

// 注意事项:
// 1. Checkpoint目录不要随便删除
// 2. 代码变更可能导致Checkpoint不兼容
// 3. 使用HDFS或S3等可靠存储

11.2 资源配置

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 4g \
  --executor-memory 8g \
  --executor-cores 4 \
  --num-executors 10 \
  --conf spark.streaming.backpressure.enabled=true \
  --conf spark.streaming.kafka.maxRatePerPartition=10000 \
  --conf spark.sql.shuffle.partitions=100 \
  --conf spark.sql.streaming.stateStore.rocksdb.enabled=true \
  --class com.example.StreamingApp \
  app.jar

11.3 监控指标

graph TB subgraph 监控指标["📊 关键监控指标"] A[处理延迟
inputRowsPerSecond vs processedRowsPerSecond] B[批处理时间
batchDuration] C[状态大小
numRowsTotal] D[Kafka Lag
消费延迟] E[Checkpoint时间
checkpointDuration] end style A fill:#ff6b6b style D fill:#ff6b6b
// 获取流查询状态
val query = df.writeStream.start()

// 查看进度
query.lastProgress
query.recentProgress

// 关键指标
// inputRowsPerSecond: 输入速率
// processedRowsPerSecond: 处理速率
// batchDuration: 批处理时间
// numInputRows: 输入行数

// 设置监控回调
spark.streams.addListener(new StreamingQueryListener {
  override def onQueryStarted(event: QueryStartedEvent): Unit = {
    println(s"Query started: ${event.id}")
  }
  
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    val progress = event.progress
    println(s"Batch ${progress.batchId}: " +
      s"input=${progress.numInputRows}, " +
      s"rate=${progress.processedRowsPerSecond}")
  }
  
  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
    println(s"Query terminated: ${event.id}")
  }
})

11.4 常见问题处理

flowchart TB subgraph 问题处理["🔧 常见问题处理"] P1[处理速度跟不上] --> S1["1. 增加Executor
2. 优化代码
3. 增加分区"] P2[状态太大OOM] --> S2["1. 使用RocksDB
2. 设置合理Watermark
3. 减少状态保留时间"] P3[Kafka Lag增长] --> S3["1. 增加maxOffsetsPerTrigger
2. 增加并行度
3. 优化处理逻辑"] P4[Checkpoint失败] --> S4["1. 增加超时时间
2. 检查存储性能
3. 减少状态大小"] end style S1 fill:#4ecdc4 style S2 fill:#4ecdc4 style S3 fill:#4ecdc4 style S4 fill:#4ecdc4

十二、写在最后

Structured Streaming让流处理变得简单,但生产环境的流处理从来都不简单。

记住几个原则:

  1. 先批后流:先用批处理验证逻辑,再转成流处理
  2. Checkpoint必须有:没有Checkpoint就是裸奔
  3. 监控要完善:实时系统出问题是秒级的事
  4. Watermark要合理:太短丢数据,太长状态爆
  5. 降级方案要有:流处理挂了怎么办?

最后送大家一句话:

批处理是历史,流处理是未来,但能跑的才是王道。

本文作者:一个与Kafka Lag搏斗的程序员

最惨经历:Watermark设错了,状态涨到100GB把集群打挂

下一篇:Spark部署与运维


附录:面试高频题

  1. Structured Streaming和Spark Streaming的区别?

    Structured Streaming基于DataFrame/Dataset API,支持Exactly-Once,支持Event Time;Spark Streaming基于DStream/RDD,仅At-Least-Once。
  2. 什么是Watermark?有什么作用?

    Watermark是处理延迟数据的机制,定义了等待迟到数据的时间阈值,超过阈值的数据会被丢弃,同时触发窗口关闭和状态清理。
  3. 三种输出模式的区别?

    Append:只输出新增行;Complete:输出全部结果;Update:只输出更新的行。聚合查询通常用Complete或Update。
  4. 如何保证Exactly-Once?

    通过Checkpoint记录偏移量,幂等写入或事务写入,原子提交。
  5. 状态太大怎么办?

    使用RocksDB状态后端,设置合理的Watermark自动清理状态,或使用flatMapGroupsWithState手动管理状态。
  6. 流处理和批处理如何统一?

    Structured Streaming的API与Spark SQL统一,可以用相同的代码处理批数据和流数据,实现流批一体。
评论区
暂无评论
avatar