前言:从"等数据"到"追数据"
在批处理的世界里,我们是这样工作的:
凌晨2点:跑批任务
早上8点:老板看报表
老板问:为什么数据是昨天的?
你答:因为是T+1...在实时处理的世界里:
用户下单的那一刻:数据已经在报表上了
老板问:这个数据是实时的吗?
你答:秒级延迟
老板:很好,那你能不能做到毫秒级?
你:...(头发-1)Structured Streaming是Spark推出的流处理解决方案,它让你用写批处理的方式写流处理。
本篇是Spark系列的第四篇,我们将深入Structured Streaming的世界,从入门到实战,从Kafka到实时数仓。
一、流处理基础概念
1.1 批处理 vs 流处理
graph TB
subgraph 批处理["📦 批处理"]
B1[数据积累] --> B2[定时处理]
B2 --> B3[输出结果]
B4["特点:高吞吐、高延迟
延迟:分钟~小时"] end subgraph 流处理["🌊 流处理"] S1[数据到达] --> S2[立即处理] S2 --> S3[输出结果] S4["特点:低延迟、持续处理
延迟:毫秒~秒"] end style B4 fill:#ff6b6b style S4 fill:#4ecdc4
延迟:分钟~小时"] end subgraph 流处理["🌊 流处理"] S1[数据到达] --> S2[立即处理] S2 --> S3[输出结果] S4["特点:低延迟、持续处理
延迟:毫秒~秒"] end style B4 fill:#ff6b6b style S4 fill:#4ecdc4
1.2 Spark Streaming vs Structured Streaming
graph TB
subgraph SparkStreaming["Spark Streaming(旧)"]
SS1[DStream API]
SS2[微批处理]
SS3["基于RDD
API复杂"] SS4["仅At-Least-Once"] end subgraph StructuredStreaming["Structured Streaming(新)"] ST1[DataFrame/Dataset API] ST2[微批处理 + 连续处理] ST3["与批处理API统一
简单易用"] ST4["支持Exactly-Once"] end SparkStreaming --> |"演进"|StructuredStreaming style SparkStreaming fill:#ff6b6b style StructuredStreaming fill:#4ecdc4
API复杂"] SS4["仅At-Least-Once"] end subgraph StructuredStreaming["Structured Streaming(新)"] ST1[DataFrame/Dataset API] ST2[微批处理 + 连续处理] ST3["与批处理API统一
简单易用"] ST4["支持Exactly-Once"] end SparkStreaming --> |"演进"|StructuredStreaming style SparkStreaming fill:#ff6b6b style StructuredStreaming fill:#4ecdc4
为什么选择Structured Streaming?
- API与Spark SQL统一,学习成本低
- 支持Exactly-Once语义
- 支持Event Time处理
- 与Spark生态无缝集成
1.3 核心概念:无界表
Structured Streaming的核心思想:把流数据当作一张不断增长的表
graph TB
subgraph 无界表概念["📊 无界表(Unbounded Table)"]
subgraph 时刻1["时刻T1"]
T1R1[Row 1]
T1R2[Row 2]
T1R3[Row 3]
end
subgraph 时刻2["时刻T2"]
T2R1[Row 1]
T2R2[Row 2]
T2R3[Row 3]
T2R4[Row 4 新增]
T2R5[Row 5 新增]
end
subgraph 时刻3["时刻T3"]
T3R1[Row 1]
T3R2[Row 2]
T3R3[Row 3]
T3R4[Row 4]
T3R5[Row 5]
T3R6[Row 6 新增]
T3R7[Row 7 新增]
end
时刻1 --> 时刻2 --> 时刻3
end
style T2R4 fill:#4ecdc4
style T2R5 fill:#4ecdc4
style T3R6 fill:#4ecdc4
style T3R7 fill:#4ecdc4
二、快速入门
2.1 第一个Streaming程序
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.builder()
.appName("StructuredStreamingDemo")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// 从Socket读取数据(测试用)
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 处理:WordCount
val wordCounts = lines
.as[String]
.flatMap(_.split(" "))
.groupBy("value")
.count()
// 输出到控制台
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()运行测试:
# 终端1:启动nc服务
nc -lk 9999
# 终端2:运行Spark程序
spark-submit --class StreamingDemo app.jar
# 终端1:输入测试数据
hello world
hello spark
spark streaming2.2 程序结构
graph LR
subgraph 流处理程序结构["🔧 Structured Streaming程序结构"]
A[输入源
readStream] --> B[转换操作
select/filter/groupBy] B --> C[输出
writeStream] C --> D[启动
start] D --> E[等待终止
awaitTermination] end style A fill:#4ecdc4 style B fill:#ffe66d style C fill:#ff6b6b
readStream] --> B[转换操作
select/filter/groupBy] B --> C[输出
writeStream] C --> D[启动
start] D --> E[等待终止
awaitTermination] end style A fill:#4ecdc4 style B fill:#ffe66d style C fill:#ff6b6b
// 通用模板
val inputDF = spark.readStream
.format("数据源") // kafka, socket, file, rate
.option(...)
.load()
val resultDF = inputDF
.select(...)
.filter(...)
.groupBy(...)
.agg(...)
val query = resultDF.writeStream
.outputMode("模式") // append, complete, update
.format("输出目标") // console, kafka, parquet, memory
.option(...)
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
query.awaitTermination()三、输入源详解
3.1 支持的输入源
graph TB
subgraph 输入源["📥 Structured Streaming输入源"]
subgraph 生产环境["生产环境"]
K[Kafka]
F[File Source
文件目录监控] end subgraph 测试环境["测试环境"] S[Socket] R[Rate
生成测试数据] end end style K fill:#4ecdc4 style F fill:#4ecdc4 style S fill:#ffe66d style R fill:#ffe66d
文件目录监控] end subgraph 测试环境["测试环境"] S[Socket] R[Rate
生成测试数据] end end style K fill:#4ecdc4 style F fill:#4ecdc4 style S fill:#ffe66d style R fill:#ffe66d
3.2 Kafka Source(重点)
// 读取Kafka
val kafkaDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092")
.option("subscribe", "topic1,topic2") // 订阅多个topic
// 或使用正则
// .option("subscribePattern", "topic.*")
.option("startingOffsets", "latest") // earliest, latest, 或具体offset
.option("maxOffsetsPerTrigger", "100000") // 每批次最大消息数
.option("failOnDataLoss", "false") // 数据丢失是否失败
.load()
// Kafka返回的Schema
// key: binary
// value: binary
// topic: string
// partition: int
// offset: long
// timestamp: timestamp
// timestampType: int
// 解析value(假设是JSON)
val parsedDF = kafkaDF
.selectExpr("CAST(value AS STRING) as json_str")
.select(from_json($"json_str", schema).as("data"))
.select("data.*")3.3 File Source
// 监控目录,读取新增文件
val fileDF = spark.readStream
.format("parquet") // 或 json, csv, orc
.schema(mySchema) // 文件源必须指定Schema
.option("maxFilesPerTrigger", "100") // 每批次最大文件数
.option("latestFirst", "false") // 是否先处理最新文件
.load("/data/input/")
// CSV文件
val csvDF = spark.readStream
.format("csv")
.schema(mySchema)
.option("header", "true")
.option("maxFilesPerTrigger", "10")
.load("/data/csv/")3.4 Rate Source(测试用)
// 生成测试数据
val rateDF = spark.readStream
.format("rate")
.option("rowsPerSecond", "1000") // 每秒生成行数
.option("numPartitions", "10") // 分区数
.load()
// Schema:
// timestamp: timestamp
// value: long (递增)
// 示例:模拟订单数据
val ordersDF = rateDF
.withColumn("order_id", $"value")
.withColumn("user_id", ($"value" % 1000).cast("long"))
.withColumn("amount", (rand() * 1000).cast("decimal(10,2)"))
.withColumn("order_time", $"timestamp")四、输出模式与输出目标
4.1 三种输出模式
graph TB
subgraph 输出模式["📤 输出模式(Output Mode)"]
subgraph Append["Append模式"]
A1["只输出新增的行"]
A2["适用于:不涉及聚合的查询"]
A3["或带Watermark的聚合"]
end
subgraph Complete["Complete模式"]
C1["输出全部结果"]
C2["适用于:聚合查询"]
C3["每次输出完整的聚合结果"]
end
subgraph Update["Update模式"]
U1["只输出更新的行"]
U2["适用于:聚合查询"]
U3["只输出有变化的聚合结果"]
end
end
style Append fill:#4ecdc4
style Complete fill:#ffe66d
style Update fill:#ff6b6b
使用场景对比:
| 输出模式 | 适用场景 | 聚合支持 | 输出数据量 |
|---|---|---|---|
| Append | 简单转换、带Watermark的聚合 | 有限 | 最少 |
| Complete | 聚合统计 | 完全 | 最多 |
| Update | 聚合统计、需要增量更新 | 完全 | 中等 |
4.2 输出目标
// 1. Console(调试用)
query.writeStream
.format("console")
.option("truncate", "false")
.option("numRows", "100")
.outputMode("complete")
.start()
// 2. Kafka
query.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("topic", "output_topic")
.option("checkpointLocation", "/checkpoint/kafka")
.outputMode("append")
.start()
// 3. 文件(Parquet/JSON/CSV)
query.writeStream
.format("parquet")
.option("path", "/data/output/")
.option("checkpointLocation", "/checkpoint/parquet")
.partitionBy("dt", "hour")
.outputMode("append")
.start()
// 4. Memory(调试用)
query.writeStream
.format("memory")
.queryName("my_table")
.outputMode("complete")
.start()
// 然后可以用SQL查询
spark.sql("SELECT * FROM my_table").show()
// 5. ForeachBatch(自定义输出)
query.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
// 可以做任何操作
batchDF.write.mode("append").saveAsTable("my_hive_table")
batchDF.write.format("jdbc").save()
}
.start()4.3 Kafka输出详解
// 写入Kafka需要key和value列
val outputDF = resultDF
.selectExpr(
"CAST(user_id AS STRING) as key",
"to_json(struct(*)) as value" // 整行转JSON
)
outputDF.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("topic", "output_topic")
.option("checkpointLocation", "/checkpoint/output")
.outputMode("append")
.start()五、触发器(Trigger)
5.1 触发器类型
graph TB
subgraph 触发器类型["⏰ 触发器类型"]
subgraph Default["默认(微批处理)"]
D1["尽可能快地处理"]
D2["前一批完成立即开始下一批"]
end
subgraph ProcessingTime["固定间隔"]
P1["每隔指定时间触发一次"]
P2["Trigger.ProcessingTime"]
end
subgraph Once["一次性"]
O1["只处理一批就停止"]
O2["Trigger.Once"]
end
subgraph Continuous["连续处理(实验性)"]
C1["毫秒级延迟"]
C2["Trigger.Continuous"]
end
end
style ProcessingTime fill:#4ecdc4
style Continuous fill:#ff6b6b
5.2 触发器使用
import org.apache.spark.sql.streaming.Trigger
// 默认:尽可能快
query.writeStream
.trigger(Trigger.ProcessingTime(0)) // 等同于不设置
.start()
// 固定间隔:每10秒一批
query.writeStream
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
// 一次性:只跑一批(用于追数据)
query.writeStream
.trigger(Trigger.Once())
.start()
// 连续处理(实验性,毫秒级延迟)
query.writeStream
.trigger(Trigger.Continuous("1 second")) // checkpoint间隔
.start()
// AvailableNow:处理所有可用数据后停止(Spark 3.3+)
query.writeStream
.trigger(Trigger.AvailableNow())
.start()六、窗口操作
6.1 窗口类型
graph TB
subgraph 窗口类型["🪟 窗口类型"]
subgraph Tumbling["滚动窗口 Tumbling"]
T1["窗口不重叠"]
T2["每条数据属于一个窗口"]
TG["| 0-10 | 10-20 | 20-30 |"]
end
subgraph Sliding["滑动窗口 Sliding"]
S1["窗口可重叠"]
S2["每条数据可能属于多个窗口"]
SG["| 0-10 |
| 5-15 |
| 10-20 |"] end subgraph Session["会话窗口 Session"] SS1["基于活动间隙"] SS2["动态窗口大小"] end end style Tumbling fill:#4ecdc4 style Sliding fill:#ffe66d style Session fill:#ff6b6b
| 5-15 |
| 10-20 |"] end subgraph Session["会话窗口 Session"] SS1["基于活动间隙"] SS2["动态窗口大小"] end end style Tumbling fill:#4ecdc4 style Sliding fill:#ffe66d style Session fill:#ff6b6b
6.2 窗口操作代码
import org.apache.spark.sql.functions._
// 假设数据有event_time字段
val events = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "events")
.load()
.selectExpr("CAST(value AS STRING) as json")
.select(from_json($"json", schema).as("data"))
.select(
$"data.user_id",
$"data.event_type",
$"data.event_time".cast("timestamp")
)
// ========== 滚动窗口 ==========
val tumblingWindow = events
.groupBy(
window($"event_time", "10 minutes"), // 10分钟滚动窗口
$"event_type"
)
.count()
// ========== 滑动窗口 ==========
val slidingWindow = events
.groupBy(
window($"event_time", "10 minutes", "5 minutes"), // 窗口10分钟,滑动5分钟
$"event_type"
)
.count()
// 窗口结果示例:
// +--------------------+----------+-----+
// |window |event_type|count|
// +--------------------+----------+-----+
// |{2024-01-01 10:00:00, 2024-01-01 10:10:00}|click|1234|
// |{2024-01-01 10:05:00, 2024-01-01 10:15:00}|click|1567|
// +--------------------+----------+-----+
// 展开窗口字段
tumblingWindow
.select(
$"window.start".as("window_start"),
$"window.end".as("window_end"),
$"event_type",
$"count"
)6.3 窗口图解
gantt
title 滚动窗口 vs 滑动窗口
dateFormat HH:mm
axisFormat %H:%M
section 滚动窗口
窗口1 (00:00-00:10) :a1, 00:00, 10m
窗口2 (00:10-00:20) :a2, 00:10, 10m
窗口3 (00:20-00:30) :a3, 00:20, 10m
section 滑动窗口
窗口1 (00:00-00:10) :b1, 00:00, 10m
窗口2 (00:05-00:15) :b2, 00:05, 10m
窗口3 (00:10-00:20) :b3, 00:10, 10m
窗口4 (00:15-00:25) :b4, 00:15, 10m
七、Watermark与延迟数据
7.1 什么是Watermark?
graph TB
subgraph Watermark概念["🌊 Watermark(水位线)"]
A["问题:数据可能迟到"]
A --> B["Event Time: 10:00"]
B --> C["Processing Time: 10:05"]
C --> D["迟到了5分钟!"]
E["解决:设置Watermark"]
E --> F["允许最多迟到10分钟"]
F --> G["10分钟后关闭窗口"]
G --> H["之后到达的数据丢弃"]
end
style A fill:#ff6b6b
style E fill:#4ecdc4
7.2 Watermark使用
// 设置Watermark
val withWatermark = events
.withWatermark("event_time", "10 minutes") // 允许迟到10分钟
.groupBy(
window($"event_time", "5 minutes"),
$"event_type"
)
.count()
// Watermark工作原理:
// 1. 追踪看到的最大Event Time: max_event_time
// 2. Watermark = max_event_time - 10 minutes
// 3. 当Watermark超过窗口结束时间,窗口关闭
// 4. 关闭后到达的数据被丢弃7.3 Watermark图解
sequenceDiagram
participant Data as 数据
participant Spark as Spark
participant Window as 窗口状态
Note over Spark: Watermark延迟 = 10分钟
Data->>Spark: Event Time = 10:05
Spark->>Spark: max_event_time = 10:05
Spark->>Spark: watermark = 09:55
Spark->>Window: 更新窗口[10:00-10:05]
Data->>Spark: Event Time = 10:15
Spark->>Spark: max_event_time = 10:15
Spark->>Spark: watermark = 10:05
Spark->>Window: 窗口[10:00-10:05]可以关闭
Data->>Spark: Event Time = 10:03 (迟到)
Spark->>Spark: 10:03 < watermark(10:05)
Spark->>Data: 丢弃!窗口已关闭
7.4 Watermark与输出模式
| 输出模式 | Watermark行为 |
|---|---|
| Append | 窗口关闭后才输出(需要Watermark) |
| Update | 每批次输出更新,窗口关闭后停止更新 |
| Complete | 每批次输出全部结果 |
// Append模式 + Watermark(推荐生产使用)
val result = events
.withWatermark("event_time", "10 minutes")
.groupBy(window($"event_time", "5 minutes"))
.count()
result.writeStream
.outputMode("append") // 窗口关闭后输出
.format("parquet")
.start()八、状态管理
8.1 有状态操作
graph TB
subgraph 有状态操作["🗃️ 有状态操作"]
A[groupBy + 聚合]
B[窗口操作]
C[去重 dropDuplicates]
D[流流Join]
E[flatMapGroupsWithState]
F[mapGroupsWithState]
end
subgraph 状态存储["状态存储"]
G["内存 + Checkpoint"]
H["RocksDB(大状态)"]
end
A & B & C & D & E & F --> 状态存储
style E fill:#ff6b6b
style F fill:#ff6b6b
8.2 自定义状态处理
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
// 定义状态和输出类型
case class UserState(lastActivity: Long, totalClicks: Long)
case class UserOutput(userId: String, totalClicks: Long, isTimeout: Boolean)
// 自定义状态更新函数
def updateUserState(
userId: String,
events: Iterator[Event],
state: GroupState[UserState]
): Iterator[UserOutput] = {
// 检查是否超时
if (state.hasTimedOut) {
val oldState = state.get
state.remove()
return Iterator(UserOutput(userId, oldState.totalClicks, isTimeout = true))
}
// 获取或初始化状态
val currentState = state.getOption.getOrElse(UserState(0L, 0L))
// 处理新事件
var newClicks = currentState.totalClicks
var lastActivity = currentState.lastActivity
events.foreach { event =>
newClicks += 1
lastActivity = math.max(lastActivity, event.timestamp)
}
// 更新状态
val newState = UserState(lastActivity, newClicks)
state.update(newState)
// 设置超时(30分钟不活跃)
state.setTimeoutDuration("30 minutes")
// 输出
Iterator(UserOutput(userId, newClicks, isTimeout = false))
}
// 使用
val result = events
.groupByKey(_.userId)
.flatMapGroupsWithState(
OutputMode.Append(),
GroupStateTimeout.ProcessingTimeTimeout
)(updateUserState)8.3 状态清理与Checkpoint
// Checkpoint配置
query.writeStream
.option("checkpointLocation", "/checkpoint/my_app")
.start()
// 状态存储配置(大状态使用RocksDB)
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
// 状态清理(通过Watermark自动清理)
val result = events
.withWatermark("event_time", "1 hour") // 1小时后清理状态
.groupBy(window($"event_time", "10 minutes"), $"user_id")
.count()九、Kafka深度集成
9.1 完整的Kafka读写示例
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val spark = SparkSession.builder()
.appName("KafkaIntegration")
.getOrCreate()
// 定义输入Schema
val inputSchema = StructType(Seq(
StructField("user_id", StringType),
StructField("event_type", StringType),
StructField("event_time", TimestampType),
StructField("properties", MapType(StringType, StringType))
))
// 读取Kafka
val kafkaDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092")
.option("subscribe", "user_events")
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", "100000")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config",
"""org.apache.kafka.common.security.plain.PlainLoginModule required
username="user" password="password";""")
.load()
// 解析JSON
val eventsDF = kafkaDF
.selectExpr("CAST(value AS STRING) as json", "timestamp as kafka_time")
.select(
from_json($"json", inputSchema).as("data"),
$"kafka_time"
)
.select("data.*", "kafka_time")
// 处理逻辑
val resultDF = eventsDF
.withWatermark("event_time", "5 minutes")
.groupBy(
window($"event_time", "1 minute"),
$"event_type"
)
.agg(
count("*").as("event_count"),
countDistinct("user_id").as("unique_users")
)
.select(
$"window.start".as("window_start"),
$"window.end".as("window_end"),
$"event_type",
$"event_count",
$"unique_users"
)
// 写入Kafka
val outputDF = resultDF.select(
$"event_type".as("key"),
to_json(struct($"*")).as("value")
)
val query = outputDF.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka1:9092")
.option("topic", "event_stats")
.option("checkpointLocation", "/checkpoint/event_stats")
.outputMode("update")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
query.awaitTermination()9.2 Kafka配置参数
| 参数 | 说明 | 建议值 |
|---|---|---|
| kafka.bootstrap.servers | Kafka地址 | 生产环境多节点 |
| subscribe | 订阅的topic | - |
| startingOffsets | 起始位置 | earliest/latest |
| maxOffsetsPerTrigger | 每批次最大消息数 | 根据处理能力设置 |
| failOnDataLoss | 数据丢失是否失败 | 生产环境true |
| kafka.group.id | 消费者组 | 设置便于监控 |
9.3 Exactly-Once语义
graph TB
subgraph ExactlyOnce["✅ Exactly-Once语义保证"]
A[幂等写入] --> B["Kafka事务
或目标支持幂等"] C[Checkpoint] --> D["记录偏移量
失败可恢复"] E[原子提交] --> F["偏移量和数据
一起提交"] end style A fill:#4ecdc4 style C fill:#4ecdc4 style E fill:#4ecdc4
或目标支持幂等"] C[Checkpoint] --> D["记录偏移量
失败可恢复"] E[原子提交] --> F["偏移量和数据
一起提交"] end style A fill:#4ecdc4 style C fill:#4ecdc4 style E fill:#4ecdc4
// 启用Kafka事务(Exactly-Once)
query.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("topic", "output")
// 启用幂等性
.option("kafka.enable.idempotence", "true")
// 启用事务
.option("kafka.transactional.id", "my-transactional-id")
.option("checkpointLocation", "/checkpoint/output")
.start()十、实战案例:实时数仓
10.1 架构设计
graph TB
subgraph 实时数仓架构["🏠 实时数仓架构"]
subgraph 数据源["数据源"]
App[App埋点]
Server[服务端日志]
DB[(数据库CDC)]
end
subgraph 消息队列["消息队列"]
Kafka[Kafka]
end
subgraph 实时处理["实时处理"]
SS[Structured Streaming]
end
subgraph 存储层["存储层"]
ODS[ODS
Kafka] DWD[DWD
Kafka] DWS[DWS
Doris/ClickHouse] ADS[ADS
Redis/MySQL] end subgraph 应用层["应用层"] Dashboard[实时大屏] Alert[实时告警] BI[BI报表] end 数据源 --> Kafka Kafka --> SS SS --> ODS --> DWD --> DWS --> ADS ADS --> 应用层 end style SS fill:#ff6b6b style DWS fill:#4ecdc4
Kafka] DWD[DWD
Kafka] DWS[DWS
Doris/ClickHouse] ADS[ADS
Redis/MySQL] end subgraph 应用层["应用层"] Dashboard[实时大屏] Alert[实时告警] BI[BI报表] end 数据源 --> Kafka Kafka --> SS SS --> ODS --> DWD --> DWS --> ADS ADS --> 应用层 end style SS fill:#ff6b6b style DWS fill:#4ecdc4
10.2 实时指标计算
// 需求:实时计算每分钟的PV、UV、订单数、GMV
val events = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "user_events,orders")
.option("startingOffsets", "latest")
.load()
// 解析事件
val parsedEvents = events
.selectExpr("CAST(value AS STRING) as json", "topic")
.select(
$"topic",
from_json($"json", eventSchema).as("data")
)
.select($"topic", $"data.*")
.withColumn("event_time", $"timestamp".cast("timestamp"))
// PV/UV统计
val pvUv = parsedEvents
.filter($"topic" === "user_events")
.withWatermark("event_time", "5 minutes")
.groupBy(window($"event_time", "1 minute"))
.agg(
count("*").as("pv"),
countDistinct("user_id").as("uv")
)
// 订单统计
val orderStats = parsedEvents
.filter($"topic" === "orders")
.filter($"event_type" === "order_created")
.withWatermark("event_time", "5 minutes")
.groupBy(window($"event_time", "1 minute"))
.agg(
count("*").as("order_count"),
sum("amount").as("gmv")
)
// 合并指标
val metrics = pvUv.join(orderStats, Seq("window"), "outer")
.select(
$"window.start".as("stat_time"),
coalesce($"pv", lit(0)).as("pv"),
coalesce($"uv", lit(0)).as("uv"),
coalesce($"order_count", lit(0)).as("order_count"),
coalesce($"gmv", lit(0)).as("gmv")
)
// 写入Doris
metrics.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write
.format("doris")
.option("doris.table.identifier", "realtime.metrics_1min")
.option("doris.fenodes", "doris:8030")
.option("user", "root")
.option("password", "")
.mode("append")
.save()
}
.option("checkpointLocation", "/checkpoint/metrics")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()10.3 实时告警
// 需求:订单金额异常告警
val orders = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "orders")
.load()
.selectExpr("CAST(value AS STRING) as json")
.select(from_json($"json", orderSchema).as("data"))
.select("data.*")
// 计算滑动窗口平均值
val avgAmount = orders
.withWatermark("order_time", "5 minutes")
.groupBy(window($"order_time", "10 minutes", "1 minute"))
.agg(
avg("amount").as("avg_amount"),
stddev("amount").as("stddev_amount")
)
// 检测异常(金额超过3倍标准差)
val anomalies = orders
.withWatermark("order_time", "5 minutes")
.join(avgAmount,
orders("order_time") >= avgAmount("window.start") &&
orders("order_time") < avgAmount("window.end"))
.filter($"amount" > $"avg_amount" + $"stddev_amount" * 3)
.select(
$"order_id",
$"user_id",
$"amount",
$"avg_amount",
$"order_time"
)
// 发送告警
anomalies.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.collect().foreach { row =>
sendAlert(
s"订单异常告警: 订单${row.getAs[String]("order_id")}, " +
s"金额${row.getAs[Double]("amount")}, " +
s"平均值${row.getAs[Double]("avg_amount")}"
)
}
}
.start()十一、生产环境最佳实践
11.1 Checkpoint配置
// 必须配置Checkpoint(生产环境)
query.writeStream
.option("checkpointLocation", "hdfs:///checkpoint/my_app")
.start()
// Checkpoint包含:
// 1. 偏移量(Kafka offset)
// 2. 状态数据
// 3. 提交日志
// 注意事项:
// 1. Checkpoint目录不要随便删除
// 2. 代码变更可能导致Checkpoint不兼容
// 3. 使用HDFS或S3等可靠存储11.2 资源配置
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--conf spark.streaming.backpressure.enabled=true \
--conf spark.streaming.kafka.maxRatePerPartition=10000 \
--conf spark.sql.shuffle.partitions=100 \
--conf spark.sql.streaming.stateStore.rocksdb.enabled=true \
--class com.example.StreamingApp \
app.jar11.3 监控指标
graph TB
subgraph 监控指标["📊 关键监控指标"]
A[处理延迟
inputRowsPerSecond vs processedRowsPerSecond] B[批处理时间
batchDuration] C[状态大小
numRowsTotal] D[Kafka Lag
消费延迟] E[Checkpoint时间
checkpointDuration] end style A fill:#ff6b6b style D fill:#ff6b6b
inputRowsPerSecond vs processedRowsPerSecond] B[批处理时间
batchDuration] C[状态大小
numRowsTotal] D[Kafka Lag
消费延迟] E[Checkpoint时间
checkpointDuration] end style A fill:#ff6b6b style D fill:#ff6b6b
// 获取流查询状态
val query = df.writeStream.start()
// 查看进度
query.lastProgress
query.recentProgress
// 关键指标
// inputRowsPerSecond: 输入速率
// processedRowsPerSecond: 处理速率
// batchDuration: 批处理时间
// numInputRows: 输入行数
// 设置监控回调
spark.streams.addListener(new StreamingQueryListener {
override def onQueryStarted(event: QueryStartedEvent): Unit = {
println(s"Query started: ${event.id}")
}
override def onQueryProgress(event: QueryProgressEvent): Unit = {
val progress = event.progress
println(s"Batch ${progress.batchId}: " +
s"input=${progress.numInputRows}, " +
s"rate=${progress.processedRowsPerSecond}")
}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
println(s"Query terminated: ${event.id}")
}
})11.4 常见问题处理
flowchart TB
subgraph 问题处理["🔧 常见问题处理"]
P1[处理速度跟不上] --> S1["1. 增加Executor
2. 优化代码
3. 增加分区"] P2[状态太大OOM] --> S2["1. 使用RocksDB
2. 设置合理Watermark
3. 减少状态保留时间"] P3[Kafka Lag增长] --> S3["1. 增加maxOffsetsPerTrigger
2. 增加并行度
3. 优化处理逻辑"] P4[Checkpoint失败] --> S4["1. 增加超时时间
2. 检查存储性能
3. 减少状态大小"] end style S1 fill:#4ecdc4 style S2 fill:#4ecdc4 style S3 fill:#4ecdc4 style S4 fill:#4ecdc4
2. 优化代码
3. 增加分区"] P2[状态太大OOM] --> S2["1. 使用RocksDB
2. 设置合理Watermark
3. 减少状态保留时间"] P3[Kafka Lag增长] --> S3["1. 增加maxOffsetsPerTrigger
2. 增加并行度
3. 优化处理逻辑"] P4[Checkpoint失败] --> S4["1. 增加超时时间
2. 检查存储性能
3. 减少状态大小"] end style S1 fill:#4ecdc4 style S2 fill:#4ecdc4 style S3 fill:#4ecdc4 style S4 fill:#4ecdc4
十二、写在最后
Structured Streaming让流处理变得简单,但生产环境的流处理从来都不简单。
记住几个原则:
- 先批后流:先用批处理验证逻辑,再转成流处理
- Checkpoint必须有:没有Checkpoint就是裸奔
- 监控要完善:实时系统出问题是秒级的事
- Watermark要合理:太短丢数据,太长状态爆
- 降级方案要有:流处理挂了怎么办?
最后送大家一句话:
批处理是历史,流处理是未来,但能跑的才是王道。
本文作者:一个与Kafka Lag搏斗的程序员
最惨经历:Watermark设错了,状态涨到100GB把集群打挂
下一篇:Spark部署与运维
附录:面试高频题
Structured Streaming和Spark Streaming的区别?
Structured Streaming基于DataFrame/Dataset API,支持Exactly-Once,支持Event Time;Spark Streaming基于DStream/RDD,仅At-Least-Once。
什么是Watermark?有什么作用?
Watermark是处理延迟数据的机制,定义了等待迟到数据的时间阈值,超过阈值的数据会被丢弃,同时触发窗口关闭和状态清理。
三种输出模式的区别?
Append:只输出新增行;Complete:输出全部结果;Update:只输出更新的行。聚合查询通常用Complete或Update。
如何保证Exactly-Once?
通过Checkpoint记录偏移量,幂等写入或事务写入,原子提交。
状态太大怎么办?
使用RocksDB状态后端,设置合理的Watermark自动清理状态,或使用flatMapGroupsWithState手动管理状态。
流处理和批处理如何统一?
Structured Streaming的API与Spark SQL统一,可以用相同的代码处理批数据和流数据,实现流批一体。