搜 索

Flink从入门到放弃④:用SQL玩转流处理

  • 10阅读
  • 2023年08月19日
  • 0评论
首页 / AI/大数据 / 正文

一、前言:为什么用 SQL?

1.1 DataStream vs SQL

graph LR subgraph "DataStream API" A1["编写 Java/Scala 代码"] A2["类型安全"] A3["灵活但复杂"] A4["需要编译打包"] end subgraph "Flink SQL" B1["编写 SQL"] B2["声明式"] B3["简单易学"] B4["即写即用"] end

同样的功能对比:

// DataStream API - 约 30 行代码
DataStream<Order> orders = env.addSource(kafkaSource);
orders
    .assignTimestampsAndWatermarks(...)
    .keyBy(Order::getUserId)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .aggregate(new OrderAggregator(), new OrderWindowFunction())
    .addSink(jdbcSink);
-- Flink SQL - 约 10 行
INSERT INTO order_stats
SELECT 
    user_id,
    TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start,
    COUNT(*) as order_count,
    SUM(amount) as total_amount
FROM orders
GROUP BY user_id, TUMBLE(order_time, INTERVAL '1' HOUR);

1.2 Flink SQL 能力

graph TB subgraph "Flink SQL 能力" A[流处理] --> A1["持续查询"] A --> A2["窗口聚合"] A --> A3["Join"] B[批处理] --> B1["批量查询"] B --> B2["离线分析"] C[连接器] --> C1["Kafka/MySQL/ES"] C --> C2["Hive/Iceberg/Hudi"] C --> C3["文件系统"] end

二、快速上手

2.1 启动 SQL Client

# 启动 Flink 集群
./bin/start-cluster.sh

# 启动 SQL Client
./bin/sql-client.sh

# 或者嵌入式模式(不需要集群)
./bin/sql-client.sh embedded

2.2 第一个查询

-- 创建数据源表
CREATE TABLE datagen_source (
    id INT,
    name STRING,
    ts TIMESTAMP(3)
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '10'
);

-- 查询
SELECT * FROM datagen_source LIMIT 10;

-- 窗口聚合
SELECT 
    TUMBLE_START(ts, INTERVAL '10' SECOND) as window_start,
    COUNT(*) as cnt
FROM datagen_source
GROUP BY TUMBLE(ts, INTERVAL '10' SECOND);

2.3 Table API 入门

// 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 方式1:执行 SQL
tableEnv.executeSql("CREATE TABLE ...");
tableEnv.executeSql("INSERT INTO ... SELECT ...");

// 方式2:Table API
Table orders = tableEnv.from("orders");
Table result = orders
    .filter($("amount").isGreater(100))
    .groupBy($("user_id"))
    .select($("user_id"), $("amount").sum().as("total"));

// Table 转 DataStream
DataStream<Row> stream = tableEnv.toDataStream(result);

// DataStream 转 Table
DataStream<Order> orderStream = ...;
Table orderTable = tableEnv.fromDataStream(orderStream);

三、动态表与连续查询

3.1 动态表概念

graph LR subgraph "传统数据库" A1[静态表] --> A2[查询] A2 --> A3[结果快照] end subgraph "Flink SQL" B1[动态表
持续变化] --> B2[连续查询] B2 --> B3[动态结果
持续更新] end

3.2 流转表、表转流

graph LR STREAM[Stream] -->|"fromDataStream()"| TABLE[Dynamic Table] TABLE -->|"toDataStream()"| STREAM2[Stream] TABLE -->|"连续查询"| TABLE2[Result Table]
// 流转表
DataStream<Order> orderStream = ...;
Table orderTable = tableEnv.fromDataStream(orderStream, 
    Schema.newBuilder()
        .column("order_id", DataTypes.BIGINT())
        .column("user_id", DataTypes.BIGINT())
        .column("amount", DataTypes.DECIMAL(10, 2))
        .column("order_time", DataTypes.TIMESTAMP(3))
        .watermark("order_time", "order_time - INTERVAL '5' SECOND")
        .build()
);

// 表转流(三种模式)
// 1. 只有 INSERT
DataStream<Row> insertStream = tableEnv.toDataStream(resultTable);

// 2. 有 UPDATE/DELETE (Changelog)
DataStream<Row> changelogStream = tableEnv.toChangelogStream(resultTable);

3.3 Changelog 机制

graph TB subgraph "Append-Only 查询" A1["SELECT * FROM source"] --> A2["只产生 INSERT"] end subgraph "更新查询" B1["SELECT COUNT(*) FROM source
GROUP BY key"] --> B2["产生 INSERT/UPDATE"] end subgraph "Changelog" C1["+I: INSERT"] C2["-U: UPDATE_BEFORE"] C3["+U: UPDATE_AFTER"] C4["-D: DELETE"] end

四、Connector 详解

4.1 常用 Connector

graph TB subgraph "Source & Sink" A[消息队列] --> A1[Kafka] A --> A2[Pulsar] B[数据库] --> B1[MySQL/PostgreSQL] B --> B2[MongoDB] C[数据湖] --> C1[Hive] C --> C2[Iceberg] C --> C3[Hudi] D[其他] --> D1[Elasticsearch] D --> D2[FileSystem] D --> D3[JDBC] end

4.2 Kafka Connector

-- Kafka Source
CREATE TABLE kafka_source (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10, 2),
    order_time TIMESTAMP(3),
    -- 定义 Watermark
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,
    -- Kafka 元数据
    `partition` INT METADATA FROM 'partition' VIRTUAL,
    `offset` BIGINT METADATA FROM 'offset' VIRTUAL
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'kafka:9092',
    'properties.group.id' = 'flink-consumer',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
);

-- Kafka Sink
CREATE TABLE kafka_sink (
    user_id BIGINT,
    window_start TIMESTAMP(3),
    order_count BIGINT,
    total_amount DECIMAL(10, 2),
    PRIMARY KEY (user_id, window_start) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'order-stats',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json',
    'sink.partitioner' = 'round-robin'
);

-- Upsert Kafka(支持更新)
CREATE TABLE kafka_upsert_sink (
    user_id BIGINT,
    total_orders BIGINT,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'user-stats',
    'properties.bootstrap.servers' = 'kafka:9092',
    'key.format' = 'json',
    'value.format' = 'json'
);

4.3 JDBC Connector

-- MySQL Source (CDC)
CREATE TABLE mysql_source (
    id BIGINT,
    name STRING,
    age INT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/mydb',
    'table-name' = 'users',
    'username' = 'root',
    'password' = 'password'
);

-- MySQL Sink
CREATE TABLE mysql_sink (
    user_id BIGINT,
    order_count BIGINT,
    total_amount DECIMAL(10, 2),
    update_time TIMESTAMP(3),
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/mydb',
    'table-name' = 'user_stats',
    'username' = 'root',
    'password' = 'password',
    'sink.buffer-flush.max-rows' = '1000',
    'sink.buffer-flush.interval' = '1s'
);

4.4 FileSystem Connector

-- 文件 Source
CREATE TABLE file_source (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10, 2),
    order_time TIMESTAMP(3)
) WITH (
    'connector' = 'filesystem',
    'path' = 'hdfs:///data/orders',
    'format' = 'parquet'
);

-- 文件 Sink(分区写入)
CREATE TABLE file_sink (
    order_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10, 2),
    order_time TIMESTAMP(3),
    dt STRING
) PARTITIONED BY (dt) WITH (
    'connector' = 'filesystem',
    'path' = 'hdfs:///data/output',
    'format' = 'parquet',
    'sink.partition-commit.trigger' = 'process-time',
    'sink.partition-commit.delay' = '1 h',
    'sink.partition-commit.policy.kind' = 'success-file'
);

五、时间与窗口

5.1 时间属性定义

-- 事件时间(推荐)
CREATE TABLE orders (
    order_id BIGINT,
    amount DECIMAL(10, 2),
    order_time TIMESTAMP(3),
    -- 定义事件时间和 Watermark
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);

-- 处理时间
CREATE TABLE orders (
    order_id BIGINT,
    amount DECIMAL(10, 2),
    -- 处理时间由系统生成
    proc_time AS PROCTIME()
) WITH (...);

-- 从 Kafka 时间戳提取
CREATE TABLE kafka_source (
    order_id BIGINT,
    amount DECIMAL(10, 2),
    ts TIMESTAMP(3) METADATA FROM 'timestamp',
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    ...
);

5.2 窗口函数

-- ============ 滚动窗口 TUMBLE ============
SELECT 
    user_id,
    TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start,
    TUMBLE_END(order_time, INTERVAL '1' HOUR) as window_end,
    COUNT(*) as order_count,
    SUM(amount) as total_amount
FROM orders
GROUP BY user_id, TUMBLE(order_time, INTERVAL '1' HOUR);

-- ============ 滑动窗口 HOP ============
SELECT 
    user_id,
    HOP_START(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR) as window_start,
    HOP_END(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR) as window_end,
    COUNT(*) as order_count
FROM orders
GROUP BY user_id, HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR);

-- ============ 会话窗口 SESSION ============
SELECT 
    user_id,
    SESSION_START(order_time, INTERVAL '30' MINUTE) as session_start,
    SESSION_END(order_time, INTERVAL '30' MINUTE) as session_end,
    COUNT(*) as order_count
FROM orders
GROUP BY user_id, SESSION(order_time, INTERVAL '30' MINUTE);

-- ============ 累积窗口 CUMULATE (Flink 1.13+) ============
SELECT 
    user_id,
    window_start,
    window_end,
    SUM(amount) as cumulative_amount
FROM TABLE(
    CUMULATE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR, INTERVAL '1' DAY)
)
GROUP BY user_id, window_start, window_end;

5.3 Window TVF (Table-Valued Function)

-- Flink 1.13+ 推荐使用 Window TVF
-- 滚动窗口
SELECT 
    user_id,
    window_start,
    window_end,
    COUNT(*) as cnt
FROM TABLE(
    TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR)
)
GROUP BY user_id, window_start, window_end;

-- 滑动窗口
SELECT 
    user_id,
    window_start,
    window_end,
    COUNT(*) as cnt
FROM TABLE(
    HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '5' MINUTE, INTERVAL '1' HOUR)
)
GROUP BY user_id, window_start, window_end;

-- 窗口 TopN
SELECT *
FROM (
    SELECT 
        *,
        ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_amount DESC) as rn
    FROM (
        SELECT 
            user_id,
            window_start,
            window_end,
            SUM(amount) as total_amount
        FROM TABLE(
            TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR)
        )
        GROUP BY user_id, window_start, window_end
    )
)
WHERE rn <= 10;

六、Join 详解

6.1 Join 类型总览

graph TB subgraph "Flink SQL Join 类型" A[Regular Join] --> A1["双流 Join
需要大状态"] B[Interval Join] --> B1["时间范围 Join
状态可控"] C[Temporal Join] --> C1["时态表 Join
维表关联"] D[Lookup Join] --> D1["外部表查询
维表查询"] end

6.2 Regular Join

-- Inner Join(双流 Join,状态会无限增长!)
SELECT 
    o.order_id,
    o.amount,
    u.name
FROM orders o
JOIN users u ON o.user_id = u.user_id;

-- Left Join
SELECT 
    o.order_id,
    o.amount,
    u.name
FROM orders o
LEFT JOIN users u ON o.user_id = u.user_id;

-- ⚠️ 注意:Regular Join 的状态会无限增长
-- 需要设置状态 TTL
SET 'table.exec.state.ttl' = '24 h';

6.3 Interval Join

-- Interval Join:只关联时间范围内的数据
-- 订单和支付在 15 分钟内匹配
SELECT 
    o.order_id,
    o.amount,
    p.pay_time
FROM orders o
JOIN payments p 
    ON o.order_id = p.order_id
    AND p.pay_time BETWEEN o.order_time AND o.order_time + INTERVAL '15' MINUTE;
graph LR subgraph "Interval Join 原理" O["订单: 10:00"] --> JOIN["匹配支付
10:00 ~ 10:15"] P1["支付: 10:05 ✓"] P2["支付: 10:20 ✗"] end

6.4 Temporal Join (时态表 Join)

-- 创建版本表
CREATE TABLE currency_rates (
    currency STRING,
    rate DECIMAL(10, 4),
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time,
    PRIMARY KEY (currency) NOT ENFORCED
) WITH (...);

-- 时态表 Join:获取订单时刻的汇率
SELECT 
    o.order_id,
    o.amount,
    o.currency,
    r.rate,
    o.amount * r.rate as amount_usd
FROM orders o
JOIN currency_rates FOR SYSTEM_TIME AS OF o.order_time AS r
    ON o.currency = r.currency;

6.5 Lookup Join

-- 创建维表(MySQL)
CREATE TABLE dim_user (
    user_id BIGINT,
    name STRING,
    age INT,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/mydb',
    'table-name' = 'users',
    'username' = 'root',
    'password' = 'password',
    'lookup.cache.max-rows' = '10000',
    'lookup.cache.ttl' = '1 hour'
);

-- Lookup Join:实时查询维表
SELECT 
    o.order_id,
    o.amount,
    u.name,
    u.age
FROM orders o
JOIN dim_user FOR SYSTEM_TIME AS OF o.proc_time AS u
    ON o.user_id = u.user_id;

七、高级特性

7.1 Top-N

-- 实时 Top 10 热销商品
SELECT *
FROM (
    SELECT 
        product_id,
        sale_count,
        ROW_NUMBER() OVER (ORDER BY sale_count DESC) as rn
    FROM product_sales
)
WHERE rn <= 10;

-- 窗口内 Top N
SELECT *
FROM (
    SELECT 
        product_id,
        window_start,
        window_end,
        sale_count,
        ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY sale_count DESC) as rn
    FROM (
        SELECT 
            product_id,
            window_start,
            window_end,
            SUM(quantity) as sale_count
        FROM TABLE(
            TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR)
        )
        GROUP BY product_id, window_start, window_end
    )
)
WHERE rn <= 10;

7.2 Deduplication (去重)

-- 按 order_id 去重,保留第一条
SELECT *
FROM (
    SELECT 
        *,
        ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY order_time ASC) as rn
    FROM orders
)
WHERE rn = 1;

-- 按 order_id 去重,保留最后一条
SELECT *
FROM (
    SELECT 
        *,
        ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY order_time DESC) as rn
    FROM orders
)
WHERE rn = 1;

7.3 CEP (复杂事件处理)

-- MATCH_RECOGNIZE:检测连续三次登录失败
SELECT *
FROM login_events
MATCH_RECOGNIZE (
    PARTITION BY user_id
    ORDER BY event_time
    MEASURES
        FIRST(A.event_time) AS start_time,
        LAST(A.event_time) AS end_time,
        COUNT(A.event_id) AS fail_count
    ONE ROW PER MATCH
    AFTER MATCH SKIP PAST LAST ROW
    PATTERN (A{3,})
    DEFINE
        A AS A.status = 'FAIL'
) AS T;

7.4 自定义函数 (UDF)

// Scalar Function(标量函数)
public class UpperCase extends ScalarFunction {
    public String eval(String s) {
        return s == null ? null : s.toUpperCase();
    }
}

// Table Function(表函数)
public class SplitFunction extends TableFunction<Row> {
    public void eval(String str) {
        for (String s : str.split(",")) {
            collect(Row.of(s));
        }
    }
}

// Aggregate Function(聚合函数)
public class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccum> {
    @Override
    public WeightedAvgAccum createAccumulator() {
        return new WeightedAvgAccum();
    }
    
    public void accumulate(WeightedAvgAccum acc, Long value, Integer weight) {
        acc.sum += value * weight;
        acc.count += weight;
    }
    
    @Override
    public Double getValue(WeightedAvgAccum acc) {
        return acc.count == 0 ? null : (double) acc.sum / acc.count;
    }
}
-- 注册 UDF
CREATE FUNCTION upper_case AS 'com.example.UpperCase';
CREATE FUNCTION split AS 'com.example.SplitFunction';
CREATE FUNCTION weighted_avg AS 'com.example.WeightedAvg';

-- 使用 UDF
SELECT upper_case(name) FROM users;

SELECT user_id, tag
FROM orders, LATERAL TABLE(split(tags)) AS T(tag);

SELECT user_id, weighted_avg(score, weight) as avg_score
FROM scores
GROUP BY user_id;

八、实战案例

8.1 案例:实时 GMV 统计

-- 1. 创建订单源表
CREATE TABLE orders (
    order_id BIGINT,
    user_id BIGINT,
    product_id BIGINT,
    amount DECIMAL(10, 2),
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
);

-- 2. 创建结果表
CREATE TABLE gmv_stats (
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    total_orders BIGINT,
    total_gmv DECIMAL(20, 2),
    PRIMARY KEY (window_start, window_end) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/analytics',
    'table-name' = 'gmv_stats',
    'username' = 'root',
    'password' = 'password'
);

-- 3. 实时计算
INSERT INTO gmv_stats
SELECT 
    window_start,
    window_end,
    COUNT(*) as total_orders,
    SUM(amount) as total_gmv
FROM TABLE(
    TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end;

8.2 案例:用户行为分析

-- 漏斗分析:浏览 -> 加购 -> 下单 -> 支付 转化率
WITH funnel AS (
    SELECT 
        user_id,
        CASE 
            WHEN action = 'view' THEN 1
            WHEN action = 'cart' THEN 2
            WHEN action = 'order' THEN 3
            WHEN action = 'pay' THEN 4
        END as step,
        event_time
    FROM user_actions
    WHERE DATE_FORMAT(event_time, 'yyyy-MM-dd') = '2024-01-15'
),
user_max_step AS (
    SELECT user_id, MAX(step) as max_step
    FROM funnel
    GROUP BY user_id
)
SELECT 
    COUNT(CASE WHEN max_step >= 1 THEN 1 END) as view_users,
    COUNT(CASE WHEN max_step >= 2 THEN 1 END) as cart_users,
    COUNT(CASE WHEN max_step >= 3 THEN 1 END) as order_users,
    COUNT(CASE WHEN max_step >= 4 THEN 1 END) as pay_users
FROM user_max_step;

8.3 案例:实时告警

-- 创建告警输出表
CREATE TABLE alerts (
    alert_type STRING,
    alert_time TIMESTAMP(3),
    message STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'alerts',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json'
);

-- 大额订单告警
INSERT INTO alerts
SELECT 
    'LARGE_ORDER' as alert_type,
    order_time as alert_time,
    CONCAT('大额订单告警: order_id=', CAST(order_id AS STRING), ', amount=', CAST(amount AS STRING)) as message
FROM orders
WHERE amount > 10000;

-- 异常流量告警:1分钟内订单数超过阈值
INSERT INTO alerts
SELECT 
    'HIGH_TRAFFIC' as alert_type,
    window_end as alert_time,
    CONCAT('流量异常: 1分钟订单数=', CAST(order_count AS STRING)) as message
FROM (
    SELECT 
        window_end,
        COUNT(*) as order_count
    FROM TABLE(
        TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' MINUTE)
    )
    GROUP BY window_start, window_end
)
WHERE order_count > 10000;

九、性能调优

9.1 配置优化

-- MiniBatch 优化(减少状态访问)
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5 s';
SET 'table.exec.mini-batch.size' = '5000';

-- 状态 TTL
SET 'table.exec.state.ttl' = '24 h';

-- 并行度
SET 'table.exec.resource.default-parallelism' = '4';

-- 两阶段聚合
SET 'table.optimizer.agg-phase-strategy' = 'TWO_PHASE';

-- 去重优化
SET 'table.optimizer.distinct-agg.split.enabled' = 'true';

9.2 SQL Hints

-- 指定 Lookup Join 缓存
SELECT /*+ LOOKUP('table'='dim_user', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay') */
    o.*, u.name
FROM orders o
JOIN dim_user FOR SYSTEM_TIME AS OF o.proc_time AS u
    ON o.user_id = u.user_id;

-- 指定 State TTL
SELECT /*+ STATE_TTL('o' = '1d', 'u' = '12h') */
    o.*, u.*
FROM orders o
JOIN users u ON o.user_id = u.user_id;

9.3 常见问题

flowchart TD A[SQL 性能问题] --> B{问题类型} B -->|状态过大| C["设置 State TTL
使用 RocksDB"] B -->|数据倾斜| D["使用两阶段聚合
打散热点 Key"] B -->|延迟高| E["检查背压
增加并行度"] B -->|Lookup 慢| F["开启缓存
异步查询"]

十、总结

10.1 核心要点

mindmap root((Flink SQL)) 基础 动态表 连续查询 Changelog Connector Kafka JDBC FileSystem 时间窗口 TUMBLE HOP SESSION Join Regular Join Interval Join Lookup Join 高级特性 Top-N 去重 CEP UDF

10.2 一句话总结

Flink SQL = 用 SQL 的方式做流处理

声明式、简单易学、功能强大,是 Flink 开发的首选方式。


评论区
暂无评论
avatar