搜 索

DolphinScheduler从入门到放弃系列:国产调度器的崛起

  • 14阅读
  • 2023年05月27日
  • 0评论
首页 / AI/大数据 / 正文

一、前言:为什么要了解 DolphinScheduler?

graph LR A[选择调度器] --> B{团队背景} B -->|精通Python| C[Airflow] B -->|不想写代码| D[DolphinScheduler] B -->|预算有限| E[开源方案] D --> F["可视化拖拽
低代码体验"] E --> G["Apache顶级项目
完全免费"]

DolphinScheduler 的独特定位:

特点AirflowDolphinScheduler
任务定义Python 代码可视化拖拽 + 代码
上手难度需要 Python 基础小白友好
国内社区一般非常活跃
中文文档有限完善

二、DolphinScheduler 是什么?

2.1 一句话定义

DolphinScheduler = 分布式、去中心化、易扩展的可视化工作流任务调度平台

2.2 发展历程

timeline title DolphinScheduler 发展史 2017 : 易观开发 EasyScheduler 2019 : 捐献给 Apache 孵化 2019 : 更名为 DolphinScheduler 2021 : 成为 Apache 顶级项目 2023 : 3.0 版本发布(重大升级) 2024 : 持续迭代,社区活跃

2.3 核心特性

mindmap root((DolphinScheduler)) 去中心化 无单点故障 Master/Worker对等 自动故障转移 可视化 拖拽式DAG编辑 实时任务监控 日志在线查看 云原生 支持K8s部署 容器化任务 弹性伸缩 多租户 租户资源隔离 权限精细控制 队列管理 易扩展 插件化设计 自定义任务类型 丰富的告警渠道

三、架构详解

3.1 整体架构

graph TB subgraph "用户层" UI[Web UI] API[REST API] CLI[命令行] end subgraph "服务层" subgraph "Master集群" M1[Master 1] M2[Master 2] M3[Master 3] end subgraph "Worker集群" W1[Worker 1
group: default] W2[Worker 2
group: default] W3[Worker 3
group: spark] W4[Worker 4
group: flink] end ALERT[Alert Server] LOGGER[Logger Server] end subgraph "注册中心" ZK[(ZooKeeper)] end subgraph "存储层" DB[(MySQL/PostgreSQL)] FS[(HDFS/S3/本地)] end UI & API & CLI --> M1 & M2 & M3 M1 & M2 & M3 --> ZK W1 & W2 & W3 & W4 --> ZK M1 & M2 & M3 --> DB W1 & W2 & W3 & W4 --> FS ALERT --> DB

3.2 核心组件职责

组件职责高可用
MasterServer工作流调度、任务分发、监控多实例,去中心化选举
WorkerServer执行具体任务多实例,分组管理
AlertServer告警服务可多实例
ApiServer提供 REST API可多实例负载均衡
ZooKeeper服务注册与发现、分布式锁ZK 集群
Database存储元数据主从/集群

3.3 去中心化设计

graph TB subgraph "传统中心化" C[中央调度器] --> W1[Worker 1] C --> W2[Worker 2] C --> W3[Worker 3] C --> X[单点故障风险!] end
graph TB subgraph "DolphinScheduler去中心化" M1[Master 1] -.-> ZK[(ZooKeeper)] M2[Master 2] -.-> ZK M3[Master 3] -.-> ZK M1 --> W1[Worker 1] M2 --> W2[Worker 2] M3 --> W3[Worker 3] M1 -->|故障| M2 M2 -->|接管| W1 end

去中心化的优势:

  • 无单点故障:任一 Master 挂掉,其他 Master 自动接管
  • 负载均衡:任务自动分配到空闲 Master
  • 水平扩展:随时增加 Master/Worker 节点

3.4 任务执行流程

sequenceDiagram participant User participant API participant Master participant ZK participant Worker participant DB User->>API: 提交工作流 API->>DB: 创建工作流实例 API->>Master: 触发调度 Master->>DB: 生成任务实例 Master->>ZK: 获取可用Worker Master->>Worker: 分发任务 Worker->>Worker: 执行任务 Worker->>DB: 更新任务状态 Worker->>Master: 任务完成回调 Master->>Master: 检查下游依赖 Master->>Worker: 分发下游任务

四、核心概念

4.1 概念层级图

graph TB subgraph "资源组织" PROJECT[项目 Project] --> WF[工作流 Workflow] WF --> TASK[任务 Task] end subgraph "资源管理" TENANT[租户 Tenant] QUEUE[队列 Queue] ENV[环境 Environment] RESOURCE[资源中心 Resource] end subgraph "运行时" WFI[工作流实例] TI[任务实例] LOG[日志] end WF --> WFI TASK --> TI TI --> LOG TENANT --> QUEUE WF --> ENV TASK --> RESOURCE

4.2 项目与工作流

graph LR subgraph "项目 Project" P1["数仓项目"] P2["报表项目"] P3["ML项目"] end subgraph "工作流 Workflow" P1 --> W1[ODS层ETL] P1 --> W2[DWD层ETL] P1 --> W3[DWS层ETL] P2 --> W4[日报生成] P2 --> W5[周报生成] end

4.3 任务类型

graph TB subgraph "DolphinScheduler任务类型" A[基础任务] --> A1[Shell] A --> A2[Python] A --> A3[SQL] A --> A4[Procedure存储过程] B[大数据任务] --> B1[Spark] B --> B2[Flink] B --> B3[MapReduce] B --> B4[Hive] B --> B5[DataX] C[逻辑任务] --> C1[Dependent依赖] C --> C2[SubProcess子流程] C --> C3[Conditions条件分支] C --> C4[Switch分支] D[云原生] --> D1[K8s] D --> D2[HTTP] D --> D3[SeaTunnel] end

4.4 多租户体系

graph TB subgraph "多租户架构" ADMIN[管理员] --> T1[租户A] ADMIN --> T2[租户B] ADMIN --> T3[租户C] T1 --> U1[用户A1] T1 --> U2[用户A2] T2 --> U3[用户B1] T1 --> Q1[队列default] T2 --> Q2[队列spark] U1 --> P1[项目1] U1 --> P2[项目2] end

租户隔离的好处:

隔离维度说明
资源隔离不同租户使用不同队列,资源互不影响
数据隔离租户只能看到自己的项目和工作流
权限隔离精细的权限控制,按需授权
Linux用户每个租户对应一个 Linux 用户

五、实战:构建数据管道

5.1 创建第一个工作流

flowchart LR A[创建项目] --> B[创建工作流] B --> C[拖拽任务节点] C --> D[配置任务参数] D --> E[设置依赖关系] E --> F[保存并上线] F --> G[运行/定时]

5.2 可视化 DAG 编辑

在 DolphinScheduler 中,你可以通过拖拽来创建工作流:

graph LR subgraph "可视化编辑器" direction TB subgraph "任务面板" T1[Shell] T2[Python] T3[SQL] T4[Spark] end subgraph "画布" N1[数据抽取] --> N2[数据清洗] N2 --> N3[数据聚合] N3 --> N4[数据入库] N3 --> N5[发送报表] end end

5.3 Shell 任务示例

{
  "taskType": "SHELL",
  "name": "extract_data",
  "description": "从MySQL抽取数据",
  "taskParams": {
    "resourceList": [],
    "localParams": [
      {
        "prop": "dt",
        "direct": "IN",
        "type": "VARCHAR",
        "value": "${global_dt}"
      }
    ],
    "rawScript": "#!/bin/bash\necho \"开始抽取数据,日期:${dt}\"\npython3 /scripts/extract.py --date ${dt}\necho \"抽取完成\""
  },
  "flag": "YES",
  "taskPriority": "MEDIUM",
  "workerGroup": "default",
  "failRetryTimes": 3,
  "failRetryInterval": 1,
  "timeoutFlag": "OPEN",
  "timeout": 7200
}

5.4 SQL 任务示例

{
  "taskType": "SQL",
  "name": "aggregate_daily_stats",
  "description": "计算每日统计",
  "taskParams": {
    "type": "MYSQL",
    "datasource": 1,
    "sql": "INSERT INTO daily_stats (date, total_orders, total_amount)\nSELECT \n  '${dt}' as date,\n  COUNT(*) as total_orders,\n  SUM(amount) as total_amount\nFROM orders\nWHERE DATE(create_time) = '${dt}'\nON DUPLICATE KEY UPDATE\n  total_orders = VALUES(total_orders),\n  total_amount = VALUES(total_amount);",
    "sqlType": "1",
    "preStatements": [],
    "postStatements": [],
    "displayRows": 10
  },
  "workerGroup": "default"
}

5.5 Spark 任务示例

{
  "taskType": "SPARK",
  "name": "spark_etl_job",
  "taskParams": {
    "programType": "SCALA",
    "sparkVersion": "SPARK2",
    "deployMode": "cluster",
    "mainClass": "com.company.etl.DailyETL",
    "mainJar": {
      "resourceName": "/resources/spark-jobs/etl-job.jar"
    },
    "appName": "daily_etl_${dt}",
    "driverCores": 2,
    "driverMemory": "4G",
    "numExecutors": 10,
    "executorMemory": "8G",
    "executorCores": 4,
    "mainArgs": "--date ${dt} --mode full",
    "others": "--conf spark.sql.shuffle.partitions=200"
  },
  "workerGroup": "spark"
}

5.6 依赖任务:跨工作流依赖

graph TB subgraph "工作流A - ODS层" A1[抽取用户表] --> A2[抽取订单表] A2 --> A3[完成标记] end subgraph "工作流B - DWD层" B0[等待ODS完成] --> B1[清洗用户数据] B0 --> B2[清洗订单数据] B1 --> B3[合并数据] B2 --> B3 end A3 -.->|依赖| B0
{
  "taskType": "DEPENDENT",
  "name": "wait_for_ods",
  "description": "等待ODS层工作流完成",
  "taskParams": {
    "dependence": {
      "relation": "AND",
      "dependTaskList": [
        {
          "relation": "AND",
          "dependItemList": [
            {
              "projectCode": 123456,
              "definitionCode": 789012,
              "depTaskCode": 0,
              "cycle": "day",
              "dateValue": "today"
            }
          ]
        }
      ]
    }
  }
}

5.7 条件分支任务

graph TD A[检查数据量] --> B{数据量判断} B -->|> 100万| C[Spark处理] B -->|<= 100万| D[单机Python处理] C --> E[入库] D --> E
{
  "taskType": "CONDITIONS",
  "name": "check_data_size",
  "taskParams": {
    "dependence": {
      "relation": "AND",
      "dependTaskList": [
        {
          "relation": "AND", 
          "dependItemList": [
            {
              "depTaskCode": "count_task",
              "status": "SUCCESS"
            }
          ]
        }
      ]
    },
    "conditionResult": {
      "successNode": ["spark_process"],
      "failedNode": ["python_process"]
    }
  }
}

六、参数系统详解

6.1 参数类型

graph TB subgraph "DolphinScheduler参数体系" A[全局参数] --> A1["工作流级别
所有任务共享"] B[局部参数] --> B1["任务级别
任务内部使用"] C[内置参数] --> C1["系统变量
时间/实例ID等"] D[传递参数] --> D1["任务间传递
类似XCom"] end

6.2 内置时间参数

参数说明示例值
${system.biz.date}业务日期2024-01-15
${system.biz.curdate}当前日期2024-01-16
${system.datetime}当前时间20240116120000

6.3 时间函数

# 基准时间
$[yyyy-MM-dd]           # 2024-01-15

# 加减运算
$[yyyy-MM-dd+7]         # 2024-01-22(加7天)
$[yyyy-MM-dd-1]         # 2024-01-14(减1天)
$[yyyyMMdd-1]           # 20240114

# 月份操作
$[add_months(yyyy-MM-dd, -1)]  # 2023-12-15(上个月)

# 自定义格式
$[yyyyMMddHHmmss]       # 20240115120000

6.4 参数传递示例

sequenceDiagram participant Task1 participant DS participant Task2 Task1->>Task1: 执行脚本 Task1->>DS: echo "row_count=1000" Note over DS: 解析输出中的
key=value格式 DS->>Task2: 注入 ${row_count} Task2->>Task2: 使用 ${row_count}
# Task1 脚本:输出参数
#!/bin/bash
count=$(mysql -e "SELECT COUNT(*) FROM orders WHERE date='${dt}'" | tail -1)
echo "row_count=${count}"  # DolphinScheduler会解析这个输出

# Task2 脚本:使用参数
#!/bin/bash
echo "上游任务处理了 ${row_count} 条数据"
if [ ${row_count} -gt 10000 ]; then
    echo "数据量较大,启用并行处理"
fi

七、告警配置

7.1 告警渠道

graph TB subgraph "DolphinScheduler告警渠道" A[告警服务] --> B[Email邮件] A --> C[DingTalk钉钉] A --> D[WeChat企业微信] A --> E[Feishu飞书] A --> F[PagerDuty] A --> G[HTTP Webhook] A --> H[Script脚本] end

7.2 告警组配置

{
  "alertGroupName": "数据开发组",
  "alertInstanceIds": "1,2,3",
  "description": "数据开发团队告警",
  "alertInstances": [
    {
      "instanceType": "DingTalk",
      "params": {
        "webhook": "https://oapi.dingtalk.com/robot/send?access_token=xxx",
        "keyword": "数据告警",
        "isAtAll": false,
        "atMobiles": "13800138000,13900139000"
      }
    },
    {
      "instanceType": "Email",
      "params": {
        "receivers": "team@company.com",
        "serverHost": "smtp.company.com",
        "serverPort": 465
      }
    }
  ]
}

7.3 告警策略

graph TD A[工作流执行] --> B{执行结果} B -->|成功| C{告警策略} B -->|失败| D[发送失败告警] B -->|超时| E[发送超时告警] C -->|成功也告警| F[发送成功通知] C -->|仅失败告警| G[不发送]

八、资源中心

8.1 资源类型

graph TB subgraph "资源中心" A[文件资源] --> A1[脚本文件 .sh/.py] A --> A2[配置文件 .conf/.yaml] A --> A3[JAR包] B[UDF资源] --> B1[Hive UDF] B --> B2[自定义函数] C[存储后端] --> C1[本地文件系统] C --> C2[HDFS] C --> C3[S3/OSS] end

8.2 资源使用

# 在Shell任务中引用资源
#!/bin/bash

# 引用资源中心的Python脚本
python3 /资源路径/process_data.py --date ${dt}

# 引用资源中心的配置文件
source /资源路径/config.sh

九、高级特性

9.1 Worker 分组

graph TB subgraph "Worker分组策略" MASTER[Master] --> G1[default组] MASTER --> G2[spark组] MASTER --> G3[flink组] MASTER --> G4[ml组] G1 --> W1[Worker1] G1 --> W2[Worker2] G2 --> W3[Worker3
大内存] G2 --> W4[Worker4
大内存] G3 --> W5[Worker5] G4 --> W6[Worker6
GPU] end

分组的应用场景:

Worker 组配置特点适用任务
default通用配置Shell/SQL/简单任务
spark大内存、多核Spark 任务
flink高网络带宽Flink 任务
mlGPU 服务器机器学习任务

9.2 补数功能

sequenceDiagram participant User participant DS participant Worker User->>DS: 补数请求
日期范围: 01-01 到 01-15 DS->>DS: 生成15个工作流实例 loop 每个日期 DS->>Worker: 执行 date=01-01 Worker-->>DS: 完成 DS->>Worker: 执行 date=01-02 Worker-->>DS: 完成 end DS->>User: 补数完成

补数配置:

{
  "scheduleTime": "2024-01-01 00:00:00,2024-01-15 00:00:00",
  "failureStrategy": "CONTINUE",
  "warningType": "FAILURE",
  "warningGroupId": 1,
  "execType": "COMPLEMENT_DATA",
  "runMode": "RUN_MODE_SERIAL",
  "processInstancePriority": "MEDIUM",
  "workerGroup": "default",
  "expectedParallelismNumber": 4
}

9.3 版本管理

graph LR subgraph "工作流版本" V1[版本1
2024-01-01] --> V2[版本2
2024-01-15] V2 --> V3[版本3
2024-02-01] V3 --> V4[版本4
当前版本] end V2 -->|回滚| ROLLBACK[回滚操作]

十、与 Airflow 对比

10.1 功能对比

功能DolphinSchedulerAirflow
DAG 定义可视化拖拽Python 代码
学习曲线平缓需要 Python
去中心化✅ 原生支持❌ 需要配置
多租户✅ 原生支持❌ 需要定制
中文支持✅ 完善一般
补数功能✅ 原生支持✅ catchup
资源中心✅ 内置❌ 需要外部
Worker 分组✅ 原生支持✅ Queue/Pool
Operator 丰富度中等非常丰富
社区生态国内强国际强

10.2 架构对比

graph TB subgraph "Airflow架构" A1[Scheduler
单点/需HA配置] --> A2[Worker] A1 --> A3[Worker] A4[Webserver] --> A5[(Database)] A1 --> A5 end
graph TB subgraph "DolphinScheduler架构" B1[Master 1] --> B4[Worker] B2[Master 2] --> B5[Worker] B3[Master 3] --> B6[Worker] B1 & B2 & B3 --> B7[(ZooKeeper)] B1 & B2 & B3 --> B8[(Database)] end

10.3 选型建议

flowchart TD A{你的团队} --> B{Python技术栈?} B -->|精通Python
喜欢代码控制| C[Airflow] B -->|不想写代码
偏好可视化| D[DolphinScheduler] A --> E{运维能力?} E -->|强,能搞定HA| C E -->|一般,要求简单| D A --> F{国内/国际?} F -->|国内团队| D F -->|国际团队| C style C fill:#ff6b6b style D fill:#4ecdc4

十一、生产部署

11.1 部署架构

graph TB subgraph "生产环境部署" LB[负载均衡] --> API1[API Server 1] LB --> API2[API Server 2] subgraph "Master集群" M1[Master 1] M2[Master 2] M3[Master 3] end subgraph "Worker集群" W1[Worker 1-3
default组] W2[Worker 4-6
spark组] end ALERT[Alert Server] ZK1[ZK1] --- ZK2[ZK2] --- ZK3[ZK3] DB[(MySQL主从)] HDFS[(HDFS集群)] end API1 & API2 --> M1 & M2 & M3 M1 & M2 & M3 --> ZK1 & ZK2 & ZK3 W1 & W2 --> ZK1 & ZK2 & ZK3 M1 & M2 & M3 --> DB W1 & W2 --> HDFS

11.2 资源规划

组件最小配置生产配置说明
Master2节点 x 4C8G3节点 x 8C16G建议奇数个
Worker2节点 x 4C8G按需 x 8C32G根据任务量扩展
API1节点 x 2C4G2节点 x 4C8G负载均衡
Alert1节点 x 2C4G2节点 x 2C4G可选 HA
ZooKeeper3节点 x 2C4G3节点 x 4C8G必须奇数个
MySQL1节点 x 4C8G主从 x 8C16G建议 SSD

11.3 K8s 部署

# values.yaml 示例
master:
  replicas: 3
  resources:
    requests:
      memory: "4Gi"
      cpu: "2"
    limits:
      memory: "8Gi"
      cpu: "4"

worker:
  replicas: 5
  resources:
    requests:
      memory: "8Gi"
      cpu: "4"
    limits:
      memory: "16Gi"
      cpu: "8"

alert:
  replicas: 2

api:
  replicas: 2

zookeeper:
  replicas: 3

postgresql:
  enabled: true
  primary:
    persistence:
      size: 50Gi

十二、常见问题

12.1 FAQ

flowchart TD A[常见问题] --> B[任务一直排队?] A --> C[任务执行失败?] A --> D[工作流不调度?] A --> E[日志看不到?] B --> B1["检查Worker是否存活
检查Worker分组配置
检查队列容量"] C --> C1["查看任务日志
检查资源文件权限
检查数据源连接"] D --> D1["检查工作流是否上线
检查定时配置
检查Master状态"] E --> E1["检查日志目录权限
检查磁盘空间
检查Logger配置"]

12.2 性能优化

mindmap root((性能优化)) 数据库 连接池配置 慢SQL优化 定期清理历史 Master 调整线程池 任务分发策略 减少状态轮询 Worker 增加并发数 分组负载均衡 资源隔离 ZooKeeper 会话超时配置 监控ZK延迟

十三、总结

13.1 DolphinScheduler 核心优势

graph LR A[DolphinScheduler] --> B[去中心化] A --> C[可视化] A --> D[多租户] A --> E[云原生] B --> B1["无单点故障
天然高可用"] C --> C1["拖拽式编辑
低代码体验"] D --> D1["资源隔离
权限控制"] E --> E1["K8s原生支持
弹性伸缩"]

13.2 适用场景

场景推荐度说明
国内企业数据平台⭐⭐⭐⭐⭐中文文档完善,社区活跃
中小团队⭐⭐⭐⭐⭐上手快,运维简单
大数据 ETL⭐⭐⭐⭐原生支持 Spark/Flink/Hive
多租户平台⭐⭐⭐⭐⭐原生多租户支持
需要复杂逻辑⭐⭐⭐建议配合代码使用

13.3 一句话总结

DolphinScheduler = 去中心化 + 可视化 + 多租户 + 云原生

如果你的团队不想写太多代码,需要一个开箱即用的调度平台,DolphinScheduler 是国内最佳选择之一。


如果这篇文章对你有帮助,欢迎:

  • 👍 点赞:让更多人看到
  • ⭐ 收藏:方便日后查阅
  • 💬 评论:分享你的 DolphinScheduler 使用经验
评论区
暂无评论
avatar