前言:一个关于"手动挡"的噩梦
我曾经带过一个实习生,人很聪明,就是有个"坏习惯"——喜欢手动跑任务。
每天早上8点,他准时打开电脑,开始他的"日常仪式":
08:00 - 手动执行DataX同步用户表
08:15 - 手动执行DataX同步订单表
08:30 - 手动执行DataX同步商品表
08:45 - 手动执行Hive ODS层清洗脚本
09:00 - 手动执行Hive DWD层加工脚本
09:30 - 手动执行Hive DWS层聚合脚本
10:00 - 手动执行ADS层报表脚本
10:30 - 检查数据,发现订单表同步失败,从头再来...有一天,他请假了。
那天早上,老板打开报表系统,发现数据还是昨天的。
然后,我的电话响了......
"为什么不做成自动化?!"
从那以后,我深刻理解了一个道理:
手动挡的尽头是翻车,自动化的尽头是摸鱼。
今天,让我们来聊聊如何搭建一套完整的自动化入库链路,让数据自己跑起来。
一、什么是自动化入库链路?
1.1 入库链路全景图
graph TB
subgraph 数据源["🗄️ 数据源"]
MySQL[(MySQL)]
Oracle[(Oracle)]
API[API接口]
Log[日志文件]
Kafka[Kafka]
end
subgraph 采集层["📥 采集层"]
DataX[DataX
批量采集] FlinkCDC[Flink CDC
实时采集] Flume[Flume
日志采集] end subgraph 存储层["💾 存储层"] subgraph 数据湖仓["数据湖/仓"] ODS[ODS层
原始数据] DWD[DWD层
明细数据] DWS[DWS层
汇总数据] ADS[ADS层
应用数据] end end subgraph 调度层["⏰ 调度层"] Scheduler[调度系统
Airflow/DolphinScheduler] end subgraph 监控层["👁️ 监控层"] Quality[数据质量监控] Alert[告警系统] Lineage[数据血缘] end subgraph 服务层["🖥️ 服务层"] BI[BI报表] API2[数据API] ML[机器学习] end 数据源 --> 采集层 采集层 --> ODS ODS --> DWD --> DWS --> ADS ADS --> 服务层 Scheduler --> |"调度"|采集层 Scheduler --> |"调度"|存储层 监控层 --> |"监控"|采集层 监控层 --> |"监控"|存储层 style Scheduler fill:#ff6b6b style Quality fill:#4ecdc4
批量采集] FlinkCDC[Flink CDC
实时采集] Flume[Flume
日志采集] end subgraph 存储层["💾 存储层"] subgraph 数据湖仓["数据湖/仓"] ODS[ODS层
原始数据] DWD[DWD层
明细数据] DWS[DWS层
汇总数据] ADS[ADS层
应用数据] end end subgraph 调度层["⏰ 调度层"] Scheduler[调度系统
Airflow/DolphinScheduler] end subgraph 监控层["👁️ 监控层"] Quality[数据质量监控] Alert[告警系统] Lineage[数据血缘] end subgraph 服务层["🖥️ 服务层"] BI[BI报表] API2[数据API] ML[机器学习] end 数据源 --> 采集层 采集层 --> ODS ODS --> DWD --> DWS --> ADS ADS --> 服务层 Scheduler --> |"调度"|采集层 Scheduler --> |"调度"|存储层 监控层 --> |"监控"|采集层 监控层 --> |"监控"|存储层 style Scheduler fill:#ff6b6b style Quality fill:#4ecdc4
1.2 自动化的核心要素
mindmap
root((自动化入库链路))
调度编排
定时触发
依赖管理
失败重试
并行控制
数据同步
全量同步
增量同步
实时同步
数据处理
数据清洗
数据转换
数据聚合
质量保障
数据校验
异常检测
数据修复
监控告警
任务监控
数据监控
告警通知
元数据管理
表结构管理
血缘追踪
影响分析
1.3 为什么要自动化?
| 手动操作 | 自动化 |
|---|---|
| 😫 每天重复执行 | 😎 配置一次,永久运行 |
| 😰 容易遗漏步骤 | ✅ 严格按流程执行 |
| 🤯 出错难以追溯 | 📊 完整的执行日志 |
| 😭 请假了没人顶 | 🤖 7×24小时运行 |
| 🐌 串行执行慢 | 🚀 并行执行快 |
| 📞 失败靠人发现 | 📱 自动告警通知 |
二、调度系统选型
2.1 主流调度系统对比
graph TB
subgraph 调度系统["⏰ 主流调度系统"]
subgraph 开源方案["开源方案"]
Airflow[Apache Airflow
Python生态首选] DS[DolphinScheduler
国产之光] Azkaban[Azkaban
LinkedIn出品] Oozie[Oozie
Hadoop原生] end subgraph 商业方案["商业/云方案"] DataWorks[DataWorks
阿里云] CDP[CDP
Cloudera] AWS[AWS Step Functions] end end style Airflow fill:#4ecdc4 style DS fill:#4ecdc4
Python生态首选] DS[DolphinScheduler
国产之光] Azkaban[Azkaban
LinkedIn出品] Oozie[Oozie
Hadoop原生] end subgraph 商业方案["商业/云方案"] DataWorks[DataWorks
阿里云] CDP[CDP
Cloudera] AWS[AWS Step Functions] end end style Airflow fill:#4ecdc4 style DS fill:#4ecdc4
2.2 详细对比表
| 特性 | Airflow | DolphinScheduler | Azkaban | Oozie |
|---|---|---|---|---|
| 开发语言 | Python | Java | Java | Java |
| DAG定义 | Python代码 | 可视化拖拽 | Properties | XML |
| 学习曲线 | 中等 | 低 | 低 | 高 |
| 扩展性 | 强(Operator) | 强(插件) | 一般 | 弱 |
| 社区活跃 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐ | ⭐ |
| 可视化 | 好 | 很好 | 一般 | 差 |
| 资源隔离 | 需配置 | 原生支持 | 弱 | 弱 |
| 中文支持 | 差 | 好 | 差 | 差 |
2.3 选型建议
flowchart TB
Start[选择调度系统] --> Q1{团队技术栈?}
Q1 --> |"Python为主"|Airflow1[Airflow
Python原生DAG] Q1 --> |"Java为主"|Q2{需要可视化?} Q2 --> |"是"|DS1[DolphinScheduler
拖拽式编排] Q2 --> |"否"|Q3{已有Hadoop?} Q3 --> |"是"|Oozie1[Oozie
Hadoop原生] Q3 --> |"否"|Azkaban1[Azkaban
轻量级] Airflow1 --> Q4{公司规模?} DS1 --> Q4 Q4 --> |"大厂/复杂场景"|Final1[Airflow + Celery
分布式执行] Q4 --> |"中小团队"|Final2[DolphinScheduler
开箱即用] style Airflow1 fill:#4ecdc4 style DS1 fill:#4ecdc4 style Final1 fill:#ffe66d style Final2 fill:#ffe66d
Python原生DAG] Q1 --> |"Java为主"|Q2{需要可视化?} Q2 --> |"是"|DS1[DolphinScheduler
拖拽式编排] Q2 --> |"否"|Q3{已有Hadoop?} Q3 --> |"是"|Oozie1[Oozie
Hadoop原生] Q3 --> |"否"|Azkaban1[Azkaban
轻量级] Airflow1 --> Q4{公司规模?} DS1 --> Q4 Q4 --> |"大厂/复杂场景"|Final1[Airflow + Celery
分布式执行] Q4 --> |"中小团队"|Final2[DolphinScheduler
开箱即用] style Airflow1 fill:#4ecdc4 style DS1 fill:#4ecdc4 style Final1 fill:#ffe66d style Final2 fill:#ffe66d
三、Airflow实战
3.1 Airflow架构
graph TB
subgraph Airflow架构["🌊 Airflow架构"]
subgraph 核心组件["核心组件"]
Webserver[Web Server
UI界面] Scheduler2[Scheduler
调度器] Executor[Executor
执行器] MetaDB[(Metadata DB
元数据库)] end subgraph 执行模式["执行模式"] Local[LocalExecutor
单机多进程] Celery[CeleryExecutor
分布式] K8s[KubernetesExecutor
容器化] end subgraph Worker["Worker节点"] W1[Worker 1] W2[Worker 2] W3[Worker N] end Webserver --> MetaDB Scheduler2 --> MetaDB Scheduler2 --> Executor Executor --> Worker end style Scheduler2 fill:#ff6b6b style Executor fill:#4ecdc4
UI界面] Scheduler2[Scheduler
调度器] Executor[Executor
执行器] MetaDB[(Metadata DB
元数据库)] end subgraph 执行模式["执行模式"] Local[LocalExecutor
单机多进程] Celery[CeleryExecutor
分布式] K8s[KubernetesExecutor
容器化] end subgraph Worker["Worker节点"] W1[Worker 1] W2[Worker 2] W3[Worker N] end Webserver --> MetaDB Scheduler2 --> MetaDB Scheduler2 --> Executor Executor --> Worker end style Scheduler2 fill:#ff6b6b style Executor fill:#4ecdc4
3.2 安装部署
# 创建虚拟环境
python -m venv airflow_venv
source airflow_venv/bin/activate
# 设置Airflow Home
export AIRFLOW_HOME=~/airflow
# 安装Airflow(指定版本和约束)
AIRFLOW_VERSION=2.8.1
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
# 安装额外依赖
pip install apache-airflow-providers-mysql
pip install apache-airflow-providers-ssh
pip install apache-airflow-providers-http
# 初始化数据库
airflow db init
# 创建管理员用户
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password admin123
# 启动服务
airflow webserver -p 8080 -D
airflow scheduler -D3.3 DAG开发:数据入库Pipeline
# dags/data_warehouse_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
# 默认参数
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'email': ['data-alert@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
}
# DAG定义
with DAG(
dag_id='data_warehouse_daily_pipeline',
default_args=default_args,
description='每日数据仓库入库Pipeline',
schedule_interval='0 2 * * *', # 每天凌晨2点执行
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['data_warehouse', 'daily'],
max_active_runs=1,
) as dag:
# 开始节点
start = EmptyOperator(task_id='start')
# ========== ODS层:数据采集 ==========
with TaskGroup(group_id='ods_layer') as ods_layer:
# 用户表同步
sync_user = BashOperator(
task_id='sync_user_info',
bash_command='''
python /opt/datax/bin/datax.py \
-p "-Ddt={{ ds }}" \
/opt/datax/job/user_info.json
''',
)
# 订单表同步
sync_order = BashOperator(
task_id='sync_order_info',
bash_command='''
python /opt/datax/bin/datax.py \
-p "-Ddt={{ ds }}" \
/opt/datax/job/order_info.json
''',
)
# 商品表同步
sync_product = BashOperator(
task_id='sync_product_info',
bash_command='''
python /opt/datax/bin/datax.py \
-p "-Ddt={{ ds }}" \
/opt/datax/job/product_info.json
''',
)
# 并行执行
[sync_user, sync_order, sync_product]
# ========== 数据质量检查 ==========
with TaskGroup(group_id='data_quality') as data_quality:
def check_record_count(**context):
"""检查记录数"""
from airflow.providers.mysql.hooks.mysql import MySqlHook
ds = context['ds']
hook = MySqlHook(mysql_conn_id='hive_metastore')
tables = ['ods_user_info', 'ods_order_info', 'ods_product_info']
for table in tables:
sql = f"SELECT COUNT(*) FROM {table} WHERE dt = '{ds}'"
result = hook.get_first(sql)
count = result[0]
if count == 0:
raise ValueError(f"表 {table} 在 {ds} 分区数据为空!")
print(f"✅ {table} 记录数: {count}")
check_count = PythonOperator(
task_id='check_record_count',
python_callable=check_record_count,
)
def check_data_freshness(**context):
"""检查数据新鲜度"""
# 检查源表最大更新时间是否在预期范围内
pass
check_freshness = PythonOperator(
task_id='check_data_freshness',
python_callable=check_data_freshness,
)
[check_count, check_freshness]
# ========== DWD层:数据清洗 ==========
with TaskGroup(group_id='dwd_layer') as dwd_layer:
dwd_user = BashOperator(
task_id='dwd_user_info',
bash_command='''
hive -e "
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE dwd.dwd_user_info PARTITION(dt='{{ ds }}')
SELECT
id,
user_name,
CASE WHEN age < 0 THEN NULL ELSE age END as age,
COALESCE(city, '未知') as city,
create_time,
update_time
FROM ods.ods_user_info
WHERE dt = '{{ ds }}'
AND id IS NOT NULL;
"
''',
)
dwd_order = BashOperator(
task_id='dwd_order_info',
bash_command='''
hive -e "
INSERT OVERWRITE TABLE dwd.dwd_order_info PARTITION(dt='{{ ds }}')
SELECT
order_id,
user_id,
product_id,
CASE WHEN amount < 0 THEN 0 ELSE amount END as amount,
order_status,
create_time,
pay_time
FROM ods.ods_order_info
WHERE dt = '{{ ds }}'
AND order_id IS NOT NULL;
"
''',
)
[dwd_user, dwd_order]
# ========== DWS层:数据聚合 ==========
with TaskGroup(group_id='dws_layer') as dws_layer:
dws_user_daily = BashOperator(
task_id='dws_user_daily_stats',
bash_command='''
hive -e "
INSERT OVERWRITE TABLE dws.dws_user_daily_stats PARTITION(dt='{{ ds }}')
SELECT
u.id as user_id,
u.user_name,
u.city,
COUNT(o.order_id) as order_count,
COALESCE(SUM(o.amount), 0) as total_amount,
MAX(o.create_time) as last_order_time
FROM dwd.dwd_user_info u
LEFT JOIN dwd.dwd_order_info o
ON u.id = o.user_id AND o.dt = '{{ ds }}'
WHERE u.dt = '{{ ds }}'
GROUP BY u.id, u.user_name, u.city;
"
''',
)
dws_user_daily
# ========== ADS层:应用数据 ==========
with TaskGroup(group_id='ads_layer') as ads_layer:
ads_daily_report = BashOperator(
task_id='ads_daily_report',
bash_command='''
hive -e "
INSERT OVERWRITE TABLE ads.ads_daily_report PARTITION(dt='{{ ds }}')
SELECT
'{{ ds }}' as report_date,
COUNT(DISTINCT user_id) as active_users,
SUM(order_count) as total_orders,
SUM(total_amount) as total_gmv,
SUM(total_amount) / COUNT(DISTINCT user_id) as arpu
FROM dws.dws_user_daily_stats
WHERE dt = '{{ ds }}';
"
''',
)
ads_daily_report
# ========== 数据导出 ==========
with TaskGroup(group_id='data_export') as data_export:
export_to_mysql = BashOperator(
task_id='export_to_mysql',
bash_command='''
python /opt/datax/bin/datax.py \
-p "-Ddt={{ ds }}" \
/opt/datax/job/export_daily_report.json
''',
)
export_to_mysql
# 结束节点
end = EmptyOperator(task_id='end')
# ========== 定义依赖关系 ==========
start >> ods_layer >> data_quality >> dwd_layer >> dws_layer >> ads_layer >> data_export >> end3.4 DAG可视化
graph LR
subgraph Pipeline["📊 数据仓库Pipeline"]
Start[Start] --> ODS
subgraph ODS["ODS层"]
O1[sync_user]
O2[sync_order]
O3[sync_product]
end
ODS --> QC
subgraph QC["质量检查"]
Q1[check_count]
Q2[check_freshness]
end
QC --> DWD
subgraph DWD["DWD层"]
D1[dwd_user]
D2[dwd_order]
end
DWD --> DWS
subgraph DWS["DWS层"]
S1[dws_user_daily]
end
DWS --> ADS
subgraph ADS["ADS层"]
A1[ads_daily_report]
end
ADS --> Export
subgraph Export["导出"]
E1[export_to_mysql]
end
Export --> End[End]
end
style ODS fill:#4ecdc4
style DWD fill:#ffe66d
style DWS fill:#ff6b6b
style ADS fill:#9b59b6
3.5 常用Operator
# 1. BashOperator - 执行Shell命令
from airflow.operators.bash import BashOperator
bash_task = BashOperator(
task_id='run_shell',
bash_command='echo "Hello {{ ds }}"',
)
# 2. PythonOperator - 执行Python函数
from airflow.operators.python import PythonOperator
def my_function(**context):
print(f"执行日期: {context['ds']}")
return "success"
python_task = PythonOperator(
task_id='run_python',
python_callable=my_function,
)
# 3. SSHOperator - 远程执行命令
from airflow.providers.ssh.operators.ssh import SSHOperator
ssh_task = SSHOperator(
task_id='remote_command',
ssh_conn_id='ssh_hadoop',
command='hadoop fs -ls /user/hive/warehouse/',
)
# 4. HiveOperator - 执行Hive SQL
from airflow.providers.apache.hive.operators.hive import HiveOperator
hive_task = HiveOperator(
task_id='run_hive',
hql='SELECT * FROM my_table LIMIT 10',
hive_cli_conn_id='hive_default',
)
# 5. SparkSubmitOperator - 提交Spark任务
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
spark_task = SparkSubmitOperator(
task_id='spark_job',
application='/opt/spark/jobs/my_job.py',
conn_id='spark_default',
executor_memory='4g',
num_executors=4,
)
# 6. BranchPythonOperator - 条件分支
from airflow.operators.python import BranchPythonOperator
def choose_branch(**context):
if context['ds_nodash'] == '20240101':
return 'full_load'
else:
return 'incremental_load'
branch_task = BranchPythonOperator(
task_id='branch',
python_callable=choose_branch,
)
# 7. TriggerDagRunOperator - 触发其他DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_task = TriggerDagRunOperator(
task_id='trigger_downstream',
trigger_dag_id='downstream_dag',
wait_for_completion=True,
)四、DolphinScheduler实战
4.1 DolphinScheduler架构
graph TB
subgraph DS架构["🐬 DolphinScheduler架构"]
subgraph Master["MasterServer"]
M1[调度器]
M2[任务分发]
M3[监控]
end
subgraph Worker["WorkerServer"]
W1[Worker 1]
W2[Worker 2]
W3[Worker N]
end
subgraph Alert["AlertServer"]
A1[告警服务]
end
subgraph API["ApiServer"]
API1[REST API]
API2[Web UI]
end
subgraph Storage["存储"]
ZK[(ZooKeeper)]
DB[(MySQL/PostgreSQL)]
end
API --> Master
Master --> ZK
Master --> Worker
Master --> DB
Worker --> DB
Alert --> DB
end
style Master fill:#ff6b6b
style Worker fill:#4ecdc4
4.2 安装部署
# 下载
wget https://dlcdn.apache.org/dolphinscheduler/3.2.0/apache-dolphinscheduler-3.2.0-bin.tar.gz
tar -zxvf apache-dolphinscheduler-3.2.0-bin.tar.gz
cd apache-dolphinscheduler-3.2.0-bin
# 配置数据库(以MySQL为例)
vim bin/env/dolphinscheduler_env.sh
# 添加:
export DATABASE=mysql
export SPRING_DATASOURCE_URL="jdbc:mysql://localhost:3306/dolphinscheduler"
export SPRING_DATASOURCE_USERNAME="ds_user"
export SPRING_DATASOURCE_PASSWORD="ds_password"
# 初始化数据库
bash tools/bin/upgrade-schema.sh
# 配置install_env.sh
vim bin/env/install_env.sh
# 配置各节点IP
# 一键部署
bash bin/install.sh
# 或单独启动各组件
bash bin/dolphinscheduler-daemon.sh start master-server
bash bin/dolphinscheduler-daemon.sh start worker-server
bash bin/dolphinscheduler-daemon.sh start api-server
bash bin/dolphinscheduler-daemon.sh start alert-server
# 访问Web UI
# http://localhost:12345/dolphinscheduler
# 默认用户:admin / dolphinscheduler1234.3 工作流配置(可视化)
DolphinScheduler支持通过Web UI拖拽创建工作流,以下是通过API创建的示例:
{
"name": "data_warehouse_daily",
"description": "每日数据仓库入库流程",
"globalParams": [
{"prop": "dt", "direct": "IN", "type": "VARCHAR", "value": "${system.biz.date}"}
],
"tasks": [
{
"name": "sync_user_info",
"type": "DATAX",
"params": {
"customConfig": 1,
"json": "{\"job\": {\"content\": [{\"reader\": {...}, \"writer\": {...}}]}}"
},
"preTasks": []
},
{
"name": "sync_order_info",
"type": "DATAX",
"params": {
"customConfig": 1,
"json": "..."
},
"preTasks": []
},
{
"name": "dwd_process",
"type": "SQL",
"params": {
"type": "HIVE",
"sql": "INSERT OVERWRITE TABLE dwd.dwd_user_info PARTITION(dt='${dt}') SELECT ..."
},
"preTasks": ["sync_user_info", "sync_order_info"]
},
{
"name": "dws_process",
"type": "SQL",
"params": {
"type": "HIVE",
"sql": "INSERT OVERWRITE TABLE dws.dws_user_stats PARTITION(dt='${dt}') SELECT ..."
},
"preTasks": ["dwd_process"]
}
]
}4.4 DolphinScheduler任务类型
mindmap
root((DS任务类型))
数据集成
DataX
SeaTunnel
Sqoop
SQL类
Hive
Spark SQL
MySQL
PostgreSQL
ClickHouse
大数据
Spark
Flink
MapReduce
脚本类
Shell
Python
SQL
流程控制
子流程
依赖
条件分支
Switch
其他
HTTP
DataQuality
MLflow
五、数据质量保障
5.1 数据质量维度
graph TB
subgraph 数据质量维度["📊 数据质量六大维度"]
A[完整性
Completeness] --> A1[数据是否缺失
NULL值比例] B[准确性
Accuracy] --> B1[数据是否正确
业务规则校验] C[一致性
Consistency] --> C1[跨系统数据是否一致
主外键关系] D[及时性
Timeliness] --> D1[数据是否按时到达
延迟监控] E[唯一性
Uniqueness] --> E1[是否有重复数据
主键唯一性] F[有效性
Validity] --> F1[数据格式是否正确
枚举值范围] end style A fill:#ff6b6b style B fill:#4ecdc4 style C fill:#ffe66d style D fill:#9b59b6 style E fill:#3498db style F fill:#e74c3c
Completeness] --> A1[数据是否缺失
NULL值比例] B[准确性
Accuracy] --> B1[数据是否正确
业务规则校验] C[一致性
Consistency] --> C1[跨系统数据是否一致
主外键关系] D[及时性
Timeliness] --> D1[数据是否按时到达
延迟监控] E[唯一性
Uniqueness] --> E1[是否有重复数据
主键唯一性] F[有效性
Validity] --> F1[数据格式是否正确
枚举值范围] end style A fill:#ff6b6b style B fill:#4ecdc4 style C fill:#ffe66d style D fill:#9b59b6 style E fill:#3498db style F fill:#e74c3c
5.2 数据质量检查框架
# data_quality/quality_checker.py
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from datetime import datetime
import logging
class QualityRule(ABC):
"""质量规则基类"""
def __init__(self, table: str, column: str = None, threshold: float = 0):
self.table = table
self.column = column
self.threshold = threshold
@abstractmethod
def check(self, context: Dict) -> Dict[str, Any]:
"""执行检查,返回结果"""
pass
class NullCheckRule(QualityRule):
"""空值检查"""
def check(self, context: Dict) -> Dict[str, Any]:
dt = context['dt']
sql = f"""
SELECT
COUNT(*) as total,
SUM(CASE WHEN {self.column} IS NULL THEN 1 ELSE 0 END) as null_count
FROM {self.table}
WHERE dt = '{dt}'
"""
# 执行SQL获取结果
result = execute_hive_sql(sql)
total, null_count = result[0]
null_rate = null_count / total if total > 0 else 0
return {
'rule': 'null_check',
'table': self.table,
'column': self.column,
'total': total,
'null_count': null_count,
'null_rate': null_rate,
'threshold': self.threshold,
'passed': null_rate <= self.threshold,
}
class UniqueCheckRule(QualityRule):
"""唯一性检查"""
def check(self, context: Dict) -> Dict[str, Any]:
dt = context['dt']
sql = f"""
SELECT
COUNT(*) as total,
COUNT(DISTINCT {self.column}) as distinct_count
FROM {self.table}
WHERE dt = '{dt}'
"""
result = execute_hive_sql(sql)
total, distinct_count = result[0]
duplicate_count = total - distinct_count
return {
'rule': 'unique_check',
'table': self.table,
'column': self.column,
'total': total,
'distinct_count': distinct_count,
'duplicate_count': duplicate_count,
'passed': duplicate_count == 0,
}
class RangeCheckRule(QualityRule):
"""范围检查"""
def __init__(self, table: str, column: str, min_val: float, max_val: float):
super().__init__(table, column)
self.min_val = min_val
self.max_val = max_val
def check(self, context: Dict) -> Dict[str, Any]:
dt = context['dt']
sql = f"""
SELECT
COUNT(*) as total,
SUM(CASE WHEN {self.column} < {self.min_val} OR {self.column} > {self.max_val}
THEN 1 ELSE 0 END) as out_of_range
FROM {self.table}
WHERE dt = '{dt}'
"""
result = execute_hive_sql(sql)
total, out_of_range = result[0]
return {
'rule': 'range_check',
'table': self.table,
'column': self.column,
'total': total,
'out_of_range': out_of_range,
'passed': out_of_range == 0,
}
class RecordCountCheckRule(QualityRule):
"""记录数检查"""
def __init__(self, table: str, min_count: int = 1):
super().__init__(table)
self.min_count = min_count
def check(self, context: Dict) -> Dict[str, Any]:
dt = context['dt']
sql = f"SELECT COUNT(*) FROM {self.table} WHERE dt = '{dt}'"
result = execute_hive_sql(sql)
count = result[0][0]
return {
'rule': 'record_count',
'table': self.table,
'count': count,
'min_count': self.min_count,
'passed': count >= self.min_count,
}
class CrossTableCheckRule(QualityRule):
"""跨表一致性检查"""
def __init__(self, source_table: str, target_table: str,
source_column: str, target_column: str):
super().__init__(source_table)
self.target_table = target_table
self.source_column = source_column
self.target_column = target_column
def check(self, context: Dict) -> Dict[str, Any]:
dt = context['dt']
sql = f"""
SELECT COUNT(*)
FROM {self.table} a
LEFT JOIN {self.target_table} b
ON a.{self.source_column} = b.{self.target_column}
AND b.dt = '{dt}'
WHERE a.dt = '{dt}' AND b.{self.target_column} IS NULL
"""
result = execute_hive_sql(sql)
orphan_count = result[0][0]
return {
'rule': 'cross_table_check',
'source_table': self.table,
'target_table': self.target_table,
'orphan_count': orphan_count,
'passed': orphan_count == 0,
}
class QualityChecker:
"""质量检查执行器"""
def __init__(self):
self.rules: List[QualityRule] = []
self.results: List[Dict] = []
def add_rule(self, rule: QualityRule):
self.rules.append(rule)
return self
def run(self, context: Dict) -> bool:
"""执行所有检查"""
self.results = []
all_passed = True
for rule in self.rules:
try:
result = rule.check(context)
result['timestamp'] = datetime.now().isoformat()
self.results.append(result)
if not result['passed']:
all_passed = False
logging.error(f"❌ 质量检查失败: {result}")
else:
logging.info(f"✅ 质量检查通过: {result['rule']} - {result.get('table')}")
except Exception as e:
logging.error(f"质量检查异常: {rule.__class__.__name__} - {e}")
all_passed = False
return all_passed
def get_report(self) -> Dict:
"""生成质量报告"""
passed = sum(1 for r in self.results if r['passed'])
failed = len(self.results) - passed
return {
'total_rules': len(self.results),
'passed': passed,
'failed': failed,
'pass_rate': passed / len(self.results) if self.results else 0,
'details': self.results,
}
# 使用示例
if __name__ == '__main__':
checker = QualityChecker()
# 添加检查规则
checker.add_rule(RecordCountCheckRule('ods.ods_user_info', min_count=1000))
checker.add_rule(NullCheckRule('ods.ods_user_info', 'user_name', threshold=0.01))
checker.add_rule(UniqueCheckRule('ods.ods_user_info', 'id'))
checker.add_rule(RangeCheckRule('ods.ods_user_info', 'age', 0, 150))
checker.add_rule(CrossTableCheckRule(
'ods.ods_order_info', 'ods.ods_user_info',
'user_id', 'id'
))
# 执行检查
context = {'dt': '2024-01-15'}
all_passed = checker.run(context)
# 获取报告
report = checker.get_report()
print(f"检查结果: {'全部通过' if all_passed else '存在失败'}")
print(f"通过率: {report['pass_rate']:.2%}")5.3 集成到Airflow
# dags/data_quality_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from data_quality.quality_checker import (
QualityChecker, NullCheckRule, UniqueCheckRule,
RecordCountCheckRule, RangeCheckRule
)
def run_quality_check(**context):
"""执行数据质量检查"""
dt = context['ds']
checker = QualityChecker()
checker.add_rule(RecordCountCheckRule('ods.ods_user_info', min_count=1000))
checker.add_rule(NullCheckRule('ods.ods_user_info', 'user_name', threshold=0.01))
checker.add_rule(UniqueCheckRule('ods.ods_user_info', 'id'))
all_passed = checker.run({'dt': dt})
report = checker.get_report()
# 保存报告到XCom
context['ti'].xcom_push(key='quality_report', value=report)
if not all_passed:
raise ValueError(f"数据质量检查失败!通过率: {report['pass_rate']:.2%}")
return report
with DAG('data_quality_check', ...) as dag:
quality_task = PythonOperator(
task_id='run_quality_check',
python_callable=run_quality_check,
)六、告警系统
6.1 告警体系设计
graph TB
subgraph 告警体系["🚨 告警体系"]
subgraph 告警源["告警来源"]
S1[任务失败]
S2[任务超时]
S3[数据质量异常]
S4[数据延迟]
S5[资源异常]
end
subgraph 告警处理["告警处理"]
P1[告警聚合]
P2[告警分级]
P3[告警抑制]
P4[告警升级]
end
subgraph 通知渠道["通知渠道"]
N1[企业微信]
N2[钉钉]
N3[邮件]
N4[短信]
N5[电话]
end
subgraph 告警级别["告警级别"]
L1["P0 - 紧急
电话+短信+IM"] L2["P1 - 重要
短信+IM"] L3["P2 - 一般
IM"] L4["P3 - 提示
邮件"] end 告警源 --> 告警处理 --> 通知渠道 end style L1 fill:#ff0000,color:#fff style L2 fill:#ff6b6b style L3 fill:#ffe66d style L4 fill:#4ecdc4
电话+短信+IM"] L2["P1 - 重要
短信+IM"] L3["P2 - 一般
IM"] L4["P3 - 提示
邮件"] end 告警源 --> 告警处理 --> 通知渠道 end style L1 fill:#ff0000,color:#fff style L2 fill:#ff6b6b style L3 fill:#ffe66d style L4 fill:#4ecdc4
6.2 告警通知实现
# alert/notifier.py
import requests
import json
from abc import ABC, abstractmethod
from typing import Dict, List
import logging
class AlertNotifier(ABC):
"""告警通知器基类"""
@abstractmethod
def send(self, alert: Dict) -> bool:
pass
class WeComNotifier(AlertNotifier):
"""企业微信通知"""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send(self, alert: Dict) -> bool:
level_emoji = {
'P0': '🔴',
'P1': '🟠',
'P2': '🟡',
'P3': '🟢',
}
emoji = level_emoji.get(alert.get('level', 'P2'), '⚪')
content = f"""
{emoji} **数据告警通知**
> **告警级别**: {alert.get('level', 'P2')}
> **告警类型**: {alert.get('type', '未知')}
> **告警时间**: {alert.get('time', '')}
> **任务名称**: {alert.get('task_name', '')}
> **告警内容**: {alert.get('message', '')}
请及时处理!
"""
data = {
"msgtype": "markdown",
"markdown": {"content": content}
}
try:
response = requests.post(
self.webhook_url,
json=data,
timeout=10
)
return response.status_code == 200
except Exception as e:
logging.error(f"企业微信通知失败: {e}")
return False
class DingTalkNotifier(AlertNotifier):
"""钉钉通知"""
def __init__(self, webhook_url: str, secret: str = None):
self.webhook_url = webhook_url
self.secret = secret
def _get_sign_url(self) -> str:
if not self.secret:
return self.webhook_url
import time
import hmac
import hashlib
import base64
import urllib.parse
timestamp = str(round(time.time() * 1000))
secret_enc = self.secret.encode('utf-8')
string_to_sign = f'{timestamp}\n{self.secret}'
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return f"{self.webhook_url}×tamp={timestamp}&sign={sign}"
def send(self, alert: Dict) -> bool:
content = f"""
【数据告警】
告警级别: {alert.get('level', 'P2')}
告警类型: {alert.get('type', '未知')}
告警时间: {alert.get('time', '')}
任务名称: {alert.get('task_name', '')}
告警内容: {alert.get('message', '')}
"""
data = {
"msgtype": "text",
"text": {"content": content},
"at": {"isAtAll": alert.get('level') == 'P0'}
}
try:
response = requests.post(
self._get_sign_url(),
json=data,
timeout=10
)
return response.status_code == 200
except Exception as e:
logging.error(f"钉钉通知失败: {e}")
return False
class EmailNotifier(AlertNotifier):
"""邮件通知"""
def __init__(self, smtp_host: str, smtp_port: int,
username: str, password: str, from_addr: str):
self.smtp_host = smtp_host
self.smtp_port = smtp_port
self.username = username
self.password = password
self.from_addr = from_addr
def send(self, alert: Dict) -> bool:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
to_addrs = alert.get('to_addrs', [])
if not to_addrs:
return False
msg = MIMEMultipart()
msg['From'] = self.from_addr
msg['To'] = ', '.join(to_addrs)
msg['Subject'] = f"[{alert.get('level', 'P2')}] 数据告警: {alert.get('task_name', '')}"
body = f"""
<h2>数据告警通知</h2>
<table border="1" cellpadding="10">
<tr><td><b>告警级别</b></td><td>{alert.get('level', 'P2')}</td></tr>
<tr><td><b>告警类型</b></td><td>{alert.get('type', '未知')}</td></tr>
<tr><td><b>告警时间</b></td><td>{alert.get('time', '')}</td></tr>
<tr><td><b>任务名称</b></td><td>{alert.get('task_name', '')}</td></tr>
<tr><td><b>告警内容</b></td><td>{alert.get('message', '')}</td></tr>
</table>
<p>请及时处理!</p>
"""
msg.attach(MIMEText(body, 'html'))
try:
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.starttls()
server.login(self.username, self.password)
server.sendmail(self.from_addr, to_addrs, msg.as_string())
return True
except Exception as e:
logging.error(f"邮件发送失败: {e}")
return False
class AlertManager:
"""告警管理器"""
def __init__(self):
self.notifiers: Dict[str, List[AlertNotifier]] = {
'P0': [],
'P1': [],
'P2': [],
'P3': [],
}
def register_notifier(self, level: str, notifier: AlertNotifier):
"""注册通知器到指定级别"""
if level in self.notifiers:
self.notifiers[level].append(notifier)
return self
def send_alert(self, alert: Dict):
"""发送告警"""
level = alert.get('level', 'P2')
notifiers = self.notifiers.get(level, [])
for notifier in notifiers:
try:
notifier.send(alert)
except Exception as e:
logging.error(f"告警发送失败: {e}")
# 使用示例
if __name__ == '__main__':
from datetime import datetime
# 初始化告警管理器
manager = AlertManager()
# 注册通知器
wecom = WeComNotifier("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx")
dingtalk = DingTalkNotifier("https://oapi.dingtalk.com/robot/send?access_token=xxx")
# P0级别:企业微信 + 钉钉
manager.register_notifier('P0', wecom)
manager.register_notifier('P0', dingtalk)
# P1/P2级别:企业微信
manager.register_notifier('P1', wecom)
manager.register_notifier('P2', wecom)
# 发送告警
manager.send_alert({
'level': 'P1',
'type': '任务失败',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'task_name': 'sync_order_info',
'message': 'DataX同步任务失败,错误信息:Connection refused',
})6.3 Airflow告警回调
# dags/utils/alert_callback.py
from datetime import datetime
from alert.notifier import AlertManager, WeComNotifier
def get_alert_manager():
manager = AlertManager()
wecom = WeComNotifier("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx")
manager.register_notifier('P1', wecom)
manager.register_notifier('P2', wecom)
return manager
def task_failure_callback(context):
"""任务失败回调"""
task_instance = context['task_instance']
dag_id = context['dag'].dag_id
task_id = task_instance.task_id
execution_date = context['execution_date']
exception = context.get('exception', 'Unknown error')
manager = get_alert_manager()
manager.send_alert({
'level': 'P1',
'type': '任务失败',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'task_name': f"{dag_id}.{task_id}",
'message': f"执行日期: {execution_date}\n错误信息: {str(exception)[:500]}",
})
def task_retry_callback(context):
"""任务重试回调"""
task_instance = context['task_instance']
dag_id = context['dag'].dag_id
task_id = task_instance.task_id
manager = get_alert_manager()
manager.send_alert({
'level': 'P2',
'type': '任务重试',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'task_name': f"{dag_id}.{task_id}",
'message': f"任务正在进行第 {task_instance.try_number} 次重试",
})
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""SLA超时回调"""
manager = get_alert_manager()
manager.send_alert({
'level': 'P1',
'type': 'SLA超时',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'task_name': dag.dag_id,
'message': f"以下任务超过SLA时间: {[t.task_id for t in task_list]}",
})
# 在DAG中使用
default_args = {
'owner': 'data_team',
'on_failure_callback': task_failure_callback,
'on_retry_callback': task_retry_callback,
'sla': timedelta(hours=2),
}
with DAG(
dag_id='my_dag',
default_args=default_args,
sla_miss_callback=sla_miss_callback,
...
) as dag:
pass七、元数据管理
7.1 元数据管理架构
graph TB
subgraph 元数据管理["📋 元数据管理架构"]
subgraph 数据源["数据源元数据"]
DS1[Hive Metastore]
DS2[MySQL Schema]
DS3[Kafka Topic]
end
subgraph 采集["元数据采集"]
C1[Atlas Collector]
C2[DataHub Ingestion]
C3[自定义采集器]
end
subgraph 存储["元数据存储"]
Store[图数据库/关系库]
end
subgraph 应用["元数据应用"]
A1[数据目录]
A2[血缘追踪]
A3[影响分析]
A4[数据质量]
end
数据源 --> 采集 --> 存储 --> 应用
end
style Store fill:#ff6b6b
style A2 fill:#4ecdc4
7.2 数据血缘追踪
graph LR
subgraph 血缘示例["🔗 数据血缘示例"]
subgraph ODS["ODS层"]
O1[(ods_user)]
O2[(ods_order)]
O3[(ods_product)]
end
subgraph DWD["DWD层"]
D1[(dwd_user)]
D2[(dwd_order_detail)]
end
subgraph DWS["DWS层"]
S1[(dws_user_stats)]
end
subgraph ADS["ADS层"]
A1[(ads_daily_report)]
end
O1 --> D1
O2 --> D2
O3 --> D2
D1 --> S1
D2 --> S1
S1 --> A1
end
style O1 fill:#4ecdc4
style O2 fill:#4ecdc4
style O3 fill:#4ecdc4
style D1 fill:#ffe66d
style D2 fill:#ffe66d
style S1 fill:#ff6b6b
style A1 fill:#9b59b6
7.3 简单的血缘管理实现
# metadata/lineage.py
from typing import Dict, List, Set
from collections import defaultdict
import json
class LineageManager:
"""数据血缘管理器"""
def __init__(self):
# 上游依赖:table -> [upstream_tables]
self.upstream: Dict[str, Set[str]] = defaultdict(set)
# 下游影响:table -> [downstream_tables]
self.downstream: Dict[str, Set[str]] = defaultdict(set)
# 表信息
self.tables: Dict[str, Dict] = {}
def register_table(self, table_name: str, info: Dict = None):
"""注册表"""
self.tables[table_name] = info or {}
def add_lineage(self, source: str, target: str, transform: str = None):
"""添加血缘关系"""
self.upstream[target].add(source)
self.downstream[source].add(target)
def get_upstream(self, table: str, depth: int = -1) -> Set[str]:
"""获取上游依赖(递归)"""
result = set()
visited = set()
def _traverse(t: str, current_depth: int):
if t in visited:
return
if depth != -1 and current_depth > depth:
return
visited.add(t)
for upstream_table in self.upstream.get(t, []):
result.add(upstream_table)
_traverse(upstream_table, current_depth + 1)
_traverse(table, 0)
return result
def get_downstream(self, table: str, depth: int = -1) -> Set[str]:
"""获取下游影响(递归)"""
result = set()
visited = set()
def _traverse(t: str, current_depth: int):
if t in visited:
return
if depth != -1 and current_depth > depth:
return
visited.add(t)
for downstream_table in self.downstream.get(t, []):
result.add(downstream_table)
_traverse(downstream_table, current_depth + 1)
_traverse(table, 0)
return result
def impact_analysis(self, table: str) -> Dict:
"""影响分析"""
downstream_tables = self.get_downstream(table)
return {
'table': table,
'direct_impact': list(self.downstream.get(table, [])),
'total_impact': list(downstream_tables),
'impact_count': len(downstream_tables),
}
def to_json(self) -> str:
"""导出为JSON"""
return json.dumps({
'tables': self.tables,
'lineage': [
{'source': s, 'target': t}
for t, sources in self.upstream.items()
for s in sources
]
}, indent=2)
# 使用示例
if __name__ == '__main__':
lineage = LineageManager()
# 注册表
lineage.register_table('ods.ods_user', {'layer': 'ODS', 'owner': 'data_team'})
lineage.register_table('ods.ods_order', {'layer': 'ODS', 'owner': 'data_team'})
lineage.register_table('dwd.dwd_user', {'layer': 'DWD', 'owner': 'data_team'})
lineage.register_table('dwd.dwd_order', {'layer': 'DWD', 'owner': 'data_team'})
lineage.register_table('dws.dws_user_stats', {'layer': 'DWS', 'owner': 'data_team'})
lineage.register_table('ads.ads_report', {'layer': 'ADS', 'owner': 'bi_team'})
# 添加血缘
lineage.add_lineage('ods.ods_user', 'dwd.dwd_user')
lineage.add_lineage('ods.ods_order', 'dwd.dwd_order')
lineage.add_lineage('dwd.dwd_user', 'dws.dws_user_stats')
lineage.add_lineage('dwd.dwd_order', 'dws.dws_user_stats')
lineage.add_lineage('dws.dws_user_stats', 'ads.ads_report')
# 查询上游
print("dws.dws_user_stats 的上游依赖:")
print(lineage.get_upstream('dws.dws_user_stats'))
# {'ods.ods_user', 'ods.ods_order', 'dwd.dwd_user', 'dwd.dwd_order'}
# 影响分析
print("\nods.ods_user 的影响分析:")
print(lineage.impact_analysis('ods.ods_user'))
# {'table': 'ods.ods_user', 'direct_impact': ['dwd.dwd_user'],
# 'total_impact': ['dwd.dwd_user', 'dws.dws_user_stats', 'ads.ads_report'],
# 'impact_count': 3}八、完整案例:电商数据入库
8.1 整体架构
整体架构:
graph LR
subgraph Source["🗄️ 数据源"]
DB[(业务数据库)]
end
subgraph Collect["📥 采集"]
DataX[DataX]
CDC[Flink CDC]
end
subgraph Warehouse["🏠 数据仓库"]
ODS[ODS层]
DWD[DWD层]
DWS[DWS层]
ADS[ADS层]
end
subgraph Service["🖥️ 应用"]
BI[BI报表]
API[数据API]
end
subgraph Ops["⚙️ 运维"]
Scheduler[Airflow调度]
Monitor[监控告警]
end
DB --> DataX & CDC
DataX & CDC --> ODS
ODS --> DWD --> DWS --> ADS
ADS --> BI & API
Scheduler -.-> Collect
Scheduler -.-> Warehouse
Monitor -.-> Warehouse
style ODS fill:#4ecdc4
style DWD fill:#ffe66d
style DWS fill:#ff6b6b
style ADS fill:#9b59b6
数据源详情:
graph LR
subgraph 业务数据库["🗄️ 业务数据库"]
M1[(用户库
user_db)] M2[(订单库
order_db)] M3[(商品库
product_db)] M4[(支付库
payment_db)] end subgraph 采集方式["📥 采集方式"] DX[DataX
批量T+1] FC[Flink CDC
实时同步] end M1 & M2 & M3 & M4 --> DX M2 & M4 --> FC style DX fill:#ff6b6b style FC fill:#4ecdc4
user_db)] M2[(订单库
order_db)] M3[(商品库
product_db)] M4[(支付库
payment_db)] end subgraph 采集方式["📥 采集方式"] DX[DataX
批量T+1] FC[Flink CDC
实时同步] end M1 & M2 & M3 & M4 --> DX M2 & M4 --> FC style DX fill:#ff6b6b style FC fill:#4ecdc4
数据仓库分层:
graph TB
subgraph ODS["📦 ODS层 - 原始数据"]
O1[ods_user_info]
O2[ods_order_info]
O3[ods_product_info]
O4[ods_payment_info]
end
subgraph DWD["🔧 DWD层 - 明细数据"]
D1[dwd_user_detail]
D2[dwd_order_detail]
D3[dwd_payment_detail]
end
subgraph DWS["📊 DWS层 - 汇总数据"]
S1[dws_user_daily_stats]
S2[dws_trade_daily_stats]
S3[dws_product_daily_stats]
end
subgraph ADS["🎯 ADS层 - 应用数据"]
A1[ads_dashboard_daily]
A2[ads_user_portrait]
A3[ads_product_ranking]
end
O1 --> D1
O2 --> D2
O3 --> D2
O4 --> D3
D1 & D2 --> S1
D2 & D3 --> S2
D2 --> S3
S1 & S2 & S3 --> A1
S1 --> A2
S3 --> A3
style ODS fill:#4ecdc4
style DWD fill:#ffe66d
style DWS fill:#ff6b6b
style ADS fill:#9b59b6
调度与监控:
graph LR
subgraph Scheduler["⏰ Airflow调度"]
DAG[数仓DAG]
end
subgraph Tasks["📋 任务流"]
T1[ODS同步] --> T2[数据质量检查]
T2 --> T3[DWD处理]
T3 --> T4[DWS聚合]
T4 --> T5[ADS生成]
T5 --> T6[数据导出]
end
subgraph Monitor["👁️ 监控告警"]
Q[数据质量监控]
A[任务告警]
end
DAG --> T1
T2 --> Q
Tasks --> A
style DAG fill:#ff6b6b
style Q fill:#4ecdc4
style A fill:#ffe66d
8.2 完整DAG实现
# dags/ecommerce_data_warehouse.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule
# 导入自定义模块
from utils.alert_callback import task_failure_callback
from utils.quality_checker import run_quality_check
from utils.notifier import send_success_notification
# 配置
DATAX_HOME = '/opt/datax'
JOB_DIR = f'{DATAX_HOME}/job/ecommerce'
HIVE_DB = 'ecommerce'
# 需要同步的表
SYNC_TABLES = [
{'name': 'user_info', 'split_key': 'id', 'priority': 1},
{'name': 'order_info', 'split_key': 'order_id', 'priority': 1},
{'name': 'order_detail', 'split_key': 'id', 'priority': 2},
{'name': 'product_info', 'split_key': 'id', 'priority': 1},
{'name': 'payment_info', 'split_key': 'id', 'priority': 2},
]
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'email': ['data-alert@company.com'],
'email_on_failure': True,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': task_failure_callback,
}
with DAG(
dag_id='ecommerce_data_warehouse_daily',
default_args=default_args,
description='电商数据仓库每日入库流程',
schedule_interval='0 1 * * *',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['ecommerce', 'data_warehouse', 'daily'],
max_active_runs=1,
dagrun_timeout=timedelta(hours=6),
) as dag:
# ========== 开始 ==========
start = EmptyOperator(task_id='start')
# ========== 环境检查 ==========
def check_environment(**context):
"""检查运行环境"""
import subprocess
# 检查HDFS
result = subprocess.run(['hdfs', 'dfs', '-ls', '/'], capture_output=True)
if result.returncode != 0:
raise Exception("HDFS不可用")
# 检查Hive
result = subprocess.run(['hive', '-e', 'show databases'], capture_output=True)
if result.returncode != 0:
raise Exception("Hive不可用")
return True
env_check = PythonOperator(
task_id='environment_check',
python_callable=check_environment,
)
# ========== ODS层:数据同步 ==========
with TaskGroup(group_id='ods_sync') as ods_sync:
sync_tasks = []
for table in SYNC_TABLES:
task = BashOperator(
task_id=f"sync_{table['name']}",
bash_command=f'''
python {DATAX_HOME}/bin/datax.py \
-p "-Ddt={{{{ ds }}}} -Dtable={table['name']}" \
{JOB_DIR}/{table['name']}.json \
>> /var/log/datax/{table['name']}_{{{{ ds_nodash }}}}.log 2>&1
''',
pool='datax_pool',
priority_weight=table['priority'],
)
sync_tasks.append(task)
# ========== ODS层:数据质量检查 ==========
with TaskGroup(group_id='ods_quality') as ods_quality:
def check_ods_quality(**context):
from data_quality.quality_checker import QualityChecker, RecordCountCheckRule, NullCheckRule
dt = context['ds']
checker = QualityChecker()
for table in SYNC_TABLES:
table_name = f"ods.ods_{table['name']}"
checker.add_rule(RecordCountCheckRule(table_name, min_count=1))
checker.add_rule(NullCheckRule(table_name, table['split_key'], threshold=0))
all_passed = checker.run({'dt': dt})
if not all_passed:
report = checker.get_report()
raise ValueError(f"ODS层数据质量检查失败: {report}")
return True
ods_quality_check = PythonOperator(
task_id='ods_quality_check',
python_callable=check_ods_quality,
)
# ========== DWD层:数据清洗 ==========
with TaskGroup(group_id='dwd_process') as dwd_process:
dwd_user = BashOperator(
task_id='dwd_user_info',
bash_command='''
hive -e "
INSERT OVERWRITE TABLE dwd.dwd_user_info PARTITION(dt='{{ ds }}')
SELECT
id,
user_name,
phone,
CASE
WHEN gender IN ('M', 'F') THEN gender
ELSE 'U'
END as gender,
CASE
WHEN age BETWEEN 0 AND 150 THEN age
ELSE NULL
END as age,
COALESCE(province, '未知') as province,
COALESCE(city, '未知') as city,
create_time,
update_time,
current_timestamp() as etl_time
FROM ods.ods_user_info
WHERE dt = '{{ ds }}';
"
''',
)
dwd_order = BashOperator(
task_id='dwd_order_detail',
bash_command='''
hive -e "
INSERT OVERWRITE TABLE dwd.dwd_order_detail PARTITION(dt='{{ ds }}')
SELECT
o.order_id,
o.user_id,
d.product_id,
d.product_name,
d.quantity,
d.unit_price,
d.quantity * d.unit_price as total_amount,
o.order_status,
o.create_time,
o.pay_time,
o.deliver_time,
o.receive_time,
current_timestamp() as etl_time
FROM ods.ods_order_info o
JOIN ods.ods_order_detail d
ON o.order_id = d.order_id AND d.dt = '{{ ds }}'
WHERE o.dt = '{{ ds }}';
"
''',
)
dwd_payment = BashOperator(
task_id='dwd_payment_detail',
bash_command='''
hive -e "
INSERT OVERWRITE TABLE dwd.dwd_payment_detail PARTITION(dt='{{ ds }}')
SELECT
payment_id,
order_id,
user_id,
payment_method,
CASE WHEN amount < 0 THEN 0 ELSE amount END as amount,
payment_status,
payment_time,
current_timestamp() as etl_time
FROM ods.ods_payment_info
WHERE dt = '{{ ds }}';
"
''',
)
[dwd_user, dwd_order, dwd_payment]
# ========== DWS层:数据汇总 ==========
with TaskGroup(group_id='dws_process') as dws_process:
dws_user_daily = BashOperator(
task_id='dws_user_daily',
bash_command='''
hive -e "
INSERT OVERWRITE TABLE dws.dws_user_daily PARTITION(dt='{{ ds }}')
SELECT
u.id as user_id,
u.user_name,
u.province,
u.city,
COUNT(DISTINCT o.order_id) as order_count,
COALESCE(SUM(o.total_amount), 0) as order_amount,
COALESCE(SUM(p.amount), 0) as pay_amount,
MAX(o.create_time) as last_order_time
FROM dwd.dwd_user_info u
LEFT JOIN dwd.dwd_order_detail o
ON u.id = o.user_id AND o.dt = '{{ ds }}'
LEFT JOIN dwd.dwd_payment_detail p
ON o.order_id = p.order_id AND p.dt = '{{ ds }}'
WHERE u.dt = '{{ ds }}'
GROUP BY u.id, u.user_name, u.province, u.city;
"
''',
)
dws_trade_daily = BashOperator(
task_id='dws_trade_daily',
bash_command='''
hive -e "
INSERT OVERWRITE TABLE dws.dws_trade_daily PARTITION(dt='{{ ds }}')
SELECT
'{{ ds }}' as stat_date,
COUNT(DISTINCT order_id) as order_count,
COUNT(DISTINCT user_id) as user_count,
SUM(total_amount) as gmv,
SUM(total_amount) / COUNT(DISTINCT order_id) as avg_order_amount,
SUM(quantity) as total_quantity
FROM dwd.dwd_order_detail
WHERE dt = '{{ ds }}';
"
''',
)
[dws_user_daily, dws_trade_daily]
# ========== ADS层:应用数据 ==========
with TaskGroup(group_id='ads_process') as ads_process:
ads_dashboard = BashOperator(
task_id='ads_dashboard',
bash_command='''
hive -e "
INSERT OVERWRITE TABLE ads.ads_dashboard PARTITION(dt='{{ ds }}')
SELECT
'{{ ds }}' as stat_date,
-- 用户指标
(SELECT COUNT(*) FROM dwd.dwd_user_info WHERE dt = '{{ ds }}') as total_users,
(SELECT COUNT(*) FROM dwd.dwd_user_info WHERE dt = '{{ ds }}' AND to_date(create_time) = '{{ ds }}') as new_users,
-- 交易指标
t.order_count,
t.user_count as active_users,
t.gmv,
t.avg_order_amount,
-- 支付指标
(SELECT COUNT(*) FROM dwd.dwd_payment_detail WHERE dt = '{{ ds }}' AND payment_status = 'SUCCESS') as pay_order_count,
(SELECT SUM(amount) FROM dwd.dwd_payment_detail WHERE dt = '{{ ds }}' AND payment_status = 'SUCCESS') as pay_amount
FROM dws.dws_trade_daily t
WHERE t.dt = '{{ ds }}';
"
''',
)
ads_dashboard
# ========== 数据导出 ==========
with TaskGroup(group_id='data_export') as data_export:
export_dashboard = BashOperator(
task_id='export_dashboard_to_mysql',
bash_command=f'''
python {DATAX_HOME}/bin/datax.py \
-p "-Ddt={{{{ ds }}}}" \
{JOB_DIR}/export_dashboard.json
''',
)
export_dashboard
# ========== 最终质量检查 ==========
def final_quality_check(**context):
"""最终数据质量检查"""
# 检查关键指标是否合理
# 比较今日数据与昨日数据的波动
pass
final_check = PythonOperator(
task_id='final_quality_check',
python_callable=final_quality_check,
)
# ========== 成功通知 ==========
def send_success_report(**context):
"""发送成功报告"""
from utils.notifier import send_success_notification
ds = context['ds']
send_success_notification(
dag_id='ecommerce_data_warehouse_daily',
execution_date=ds,
message='电商数据仓库每日入库任务完成'
)
success_notify = PythonOperator(
task_id='success_notification',
python_callable=send_success_report,
trigger_rule=TriggerRule.ALL_SUCCESS,
)
# ========== 结束 ==========
end = EmptyOperator(
task_id='end',
trigger_rule=TriggerRule.ALL_DONE,
)
# ========== 依赖关系 ==========
(
start
>> env_check
>> ods_sync
>> ods_quality
>> dwd_process
>> dws_process
>> ads_process
>> data_export
>> final_check
>> success_notify
>> end
)8.3 运行效果
gantt
title 电商数据仓库每日执行时间线
dateFormat HH:mm
axisFormat %H:%M
section 准备阶段
环境检查 :a1, 01:00, 5m
section ODS层
sync_user_info :a2, after a1, 15m
sync_order_info :a3, after a1, 20m
sync_product_info :a4, after a1, 10m
sync_payment_info :a5, after a1, 15m
质量检查 :a6, after a3, 5m
section DWD层
dwd_user_info :b1, after a6, 10m
dwd_order_detail :b2, after a6, 15m
dwd_payment_detail :b3, after a6, 10m
section DWS层
dws_user_daily :c1, after b2, 20m
dws_trade_daily :c2, after b2, 10m
section ADS层
ads_dashboard :d1, after c1, 15m
section 导出
export_to_mysql :e1, after d1, 10m
最终检查 :e2, after e1, 5m
成功通知 :e3, after e2, 1m
九、最佳实践总结
9.1 设计原则
mindmap
root((自动化入库最佳实践))
设计原则
幂等性
重复执行结果一致
支持失败重跑
可观测性
完整的日志
关键指标监控
数据血缘
容错性
失败自动重试
异常告警
降级方案
可扩展性
模块化设计
配置驱动
易于新增表
实施要点
分层处理
ODS原始数据
DWD清洗数据
DWS汇总数据
ADS应用数据
质量保障
入库前检查
入库后验证
异常数据处理
调度策略
合理的依赖
并行执行
资源隔离
9.2 Checklist
## 自动化入库上线Checklist
### 准备阶段
- [ ] 确认数据源连接信息
- [ ] 确认目标表结构已创建
- [ ] 确认网络连通性
- [ ] 配置文件已准备
### 开发阶段
- [ ] 同步脚本开发完成
- [ ] 数据处理脚本开发完成
- [ ] 数据质量规则已定义
- [ ] 告警配置已完成
- [ ] 单元测试通过
### 测试阶段
- [ ] 测试环境验证通过
- [ ] 全量同步测试通过
- [ ] 增量同步测试通过
- [ ] 失败重试测试通过
- [ ] 告警功能测试通过
- [ ] 性能测试满足要求
### 上线阶段
- [ ] 生产环境配置已更新
- [ ] 调度任务已配置
- [ ] 监控大盘已配置
- [ ] 值班人员已通知
- [ ] 回滚方案已准备
### 运维阶段
- [ ] 每日检查任务状态
- [ ] 定期检查数据质量
- [ ] 定期检查资源使用
- [ ] 定期优化性能十、写在最后
自动化入库链路,说白了就是把人工操作变成机器操作。但这个过程中,有几个关键点要把握:
- 先跑起来,再优化:不要一开始就追求完美,先让数据流动起来
- 监控比执行更重要:没有监控的自动化就是定时炸弹
- 告警要有价值:告警太多等于没有告警
- 文档要跟上:半年后你自己都不记得当初为什么这么写
最后送大家一句话:
"自动化不是目的,稳定可靠的数据才是目的。"
当你搭建好一套自动化入库链路,每天看着数据自己按时入库,质量检查全部通过,报表准时更新——那种感觉,就像看着自己养的孩子自己会吃饭了一样欣慰。
然后你就可以安心摸鱼了。😎
本文作者:一个把手动挡换成自动挡的程序员
最惨经历:调度系统挂了一天没人发现,因为告警也挂了
自动化的尽头是什么?是信任,也是焦虑。
附录:面试高频题
如何设计一个数据入库的调度系统?
分层设计(ODS→DWD→DWS→ADS),定义清晰的依赖关系,支持失败重试和告警,保证幂等性。
Airflow和DolphinScheduler怎么选?
Python团队选Airflow,代码即DAG;需要可视化或中文支持选DolphinScheduler。
如何保证数据质量?
多维度检查(完整性、准确性、一致性等),入库前后都要检查,异常及时告警。
任务失败了怎么处理?
自动重试 → 重试失败告警 → 人工介入 → 修复后重跑。保证任务幂等性,支持重跑。
如何处理数据延迟?
设置SLA监控,超时告警;优化任务性能;必要时并行执行;建立数据延迟的应急预案。
元数据管理有什么用?
数据目录(找数据)、血缘追踪(追溯源头)、影响分析(评估变更影响)、质量管理。