搜 索

Airflow从入门到放弃系列:以代码定义工作流的魅力

  • 15阅读
  • 2023年05月20日
  • 0评论
首页 / AI/大数据 / 正文

一、前言:为什么需要工作流调度?

还记得你刚入行时是怎么跑定时任务的吗?

# 青涩的你:crontab -e
0 2 * * * /home/scripts/etl_step1.sh
30 2 * * * /home/scripts/etl_step2.sh  # 假设step1跑30分钟
0 3 * * * /home/scripts/etl_step3.sh   # 假设step2跑30分钟

# 某天凌晨3点,你被电话叫醒
# "报表数据不对!"
# 原来step1跑了40分钟,step2还没跑完step3就开始了...

Crontab 的致命问题:

graph TD A[Crontab的痛点] --> B[无依赖管理] A --> C[无失败重试] A --> D[无可视化监控] A --> E[无历史记录] B --> B1["Task2不知道Task1是否完成"] C --> C1["失败了?手动重跑吧"] D --> D1["任务跑到哪了?看日志去"] E --> E1["上周的任务什么情况?不知道"]

工作流调度器的价值:

graph LR subgraph "有调度器的世界" A[Task1] -->|成功后| B[Task2] B -->|成功后| C[Task3] A -->|失败| D[告警+自动重试] B -->|失败| D end

二、Airflow 是什么?

2.1 一句话定义

Airflow = 以 Python 代码定义工作流(DAG)的分布式任务调度平台

2.2 发展历程

timeline title Airflow 发展史 2014 : Airbnb 内部项目 2015 : 开源 2016 : 进入 Apache 孵化 2019 : 成为 Apache 顶级项目 2020 : Airflow 2.0 发布(重大升级) 2023 : Airflow 2.7+ 持续迭代

2.3 为什么选择 Airflow?

graph TB A[Airflow的优势] --> B[代码即配置] A --> C[丰富的Operator] A --> D[强大的社区] A --> E[完善的监控] A --> F[灵活的扩展性] B --> B1["Python定义DAG
版本控制友好
代码review"] C --> C1["200+ 内置Operator
覆盖主流系统"] D --> D1["Apache顶级项目
大厂背书"] E --> E1["Web UI
日志追踪
告警集成"] F --> F1["自定义Operator
Plugin机制"]

三、核心概念详解

3.1 概念全景图

graph TB subgraph "Airflow核心概念" DAG[DAG
有向无环图] Task[Task
任务节点] Operator[Operator
任务类型] Sensor[Sensor
等待条件] Hook[Hook
外部连接] Connection[Connection
连接配置] Variable[Variable
全局变量] XCom[XCom
任务通信] end DAG --> Task Task --> Operator Task --> Sensor Operator --> Hook Sensor --> Hook Hook --> Connection DAG --> Variable Task --> XCom

3.2 DAG:有向无环图

graph LR subgraph "一个典型的ETL DAG" A[extract_data] --> B[transform_data] B --> C[load_to_warehouse] C --> D[generate_report] C --> E[send_notification] end

DAG 的基本定义:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# DAG 默认参数
default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'email': ['alert@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

# 定义 DAG
with DAG(
    dag_id='etl_daily_pipeline',
    default_args=default_args,
    description='每日ETL数据管道',
    schedule_interval='0 2 * * *',  # 每天凌晨2点
    start_date=datetime(2024, 1, 1),
    catchup=False,  # 不补跑历史
    tags=['etl', 'daily'],
) as dag:
    
    # 定义任务...
    pass

3.3 Operator:任务类型

graph TB subgraph "常用Operator分类" A[Action Operators] --> A1[BashOperator] A --> A2[PythonOperator] A --> A3[EmailOperator] B[Transfer Operators] --> B1[S3ToRedshiftOperator] B --> B2[MySqlToHiveOperator] B --> B3[GCSToGCSOperator] C[Sensor Operators] --> C1[FileSensor] C --> C2[HttpSensor] C --> C3[SqlSensor] D[Provider Operators] --> D1[SparkSubmitOperator] D --> D2[KubernetesPodOperator] D --> D3[SnowflakeOperator] end

常用 Operator 示例:

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.http.operators.http import SimpleHttpOperator

# 1. BashOperator - 执行Shell命令
extract_task = BashOperator(
    task_id='extract_data',
    bash_command='python /scripts/extract.py --date {{ ds }}',
)

# 2. PythonOperator - 执行Python函数
def transform_data(**context):
    execution_date = context['ds']
    # 数据转换逻辑
    return f"Transformed data for {execution_date}"

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    provide_context=True,
)

# 3. MySqlOperator - 执行SQL
load_task = MySqlOperator(
    task_id='load_to_mysql',
    mysql_conn_id='mysql_warehouse',
    sql="""
        INSERT INTO daily_stats (date, metric, value)
        SELECT '{{ ds }}', metric, SUM(value)
        FROM raw_data
        WHERE date = '{{ ds }}'
        GROUP BY metric
    """,
)

# 4. HttpOperator - 调用API
notify_task = SimpleHttpOperator(
    task_id='notify_api',
    http_conn_id='notification_api',
    endpoint='/webhook/airflow',
    method='POST',
    data='{"status": "completed", "date": "{{ ds }}"}',
)

3.4 Sensor:等待条件满足

sequenceDiagram participant Scheduler participant Sensor participant External Scheduler->>Sensor: 启动Sensor任务 loop 每隔poke_interval检查 Sensor->>External: 检查条件是否满足 External-->>Sensor: 未满足 Sensor->>Sensor: 等待poke_interval end Sensor->>External: 检查条件 External-->>Sensor: 条件满足! Sensor->>Scheduler: 任务成功,继续下游
from airflow.sensors.filesystem import FileSensor
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.sql import SqlSensor

# 1. 文件Sensor - 等待文件出现
wait_for_file = FileSensor(
    task_id='wait_for_data_file',
    filepath='/data/input/{{ ds }}/data.csv',
    poke_interval=60,  # 每60秒检查一次
    timeout=3600,      # 最多等待1小时
    mode='poke',       # poke模式占用worker槽位
)

# 2. SQL Sensor - 等待数据就绪
wait_for_data = SqlSensor(
    task_id='wait_for_upstream_data',
    conn_id='mysql_source',
    sql="""
        SELECT COUNT(*) FROM source_table 
        WHERE date = '{{ ds }}' AND status = 'ready'
    """,
    poke_interval=300,
    timeout=7200,
    mode='reschedule',  # reschedule模式释放槽位
)

# 3. HTTP Sensor - 等待API返回成功
wait_for_api = HttpSensor(
    task_id='wait_for_upstream_api',
    http_conn_id='upstream_service',
    endpoint='/status/{{ ds }}',
    response_check=lambda response: response.json()['status'] == 'ready',
    poke_interval=120,
    timeout=3600,
)

3.5 XCom:任务间通信

graph LR A[Task A] -->|push XCom| X[(XCom存储)] X -->|pull XCom| B[Task B] subgraph "XCom数据流" A1["return 'value'
或 xcom_push()"] --> X X --> B1["xcom_pull()
或 Jinja模板"] end
# 方式1:通过return值自动push
def extract_data(**context):
    data = {'count': 1000, 'status': 'success'}
    return data  # 自动push到XCom

def transform_data(**context):
    # 从上游任务获取XCom
    ti = context['ti']
    upstream_data = ti.xcom_pull(task_ids='extract_data')
    print(f"收到数据: {upstream_data}")  # {'count': 1000, 'status': 'success'}

# 方式2:手动push/pull
def manual_push(**context):
    ti = context['ti']
    ti.xcom_push(key='my_key', value='my_value')

def manual_pull(**context):
    ti = context['ti']
    value = ti.xcom_pull(task_ids='manual_push', key='my_key')

# 方式3:Jinja模板中使用
bash_task = BashOperator(
    task_id='use_xcom',
    bash_command='echo "Count: {{ ti.xcom_pull(task_ids="extract_data")["count"] }}"',
)

四、Airflow 架构详解

4.1 整体架构

graph TB subgraph "Airflow架构" subgraph "Web层" WS[Webserver
Flask应用] end subgraph "调度层" SCH[Scheduler
调度器] TRIG[Triggerer
异步触发器] end subgraph "执行层" W1[Worker 1] W2[Worker 2] W3[Worker N] end subgraph "存储层" META[(Metadata DB
PostgreSQL/MySQL)] LOG[(日志存储
本地/S3/GCS)] end subgraph "消息队列 可选" MQ[Redis/RabbitMQ] end end WS --> META SCH --> META SCH --> MQ MQ --> W1 & W2 & W3 W1 & W2 & W3 --> META W1 & W2 & W3 --> LOG TRIG --> META

4.2 核心组件职责

组件职责部署建议
Webserver提供 Web UI,展示 DAG 状态可多实例负载均衡
Scheduler解析 DAG,调度任务2.0+ 支持多实例 HA
Worker执行具体任务按需水平扩展
Triggerer处理 Deferrable Operator2.2+ 新增,可选
Metadata DB存储 DAG/任务/连接等元数据PostgreSQL 推荐

4.3 Executor 类型

graph TB A[Executor类型] --> B[SequentialExecutor] A --> C[LocalExecutor] A --> D[CeleryExecutor] A --> E[KubernetesExecutor] B --> B1["顺序执行
仅用于测试"] C --> C1["本地多进程
单机部署"] D --> D1["分布式
Celery + Redis/RabbitMQ"] E --> E1["K8s Pod执行
弹性伸缩"]
flowchart TD A{选择Executor} --> B{任务量级?} B -->|很少,测试用| C[SequentialExecutor] B -->|中等,单机| D[LocalExecutor] B -->|大量,分布式| E{基础设施?} E -->|有K8s| F[KubernetesExecutor] E -->|无K8s| G[CeleryExecutor] style C fill:#ff6b6b style D fill:#ffeaa7 style F fill:#4ecdc4 style G fill:#4ecdc4

4.4 任务生命周期

stateDiagram-v2 [*] --> scheduled: Scheduler发现 scheduled --> queued: 依赖满足 queued --> running: Worker获取 running --> success: 执行成功 running --> failed: 执行失败 running --> up_for_retry: 失败但可重试 up_for_retry --> scheduled: 重试 failed --> [*] success --> [*] scheduled --> upstream_failed: 上游失败 scheduled --> skipped: 条件跳过 upstream_failed --> [*] skipped --> [*]

五、实战:构建完整的 ETL Pipeline

5.1 项目结构

airflow_project/
├── dags/
│   ├── __init__.py
│   ├── etl_daily_pipeline.py      # DAG定义
│   └── common/
│       ├── __init__.py
│       └── utils.py               # 公共函数
├── plugins/
│   └── custom_operators/          # 自定义Operator
├── tests/
│   └── test_dags.py              # DAG测试
├── docker-compose.yaml
└── requirements.txt

5.2 完整 DAG 示例

"""
ETL Daily Pipeline
每日从MySQL抽取数据,转换后加载到数据仓库
"""
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.sensors.sql import SqlSensor
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

# ============ 默认参数 ============
default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'email': ['data-alerts@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
}

# ============ Python函数 ============
def check_data_quality(**context):
    """数据质量检查"""
    ds = context['ds']
    # 这里执行数据质量检查逻辑
    record_count = 1000  # 模拟获取记录数
    
    if record_count < 100:
        raise ValueError(f"数据量异常: {record_count} < 100")
    
    logger.info(f"数据质量检查通过,记录数: {record_count}")
    return {'record_count': record_count, 'status': 'passed'}

def decide_branch(**context):
    """决定执行哪个分支"""
    ti = context['ti']
    qc_result = ti.xcom_pull(task_ids='quality_check')
    
    if qc_result['record_count'] > 10000:
        return 'process_large_data'
    else:
        return 'process_small_data'

def process_data(data_size: str, **context):
    """处理数据"""
    ds = context['ds']
    logger.info(f"处理{data_size}数据集,日期: {ds}")
    # 实际的数据处理逻辑
    return f"Processed {data_size} data for {ds}"

def send_notification(**context):
    """发送通知"""
    ti = context['ti']
    ds = context['ds']
    # 获取所有上游任务的结果
    logger.info(f"发送完成通知,日期: {ds}")

# ============ DAG定义 ============
with DAG(
    dag_id='etl_daily_pipeline_v2',
    default_args=default_args,
    description='生产级ETL数据管道示例',
    schedule_interval='0 2 * * *',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=1,
    tags=['etl', 'production', 'daily'],
    doc_md="""
    ## ETL Daily Pipeline
    
    ### 功能说明
    1. 等待上游数据就绪
    2. 抽取MySQL数据
    3. 数据质量检查
    4. 根据数据量选择处理策略
    5. 加载到数据仓库
    6. 发送通知
    
    ### 依赖
    - MySQL源数据库
    - 数据仓库
    - 告警服务
    """,
) as dag:
    
    # ===== 开始节点 =====
    start = EmptyOperator(task_id='start')
    
    # ===== 等待上游数据 =====
    wait_for_source = SqlSensor(
        task_id='wait_for_source_data',
        conn_id='mysql_source',
        sql="""
            SELECT COUNT(*) > 0 
            FROM etl_status 
            WHERE date = '{{ ds }}' AND status = 'ready'
        """,
        poke_interval=300,
        timeout=7200,
        mode='reschedule',
    )
    
    # ===== 抽取数据(任务组) =====
    with TaskGroup(group_id='extract_group') as extract_group:
        extract_users = BashOperator(
            task_id='extract_users',
            bash_command='python /scripts/extract.py --table users --date {{ ds }}',
        )
        
        extract_orders = BashOperator(
            task_id='extract_orders',
            bash_command='python /scripts/extract.py --table orders --date {{ ds }}',
        )
        
        extract_products = BashOperator(
            task_id='extract_products',
            bash_command='python /scripts/extract.py --table products --date {{ ds }}',
        )
        
        # 组内并行执行
        [extract_users, extract_orders, extract_products]
    
    # ===== 数据质量检查 =====
    quality_check = PythonOperator(
        task_id='quality_check',
        python_callable=check_data_quality,
    )
    
    # ===== 分支决策 =====
    branch = BranchPythonOperator(
        task_id='branch_by_data_size',
        python_callable=decide_branch,
    )
    
    # ===== 处理分支 =====
    process_large = PythonOperator(
        task_id='process_large_data',
        python_callable=process_data,
        op_kwargs={'data_size': 'large'},
    )
    
    process_small = PythonOperator(
        task_id='process_small_data',
        python_callable=process_data,
        op_kwargs={'data_size': 'small'},
    )
    
    # ===== 合并分支 =====
    merge = EmptyOperator(
        task_id='merge_branches',
        trigger_rule='none_failed_min_one_success',  # 至少一个成功即可
    )
    
    # ===== 加载到数仓 =====
    with TaskGroup(group_id='load_group') as load_group:
        load_to_dw = MySqlOperator(
            task_id='load_to_warehouse',
            mysql_conn_id='mysql_warehouse',
            sql='/sql/load_warehouse.sql',
        )
        
        update_stats = MySqlOperator(
            task_id='update_statistics',
            mysql_conn_id='mysql_warehouse',
            sql="""
                INSERT INTO etl_statistics (date, table_name, row_count, updated_at)
                VALUES ('{{ ds }}', 'daily_summary', 
                        (SELECT COUNT(*) FROM daily_summary WHERE date = '{{ ds }}'),
                        NOW())
                ON DUPLICATE KEY UPDATE 
                    row_count = VALUES(row_count),
                    updated_at = NOW()
            """,
        )
        
        load_to_dw >> update_stats
    
    # ===== 发送通知 =====
    notify = PythonOperator(
        task_id='send_notification',
        python_callable=send_notification,
        trigger_rule='all_done',  # 无论成功失败都执行
    )
    
    # ===== 结束节点 =====
    end = EmptyOperator(task_id='end')
    
    # ===== 定义依赖关系 =====
    start >> wait_for_source >> extract_group >> quality_check >> branch
    branch >> [process_large, process_small] >> merge
    merge >> load_group >> notify >> end

5.3 DAG 可视化

graph LR start[start] --> wait[wait_for_source_data] wait --> extract[extract_group] subgraph extract[extract_group] e1[extract_users] e2[extract_orders] e3[extract_products] end extract --> qc[quality_check] qc --> branch[branch_by_data_size] branch -->|大数据量| large[process_large_data] branch -->|小数据量| small[process_small_data] large --> merge[merge_branches] small --> merge merge --> load[load_group] subgraph load[load_group] l1[load_to_warehouse] --> l2[update_statistics] end load --> notify[send_notification] notify --> end_node[end]

六、高级特性

6.1 动态 DAG 生成

"""
根据配置动态生成多个相似的DAG
"""
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# 配置:每个数据源一个DAG
data_sources = [
    {'name': 'mysql_db1', 'schedule': '0 1 * * *', 'conn_id': 'mysql_db1'},
    {'name': 'mysql_db2', 'schedule': '0 2 * * *', 'conn_id': 'mysql_db2'},
    {'name': 'postgres_db1', 'schedule': '0 3 * * *', 'conn_id': 'pg_db1'},
]

def create_dag(source_config):
    """工厂函数:创建DAG"""
    dag_id = f"sync_{source_config['name']}"
    
    dag = DAG(
        dag_id=dag_id,
        schedule_interval=source_config['schedule'],
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=['dynamic', 'sync'],
    )
    
    def sync_data(conn_id, **context):
        print(f"Syncing data from {conn_id}")
    
    with dag:
        sync_task = PythonOperator(
            task_id='sync_data',
            python_callable=sync_data,
            op_kwargs={'conn_id': source_config['conn_id']},
        )
    
    return dag

# 动态创建DAG并注册到全局命名空间
for source in data_sources:
    dag_id = f"sync_{source['name']}"
    globals()[dag_id] = create_dag(source)

6.2 TaskFlow API (Airflow 2.0+)

"""
使用TaskFlow API简化DAG编写
更Pythonic,更简洁
"""
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='taskflow_example',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['taskflow'],
)
def taskflow_pipeline():
    """使用装饰器定义DAG"""
    
    @task
    def extract() -> dict:
        """抽取数据"""
        return {'data': [1, 2, 3, 4, 5], 'count': 5}
    
    @task
    def transform(raw_data: dict) -> dict:
        """转换数据"""
        data = raw_data['data']
        transformed = [x * 2 for x in data]
        return {'data': transformed, 'count': len(transformed)}
    
    @task
    def load(transformed_data: dict) -> str:
        """加载数据"""
        print(f"Loading {transformed_data['count']} records")
        return 'success'
    
    @task
    def notify(status: str):
        """发送通知"""
        print(f"Pipeline finished with status: {status}")
    
    # 自动推断依赖关系!
    raw = extract()
    transformed = transform(raw)
    status = load(transformed)
    notify(status)

# 实例化DAG
taskflow_dag = taskflow_pipeline()

6.3 Deferrable Operators (Airflow 2.2+)

graph LR subgraph "传统Sensor" A1[Sensor任务] -->|占用Worker| A2[循环检查] A2 --> A3[浪费资源] end
graph LR subgraph "Deferrable Operator" B1[任务启动] -->|触发异步| B2[Triggerer处理] B2 -->|条件满足| B3[唤醒任务] B1 -->|释放Worker| B4[其他任务可用] end
from airflow.sensors.filesystem import FileSensor

# 传统模式:占用Worker槽位
file_sensor_poke = FileSensor(
    task_id='wait_file_poke',
    filepath='/data/file.csv',
    mode='poke',  # 持续占用
)

# Reschedule模式:定期释放
file_sensor_reschedule = FileSensor(
    task_id='wait_file_reschedule',
    filepath='/data/file.csv',
    mode='reschedule',  # 每次检查后释放
)

# Deferrable模式:最高效(需要Triggerer组件)
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensorAsync

s3_sensor_async = S3KeySensorAsync(
    task_id='wait_s3_file',
    bucket_key='s3://bucket/path/file.csv',
    deferrable=True,  # 异步等待
)

6.4 数据感知调度 (Data-aware Scheduling)

"""
Airflow 2.4+ Dataset感知调度
上游DAG产出数据集,下游DAG自动触发
"""
from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
from datetime import datetime

# 定义数据集
orders_dataset = Dataset('s3://bucket/orders/')
users_dataset = Dataset('s3://bucket/users/')

# 生产者DAG
with DAG(
    dag_id='producer_orders',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
) as producer_dag:
    
    @task(outlets=[orders_dataset])  # 声明产出数据集
    def produce_orders():
        # 生成订单数据
        pass

# 消费者DAG - 当orders和users都更新时自动触发
with DAG(
    dag_id='consumer_report',
    schedule=[orders_dataset, users_dataset],  # 数据集触发
    start_date=datetime(2024, 1, 1),
) as consumer_dag:
    
    @task
    def generate_report():
        # 生成报表
        pass

七、运维与监控

7.1 部署架构(生产环境)

graph TB subgraph "负载均衡" LB[Nginx/ALB] end subgraph "Web层" WS1[Webserver 1] WS2[Webserver 2] end subgraph "调度层" SCH1[Scheduler 1] SCH2[Scheduler 2] TRIG1[Triggerer 1] TRIG2[Triggerer 2] end subgraph "执行层" W1[Worker 1] W2[Worker 2] W3[Worker 3] W4[Worker N] end subgraph "存储层" REDIS[(Redis)] PG[(PostgreSQL HA)] S3[(S3 日志)] end LB --> WS1 & WS2 WS1 & WS2 --> PG SCH1 & SCH2 --> PG SCH1 & SCH2 --> REDIS REDIS --> W1 & W2 & W3 & W4 W1 & W2 & W3 & W4 --> PG W1 & W2 & W3 & W4 --> S3 TRIG1 & TRIG2 --> PG

7.2 关键监控指标

graph TB subgraph "监控指标" A[DAG指标] --> A1[DAG运行时长] A --> A2[DAG成功率] A --> A3[DAG积压数量] B[Task指标] --> B1[Task队列长度] B --> B2[Task失败率] B --> B3[Task平均耗时] C[系统指标] --> C1[Scheduler心跳] C --> C2[Worker活跃数] C --> C3[DB连接池] end

7.3 常见问题排查

flowchart TD A[问题排查] --> B{DAG不执行?} B -->|是| B1["检查Scheduler是否运行"] B1 --> B2["检查DAG是否暂停"] B2 --> B3["检查schedule_interval"] B3 --> B4["检查start_date"] A --> C{Task一直Queued?} C -->|是| C1["检查Worker是否运行"] C1 --> C2["检查并发配置"] C2 --> C3["检查Pool容量"] A --> D{Task执行失败?} D -->|是| D1["查看Task日志"] D1 --> D2["检查Connection配置"] D2 --> D3["检查资源是否充足"]

7.4 配置优化

# airflow.cfg 关键配置

[core]
# DAG解析间隔(秒)
dag_dir_list_interval = 300
# 最大活跃DAG运行数
max_active_runs_per_dag = 16
# 并行任务数
parallelism = 32

[scheduler]
# Scheduler循环间隔
scheduler_heartbeat_sec = 5
# 每次调度的最大DAG数
max_dagruns_to_create_per_loop = 10
# 每次解析的最大DAG文件数
max_dagruns_per_loop_to_schedule = 20

[celery]
# Worker并发数
worker_concurrency = 16

[webserver]
# Web页面刷新间隔
default_dag_run_display_number = 25

八、最佳实践

8.1 DAG 编写规范

mindmap root((DAG最佳实践)) 命名规范 dag_id使用下划线 task_id语义清晰 统一前缀便于查找 幂等设计 任务可重复执行 使用INSERT OVERWRITE 避免重复插入 错误处理 合理设置重试 设置超时时间 失败通知 性能优化 避免在DAG顶层做重计算 使用TaskGroup组织 合理设置并发

8.2 常见反模式

# ❌ 反模式1:在DAG顶层做重计算
import pandas as pd
df = pd.read_csv('large_file.csv')  # 每次解析DAG都会执行!

# ✅ 正确做法:放在任务函数内
def process_data():
    df = pd.read_csv('large_file.csv')
    # ...

# ❌ 反模式2:硬编码日期
sql = "SELECT * FROM table WHERE date = '2024-01-15'"

# ✅ 正确做法:使用模板变量
sql = "SELECT * FROM table WHERE date = '{{ ds }}'"

# ❌ 反模式3:过度使用XCom传递大数据
def return_large_data():
    return pd.read_csv('huge_file.csv').to_dict()  # 可能几百MB

# ✅ 正确做法:XCom只传递引用
def return_data_path():
    path = '/tmp/processed_data.parquet'
    df.to_parquet(path)
    return path  # 只传路径

# ❌ 反模式4:不设置超时
task = BashOperator(
    task_id='might_hang',
    bash_command='python long_running.py',
    # 没有execution_timeout,可能永远挂住
)

# ✅ 正确做法:总是设置超时
task = BashOperator(
    task_id='with_timeout',
    bash_command='python long_running.py',
    execution_timeout=timedelta(hours=2),
)

九、总结

9.1 Airflow 核心价值

graph LR A[Airflow核心价值] --> B[代码即配置] A --> C[强大的调度能力] A --> D[丰富的生态] A --> E[完善的监控] B --> B1["DAG版本控制
代码Review
CI/CD集成"] C --> C1["复杂依赖
条件分支
动态任务"] D --> D1["200+ Operators
主流系统全覆盖"] E --> E1["Web UI
日志追踪
告警集成"]

9.2 适用场景

场景是否适合说明
复杂 ETL Pipeline✅ 非常适合Airflow 的主战场
ML Pipeline✅ 适合配合 MLflow/Kubeflow
定时任务调度✅ 适合比 Crontab 强太多
实时流处理❌ 不适合用 Flink/Spark Streaming
简单定时任务⚠️ 看情况可能杀鸡用牛刀

9.3 一句话总结

Airflow = Python 定义工作流 + 强大的调度 + 丰富的生态 + 完善的监控

如果你的团队懂 Python,需要管理复杂的数据管道,Airflow 是最佳选择之一。


如果这篇文章对你有帮助,欢迎:

  • 👍 点赞:让更多人看到
  • ⭐ 收藏:方便日后查阅
  • 💬 评论:分享你的 Airflow 使用经验
评论区
暂无评论
avatar