搜 索

自动化入库链路实战

  • 26阅读
  • 2023年02月25日
  • 0评论
首页 / AI/大数据 / 正文

前言:一个关于"手动挡"的噩梦

我曾经带过一个实习生,人很聪明,就是有个"坏习惯"——喜欢手动跑任务。

每天早上8点,他准时打开电脑,开始他的"日常仪式":

08:00 - 手动执行DataX同步用户表
08:15 - 手动执行DataX同步订单表
08:30 - 手动执行DataX同步商品表
08:45 - 手动执行Hive ODS层清洗脚本
09:00 - 手动执行Hive DWD层加工脚本
09:30 - 手动执行Hive DWS层聚合脚本
10:00 - 手动执行ADS层报表脚本
10:30 - 检查数据,发现订单表同步失败,从头再来...

有一天,他请假了。

那天早上,老板打开报表系统,发现数据还是昨天的。

然后,我的电话响了......

"为什么不做成自动化?!"

从那以后,我深刻理解了一个道理:

手动挡的尽头是翻车,自动化的尽头是摸鱼。

今天,让我们来聊聊如何搭建一套完整的自动化入库链路,让数据自己跑起来。


一、什么是自动化入库链路?

1.1 入库链路全景图

graph TB subgraph 数据源["🗄️ 数据源"] MySQL[(MySQL)] Oracle[(Oracle)] API[API接口] Log[日志文件] Kafka[Kafka] end subgraph 采集层["📥 采集层"] DataX[DataX
批量采集] FlinkCDC[Flink CDC
实时采集] Flume[Flume
日志采集] end subgraph 存储层["💾 存储层"] subgraph 数据湖仓["数据湖/仓"] ODS[ODS层
原始数据] DWD[DWD层
明细数据] DWS[DWS层
汇总数据] ADS[ADS层
应用数据] end end subgraph 调度层["⏰ 调度层"] Scheduler[调度系统
Airflow/DolphinScheduler] end subgraph 监控层["👁️ 监控层"] Quality[数据质量监控] Alert[告警系统] Lineage[数据血缘] end subgraph 服务层["🖥️ 服务层"] BI[BI报表] API2[数据API] ML[机器学习] end 数据源 --> 采集层 采集层 --> ODS ODS --> DWD --> DWS --> ADS ADS --> 服务层 Scheduler --> |"调度"|采集层 Scheduler --> |"调度"|存储层 监控层 --> |"监控"|采集层 监控层 --> |"监控"|存储层 style Scheduler fill:#ff6b6b style Quality fill:#4ecdc4

1.2 自动化的核心要素

mindmap root((自动化入库链路)) 调度编排 定时触发 依赖管理 失败重试 并行控制 数据同步 全量同步 增量同步 实时同步 数据处理 数据清洗 数据转换 数据聚合 质量保障 数据校验 异常检测 数据修复 监控告警 任务监控 数据监控 告警通知 元数据管理 表结构管理 血缘追踪 影响分析

1.3 为什么要自动化?

手动操作自动化
😫 每天重复执行😎 配置一次,永久运行
😰 容易遗漏步骤✅ 严格按流程执行
🤯 出错难以追溯📊 完整的执行日志
😭 请假了没人顶🤖 7×24小时运行
🐌 串行执行慢🚀 并行执行快
📞 失败靠人发现📱 自动告警通知

二、调度系统选型

2.1 主流调度系统对比

graph TB subgraph 调度系统["⏰ 主流调度系统"] subgraph 开源方案["开源方案"] Airflow[Apache Airflow
Python生态首选] DS[DolphinScheduler
国产之光] Azkaban[Azkaban
LinkedIn出品] Oozie[Oozie
Hadoop原生] end subgraph 商业方案["商业/云方案"] DataWorks[DataWorks
阿里云] CDP[CDP
Cloudera] AWS[AWS Step Functions] end end style Airflow fill:#4ecdc4 style DS fill:#4ecdc4

2.2 详细对比表

特性AirflowDolphinSchedulerAzkabanOozie
开发语言PythonJavaJavaJava
DAG定义Python代码可视化拖拽PropertiesXML
学习曲线中等
扩展性强(Operator)强(插件)一般
社区活跃⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
可视化很好一般
资源隔离需配置原生支持
中文支持

2.3 选型建议

flowchart TB Start[选择调度系统] --> Q1{团队技术栈?} Q1 --> |"Python为主"|Airflow1[Airflow
Python原生DAG] Q1 --> |"Java为主"|Q2{需要可视化?} Q2 --> |"是"|DS1[DolphinScheduler
拖拽式编排] Q2 --> |"否"|Q3{已有Hadoop?} Q3 --> |"是"|Oozie1[Oozie
Hadoop原生] Q3 --> |"否"|Azkaban1[Azkaban
轻量级] Airflow1 --> Q4{公司规模?} DS1 --> Q4 Q4 --> |"大厂/复杂场景"|Final1[Airflow + Celery
分布式执行] Q4 --> |"中小团队"|Final2[DolphinScheduler
开箱即用] style Airflow1 fill:#4ecdc4 style DS1 fill:#4ecdc4 style Final1 fill:#ffe66d style Final2 fill:#ffe66d

三、Airflow实战

3.1 Airflow架构

graph TB subgraph Airflow架构["🌊 Airflow架构"] subgraph 核心组件["核心组件"] Webserver[Web Server
UI界面] Scheduler2[Scheduler
调度器] Executor[Executor
执行器] MetaDB[(Metadata DB
元数据库)] end subgraph 执行模式["执行模式"] Local[LocalExecutor
单机多进程] Celery[CeleryExecutor
分布式] K8s[KubernetesExecutor
容器化] end subgraph Worker["Worker节点"] W1[Worker 1] W2[Worker 2] W3[Worker N] end Webserver --> MetaDB Scheduler2 --> MetaDB Scheduler2 --> Executor Executor --> Worker end style Scheduler2 fill:#ff6b6b style Executor fill:#4ecdc4

3.2 安装部署

# 创建虚拟环境
python -m venv airflow_venv
source airflow_venv/bin/activate

# 设置Airflow Home
export AIRFLOW_HOME=~/airflow

# 安装Airflow(指定版本和约束)
AIRFLOW_VERSION=2.8.1
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# 安装额外依赖
pip install apache-airflow-providers-mysql
pip install apache-airflow-providers-ssh
pip install apache-airflow-providers-http

# 初始化数据库
airflow db init

# 创建管理员用户
airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com \
    --password admin123

# 启动服务
airflow webserver -p 8080 -D
airflow scheduler -D

3.3 DAG开发:数据入库Pipeline

# dags/data_warehouse_pipeline.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup

# 默认参数
default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'email': ['data-alert@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
}

# DAG定义
with DAG(
    dag_id='data_warehouse_daily_pipeline',
    default_args=default_args,
    description='每日数据仓库入库Pipeline',
    schedule_interval='0 2 * * *',  # 每天凌晨2点执行
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['data_warehouse', 'daily'],
    max_active_runs=1,
) as dag:
    
    # 开始节点
    start = EmptyOperator(task_id='start')
    
    # ========== ODS层:数据采集 ==========
    with TaskGroup(group_id='ods_layer') as ods_layer:
        
        # 用户表同步
        sync_user = BashOperator(
            task_id='sync_user_info',
            bash_command='''
                python /opt/datax/bin/datax.py \
                    -p "-Ddt={{ ds }}" \
                    /opt/datax/job/user_info.json
            ''',
        )
        
        # 订单表同步
        sync_order = BashOperator(
            task_id='sync_order_info',
            bash_command='''
                python /opt/datax/bin/datax.py \
                    -p "-Ddt={{ ds }}" \
                    /opt/datax/job/order_info.json
            ''',
        )
        
        # 商品表同步
        sync_product = BashOperator(
            task_id='sync_product_info',
            bash_command='''
                python /opt/datax/bin/datax.py \
                    -p "-Ddt={{ ds }}" \
                    /opt/datax/job/product_info.json
            ''',
        )
        
        # 并行执行
        [sync_user, sync_order, sync_product]
    
    # ========== 数据质量检查 ==========
    with TaskGroup(group_id='data_quality') as data_quality:
        
        def check_record_count(**context):
            """检查记录数"""
            from airflow.providers.mysql.hooks.mysql import MySqlHook
            
            ds = context['ds']
            hook = MySqlHook(mysql_conn_id='hive_metastore')
            
            tables = ['ods_user_info', 'ods_order_info', 'ods_product_info']
            for table in tables:
                sql = f"SELECT COUNT(*) FROM {table} WHERE dt = '{ds}'"
                result = hook.get_first(sql)
                count = result[0]
                
                if count == 0:
                    raise ValueError(f"表 {table} 在 {ds} 分区数据为空!")
                
                print(f"✅ {table} 记录数: {count}")
        
        check_count = PythonOperator(
            task_id='check_record_count',
            python_callable=check_record_count,
        )
        
        def check_data_freshness(**context):
            """检查数据新鲜度"""
            # 检查源表最大更新时间是否在预期范围内
            pass
        
        check_freshness = PythonOperator(
            task_id='check_data_freshness',
            python_callable=check_data_freshness,
        )
        
        [check_count, check_freshness]
    
    # ========== DWD层:数据清洗 ==========
    with TaskGroup(group_id='dwd_layer') as dwd_layer:
        
        dwd_user = BashOperator(
            task_id='dwd_user_info',
            bash_command='''
                hive -e "
                    SET hive.exec.dynamic.partition=true;
                    SET hive.exec.dynamic.partition.mode=nonstrict;
                    
                    INSERT OVERWRITE TABLE dwd.dwd_user_info PARTITION(dt='{{ ds }}')
                    SELECT 
                        id,
                        user_name,
                        CASE WHEN age < 0 THEN NULL ELSE age END as age,
                        COALESCE(city, '未知') as city,
                        create_time,
                        update_time
                    FROM ods.ods_user_info
                    WHERE dt = '{{ ds }}'
                    AND id IS NOT NULL;
                "
            ''',
        )
        
        dwd_order = BashOperator(
            task_id='dwd_order_info',
            bash_command='''
                hive -e "
                    INSERT OVERWRITE TABLE dwd.dwd_order_info PARTITION(dt='{{ ds }}')
                    SELECT 
                        order_id,
                        user_id,
                        product_id,
                        CASE WHEN amount < 0 THEN 0 ELSE amount END as amount,
                        order_status,
                        create_time,
                        pay_time
                    FROM ods.ods_order_info
                    WHERE dt = '{{ ds }}'
                    AND order_id IS NOT NULL;
                "
            ''',
        )
        
        [dwd_user, dwd_order]
    
    # ========== DWS层:数据聚合 ==========
    with TaskGroup(group_id='dws_layer') as dws_layer:
        
        dws_user_daily = BashOperator(
            task_id='dws_user_daily_stats',
            bash_command='''
                hive -e "
                    INSERT OVERWRITE TABLE dws.dws_user_daily_stats PARTITION(dt='{{ ds }}')
                    SELECT 
                        u.id as user_id,
                        u.user_name,
                        u.city,
                        COUNT(o.order_id) as order_count,
                        COALESCE(SUM(o.amount), 0) as total_amount,
                        MAX(o.create_time) as last_order_time
                    FROM dwd.dwd_user_info u
                    LEFT JOIN dwd.dwd_order_info o 
                        ON u.id = o.user_id AND o.dt = '{{ ds }}'
                    WHERE u.dt = '{{ ds }}'
                    GROUP BY u.id, u.user_name, u.city;
                "
            ''',
        )
        
        dws_user_daily
    
    # ========== ADS层:应用数据 ==========
    with TaskGroup(group_id='ads_layer') as ads_layer:
        
        ads_daily_report = BashOperator(
            task_id='ads_daily_report',
            bash_command='''
                hive -e "
                    INSERT OVERWRITE TABLE ads.ads_daily_report PARTITION(dt='{{ ds }}')
                    SELECT 
                        '{{ ds }}' as report_date,
                        COUNT(DISTINCT user_id) as active_users,
                        SUM(order_count) as total_orders,
                        SUM(total_amount) as total_gmv,
                        SUM(total_amount) / COUNT(DISTINCT user_id) as arpu
                    FROM dws.dws_user_daily_stats
                    WHERE dt = '{{ ds }}';
                "
            ''',
        )
        
        ads_daily_report
    
    # ========== 数据导出 ==========
    with TaskGroup(group_id='data_export') as data_export:
        
        export_to_mysql = BashOperator(
            task_id='export_to_mysql',
            bash_command='''
                python /opt/datax/bin/datax.py \
                    -p "-Ddt={{ ds }}" \
                    /opt/datax/job/export_daily_report.json
            ''',
        )
        
        export_to_mysql
    
    # 结束节点
    end = EmptyOperator(task_id='end')
    
    # ========== 定义依赖关系 ==========
    start >> ods_layer >> data_quality >> dwd_layer >> dws_layer >> ads_layer >> data_export >> end

3.4 DAG可视化

graph LR subgraph Pipeline["📊 数据仓库Pipeline"] Start[Start] --> ODS subgraph ODS["ODS层"] O1[sync_user] O2[sync_order] O3[sync_product] end ODS --> QC subgraph QC["质量检查"] Q1[check_count] Q2[check_freshness] end QC --> DWD subgraph DWD["DWD层"] D1[dwd_user] D2[dwd_order] end DWD --> DWS subgraph DWS["DWS层"] S1[dws_user_daily] end DWS --> ADS subgraph ADS["ADS层"] A1[ads_daily_report] end ADS --> Export subgraph Export["导出"] E1[export_to_mysql] end Export --> End[End] end style ODS fill:#4ecdc4 style DWD fill:#ffe66d style DWS fill:#ff6b6b style ADS fill:#9b59b6

3.5 常用Operator

# 1. BashOperator - 执行Shell命令
from airflow.operators.bash import BashOperator

bash_task = BashOperator(
    task_id='run_shell',
    bash_command='echo "Hello {{ ds }}"',
)

# 2. PythonOperator - 执行Python函数
from airflow.operators.python import PythonOperator

def my_function(**context):
    print(f"执行日期: {context['ds']}")
    return "success"

python_task = PythonOperator(
    task_id='run_python',
    python_callable=my_function,
)

# 3. SSHOperator - 远程执行命令
from airflow.providers.ssh.operators.ssh import SSHOperator

ssh_task = SSHOperator(
    task_id='remote_command',
    ssh_conn_id='ssh_hadoop',
    command='hadoop fs -ls /user/hive/warehouse/',
)

# 4. HiveOperator - 执行Hive SQL
from airflow.providers.apache.hive.operators.hive import HiveOperator

hive_task = HiveOperator(
    task_id='run_hive',
    hql='SELECT * FROM my_table LIMIT 10',
    hive_cli_conn_id='hive_default',
)

# 5. SparkSubmitOperator - 提交Spark任务
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

spark_task = SparkSubmitOperator(
    task_id='spark_job',
    application='/opt/spark/jobs/my_job.py',
    conn_id='spark_default',
    executor_memory='4g',
    num_executors=4,
)

# 6. BranchPythonOperator - 条件分支
from airflow.operators.python import BranchPythonOperator

def choose_branch(**context):
    if context['ds_nodash'] == '20240101':
        return 'full_load'
    else:
        return 'incremental_load'

branch_task = BranchPythonOperator(
    task_id='branch',
    python_callable=choose_branch,
)

# 7. TriggerDagRunOperator - 触发其他DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_task = TriggerDagRunOperator(
    task_id='trigger_downstream',
    trigger_dag_id='downstream_dag',
    wait_for_completion=True,
)

四、DolphinScheduler实战

4.1 DolphinScheduler架构

graph TB subgraph DS架构["🐬 DolphinScheduler架构"] subgraph Master["MasterServer"] M1[调度器] M2[任务分发] M3[监控] end subgraph Worker["WorkerServer"] W1[Worker 1] W2[Worker 2] W3[Worker N] end subgraph Alert["AlertServer"] A1[告警服务] end subgraph API["ApiServer"] API1[REST API] API2[Web UI] end subgraph Storage["存储"] ZK[(ZooKeeper)] DB[(MySQL/PostgreSQL)] end API --> Master Master --> ZK Master --> Worker Master --> DB Worker --> DB Alert --> DB end style Master fill:#ff6b6b style Worker fill:#4ecdc4

4.2 安装部署

# 下载
wget https://dlcdn.apache.org/dolphinscheduler/3.2.0/apache-dolphinscheduler-3.2.0-bin.tar.gz
tar -zxvf apache-dolphinscheduler-3.2.0-bin.tar.gz
cd apache-dolphinscheduler-3.2.0-bin

# 配置数据库(以MySQL为例)
vim bin/env/dolphinscheduler_env.sh
# 添加:
export DATABASE=mysql
export SPRING_DATASOURCE_URL="jdbc:mysql://localhost:3306/dolphinscheduler"
export SPRING_DATASOURCE_USERNAME="ds_user"
export SPRING_DATASOURCE_PASSWORD="ds_password"

# 初始化数据库
bash tools/bin/upgrade-schema.sh

# 配置install_env.sh
vim bin/env/install_env.sh
# 配置各节点IP

# 一键部署
bash bin/install.sh

# 或单独启动各组件
bash bin/dolphinscheduler-daemon.sh start master-server
bash bin/dolphinscheduler-daemon.sh start worker-server
bash bin/dolphinscheduler-daemon.sh start api-server
bash bin/dolphinscheduler-daemon.sh start alert-server

# 访问Web UI
# http://localhost:12345/dolphinscheduler
# 默认用户:admin / dolphinscheduler123

4.3 工作流配置(可视化)

DolphinScheduler支持通过Web UI拖拽创建工作流,以下是通过API创建的示例:

{
  "name": "data_warehouse_daily",
  "description": "每日数据仓库入库流程",
  "globalParams": [
    {"prop": "dt", "direct": "IN", "type": "VARCHAR", "value": "${system.biz.date}"}
  ],
  "tasks": [
    {
      "name": "sync_user_info",
      "type": "DATAX",
      "params": {
        "customConfig": 1,
        "json": "{\"job\": {\"content\": [{\"reader\": {...}, \"writer\": {...}}]}}"
      },
      "preTasks": []
    },
    {
      "name": "sync_order_info",
      "type": "DATAX",
      "params": {
        "customConfig": 1,
        "json": "..."
      },
      "preTasks": []
    },
    {
      "name": "dwd_process",
      "type": "SQL",
      "params": {
        "type": "HIVE",
        "sql": "INSERT OVERWRITE TABLE dwd.dwd_user_info PARTITION(dt='${dt}') SELECT ..."
      },
      "preTasks": ["sync_user_info", "sync_order_info"]
    },
    {
      "name": "dws_process",
      "type": "SQL",
      "params": {
        "type": "HIVE",
        "sql": "INSERT OVERWRITE TABLE dws.dws_user_stats PARTITION(dt='${dt}') SELECT ..."
      },
      "preTasks": ["dwd_process"]
    }
  ]
}

4.4 DolphinScheduler任务类型

mindmap root((DS任务类型)) 数据集成 DataX SeaTunnel Sqoop SQL类 Hive Spark SQL MySQL PostgreSQL ClickHouse 大数据 Spark Flink MapReduce 脚本类 Shell Python SQL 流程控制 子流程 依赖 条件分支 Switch 其他 HTTP DataQuality MLflow

五、数据质量保障

5.1 数据质量维度

graph TB subgraph 数据质量维度["📊 数据质量六大维度"] A[完整性
Completeness] --> A1[数据是否缺失
NULL值比例] B[准确性
Accuracy] --> B1[数据是否正确
业务规则校验] C[一致性
Consistency] --> C1[跨系统数据是否一致
主外键关系] D[及时性
Timeliness] --> D1[数据是否按时到达
延迟监控] E[唯一性
Uniqueness] --> E1[是否有重复数据
主键唯一性] F[有效性
Validity] --> F1[数据格式是否正确
枚举值范围] end style A fill:#ff6b6b style B fill:#4ecdc4 style C fill:#ffe66d style D fill:#9b59b6 style E fill:#3498db style F fill:#e74c3c

5.2 数据质量检查框架

# data_quality/quality_checker.py

from abc import ABC, abstractmethod
from typing import List, Dict, Any
from datetime import datetime
import logging

class QualityRule(ABC):
    """质量规则基类"""
    
    def __init__(self, table: str, column: str = None, threshold: float = 0):
        self.table = table
        self.column = column
        self.threshold = threshold
    
    @abstractmethod
    def check(self, context: Dict) -> Dict[str, Any]:
        """执行检查,返回结果"""
        pass


class NullCheckRule(QualityRule):
    """空值检查"""
    
    def check(self, context: Dict) -> Dict[str, Any]:
        dt = context['dt']
        sql = f"""
            SELECT 
                COUNT(*) as total,
                SUM(CASE WHEN {self.column} IS NULL THEN 1 ELSE 0 END) as null_count
            FROM {self.table}
            WHERE dt = '{dt}'
        """
        # 执行SQL获取结果
        result = execute_hive_sql(sql)
        total, null_count = result[0]
        null_rate = null_count / total if total > 0 else 0
        
        return {
            'rule': 'null_check',
            'table': self.table,
            'column': self.column,
            'total': total,
            'null_count': null_count,
            'null_rate': null_rate,
            'threshold': self.threshold,
            'passed': null_rate <= self.threshold,
        }


class UniqueCheckRule(QualityRule):
    """唯一性检查"""
    
    def check(self, context: Dict) -> Dict[str, Any]:
        dt = context['dt']
        sql = f"""
            SELECT 
                COUNT(*) as total,
                COUNT(DISTINCT {self.column}) as distinct_count
            FROM {self.table}
            WHERE dt = '{dt}'
        """
        result = execute_hive_sql(sql)
        total, distinct_count = result[0]
        duplicate_count = total - distinct_count
        
        return {
            'rule': 'unique_check',
            'table': self.table,
            'column': self.column,
            'total': total,
            'distinct_count': distinct_count,
            'duplicate_count': duplicate_count,
            'passed': duplicate_count == 0,
        }


class RangeCheckRule(QualityRule):
    """范围检查"""
    
    def __init__(self, table: str, column: str, min_val: float, max_val: float):
        super().__init__(table, column)
        self.min_val = min_val
        self.max_val = max_val
    
    def check(self, context: Dict) -> Dict[str, Any]:
        dt = context['dt']
        sql = f"""
            SELECT 
                COUNT(*) as total,
                SUM(CASE WHEN {self.column} < {self.min_val} OR {self.column} > {self.max_val} 
                    THEN 1 ELSE 0 END) as out_of_range
            FROM {self.table}
            WHERE dt = '{dt}'
        """
        result = execute_hive_sql(sql)
        total, out_of_range = result[0]
        
        return {
            'rule': 'range_check',
            'table': self.table,
            'column': self.column,
            'total': total,
            'out_of_range': out_of_range,
            'passed': out_of_range == 0,
        }


class RecordCountCheckRule(QualityRule):
    """记录数检查"""
    
    def __init__(self, table: str, min_count: int = 1):
        super().__init__(table)
        self.min_count = min_count
    
    def check(self, context: Dict) -> Dict[str, Any]:
        dt = context['dt']
        sql = f"SELECT COUNT(*) FROM {self.table} WHERE dt = '{dt}'"
        result = execute_hive_sql(sql)
        count = result[0][0]
        
        return {
            'rule': 'record_count',
            'table': self.table,
            'count': count,
            'min_count': self.min_count,
            'passed': count >= self.min_count,
        }


class CrossTableCheckRule(QualityRule):
    """跨表一致性检查"""
    
    def __init__(self, source_table: str, target_table: str, 
                 source_column: str, target_column: str):
        super().__init__(source_table)
        self.target_table = target_table
        self.source_column = source_column
        self.target_column = target_column
    
    def check(self, context: Dict) -> Dict[str, Any]:
        dt = context['dt']
        sql = f"""
            SELECT COUNT(*) 
            FROM {self.table} a
            LEFT JOIN {self.target_table} b 
                ON a.{self.source_column} = b.{self.target_column} 
                AND b.dt = '{dt}'
            WHERE a.dt = '{dt}' AND b.{self.target_column} IS NULL
        """
        result = execute_hive_sql(sql)
        orphan_count = result[0][0]
        
        return {
            'rule': 'cross_table_check',
            'source_table': self.table,
            'target_table': self.target_table,
            'orphan_count': orphan_count,
            'passed': orphan_count == 0,
        }


class QualityChecker:
    """质量检查执行器"""
    
    def __init__(self):
        self.rules: List[QualityRule] = []
        self.results: List[Dict] = []
    
    def add_rule(self, rule: QualityRule):
        self.rules.append(rule)
        return self
    
    def run(self, context: Dict) -> bool:
        """执行所有检查"""
        self.results = []
        all_passed = True
        
        for rule in self.rules:
            try:
                result = rule.check(context)
                result['timestamp'] = datetime.now().isoformat()
                self.results.append(result)
                
                if not result['passed']:
                    all_passed = False
                    logging.error(f"❌ 质量检查失败: {result}")
                else:
                    logging.info(f"✅ 质量检查通过: {result['rule']} - {result.get('table')}")
            except Exception as e:
                logging.error(f"质量检查异常: {rule.__class__.__name__} - {e}")
                all_passed = False
        
        return all_passed
    
    def get_report(self) -> Dict:
        """生成质量报告"""
        passed = sum(1 for r in self.results if r['passed'])
        failed = len(self.results) - passed
        
        return {
            'total_rules': len(self.results),
            'passed': passed,
            'failed': failed,
            'pass_rate': passed / len(self.results) if self.results else 0,
            'details': self.results,
        }


# 使用示例
if __name__ == '__main__':
    checker = QualityChecker()
    
    # 添加检查规则
    checker.add_rule(RecordCountCheckRule('ods.ods_user_info', min_count=1000))
    checker.add_rule(NullCheckRule('ods.ods_user_info', 'user_name', threshold=0.01))
    checker.add_rule(UniqueCheckRule('ods.ods_user_info', 'id'))
    checker.add_rule(RangeCheckRule('ods.ods_user_info', 'age', 0, 150))
    checker.add_rule(CrossTableCheckRule(
        'ods.ods_order_info', 'ods.ods_user_info',
        'user_id', 'id'
    ))
    
    # 执行检查
    context = {'dt': '2024-01-15'}
    all_passed = checker.run(context)
    
    # 获取报告
    report = checker.get_report()
    print(f"检查结果: {'全部通过' if all_passed else '存在失败'}")
    print(f"通过率: {report['pass_rate']:.2%}")

5.3 集成到Airflow

# dags/data_quality_dag.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from data_quality.quality_checker import (
    QualityChecker, NullCheckRule, UniqueCheckRule, 
    RecordCountCheckRule, RangeCheckRule
)

def run_quality_check(**context):
    """执行数据质量检查"""
    dt = context['ds']
    
    checker = QualityChecker()
    checker.add_rule(RecordCountCheckRule('ods.ods_user_info', min_count=1000))
    checker.add_rule(NullCheckRule('ods.ods_user_info', 'user_name', threshold=0.01))
    checker.add_rule(UniqueCheckRule('ods.ods_user_info', 'id'))
    
    all_passed = checker.run({'dt': dt})
    report = checker.get_report()
    
    # 保存报告到XCom
    context['ti'].xcom_push(key='quality_report', value=report)
    
    if not all_passed:
        raise ValueError(f"数据质量检查失败!通过率: {report['pass_rate']:.2%}")
    
    return report

with DAG('data_quality_check', ...) as dag:
    quality_task = PythonOperator(
        task_id='run_quality_check',
        python_callable=run_quality_check,
    )

六、告警系统

6.1 告警体系设计

graph TB subgraph 告警体系["🚨 告警体系"] subgraph 告警源["告警来源"] S1[任务失败] S2[任务超时] S3[数据质量异常] S4[数据延迟] S5[资源异常] end subgraph 告警处理["告警处理"] P1[告警聚合] P2[告警分级] P3[告警抑制] P4[告警升级] end subgraph 通知渠道["通知渠道"] N1[企业微信] N2[钉钉] N3[邮件] N4[短信] N5[电话] end subgraph 告警级别["告警级别"] L1["P0 - 紧急
电话+短信+IM"] L2["P1 - 重要
短信+IM"] L3["P2 - 一般
IM"] L4["P3 - 提示
邮件"] end 告警源 --> 告警处理 --> 通知渠道 end style L1 fill:#ff0000,color:#fff style L2 fill:#ff6b6b style L3 fill:#ffe66d style L4 fill:#4ecdc4

6.2 告警通知实现

# alert/notifier.py

import requests
import json
from abc import ABC, abstractmethod
from typing import Dict, List
import logging

class AlertNotifier(ABC):
    """告警通知器基类"""
    
    @abstractmethod
    def send(self, alert: Dict) -> bool:
        pass


class WeComNotifier(AlertNotifier):
    """企业微信通知"""
    
    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url
    
    def send(self, alert: Dict) -> bool:
        level_emoji = {
            'P0': '🔴',
            'P1': '🟠',
            'P2': '🟡',
            'P3': '🟢',
        }
        
        emoji = level_emoji.get(alert.get('level', 'P2'), '⚪')
        
        content = f"""
{emoji} **数据告警通知**

> **告警级别**: {alert.get('level', 'P2')}
> **告警类型**: {alert.get('type', '未知')}
> **告警时间**: {alert.get('time', '')}
> **任务名称**: {alert.get('task_name', '')}
> **告警内容**: {alert.get('message', '')}

请及时处理!
        """
        
        data = {
            "msgtype": "markdown",
            "markdown": {"content": content}
        }
        
        try:
            response = requests.post(
                self.webhook_url,
                json=data,
                timeout=10
            )
            return response.status_code == 200
        except Exception as e:
            logging.error(f"企业微信通知失败: {e}")
            return False


class DingTalkNotifier(AlertNotifier):
    """钉钉通知"""
    
    def __init__(self, webhook_url: str, secret: str = None):
        self.webhook_url = webhook_url
        self.secret = secret
    
    def _get_sign_url(self) -> str:
        if not self.secret:
            return self.webhook_url
        
        import time
        import hmac
        import hashlib
        import base64
        import urllib.parse
        
        timestamp = str(round(time.time() * 1000))
        secret_enc = self.secret.encode('utf-8')
        string_to_sign = f'{timestamp}\n{self.secret}'
        string_to_sign_enc = string_to_sign.encode('utf-8')
        hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
        sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
        
        return f"{self.webhook_url}&timestamp={timestamp}&sign={sign}"
    
    def send(self, alert: Dict) -> bool:
        content = f"""
【数据告警】
告警级别: {alert.get('level', 'P2')}
告警类型: {alert.get('type', '未知')}
告警时间: {alert.get('time', '')}
任务名称: {alert.get('task_name', '')}
告警内容: {alert.get('message', '')}
        """
        
        data = {
            "msgtype": "text",
            "text": {"content": content},
            "at": {"isAtAll": alert.get('level') == 'P0'}
        }
        
        try:
            response = requests.post(
                self._get_sign_url(),
                json=data,
                timeout=10
            )
            return response.status_code == 200
        except Exception as e:
            logging.error(f"钉钉通知失败: {e}")
            return False


class EmailNotifier(AlertNotifier):
    """邮件通知"""
    
    def __init__(self, smtp_host: str, smtp_port: int, 
                 username: str, password: str, from_addr: str):
        self.smtp_host = smtp_host
        self.smtp_port = smtp_port
        self.username = username
        self.password = password
        self.from_addr = from_addr
    
    def send(self, alert: Dict) -> bool:
        import smtplib
        from email.mime.text import MIMEText
        from email.mime.multipart import MIMEMultipart
        
        to_addrs = alert.get('to_addrs', [])
        if not to_addrs:
            return False
        
        msg = MIMEMultipart()
        msg['From'] = self.from_addr
        msg['To'] = ', '.join(to_addrs)
        msg['Subject'] = f"[{alert.get('level', 'P2')}] 数据告警: {alert.get('task_name', '')}"
        
        body = f"""
        <h2>数据告警通知</h2>
        <table border="1" cellpadding="10">
            <tr><td><b>告警级别</b></td><td>{alert.get('level', 'P2')}</td></tr>
            <tr><td><b>告警类型</b></td><td>{alert.get('type', '未知')}</td></tr>
            <tr><td><b>告警时间</b></td><td>{alert.get('time', '')}</td></tr>
            <tr><td><b>任务名称</b></td><td>{alert.get('task_name', '')}</td></tr>
            <tr><td><b>告警内容</b></td><td>{alert.get('message', '')}</td></tr>
        </table>
        <p>请及时处理!</p>
        """
        
        msg.attach(MIMEText(body, 'html'))
        
        try:
            with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
                server.starttls()
                server.login(self.username, self.password)
                server.sendmail(self.from_addr, to_addrs, msg.as_string())
            return True
        except Exception as e:
            logging.error(f"邮件发送失败: {e}")
            return False


class AlertManager:
    """告警管理器"""
    
    def __init__(self):
        self.notifiers: Dict[str, List[AlertNotifier]] = {
            'P0': [],
            'P1': [],
            'P2': [],
            'P3': [],
        }
    
    def register_notifier(self, level: str, notifier: AlertNotifier):
        """注册通知器到指定级别"""
        if level in self.notifiers:
            self.notifiers[level].append(notifier)
        return self
    
    def send_alert(self, alert: Dict):
        """发送告警"""
        level = alert.get('level', 'P2')
        notifiers = self.notifiers.get(level, [])
        
        for notifier in notifiers:
            try:
                notifier.send(alert)
            except Exception as e:
                logging.error(f"告警发送失败: {e}")


# 使用示例
if __name__ == '__main__':
    from datetime import datetime
    
    # 初始化告警管理器
    manager = AlertManager()
    
    # 注册通知器
    wecom = WeComNotifier("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx")
    dingtalk = DingTalkNotifier("https://oapi.dingtalk.com/robot/send?access_token=xxx")
    
    # P0级别:企业微信 + 钉钉
    manager.register_notifier('P0', wecom)
    manager.register_notifier('P0', dingtalk)
    
    # P1/P2级别:企业微信
    manager.register_notifier('P1', wecom)
    manager.register_notifier('P2', wecom)
    
    # 发送告警
    manager.send_alert({
        'level': 'P1',
        'type': '任务失败',
        'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'task_name': 'sync_order_info',
        'message': 'DataX同步任务失败,错误信息:Connection refused',
    })

6.3 Airflow告警回调

# dags/utils/alert_callback.py

from datetime import datetime
from alert.notifier import AlertManager, WeComNotifier

def get_alert_manager():
    manager = AlertManager()
    wecom = WeComNotifier("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx")
    manager.register_notifier('P1', wecom)
    manager.register_notifier('P2', wecom)
    return manager

def task_failure_callback(context):
    """任务失败回调"""
    task_instance = context['task_instance']
    dag_id = context['dag'].dag_id
    task_id = task_instance.task_id
    execution_date = context['execution_date']
    exception = context.get('exception', 'Unknown error')
    
    manager = get_alert_manager()
    manager.send_alert({
        'level': 'P1',
        'type': '任务失败',
        'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'task_name': f"{dag_id}.{task_id}",
        'message': f"执行日期: {execution_date}\n错误信息: {str(exception)[:500]}",
    })

def task_retry_callback(context):
    """任务重试回调"""
    task_instance = context['task_instance']
    dag_id = context['dag'].dag_id
    task_id = task_instance.task_id
    
    manager = get_alert_manager()
    manager.send_alert({
        'level': 'P2',
        'type': '任务重试',
        'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'task_name': f"{dag_id}.{task_id}",
        'message': f"任务正在进行第 {task_instance.try_number} 次重试",
    })

def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    """SLA超时回调"""
    manager = get_alert_manager()
    manager.send_alert({
        'level': 'P1',
        'type': 'SLA超时',
        'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'task_name': dag.dag_id,
        'message': f"以下任务超过SLA时间: {[t.task_id for t in task_list]}",
    })


# 在DAG中使用
default_args = {
    'owner': 'data_team',
    'on_failure_callback': task_failure_callback,
    'on_retry_callback': task_retry_callback,
    'sla': timedelta(hours=2),
}

with DAG(
    dag_id='my_dag',
    default_args=default_args,
    sla_miss_callback=sla_miss_callback,
    ...
) as dag:
    pass

七、元数据管理

7.1 元数据管理架构

graph TB subgraph 元数据管理["📋 元数据管理架构"] subgraph 数据源["数据源元数据"] DS1[Hive Metastore] DS2[MySQL Schema] DS3[Kafka Topic] end subgraph 采集["元数据采集"] C1[Atlas Collector] C2[DataHub Ingestion] C3[自定义采集器] end subgraph 存储["元数据存储"] Store[图数据库/关系库] end subgraph 应用["元数据应用"] A1[数据目录] A2[血缘追踪] A3[影响分析] A4[数据质量] end 数据源 --> 采集 --> 存储 --> 应用 end style Store fill:#ff6b6b style A2 fill:#4ecdc4

7.2 数据血缘追踪

graph LR subgraph 血缘示例["🔗 数据血缘示例"] subgraph ODS["ODS层"] O1[(ods_user)] O2[(ods_order)] O3[(ods_product)] end subgraph DWD["DWD层"] D1[(dwd_user)] D2[(dwd_order_detail)] end subgraph DWS["DWS层"] S1[(dws_user_stats)] end subgraph ADS["ADS层"] A1[(ads_daily_report)] end O1 --> D1 O2 --> D2 O3 --> D2 D1 --> S1 D2 --> S1 S1 --> A1 end style O1 fill:#4ecdc4 style O2 fill:#4ecdc4 style O3 fill:#4ecdc4 style D1 fill:#ffe66d style D2 fill:#ffe66d style S1 fill:#ff6b6b style A1 fill:#9b59b6

7.3 简单的血缘管理实现

# metadata/lineage.py

from typing import Dict, List, Set
from collections import defaultdict
import json

class LineageManager:
    """数据血缘管理器"""
    
    def __init__(self):
        # 上游依赖:table -> [upstream_tables]
        self.upstream: Dict[str, Set[str]] = defaultdict(set)
        # 下游影响:table -> [downstream_tables]
        self.downstream: Dict[str, Set[str]] = defaultdict(set)
        # 表信息
        self.tables: Dict[str, Dict] = {}
    
    def register_table(self, table_name: str, info: Dict = None):
        """注册表"""
        self.tables[table_name] = info or {}
    
    def add_lineage(self, source: str, target: str, transform: str = None):
        """添加血缘关系"""
        self.upstream[target].add(source)
        self.downstream[source].add(target)
    
    def get_upstream(self, table: str, depth: int = -1) -> Set[str]:
        """获取上游依赖(递归)"""
        result = set()
        visited = set()
        
        def _traverse(t: str, current_depth: int):
            if t in visited:
                return
            if depth != -1 and current_depth > depth:
                return
            
            visited.add(t)
            for upstream_table in self.upstream.get(t, []):
                result.add(upstream_table)
                _traverse(upstream_table, current_depth + 1)
        
        _traverse(table, 0)
        return result
    
    def get_downstream(self, table: str, depth: int = -1) -> Set[str]:
        """获取下游影响(递归)"""
        result = set()
        visited = set()
        
        def _traverse(t: str, current_depth: int):
            if t in visited:
                return
            if depth != -1 and current_depth > depth:
                return
            
            visited.add(t)
            for downstream_table in self.downstream.get(t, []):
                result.add(downstream_table)
                _traverse(downstream_table, current_depth + 1)
        
        _traverse(table, 0)
        return result
    
    def impact_analysis(self, table: str) -> Dict:
        """影响分析"""
        downstream_tables = self.get_downstream(table)
        
        return {
            'table': table,
            'direct_impact': list(self.downstream.get(table, [])),
            'total_impact': list(downstream_tables),
            'impact_count': len(downstream_tables),
        }
    
    def to_json(self) -> str:
        """导出为JSON"""
        return json.dumps({
            'tables': self.tables,
            'lineage': [
                {'source': s, 'target': t}
                for t, sources in self.upstream.items()
                for s in sources
            ]
        }, indent=2)


# 使用示例
if __name__ == '__main__':
    lineage = LineageManager()
    
    # 注册表
    lineage.register_table('ods.ods_user', {'layer': 'ODS', 'owner': 'data_team'})
    lineage.register_table('ods.ods_order', {'layer': 'ODS', 'owner': 'data_team'})
    lineage.register_table('dwd.dwd_user', {'layer': 'DWD', 'owner': 'data_team'})
    lineage.register_table('dwd.dwd_order', {'layer': 'DWD', 'owner': 'data_team'})
    lineage.register_table('dws.dws_user_stats', {'layer': 'DWS', 'owner': 'data_team'})
    lineage.register_table('ads.ads_report', {'layer': 'ADS', 'owner': 'bi_team'})
    
    # 添加血缘
    lineage.add_lineage('ods.ods_user', 'dwd.dwd_user')
    lineage.add_lineage('ods.ods_order', 'dwd.dwd_order')
    lineage.add_lineage('dwd.dwd_user', 'dws.dws_user_stats')
    lineage.add_lineage('dwd.dwd_order', 'dws.dws_user_stats')
    lineage.add_lineage('dws.dws_user_stats', 'ads.ads_report')
    
    # 查询上游
    print("dws.dws_user_stats 的上游依赖:")
    print(lineage.get_upstream('dws.dws_user_stats'))
    # {'ods.ods_user', 'ods.ods_order', 'dwd.dwd_user', 'dwd.dwd_order'}
    
    # 影响分析
    print("\nods.ods_user 的影响分析:")
    print(lineage.impact_analysis('ods.ods_user'))
    # {'table': 'ods.ods_user', 'direct_impact': ['dwd.dwd_user'], 
    #  'total_impact': ['dwd.dwd_user', 'dws.dws_user_stats', 'ads.ads_report'], 
    #  'impact_count': 3}

八、完整案例:电商数据入库

8.1 整体架构

整体架构:

graph LR subgraph Source["🗄️ 数据源"] DB[(业务数据库)] end subgraph Collect["📥 采集"] DataX[DataX] CDC[Flink CDC] end subgraph Warehouse["🏠 数据仓库"] ODS[ODS层] DWD[DWD层] DWS[DWS层] ADS[ADS层] end subgraph Service["🖥️ 应用"] BI[BI报表] API[数据API] end subgraph Ops["⚙️ 运维"] Scheduler[Airflow调度] Monitor[监控告警] end DB --> DataX & CDC DataX & CDC --> ODS ODS --> DWD --> DWS --> ADS ADS --> BI & API Scheduler -.-> Collect Scheduler -.-> Warehouse Monitor -.-> Warehouse style ODS fill:#4ecdc4 style DWD fill:#ffe66d style DWS fill:#ff6b6b style ADS fill:#9b59b6

数据源详情:

graph LR subgraph 业务数据库["🗄️ 业务数据库"] M1[(用户库
user_db)] M2[(订单库
order_db)] M3[(商品库
product_db)] M4[(支付库
payment_db)] end subgraph 采集方式["📥 采集方式"] DX[DataX
批量T+1] FC[Flink CDC
实时同步] end M1 & M2 & M3 & M4 --> DX M2 & M4 --> FC style DX fill:#ff6b6b style FC fill:#4ecdc4

数据仓库分层:

graph TB subgraph ODS["📦 ODS层 - 原始数据"] O1[ods_user_info] O2[ods_order_info] O3[ods_product_info] O4[ods_payment_info] end subgraph DWD["🔧 DWD层 - 明细数据"] D1[dwd_user_detail] D2[dwd_order_detail] D3[dwd_payment_detail] end subgraph DWS["📊 DWS层 - 汇总数据"] S1[dws_user_daily_stats] S2[dws_trade_daily_stats] S3[dws_product_daily_stats] end subgraph ADS["🎯 ADS层 - 应用数据"] A1[ads_dashboard_daily] A2[ads_user_portrait] A3[ads_product_ranking] end O1 --> D1 O2 --> D2 O3 --> D2 O4 --> D3 D1 & D2 --> S1 D2 & D3 --> S2 D2 --> S3 S1 & S2 & S3 --> A1 S1 --> A2 S3 --> A3 style ODS fill:#4ecdc4 style DWD fill:#ffe66d style DWS fill:#ff6b6b style ADS fill:#9b59b6

调度与监控:

graph LR subgraph Scheduler["⏰ Airflow调度"] DAG[数仓DAG] end subgraph Tasks["📋 任务流"] T1[ODS同步] --> T2[数据质量检查] T2 --> T3[DWD处理] T3 --> T4[DWS聚合] T4 --> T5[ADS生成] T5 --> T6[数据导出] end subgraph Monitor["👁️ 监控告警"] Q[数据质量监控] A[任务告警] end DAG --> T1 T2 --> Q Tasks --> A style DAG fill:#ff6b6b style Q fill:#4ecdc4 style A fill:#ffe66d

8.2 完整DAG实现

# dags/ecommerce_data_warehouse.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule

# 导入自定义模块
from utils.alert_callback import task_failure_callback
from utils.quality_checker import run_quality_check
from utils.notifier import send_success_notification

# 配置
DATAX_HOME = '/opt/datax'
JOB_DIR = f'{DATAX_HOME}/job/ecommerce'
HIVE_DB = 'ecommerce'

# 需要同步的表
SYNC_TABLES = [
    {'name': 'user_info', 'split_key': 'id', 'priority': 1},
    {'name': 'order_info', 'split_key': 'order_id', 'priority': 1},
    {'name': 'order_detail', 'split_key': 'id', 'priority': 2},
    {'name': 'product_info', 'split_key': 'id', 'priority': 1},
    {'name': 'payment_info', 'split_key': 'id', 'priority': 2},
]

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'email': ['data-alert@company.com'],
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'on_failure_callback': task_failure_callback,
}

with DAG(
    dag_id='ecommerce_data_warehouse_daily',
    default_args=default_args,
    description='电商数据仓库每日入库流程',
    schedule_interval='0 1 * * *',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['ecommerce', 'data_warehouse', 'daily'],
    max_active_runs=1,
    dagrun_timeout=timedelta(hours=6),
) as dag:
    
    # ========== 开始 ==========
    start = EmptyOperator(task_id='start')
    
    # ========== 环境检查 ==========
    def check_environment(**context):
        """检查运行环境"""
        import subprocess
        
        # 检查HDFS
        result = subprocess.run(['hdfs', 'dfs', '-ls', '/'], capture_output=True)
        if result.returncode != 0:
            raise Exception("HDFS不可用")
        
        # 检查Hive
        result = subprocess.run(['hive', '-e', 'show databases'], capture_output=True)
        if result.returncode != 0:
            raise Exception("Hive不可用")
        
        return True
    
    env_check = PythonOperator(
        task_id='environment_check',
        python_callable=check_environment,
    )
    
    # ========== ODS层:数据同步 ==========
    with TaskGroup(group_id='ods_sync') as ods_sync:
        
        sync_tasks = []
        for table in SYNC_TABLES:
            task = BashOperator(
                task_id=f"sync_{table['name']}",
                bash_command=f'''
                    python {DATAX_HOME}/bin/datax.py \
                        -p "-Ddt={{{{ ds }}}} -Dtable={table['name']}" \
                        {JOB_DIR}/{table['name']}.json \
                        >> /var/log/datax/{table['name']}_{{{{ ds_nodash }}}}.log 2>&1
                ''',
                pool='datax_pool',
                priority_weight=table['priority'],
            )
            sync_tasks.append(task)
    
    # ========== ODS层:数据质量检查 ==========
    with TaskGroup(group_id='ods_quality') as ods_quality:
        
        def check_ods_quality(**context):
            from data_quality.quality_checker import QualityChecker, RecordCountCheckRule, NullCheckRule
            
            dt = context['ds']
            checker = QualityChecker()
            
            for table in SYNC_TABLES:
                table_name = f"ods.ods_{table['name']}"
                checker.add_rule(RecordCountCheckRule(table_name, min_count=1))
                checker.add_rule(NullCheckRule(table_name, table['split_key'], threshold=0))
            
            all_passed = checker.run({'dt': dt})
            
            if not all_passed:
                report = checker.get_report()
                raise ValueError(f"ODS层数据质量检查失败: {report}")
            
            return True
        
        ods_quality_check = PythonOperator(
            task_id='ods_quality_check',
            python_callable=check_ods_quality,
        )
    
    # ========== DWD层:数据清洗 ==========
    with TaskGroup(group_id='dwd_process') as dwd_process:
        
        dwd_user = BashOperator(
            task_id='dwd_user_info',
            bash_command='''
                hive -e "
                    INSERT OVERWRITE TABLE dwd.dwd_user_info PARTITION(dt='{{ ds }}')
                    SELECT 
                        id,
                        user_name,
                        phone,
                        CASE 
                            WHEN gender IN ('M', 'F') THEN gender 
                            ELSE 'U' 
                        END as gender,
                        CASE 
                            WHEN age BETWEEN 0 AND 150 THEN age 
                            ELSE NULL 
                        END as age,
                        COALESCE(province, '未知') as province,
                        COALESCE(city, '未知') as city,
                        create_time,
                        update_time,
                        current_timestamp() as etl_time
                    FROM ods.ods_user_info
                    WHERE dt = '{{ ds }}';
                "
            ''',
        )
        
        dwd_order = BashOperator(
            task_id='dwd_order_detail',
            bash_command='''
                hive -e "
                    INSERT OVERWRITE TABLE dwd.dwd_order_detail PARTITION(dt='{{ ds }}')
                    SELECT 
                        o.order_id,
                        o.user_id,
                        d.product_id,
                        d.product_name,
                        d.quantity,
                        d.unit_price,
                        d.quantity * d.unit_price as total_amount,
                        o.order_status,
                        o.create_time,
                        o.pay_time,
                        o.deliver_time,
                        o.receive_time,
                        current_timestamp() as etl_time
                    FROM ods.ods_order_info o
                    JOIN ods.ods_order_detail d 
                        ON o.order_id = d.order_id AND d.dt = '{{ ds }}'
                    WHERE o.dt = '{{ ds }}';
                "
            ''',
        )
        
        dwd_payment = BashOperator(
            task_id='dwd_payment_detail',
            bash_command='''
                hive -e "
                    INSERT OVERWRITE TABLE dwd.dwd_payment_detail PARTITION(dt='{{ ds }}')
                    SELECT 
                        payment_id,
                        order_id,
                        user_id,
                        payment_method,
                        CASE WHEN amount < 0 THEN 0 ELSE amount END as amount,
                        payment_status,
                        payment_time,
                        current_timestamp() as etl_time
                    FROM ods.ods_payment_info
                    WHERE dt = '{{ ds }}';
                "
            ''',
        )
        
        [dwd_user, dwd_order, dwd_payment]
    
    # ========== DWS层:数据汇总 ==========
    with TaskGroup(group_id='dws_process') as dws_process:
        
        dws_user_daily = BashOperator(
            task_id='dws_user_daily',
            bash_command='''
                hive -e "
                    INSERT OVERWRITE TABLE dws.dws_user_daily PARTITION(dt='{{ ds }}')
                    SELECT 
                        u.id as user_id,
                        u.user_name,
                        u.province,
                        u.city,
                        COUNT(DISTINCT o.order_id) as order_count,
                        COALESCE(SUM(o.total_amount), 0) as order_amount,
                        COALESCE(SUM(p.amount), 0) as pay_amount,
                        MAX(o.create_time) as last_order_time
                    FROM dwd.dwd_user_info u
                    LEFT JOIN dwd.dwd_order_detail o 
                        ON u.id = o.user_id AND o.dt = '{{ ds }}'
                    LEFT JOIN dwd.dwd_payment_detail p 
                        ON o.order_id = p.order_id AND p.dt = '{{ ds }}'
                    WHERE u.dt = '{{ ds }}'
                    GROUP BY u.id, u.user_name, u.province, u.city;
                "
            ''',
        )
        
        dws_trade_daily = BashOperator(
            task_id='dws_trade_daily',
            bash_command='''
                hive -e "
                    INSERT OVERWRITE TABLE dws.dws_trade_daily PARTITION(dt='{{ ds }}')
                    SELECT 
                        '{{ ds }}' as stat_date,
                        COUNT(DISTINCT order_id) as order_count,
                        COUNT(DISTINCT user_id) as user_count,
                        SUM(total_amount) as gmv,
                        SUM(total_amount) / COUNT(DISTINCT order_id) as avg_order_amount,
                        SUM(quantity) as total_quantity
                    FROM dwd.dwd_order_detail
                    WHERE dt = '{{ ds }}';
                "
            ''',
        )
        
        [dws_user_daily, dws_trade_daily]
    
    # ========== ADS层:应用数据 ==========
    with TaskGroup(group_id='ads_process') as ads_process:
        
        ads_dashboard = BashOperator(
            task_id='ads_dashboard',
            bash_command='''
                hive -e "
                    INSERT OVERWRITE TABLE ads.ads_dashboard PARTITION(dt='{{ ds }}')
                    SELECT 
                        '{{ ds }}' as stat_date,
                        -- 用户指标
                        (SELECT COUNT(*) FROM dwd.dwd_user_info WHERE dt = '{{ ds }}') as total_users,
                        (SELECT COUNT(*) FROM dwd.dwd_user_info WHERE dt = '{{ ds }}' AND to_date(create_time) = '{{ ds }}') as new_users,
                        -- 交易指标
                        t.order_count,
                        t.user_count as active_users,
                        t.gmv,
                        t.avg_order_amount,
                        -- 支付指标
                        (SELECT COUNT(*) FROM dwd.dwd_payment_detail WHERE dt = '{{ ds }}' AND payment_status = 'SUCCESS') as pay_order_count,
                        (SELECT SUM(amount) FROM dwd.dwd_payment_detail WHERE dt = '{{ ds }}' AND payment_status = 'SUCCESS') as pay_amount
                    FROM dws.dws_trade_daily t
                    WHERE t.dt = '{{ ds }}';
                "
            ''',
        )
        
        ads_dashboard
    
    # ========== 数据导出 ==========
    with TaskGroup(group_id='data_export') as data_export:
        
        export_dashboard = BashOperator(
            task_id='export_dashboard_to_mysql',
            bash_command=f'''
                python {DATAX_HOME}/bin/datax.py \
                    -p "-Ddt={{{{ ds }}}}" \
                    {JOB_DIR}/export_dashboard.json
            ''',
        )
        
        export_dashboard
    
    # ========== 最终质量检查 ==========
    def final_quality_check(**context):
        """最终数据质量检查"""
        # 检查关键指标是否合理
        # 比较今日数据与昨日数据的波动
        pass
    
    final_check = PythonOperator(
        task_id='final_quality_check',
        python_callable=final_quality_check,
    )
    
    # ========== 成功通知 ==========
    def send_success_report(**context):
        """发送成功报告"""
        from utils.notifier import send_success_notification
        
        ds = context['ds']
        send_success_notification(
            dag_id='ecommerce_data_warehouse_daily',
            execution_date=ds,
            message='电商数据仓库每日入库任务完成'
        )
    
    success_notify = PythonOperator(
        task_id='success_notification',
        python_callable=send_success_report,
        trigger_rule=TriggerRule.ALL_SUCCESS,
    )
    
    # ========== 结束 ==========
    end = EmptyOperator(
        task_id='end',
        trigger_rule=TriggerRule.ALL_DONE,
    )
    
    # ========== 依赖关系 ==========
    (
        start 
        >> env_check 
        >> ods_sync 
        >> ods_quality 
        >> dwd_process 
        >> dws_process 
        >> ads_process 
        >> data_export 
        >> final_check 
        >> success_notify 
        >> end
    )

8.3 运行效果

gantt title 电商数据仓库每日执行时间线 dateFormat HH:mm axisFormat %H:%M section 准备阶段 环境检查 :a1, 01:00, 5m section ODS层 sync_user_info :a2, after a1, 15m sync_order_info :a3, after a1, 20m sync_product_info :a4, after a1, 10m sync_payment_info :a5, after a1, 15m 质量检查 :a6, after a3, 5m section DWD层 dwd_user_info :b1, after a6, 10m dwd_order_detail :b2, after a6, 15m dwd_payment_detail :b3, after a6, 10m section DWS层 dws_user_daily :c1, after b2, 20m dws_trade_daily :c2, after b2, 10m section ADS层 ads_dashboard :d1, after c1, 15m section 导出 export_to_mysql :e1, after d1, 10m 最终检查 :e2, after e1, 5m 成功通知 :e3, after e2, 1m

九、最佳实践总结

9.1 设计原则

mindmap root((自动化入库最佳实践)) 设计原则 幂等性 重复执行结果一致 支持失败重跑 可观测性 完整的日志 关键指标监控 数据血缘 容错性 失败自动重试 异常告警 降级方案 可扩展性 模块化设计 配置驱动 易于新增表 实施要点 分层处理 ODS原始数据 DWD清洗数据 DWS汇总数据 ADS应用数据 质量保障 入库前检查 入库后验证 异常数据处理 调度策略 合理的依赖 并行执行 资源隔离

9.2 Checklist

## 自动化入库上线Checklist

### 准备阶段
- [ ] 确认数据源连接信息
- [ ] 确认目标表结构已创建
- [ ] 确认网络连通性
- [ ] 配置文件已准备

### 开发阶段
- [ ] 同步脚本开发完成
- [ ] 数据处理脚本开发完成
- [ ] 数据质量规则已定义
- [ ] 告警配置已完成
- [ ] 单元测试通过

### 测试阶段
- [ ] 测试环境验证通过
- [ ] 全量同步测试通过
- [ ] 增量同步测试通过
- [ ] 失败重试测试通过
- [ ] 告警功能测试通过
- [ ] 性能测试满足要求

### 上线阶段
- [ ] 生产环境配置已更新
- [ ] 调度任务已配置
- [ ] 监控大盘已配置
- [ ] 值班人员已通知
- [ ] 回滚方案已准备

### 运维阶段
- [ ] 每日检查任务状态
- [ ] 定期检查数据质量
- [ ] 定期检查资源使用
- [ ] 定期优化性能

十、写在最后

自动化入库链路,说白了就是把人工操作变成机器操作。但这个过程中,有几个关键点要把握:

  1. 先跑起来,再优化:不要一开始就追求完美,先让数据流动起来
  2. 监控比执行更重要:没有监控的自动化就是定时炸弹
  3. 告警要有价值:告警太多等于没有告警
  4. 文档要跟上:半年后你自己都不记得当初为什么这么写

最后送大家一句话:

"自动化不是目的,稳定可靠的数据才是目的。"

当你搭建好一套自动化入库链路,每天看着数据自己按时入库,质量检查全部通过,报表准时更新——那种感觉,就像看着自己养的孩子自己会吃饭了一样欣慰。

然后你就可以安心摸鱼了。😎


本文作者:一个把手动挡换成自动挡的程序员

最惨经历:调度系统挂了一天没人发现,因为告警也挂了

自动化的尽头是什么?是信任,也是焦虑。


附录:面试高频题

  1. 如何设计一个数据入库的调度系统?

    分层设计(ODS→DWD→DWS→ADS),定义清晰的依赖关系,支持失败重试和告警,保证幂等性。
  2. Airflow和DolphinScheduler怎么选?

    Python团队选Airflow,代码即DAG;需要可视化或中文支持选DolphinScheduler。
  3. 如何保证数据质量?

    多维度检查(完整性、准确性、一致性等),入库前后都要检查,异常及时告警。
  4. 任务失败了怎么处理?

    自动重试 → 重试失败告警 → 人工介入 → 修复后重跑。保证任务幂等性,支持重跑。
  5. 如何处理数据延迟?

    设置SLA监控,超时告警;优化任务性能;必要时并行执行;建立数据延迟的应急预案。
  6. 元数据管理有什么用?

    数据目录(找数据)、血缘追踪(追溯源头)、影响分析(评估变更影响)、质量管理。
评论区
暂无评论
avatar