一、前言:为什么用 SQL?
1.1 DataStream vs SQL
graph LR
subgraph "DataStream API"
A1["编写 Java/Scala 代码"]
A2["类型安全"]
A3["灵活但复杂"]
A4["需要编译打包"]
end
subgraph "Flink SQL"
B1["编写 SQL"]
B2["声明式"]
B3["简单易学"]
B4["即写即用"]
end
同样的功能对比:
// DataStream API - 约 30 行代码
DataStream<Order> orders = env.addSource(kafkaSource);
orders
.assignTimestampsAndWatermarks(...)
.keyBy(Order::getUserId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new OrderAggregator(), new OrderWindowFunction())
.addSink(jdbcSink);-- Flink SQL - 约 10 行
INSERT INTO order_stats
SELECT
user_id,
TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start,
COUNT(*) as order_count,
SUM(amount) as total_amount
FROM orders
GROUP BY user_id, TUMBLE(order_time, INTERVAL '1' HOUR);1.2 Flink SQL 能力
graph TB
subgraph "Flink SQL 能力"
A[流处理] --> A1["持续查询"]
A --> A2["窗口聚合"]
A --> A3["Join"]
B[批处理] --> B1["批量查询"]
B --> B2["离线分析"]
C[连接器] --> C1["Kafka/MySQL/ES"]
C --> C2["Hive/Iceberg/Hudi"]
C --> C3["文件系统"]
end
二、快速上手
2.1 启动 SQL Client
# 启动 Flink 集群
./bin/start-cluster.sh
# 启动 SQL Client
./bin/sql-client.sh
# 或者嵌入式模式(不需要集群)
./bin/sql-client.sh embedded2.2 第一个查询
-- 创建数据源表
CREATE TABLE datagen_source (
id INT,
name STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
);
-- 查询
SELECT * FROM datagen_source LIMIT 10;
-- 窗口聚合
SELECT
TUMBLE_START(ts, INTERVAL '10' SECOND) as window_start,
COUNT(*) as cnt
FROM datagen_source
GROUP BY TUMBLE(ts, INTERVAL '10' SECOND);2.3 Table API 入门
// 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 方式1:执行 SQL
tableEnv.executeSql("CREATE TABLE ...");
tableEnv.executeSql("INSERT INTO ... SELECT ...");
// 方式2:Table API
Table orders = tableEnv.from("orders");
Table result = orders
.filter($("amount").isGreater(100))
.groupBy($("user_id"))
.select($("user_id"), $("amount").sum().as("total"));
// Table 转 DataStream
DataStream<Row> stream = tableEnv.toDataStream(result);
// DataStream 转 Table
DataStream<Order> orderStream = ...;
Table orderTable = tableEnv.fromDataStream(orderStream);三、动态表与连续查询
3.1 动态表概念
graph LR
subgraph "传统数据库"
A1[静态表] --> A2[查询]
A2 --> A3[结果快照]
end
subgraph "Flink SQL"
B1[动态表
持续变化] --> B2[连续查询] B2 --> B3[动态结果
持续更新] end
持续变化] --> B2[连续查询] B2 --> B3[动态结果
持续更新] end
3.2 流转表、表转流
graph LR
STREAM[Stream] -->|"fromDataStream()"| TABLE[Dynamic Table]
TABLE -->|"toDataStream()"| STREAM2[Stream]
TABLE -->|"连续查询"| TABLE2[Result Table]
// 流转表
DataStream<Order> orderStream = ...;
Table orderTable = tableEnv.fromDataStream(orderStream,
Schema.newBuilder()
.column("order_id", DataTypes.BIGINT())
.column("user_id", DataTypes.BIGINT())
.column("amount", DataTypes.DECIMAL(10, 2))
.column("order_time", DataTypes.TIMESTAMP(3))
.watermark("order_time", "order_time - INTERVAL '5' SECOND")
.build()
);
// 表转流(三种模式)
// 1. 只有 INSERT
DataStream<Row> insertStream = tableEnv.toDataStream(resultTable);
// 2. 有 UPDATE/DELETE (Changelog)
DataStream<Row> changelogStream = tableEnv.toChangelogStream(resultTable);3.3 Changelog 机制
graph TB
subgraph "Append-Only 查询"
A1["SELECT * FROM source"] --> A2["只产生 INSERT"]
end
subgraph "更新查询"
B1["SELECT COUNT(*) FROM source
GROUP BY key"] --> B2["产生 INSERT/UPDATE"] end subgraph "Changelog" C1["+I: INSERT"] C2["-U: UPDATE_BEFORE"] C3["+U: UPDATE_AFTER"] C4["-D: DELETE"] end
GROUP BY key"] --> B2["产生 INSERT/UPDATE"] end subgraph "Changelog" C1["+I: INSERT"] C2["-U: UPDATE_BEFORE"] C3["+U: UPDATE_AFTER"] C4["-D: DELETE"] end
四、Connector 详解
4.1 常用 Connector
graph TB
subgraph "Source & Sink"
A[消息队列] --> A1[Kafka]
A --> A2[Pulsar]
B[数据库] --> B1[MySQL/PostgreSQL]
B --> B2[MongoDB]
C[数据湖] --> C1[Hive]
C --> C2[Iceberg]
C --> C3[Hudi]
D[其他] --> D1[Elasticsearch]
D --> D2[FileSystem]
D --> D3[JDBC]
end
4.2 Kafka Connector
-- Kafka Source
CREATE TABLE kafka_source (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
-- 定义 Watermark
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,
-- Kafka 元数据
`partition` INT METADATA FROM 'partition' VIRTUAL,
`offset` BIGINT METADATA FROM 'offset' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-consumer',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
-- Kafka Sink
CREATE TABLE kafka_sink (
user_id BIGINT,
window_start TIMESTAMP(3),
order_count BIGINT,
total_amount DECIMAL(10, 2),
PRIMARY KEY (user_id, window_start) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'order-stats',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json',
'sink.partitioner' = 'round-robin'
);
-- Upsert Kafka(支持更新)
CREATE TABLE kafka_upsert_sink (
user_id BIGINT,
total_orders BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'user-stats',
'properties.bootstrap.servers' = 'kafka:9092',
'key.format' = 'json',
'value.format' = 'json'
);4.3 JDBC Connector
-- MySQL Source (CDC)
CREATE TABLE mysql_source (
id BIGINT,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydb',
'table-name' = 'users',
'username' = 'root',
'password' = 'password'
);
-- MySQL Sink
CREATE TABLE mysql_sink (
user_id BIGINT,
order_count BIGINT,
total_amount DECIMAL(10, 2),
update_time TIMESTAMP(3),
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydb',
'table-name' = 'user_stats',
'username' = 'root',
'password' = 'password',
'sink.buffer-flush.max-rows' = '1000',
'sink.buffer-flush.interval' = '1s'
);4.4 FileSystem Connector
-- 文件 Source
CREATE TABLE file_source (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs:///data/orders',
'format' = 'parquet'
);
-- 文件 Sink(分区写入)
CREATE TABLE file_sink (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
dt STRING
) PARTITIONED BY (dt) WITH (
'connector' = 'filesystem',
'path' = 'hdfs:///data/output',
'format' = 'parquet',
'sink.partition-commit.trigger' = 'process-time',
'sink.partition-commit.delay' = '1 h',
'sink.partition-commit.policy.kind' = 'success-file'
);五、时间与窗口
5.1 时间属性定义
-- 事件时间(推荐)
CREATE TABLE orders (
order_id BIGINT,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
-- 定义事件时间和 Watermark
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);
-- 处理时间
CREATE TABLE orders (
order_id BIGINT,
amount DECIMAL(10, 2),
-- 处理时间由系统生成
proc_time AS PROCTIME()
) WITH (...);
-- 从 Kafka 时间戳提取
CREATE TABLE kafka_source (
order_id BIGINT,
amount DECIMAL(10, 2),
ts TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
...
);5.2 窗口函数
-- ============ 滚动窗口 TUMBLE ============
SELECT
user_id,
TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start,
TUMBLE_END(order_time, INTERVAL '1' HOUR) as window_end,
COUNT(*) as order_count,
SUM(amount) as total_amount
FROM orders
GROUP BY user_id, TUMBLE(order_time, INTERVAL '1' HOUR);
-- ============ 滑动窗口 HOP ============
SELECT
user_id,
HOP_START(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR) as window_start,
HOP_END(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR) as window_end,
COUNT(*) as order_count
FROM orders
GROUP BY user_id, HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR);
-- ============ 会话窗口 SESSION ============
SELECT
user_id,
SESSION_START(order_time, INTERVAL '30' MINUTE) as session_start,
SESSION_END(order_time, INTERVAL '30' MINUTE) as session_end,
COUNT(*) as order_count
FROM orders
GROUP BY user_id, SESSION(order_time, INTERVAL '30' MINUTE);
-- ============ 累积窗口 CUMULATE (Flink 1.13+) ============
SELECT
user_id,
window_start,
window_end,
SUM(amount) as cumulative_amount
FROM TABLE(
CUMULATE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR, INTERVAL '1' DAY)
)
GROUP BY user_id, window_start, window_end;5.3 Window TVF (Table-Valued Function)
-- Flink 1.13+ 推荐使用 Window TVF
-- 滚动窗口
SELECT
user_id,
window_start,
window_end,
COUNT(*) as cnt
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR)
)
GROUP BY user_id, window_start, window_end;
-- 滑动窗口
SELECT
user_id,
window_start,
window_end,
COUNT(*) as cnt
FROM TABLE(
HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '5' MINUTE, INTERVAL '1' HOUR)
)
GROUP BY user_id, window_start, window_end;
-- 窗口 TopN
SELECT *
FROM (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_amount DESC) as rn
FROM (
SELECT
user_id,
window_start,
window_end,
SUM(amount) as total_amount
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR)
)
GROUP BY user_id, window_start, window_end
)
)
WHERE rn <= 10;六、Join 详解
6.1 Join 类型总览
graph TB
subgraph "Flink SQL Join 类型"
A[Regular Join] --> A1["双流 Join
需要大状态"] B[Interval Join] --> B1["时间范围 Join
状态可控"] C[Temporal Join] --> C1["时态表 Join
维表关联"] D[Lookup Join] --> D1["外部表查询
维表查询"] end
需要大状态"] B[Interval Join] --> B1["时间范围 Join
状态可控"] C[Temporal Join] --> C1["时态表 Join
维表关联"] D[Lookup Join] --> D1["外部表查询
维表查询"] end
6.2 Regular Join
-- Inner Join(双流 Join,状态会无限增长!)
SELECT
o.order_id,
o.amount,
u.name
FROM orders o
JOIN users u ON o.user_id = u.user_id;
-- Left Join
SELECT
o.order_id,
o.amount,
u.name
FROM orders o
LEFT JOIN users u ON o.user_id = u.user_id;
-- ⚠️ 注意:Regular Join 的状态会无限增长
-- 需要设置状态 TTL
SET 'table.exec.state.ttl' = '24 h';6.3 Interval Join
-- Interval Join:只关联时间范围内的数据
-- 订单和支付在 15 分钟内匹配
SELECT
o.order_id,
o.amount,
p.pay_time
FROM orders o
JOIN payments p
ON o.order_id = p.order_id
AND p.pay_time BETWEEN o.order_time AND o.order_time + INTERVAL '15' MINUTE;graph LR
subgraph "Interval Join 原理"
O["订单: 10:00"] --> JOIN["匹配支付
10:00 ~ 10:15"] P1["支付: 10:05 ✓"] P2["支付: 10:20 ✗"] end
10:00 ~ 10:15"] P1["支付: 10:05 ✓"] P2["支付: 10:20 ✗"] end
6.4 Temporal Join (时态表 Join)
-- 创建版本表
CREATE TABLE currency_rates (
currency STRING,
rate DECIMAL(10, 4),
update_time TIMESTAMP(3),
WATERMARK FOR update_time AS update_time,
PRIMARY KEY (currency) NOT ENFORCED
) WITH (...);
-- 时态表 Join:获取订单时刻的汇率
SELECT
o.order_id,
o.amount,
o.currency,
r.rate,
o.amount * r.rate as amount_usd
FROM orders o
JOIN currency_rates FOR SYSTEM_TIME AS OF o.order_time AS r
ON o.currency = r.currency;6.5 Lookup Join
-- 创建维表(MySQL)
CREATE TABLE dim_user (
user_id BIGINT,
name STRING,
age INT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydb',
'table-name' = 'users',
'username' = 'root',
'password' = 'password',
'lookup.cache.max-rows' = '10000',
'lookup.cache.ttl' = '1 hour'
);
-- Lookup Join:实时查询维表
SELECT
o.order_id,
o.amount,
u.name,
u.age
FROM orders o
JOIN dim_user FOR SYSTEM_TIME AS OF o.proc_time AS u
ON o.user_id = u.user_id;七、高级特性
7.1 Top-N
-- 实时 Top 10 热销商品
SELECT *
FROM (
SELECT
product_id,
sale_count,
ROW_NUMBER() OVER (ORDER BY sale_count DESC) as rn
FROM product_sales
)
WHERE rn <= 10;
-- 窗口内 Top N
SELECT *
FROM (
SELECT
product_id,
window_start,
window_end,
sale_count,
ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY sale_count DESC) as rn
FROM (
SELECT
product_id,
window_start,
window_end,
SUM(quantity) as sale_count
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR)
)
GROUP BY product_id, window_start, window_end
)
)
WHERE rn <= 10;7.2 Deduplication (去重)
-- 按 order_id 去重,保留第一条
SELECT *
FROM (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY order_time ASC) as rn
FROM orders
)
WHERE rn = 1;
-- 按 order_id 去重,保留最后一条
SELECT *
FROM (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY order_time DESC) as rn
FROM orders
)
WHERE rn = 1;7.3 CEP (复杂事件处理)
-- MATCH_RECOGNIZE:检测连续三次登录失败
SELECT *
FROM login_events
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY event_time
MEASURES
FIRST(A.event_time) AS start_time,
LAST(A.event_time) AS end_time,
COUNT(A.event_id) AS fail_count
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A{3,})
DEFINE
A AS A.status = 'FAIL'
) AS T;7.4 自定义函数 (UDF)
// Scalar Function(标量函数)
public class UpperCase extends ScalarFunction {
public String eval(String s) {
return s == null ? null : s.toUpperCase();
}
}
// Table Function(表函数)
public class SplitFunction extends TableFunction<Row> {
public void eval(String str) {
for (String s : str.split(",")) {
collect(Row.of(s));
}
}
}
// Aggregate Function(聚合函数)
public class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccum> {
@Override
public WeightedAvgAccum createAccumulator() {
return new WeightedAvgAccum();
}
public void accumulate(WeightedAvgAccum acc, Long value, Integer weight) {
acc.sum += value * weight;
acc.count += weight;
}
@Override
public Double getValue(WeightedAvgAccum acc) {
return acc.count == 0 ? null : (double) acc.sum / acc.count;
}
}-- 注册 UDF
CREATE FUNCTION upper_case AS 'com.example.UpperCase';
CREATE FUNCTION split AS 'com.example.SplitFunction';
CREATE FUNCTION weighted_avg AS 'com.example.WeightedAvg';
-- 使用 UDF
SELECT upper_case(name) FROM users;
SELECT user_id, tag
FROM orders, LATERAL TABLE(split(tags)) AS T(tag);
SELECT user_id, weighted_avg(score, weight) as avg_score
FROM scores
GROUP BY user_id;八、实战案例
8.1 案例:实时 GMV 统计
-- 1. 创建订单源表
CREATE TABLE orders (
order_id BIGINT,
user_id BIGINT,
product_id BIGINT,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
-- 2. 创建结果表
CREATE TABLE gmv_stats (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
total_orders BIGINT,
total_gmv DECIMAL(20, 2),
PRIMARY KEY (window_start, window_end) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/analytics',
'table-name' = 'gmv_stats',
'username' = 'root',
'password' = 'password'
);
-- 3. 实时计算
INSERT INTO gmv_stats
SELECT
window_start,
window_end,
COUNT(*) as total_orders,
SUM(amount) as total_gmv
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end;8.2 案例:用户行为分析
-- 漏斗分析:浏览 -> 加购 -> 下单 -> 支付 转化率
WITH funnel AS (
SELECT
user_id,
CASE
WHEN action = 'view' THEN 1
WHEN action = 'cart' THEN 2
WHEN action = 'order' THEN 3
WHEN action = 'pay' THEN 4
END as step,
event_time
FROM user_actions
WHERE DATE_FORMAT(event_time, 'yyyy-MM-dd') = '2024-01-15'
),
user_max_step AS (
SELECT user_id, MAX(step) as max_step
FROM funnel
GROUP BY user_id
)
SELECT
COUNT(CASE WHEN max_step >= 1 THEN 1 END) as view_users,
COUNT(CASE WHEN max_step >= 2 THEN 1 END) as cart_users,
COUNT(CASE WHEN max_step >= 3 THEN 1 END) as order_users,
COUNT(CASE WHEN max_step >= 4 THEN 1 END) as pay_users
FROM user_max_step;8.3 案例:实时告警
-- 创建告警输出表
CREATE TABLE alerts (
alert_type STRING,
alert_time TIMESTAMP(3),
message STRING
) WITH (
'connector' = 'kafka',
'topic' = 'alerts',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
-- 大额订单告警
INSERT INTO alerts
SELECT
'LARGE_ORDER' as alert_type,
order_time as alert_time,
CONCAT('大额订单告警: order_id=', CAST(order_id AS STRING), ', amount=', CAST(amount AS STRING)) as message
FROM orders
WHERE amount > 10000;
-- 异常流量告警:1分钟内订单数超过阈值
INSERT INTO alerts
SELECT
'HIGH_TRAFFIC' as alert_type,
window_end as alert_time,
CONCAT('流量异常: 1分钟订单数=', CAST(order_count AS STRING)) as message
FROM (
SELECT
window_end,
COUNT(*) as order_count
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end
)
WHERE order_count > 10000;九、性能调优
9.1 配置优化
-- MiniBatch 优化(减少状态访问)
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5 s';
SET 'table.exec.mini-batch.size' = '5000';
-- 状态 TTL
SET 'table.exec.state.ttl' = '24 h';
-- 并行度
SET 'table.exec.resource.default-parallelism' = '4';
-- 两阶段聚合
SET 'table.optimizer.agg-phase-strategy' = 'TWO_PHASE';
-- 去重优化
SET 'table.optimizer.distinct-agg.split.enabled' = 'true';9.2 SQL Hints
-- 指定 Lookup Join 缓存
SELECT /*+ LOOKUP('table'='dim_user', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay') */
o.*, u.name
FROM orders o
JOIN dim_user FOR SYSTEM_TIME AS OF o.proc_time AS u
ON o.user_id = u.user_id;
-- 指定 State TTL
SELECT /*+ STATE_TTL('o' = '1d', 'u' = '12h') */
o.*, u.*
FROM orders o
JOIN users u ON o.user_id = u.user_id;9.3 常见问题
flowchart TD
A[SQL 性能问题] --> B{问题类型}
B -->|状态过大| C["设置 State TTL
使用 RocksDB"] B -->|数据倾斜| D["使用两阶段聚合
打散热点 Key"] B -->|延迟高| E["检查背压
增加并行度"] B -->|Lookup 慢| F["开启缓存
异步查询"]
使用 RocksDB"] B -->|数据倾斜| D["使用两阶段聚合
打散热点 Key"] B -->|延迟高| E["检查背压
增加并行度"] B -->|Lookup 慢| F["开启缓存
异步查询"]
十、总结
10.1 核心要点
mindmap
root((Flink SQL))
基础
动态表
连续查询
Changelog
Connector
Kafka
JDBC
FileSystem
时间窗口
TUMBLE
HOP
SESSION
Join
Regular Join
Interval Join
Lookup Join
高级特性
Top-N
去重
CEP
UDF
10.2 一句话总结
Flink SQL = 用 SQL 的方式做流处理
声明式、简单易学、功能强大,是 Flink 开发的首选方式。