一、前言:为什么需要工作流调度?
还记得你刚入行时是怎么跑定时任务的吗?
# 青涩的你:crontab -e
0 2 * * * /home/scripts/etl_step1.sh
30 2 * * * /home/scripts/etl_step2.sh # 假设step1跑30分钟
0 3 * * * /home/scripts/etl_step3.sh # 假设step2跑30分钟
# 某天凌晨3点,你被电话叫醒
# "报表数据不对!"
# 原来step1跑了40分钟,step2还没跑完step3就开始了...Crontab 的致命问题:
graph TD
A[Crontab的痛点] --> B[无依赖管理]
A --> C[无失败重试]
A --> D[无可视化监控]
A --> E[无历史记录]
B --> B1["Task2不知道Task1是否完成"]
C --> C1["失败了?手动重跑吧"]
D --> D1["任务跑到哪了?看日志去"]
E --> E1["上周的任务什么情况?不知道"]
工作流调度器的价值:
graph LR
subgraph "有调度器的世界"
A[Task1] -->|成功后| B[Task2]
B -->|成功后| C[Task3]
A -->|失败| D[告警+自动重试]
B -->|失败| D
end
二、Airflow 是什么?
2.1 一句话定义
Airflow = 以 Python 代码定义工作流(DAG)的分布式任务调度平台
2.2 发展历程
timeline
title Airflow 发展史
2014 : Airbnb 内部项目
2015 : 开源
2016 : 进入 Apache 孵化
2019 : 成为 Apache 顶级项目
2020 : Airflow 2.0 发布(重大升级)
2023 : Airflow 2.7+ 持续迭代
2.3 为什么选择 Airflow?
graph TB
A[Airflow的优势] --> B[代码即配置]
A --> C[丰富的Operator]
A --> D[强大的社区]
A --> E[完善的监控]
A --> F[灵活的扩展性]
B --> B1["Python定义DAG
版本控制友好
代码review"] C --> C1["200+ 内置Operator
覆盖主流系统"] D --> D1["Apache顶级项目
大厂背书"] E --> E1["Web UI
日志追踪
告警集成"] F --> F1["自定义Operator
Plugin机制"]
版本控制友好
代码review"] C --> C1["200+ 内置Operator
覆盖主流系统"] D --> D1["Apache顶级项目
大厂背书"] E --> E1["Web UI
日志追踪
告警集成"] F --> F1["自定义Operator
Plugin机制"]
三、核心概念详解
3.1 概念全景图
graph TB
subgraph "Airflow核心概念"
DAG[DAG
有向无环图] Task[Task
任务节点] Operator[Operator
任务类型] Sensor[Sensor
等待条件] Hook[Hook
外部连接] Connection[Connection
连接配置] Variable[Variable
全局变量] XCom[XCom
任务通信] end DAG --> Task Task --> Operator Task --> Sensor Operator --> Hook Sensor --> Hook Hook --> Connection DAG --> Variable Task --> XCom
有向无环图] Task[Task
任务节点] Operator[Operator
任务类型] Sensor[Sensor
等待条件] Hook[Hook
外部连接] Connection[Connection
连接配置] Variable[Variable
全局变量] XCom[XCom
任务通信] end DAG --> Task Task --> Operator Task --> Sensor Operator --> Hook Sensor --> Hook Hook --> Connection DAG --> Variable Task --> XCom
3.2 DAG:有向无环图
graph LR
subgraph "一个典型的ETL DAG"
A[extract_data] --> B[transform_data]
B --> C[load_to_warehouse]
C --> D[generate_report]
C --> E[send_notification]
end
DAG 的基本定义:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# DAG 默认参数
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'email': ['alert@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
# 定义 DAG
with DAG(
dag_id='etl_daily_pipeline',
default_args=default_args,
description='每日ETL数据管道',
schedule_interval='0 2 * * *', # 每天凌晨2点
start_date=datetime(2024, 1, 1),
catchup=False, # 不补跑历史
tags=['etl', 'daily'],
) as dag:
# 定义任务...
pass3.3 Operator:任务类型
graph TB
subgraph "常用Operator分类"
A[Action Operators] --> A1[BashOperator]
A --> A2[PythonOperator]
A --> A3[EmailOperator]
B[Transfer Operators] --> B1[S3ToRedshiftOperator]
B --> B2[MySqlToHiveOperator]
B --> B3[GCSToGCSOperator]
C[Sensor Operators] --> C1[FileSensor]
C --> C2[HttpSensor]
C --> C3[SqlSensor]
D[Provider Operators] --> D1[SparkSubmitOperator]
D --> D2[KubernetesPodOperator]
D --> D3[SnowflakeOperator]
end
常用 Operator 示例:
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
# 1. BashOperator - 执行Shell命令
extract_task = BashOperator(
task_id='extract_data',
bash_command='python /scripts/extract.py --date {{ ds }}',
)
# 2. PythonOperator - 执行Python函数
def transform_data(**context):
execution_date = context['ds']
# 数据转换逻辑
return f"Transformed data for {execution_date}"
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
provide_context=True,
)
# 3. MySqlOperator - 执行SQL
load_task = MySqlOperator(
task_id='load_to_mysql',
mysql_conn_id='mysql_warehouse',
sql="""
INSERT INTO daily_stats (date, metric, value)
SELECT '{{ ds }}', metric, SUM(value)
FROM raw_data
WHERE date = '{{ ds }}'
GROUP BY metric
""",
)
# 4. HttpOperator - 调用API
notify_task = SimpleHttpOperator(
task_id='notify_api',
http_conn_id='notification_api',
endpoint='/webhook/airflow',
method='POST',
data='{"status": "completed", "date": "{{ ds }}"}',
)3.4 Sensor:等待条件满足
sequenceDiagram
participant Scheduler
participant Sensor
participant External
Scheduler->>Sensor: 启动Sensor任务
loop 每隔poke_interval检查
Sensor->>External: 检查条件是否满足
External-->>Sensor: 未满足
Sensor->>Sensor: 等待poke_interval
end
Sensor->>External: 检查条件
External-->>Sensor: 条件满足!
Sensor->>Scheduler: 任务成功,继续下游
from airflow.sensors.filesystem import FileSensor
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.sql import SqlSensor
# 1. 文件Sensor - 等待文件出现
wait_for_file = FileSensor(
task_id='wait_for_data_file',
filepath='/data/input/{{ ds }}/data.csv',
poke_interval=60, # 每60秒检查一次
timeout=3600, # 最多等待1小时
mode='poke', # poke模式占用worker槽位
)
# 2. SQL Sensor - 等待数据就绪
wait_for_data = SqlSensor(
task_id='wait_for_upstream_data',
conn_id='mysql_source',
sql="""
SELECT COUNT(*) FROM source_table
WHERE date = '{{ ds }}' AND status = 'ready'
""",
poke_interval=300,
timeout=7200,
mode='reschedule', # reschedule模式释放槽位
)
# 3. HTTP Sensor - 等待API返回成功
wait_for_api = HttpSensor(
task_id='wait_for_upstream_api',
http_conn_id='upstream_service',
endpoint='/status/{{ ds }}',
response_check=lambda response: response.json()['status'] == 'ready',
poke_interval=120,
timeout=3600,
)3.5 XCom:任务间通信
graph LR
A[Task A] -->|push XCom| X[(XCom存储)]
X -->|pull XCom| B[Task B]
subgraph "XCom数据流"
A1["return 'value'
或 xcom_push()"] --> X X --> B1["xcom_pull()
或 Jinja模板"] end
或 xcom_push()"] --> X X --> B1["xcom_pull()
或 Jinja模板"] end
# 方式1:通过return值自动push
def extract_data(**context):
data = {'count': 1000, 'status': 'success'}
return data # 自动push到XCom
def transform_data(**context):
# 从上游任务获取XCom
ti = context['ti']
upstream_data = ti.xcom_pull(task_ids='extract_data')
print(f"收到数据: {upstream_data}") # {'count': 1000, 'status': 'success'}
# 方式2:手动push/pull
def manual_push(**context):
ti = context['ti']
ti.xcom_push(key='my_key', value='my_value')
def manual_pull(**context):
ti = context['ti']
value = ti.xcom_pull(task_ids='manual_push', key='my_key')
# 方式3:Jinja模板中使用
bash_task = BashOperator(
task_id='use_xcom',
bash_command='echo "Count: {{ ti.xcom_pull(task_ids="extract_data")["count"] }}"',
)四、Airflow 架构详解
4.1 整体架构
graph TB
subgraph "Airflow架构"
subgraph "Web层"
WS[Webserver
Flask应用] end subgraph "调度层" SCH[Scheduler
调度器] TRIG[Triggerer
异步触发器] end subgraph "执行层" W1[Worker 1] W2[Worker 2] W3[Worker N] end subgraph "存储层" META[(Metadata DB
PostgreSQL/MySQL)] LOG[(日志存储
本地/S3/GCS)] end subgraph "消息队列 可选" MQ[Redis/RabbitMQ] end end WS --> META SCH --> META SCH --> MQ MQ --> W1 & W2 & W3 W1 & W2 & W3 --> META W1 & W2 & W3 --> LOG TRIG --> META
Flask应用] end subgraph "调度层" SCH[Scheduler
调度器] TRIG[Triggerer
异步触发器] end subgraph "执行层" W1[Worker 1] W2[Worker 2] W3[Worker N] end subgraph "存储层" META[(Metadata DB
PostgreSQL/MySQL)] LOG[(日志存储
本地/S3/GCS)] end subgraph "消息队列 可选" MQ[Redis/RabbitMQ] end end WS --> META SCH --> META SCH --> MQ MQ --> W1 & W2 & W3 W1 & W2 & W3 --> META W1 & W2 & W3 --> LOG TRIG --> META
4.2 核心组件职责
| 组件 | 职责 | 部署建议 |
|---|---|---|
| Webserver | 提供 Web UI,展示 DAG 状态 | 可多实例负载均衡 |
| Scheduler | 解析 DAG,调度任务 | 2.0+ 支持多实例 HA |
| Worker | 执行具体任务 | 按需水平扩展 |
| Triggerer | 处理 Deferrable Operator | 2.2+ 新增,可选 |
| Metadata DB | 存储 DAG/任务/连接等元数据 | PostgreSQL 推荐 |
4.3 Executor 类型
graph TB
A[Executor类型] --> B[SequentialExecutor]
A --> C[LocalExecutor]
A --> D[CeleryExecutor]
A --> E[KubernetesExecutor]
B --> B1["顺序执行
仅用于测试"] C --> C1["本地多进程
单机部署"] D --> D1["分布式
Celery + Redis/RabbitMQ"] E --> E1["K8s Pod执行
弹性伸缩"]
仅用于测试"] C --> C1["本地多进程
单机部署"] D --> D1["分布式
Celery + Redis/RabbitMQ"] E --> E1["K8s Pod执行
弹性伸缩"]
flowchart TD
A{选择Executor} --> B{任务量级?}
B -->|很少,测试用| C[SequentialExecutor]
B -->|中等,单机| D[LocalExecutor]
B -->|大量,分布式| E{基础设施?}
E -->|有K8s| F[KubernetesExecutor]
E -->|无K8s| G[CeleryExecutor]
style C fill:#ff6b6b
style D fill:#ffeaa7
style F fill:#4ecdc4
style G fill:#4ecdc4
4.4 任务生命周期
stateDiagram-v2
[*] --> scheduled: Scheduler发现
scheduled --> queued: 依赖满足
queued --> running: Worker获取
running --> success: 执行成功
running --> failed: 执行失败
running --> up_for_retry: 失败但可重试
up_for_retry --> scheduled: 重试
failed --> [*]
success --> [*]
scheduled --> upstream_failed: 上游失败
scheduled --> skipped: 条件跳过
upstream_failed --> [*]
skipped --> [*]
五、实战:构建完整的 ETL Pipeline
5.1 项目结构
airflow_project/
├── dags/
│ ├── __init__.py
│ ├── etl_daily_pipeline.py # DAG定义
│ └── common/
│ ├── __init__.py
│ └── utils.py # 公共函数
├── plugins/
│ └── custom_operators/ # 自定义Operator
├── tests/
│ └── test_dags.py # DAG测试
├── docker-compose.yaml
└── requirements.txt5.2 完整 DAG 示例
"""
ETL Daily Pipeline
每日从MySQL抽取数据,转换后加载到数据仓库
"""
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.sensors.sql import SqlSensor
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
# ============ 默认参数 ============
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'email': ['data-alerts@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
}
# ============ Python函数 ============
def check_data_quality(**context):
"""数据质量检查"""
ds = context['ds']
# 这里执行数据质量检查逻辑
record_count = 1000 # 模拟获取记录数
if record_count < 100:
raise ValueError(f"数据量异常: {record_count} < 100")
logger.info(f"数据质量检查通过,记录数: {record_count}")
return {'record_count': record_count, 'status': 'passed'}
def decide_branch(**context):
"""决定执行哪个分支"""
ti = context['ti']
qc_result = ti.xcom_pull(task_ids='quality_check')
if qc_result['record_count'] > 10000:
return 'process_large_data'
else:
return 'process_small_data'
def process_data(data_size: str, **context):
"""处理数据"""
ds = context['ds']
logger.info(f"处理{data_size}数据集,日期: {ds}")
# 实际的数据处理逻辑
return f"Processed {data_size} data for {ds}"
def send_notification(**context):
"""发送通知"""
ti = context['ti']
ds = context['ds']
# 获取所有上游任务的结果
logger.info(f"发送完成通知,日期: {ds}")
# ============ DAG定义 ============
with DAG(
dag_id='etl_daily_pipeline_v2',
default_args=default_args,
description='生产级ETL数据管道示例',
schedule_interval='0 2 * * *',
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
tags=['etl', 'production', 'daily'],
doc_md="""
## ETL Daily Pipeline
### 功能说明
1. 等待上游数据就绪
2. 抽取MySQL数据
3. 数据质量检查
4. 根据数据量选择处理策略
5. 加载到数据仓库
6. 发送通知
### 依赖
- MySQL源数据库
- 数据仓库
- 告警服务
""",
) as dag:
# ===== 开始节点 =====
start = EmptyOperator(task_id='start')
# ===== 等待上游数据 =====
wait_for_source = SqlSensor(
task_id='wait_for_source_data',
conn_id='mysql_source',
sql="""
SELECT COUNT(*) > 0
FROM etl_status
WHERE date = '{{ ds }}' AND status = 'ready'
""",
poke_interval=300,
timeout=7200,
mode='reschedule',
)
# ===== 抽取数据(任务组) =====
with TaskGroup(group_id='extract_group') as extract_group:
extract_users = BashOperator(
task_id='extract_users',
bash_command='python /scripts/extract.py --table users --date {{ ds }}',
)
extract_orders = BashOperator(
task_id='extract_orders',
bash_command='python /scripts/extract.py --table orders --date {{ ds }}',
)
extract_products = BashOperator(
task_id='extract_products',
bash_command='python /scripts/extract.py --table products --date {{ ds }}',
)
# 组内并行执行
[extract_users, extract_orders, extract_products]
# ===== 数据质量检查 =====
quality_check = PythonOperator(
task_id='quality_check',
python_callable=check_data_quality,
)
# ===== 分支决策 =====
branch = BranchPythonOperator(
task_id='branch_by_data_size',
python_callable=decide_branch,
)
# ===== 处理分支 =====
process_large = PythonOperator(
task_id='process_large_data',
python_callable=process_data,
op_kwargs={'data_size': 'large'},
)
process_small = PythonOperator(
task_id='process_small_data',
python_callable=process_data,
op_kwargs={'data_size': 'small'},
)
# ===== 合并分支 =====
merge = EmptyOperator(
task_id='merge_branches',
trigger_rule='none_failed_min_one_success', # 至少一个成功即可
)
# ===== 加载到数仓 =====
with TaskGroup(group_id='load_group') as load_group:
load_to_dw = MySqlOperator(
task_id='load_to_warehouse',
mysql_conn_id='mysql_warehouse',
sql='/sql/load_warehouse.sql',
)
update_stats = MySqlOperator(
task_id='update_statistics',
mysql_conn_id='mysql_warehouse',
sql="""
INSERT INTO etl_statistics (date, table_name, row_count, updated_at)
VALUES ('{{ ds }}', 'daily_summary',
(SELECT COUNT(*) FROM daily_summary WHERE date = '{{ ds }}'),
NOW())
ON DUPLICATE KEY UPDATE
row_count = VALUES(row_count),
updated_at = NOW()
""",
)
load_to_dw >> update_stats
# ===== 发送通知 =====
notify = PythonOperator(
task_id='send_notification',
python_callable=send_notification,
trigger_rule='all_done', # 无论成功失败都执行
)
# ===== 结束节点 =====
end = EmptyOperator(task_id='end')
# ===== 定义依赖关系 =====
start >> wait_for_source >> extract_group >> quality_check >> branch
branch >> [process_large, process_small] >> merge
merge >> load_group >> notify >> end5.3 DAG 可视化
graph LR
start[start] --> wait[wait_for_source_data]
wait --> extract[extract_group]
subgraph extract[extract_group]
e1[extract_users]
e2[extract_orders]
e3[extract_products]
end
extract --> qc[quality_check]
qc --> branch[branch_by_data_size]
branch -->|大数据量| large[process_large_data]
branch -->|小数据量| small[process_small_data]
large --> merge[merge_branches]
small --> merge
merge --> load[load_group]
subgraph load[load_group]
l1[load_to_warehouse] --> l2[update_statistics]
end
load --> notify[send_notification]
notify --> end_node[end]
六、高级特性
6.1 动态 DAG 生成
"""
根据配置动态生成多个相似的DAG
"""
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# 配置:每个数据源一个DAG
data_sources = [
{'name': 'mysql_db1', 'schedule': '0 1 * * *', 'conn_id': 'mysql_db1'},
{'name': 'mysql_db2', 'schedule': '0 2 * * *', 'conn_id': 'mysql_db2'},
{'name': 'postgres_db1', 'schedule': '0 3 * * *', 'conn_id': 'pg_db1'},
]
def create_dag(source_config):
"""工厂函数:创建DAG"""
dag_id = f"sync_{source_config['name']}"
dag = DAG(
dag_id=dag_id,
schedule_interval=source_config['schedule'],
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['dynamic', 'sync'],
)
def sync_data(conn_id, **context):
print(f"Syncing data from {conn_id}")
with dag:
sync_task = PythonOperator(
task_id='sync_data',
python_callable=sync_data,
op_kwargs={'conn_id': source_config['conn_id']},
)
return dag
# 动态创建DAG并注册到全局命名空间
for source in data_sources:
dag_id = f"sync_{source['name']}"
globals()[dag_id] = create_dag(source)6.2 TaskFlow API (Airflow 2.0+)
"""
使用TaskFlow API简化DAG编写
更Pythonic,更简洁
"""
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id='taskflow_example',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['taskflow'],
)
def taskflow_pipeline():
"""使用装饰器定义DAG"""
@task
def extract() -> dict:
"""抽取数据"""
return {'data': [1, 2, 3, 4, 5], 'count': 5}
@task
def transform(raw_data: dict) -> dict:
"""转换数据"""
data = raw_data['data']
transformed = [x * 2 for x in data]
return {'data': transformed, 'count': len(transformed)}
@task
def load(transformed_data: dict) -> str:
"""加载数据"""
print(f"Loading {transformed_data['count']} records")
return 'success'
@task
def notify(status: str):
"""发送通知"""
print(f"Pipeline finished with status: {status}")
# 自动推断依赖关系!
raw = extract()
transformed = transform(raw)
status = load(transformed)
notify(status)
# 实例化DAG
taskflow_dag = taskflow_pipeline()6.3 Deferrable Operators (Airflow 2.2+)
graph LR
subgraph "传统Sensor"
A1[Sensor任务] -->|占用Worker| A2[循环检查]
A2 --> A3[浪费资源]
end
graph LR
subgraph "Deferrable Operator"
B1[任务启动] -->|触发异步| B2[Triggerer处理]
B2 -->|条件满足| B3[唤醒任务]
B1 -->|释放Worker| B4[其他任务可用]
end
from airflow.sensors.filesystem import FileSensor
# 传统模式:占用Worker槽位
file_sensor_poke = FileSensor(
task_id='wait_file_poke',
filepath='/data/file.csv',
mode='poke', # 持续占用
)
# Reschedule模式:定期释放
file_sensor_reschedule = FileSensor(
task_id='wait_file_reschedule',
filepath='/data/file.csv',
mode='reschedule', # 每次检查后释放
)
# Deferrable模式:最高效(需要Triggerer组件)
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensorAsync
s3_sensor_async = S3KeySensorAsync(
task_id='wait_s3_file',
bucket_key='s3://bucket/path/file.csv',
deferrable=True, # 异步等待
)6.4 数据感知调度 (Data-aware Scheduling)
"""
Airflow 2.4+ Dataset感知调度
上游DAG产出数据集,下游DAG自动触发
"""
from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
from datetime import datetime
# 定义数据集
orders_dataset = Dataset('s3://bucket/orders/')
users_dataset = Dataset('s3://bucket/users/')
# 生产者DAG
with DAG(
dag_id='producer_orders',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
) as producer_dag:
@task(outlets=[orders_dataset]) # 声明产出数据集
def produce_orders():
# 生成订单数据
pass
# 消费者DAG - 当orders和users都更新时自动触发
with DAG(
dag_id='consumer_report',
schedule=[orders_dataset, users_dataset], # 数据集触发
start_date=datetime(2024, 1, 1),
) as consumer_dag:
@task
def generate_report():
# 生成报表
pass七、运维与监控
7.1 部署架构(生产环境)
graph TB
subgraph "负载均衡"
LB[Nginx/ALB]
end
subgraph "Web层"
WS1[Webserver 1]
WS2[Webserver 2]
end
subgraph "调度层"
SCH1[Scheduler 1]
SCH2[Scheduler 2]
TRIG1[Triggerer 1]
TRIG2[Triggerer 2]
end
subgraph "执行层"
W1[Worker 1]
W2[Worker 2]
W3[Worker 3]
W4[Worker N]
end
subgraph "存储层"
REDIS[(Redis)]
PG[(PostgreSQL HA)]
S3[(S3 日志)]
end
LB --> WS1 & WS2
WS1 & WS2 --> PG
SCH1 & SCH2 --> PG
SCH1 & SCH2 --> REDIS
REDIS --> W1 & W2 & W3 & W4
W1 & W2 & W3 & W4 --> PG
W1 & W2 & W3 & W4 --> S3
TRIG1 & TRIG2 --> PG
7.2 关键监控指标
graph TB
subgraph "监控指标"
A[DAG指标] --> A1[DAG运行时长]
A --> A2[DAG成功率]
A --> A3[DAG积压数量]
B[Task指标] --> B1[Task队列长度]
B --> B2[Task失败率]
B --> B3[Task平均耗时]
C[系统指标] --> C1[Scheduler心跳]
C --> C2[Worker活跃数]
C --> C3[DB连接池]
end
7.3 常见问题排查
flowchart TD
A[问题排查] --> B{DAG不执行?}
B -->|是| B1["检查Scheduler是否运行"]
B1 --> B2["检查DAG是否暂停"]
B2 --> B3["检查schedule_interval"]
B3 --> B4["检查start_date"]
A --> C{Task一直Queued?}
C -->|是| C1["检查Worker是否运行"]
C1 --> C2["检查并发配置"]
C2 --> C3["检查Pool容量"]
A --> D{Task执行失败?}
D -->|是| D1["查看Task日志"]
D1 --> D2["检查Connection配置"]
D2 --> D3["检查资源是否充足"]
7.4 配置优化
# airflow.cfg 关键配置
[core]
# DAG解析间隔(秒)
dag_dir_list_interval = 300
# 最大活跃DAG运行数
max_active_runs_per_dag = 16
# 并行任务数
parallelism = 32
[scheduler]
# Scheduler循环间隔
scheduler_heartbeat_sec = 5
# 每次调度的最大DAG数
max_dagruns_to_create_per_loop = 10
# 每次解析的最大DAG文件数
max_dagruns_per_loop_to_schedule = 20
[celery]
# Worker并发数
worker_concurrency = 16
[webserver]
# Web页面刷新间隔
default_dag_run_display_number = 25八、最佳实践
8.1 DAG 编写规范
mindmap
root((DAG最佳实践))
命名规范
dag_id使用下划线
task_id语义清晰
统一前缀便于查找
幂等设计
任务可重复执行
使用INSERT OVERWRITE
避免重复插入
错误处理
合理设置重试
设置超时时间
失败通知
性能优化
避免在DAG顶层做重计算
使用TaskGroup组织
合理设置并发
8.2 常见反模式
# ❌ 反模式1:在DAG顶层做重计算
import pandas as pd
df = pd.read_csv('large_file.csv') # 每次解析DAG都会执行!
# ✅ 正确做法:放在任务函数内
def process_data():
df = pd.read_csv('large_file.csv')
# ...
# ❌ 反模式2:硬编码日期
sql = "SELECT * FROM table WHERE date = '2024-01-15'"
# ✅ 正确做法:使用模板变量
sql = "SELECT * FROM table WHERE date = '{{ ds }}'"
# ❌ 反模式3:过度使用XCom传递大数据
def return_large_data():
return pd.read_csv('huge_file.csv').to_dict() # 可能几百MB
# ✅ 正确做法:XCom只传递引用
def return_data_path():
path = '/tmp/processed_data.parquet'
df.to_parquet(path)
return path # 只传路径
# ❌ 反模式4:不设置超时
task = BashOperator(
task_id='might_hang',
bash_command='python long_running.py',
# 没有execution_timeout,可能永远挂住
)
# ✅ 正确做法:总是设置超时
task = BashOperator(
task_id='with_timeout',
bash_command='python long_running.py',
execution_timeout=timedelta(hours=2),
)九、总结
9.1 Airflow 核心价值
graph LR
A[Airflow核心价值] --> B[代码即配置]
A --> C[强大的调度能力]
A --> D[丰富的生态]
A --> E[完善的监控]
B --> B1["DAG版本控制
代码Review
CI/CD集成"] C --> C1["复杂依赖
条件分支
动态任务"] D --> D1["200+ Operators
主流系统全覆盖"] E --> E1["Web UI
日志追踪
告警集成"]
代码Review
CI/CD集成"] C --> C1["复杂依赖
条件分支
动态任务"] D --> D1["200+ Operators
主流系统全覆盖"] E --> E1["Web UI
日志追踪
告警集成"]
9.2 适用场景
| 场景 | 是否适合 | 说明 |
|---|---|---|
| 复杂 ETL Pipeline | ✅ 非常适合 | Airflow 的主战场 |
| ML Pipeline | ✅ 适合 | 配合 MLflow/Kubeflow |
| 定时任务调度 | ✅ 适合 | 比 Crontab 强太多 |
| 实时流处理 | ❌ 不适合 | 用 Flink/Spark Streaming |
| 简单定时任务 | ⚠️ 看情况 | 可能杀鸡用牛刀 |
9.3 一句话总结
Airflow = Python 定义工作流 + 强大的调度 + 丰富的生态 + 完善的监控
如果你的团队懂 Python,需要管理复杂的数据管道,Airflow 是最佳选择之一。
如果这篇文章对你有帮助,欢迎:
- 👍 点赞:让更多人看到
- ⭐ 收藏:方便日后查阅
- 💬 评论:分享你的 Airflow 使用经验