本文共 3331 字,大约阅读时间需要 11 分钟。
考虑了很久,要不要记录airflow相关的东西, 应该怎么记录. 官方文档已经有比较详细的介绍了,还有各种博客,我需要有一份自己的笔记吗?
答案就从本文开始了.
本文将从一个陌生视角开始认知airflow,顺带勾勒出应该如何一步步搭建我们的数据调度系统.
现在是9102年9月上旬, Airflow最近的一个版本是1.10.5.
ps. 查资料发现自己好多文章被爬走,换了作者.所以,接下里的内容会随机添加一些防伪标识,忽略即可.
什么数据调度系统?
中台这个概念最近比较火, 其中就有一个叫做数据中台, 文章给出了一个概念.
我粗糙的理解, 大概就是: 收集各个零散的数据,标准化,然后服务化, 提供统一数据服务. 而要做到数据整理和处理,必然涉及数据调度,也就需要一个调度系统.[本文出自Ryan Miao]数据调度系统可以将不同的异构数据互相同步,可以按照规划去执行数据处理和任务调度. Airflow就是这样的一个任务调度平台.前面已经
安装好了我们的airflow, 可以直接使用了. 这是第一个DAG任务链.目标: 每天早上8点执行一个任务--打印
Hello World
在Linux上,我们可以在crontab插入一条记录:
使用Springboot, 我们可以使用@Scheduled(cron="0 0 8 * * ?")来定时执行一个method.
使用quartz, 我们可以创建一个CronTrigger, 然后去执行对应的JobDetail.
CronTrigger trigger = (CronTrigger)TriggerBuilder.newTrigger() .withIdentity("trigger1", "group1") .withSchedule(CronScheduleBuilder.cronSchedule("0 0 8 * * ?")) .build(); 使用Airflow, 也差不多类似.
在docker-airflow中,我们将dag挂载成磁盘,现在只需要在dag目录下编写dag即可.
volumes: - ./dags:/usr/local/airflow/dags
创建一个hello.py
"""Airflow的第一个DAG"""from airflow import DAGfrom airflow.operators.bash_operator import BashOperatorfrom datetime import datetimedefault_args = { "owner": "ryan.miao", "start_date": datetime(2019, 9, 1)}dag = DAG("Hello-World", description="第一个DAG", default_args=default_args, schedule_interval='0 8 * * *')t1 = BashOperator(task_id="hello", bash_command="echo 'Hello World, today is {{ ds }}'", dag=dag) 这是一个Python脚本, 主要定义了两个变量.
DAG
表示一个有向无环图,一个任务链, 其id全局唯一. DAG是airflow的核心概念, 任务装载到dag中, 封装成任务依赖链条. DAG决定这些任务的执行规则,比如执行时间.这里设置为从9月1号开始,每天8点执行.
TASK
task表示具体的一个任务,其id在dag内唯一. task有不同的种类,通过各种Operator插件来区分任务类型. 这里是一个BashOperator, 来自airflow自带的插件, airflow自带了很多拆箱即用的插件.
ds
airflow内置的时间变量模板, 在渲染operator的时候,会注入一个当前执行日期的字符串. 后面会专门讲解这个执行日期.
[本文出自Ryan Miao]
将上述hello.py上传到dag目录, airflow会自动检测文件变化, 然后解析py文件,导入dag定义到数据库.
访问airflow地址,刷新即可看到我们的dag.
开启dag, 进入dag定义, 可以看到已经执行了昨天的任务.
点击任务实例, 点击view log可以查看日志
我们的任务在这台机器上执行,并打印了hello, 注意, 这个打印的日期.
这样就是一个基本的airflow任务单元了, 这个任务每天8点会执行.
定义一个任务的具体内容,比如这里就是打印Hello World,today is {{ ds }}.
任务设定了运行时间,每次运行时会生成一个实例,即 dag-task-executiondate 标记一个任务实例.任务实例和任务当前代表的执行时间绑定. 本demo中,每天会生成一个任务实例.
今天是2019-09-07, 但我们日志里打印的任务执行日期是2019-09-06.
执行日期是任务实例运行所代表的任务时间, 我们通常叫做execute-date或bizdate, 类似hive表的的分区.
为什么今天执行的任务,任务的时间变量是昨天呢?
因为任务实例是一个时间段的任务, 比如计算每天的访问量, 我们只有6号这一天过去了才能计算6号这一天的的总量. 那这个任务最早要7号0点之后才能计算, 计算6号0点到7号0点之间的访问量.所以,这个任务时间就代表任务要处理的数据时间, 就是6号. 任务真正执行时间不固定的, 可以7号, 也可以8号, 只要任务执行计算的数据区间是6号就可以了.
因此, 调度系统中的ds(execution date)通常是过去的一个周期, 即本周期执行上周期的任务.
最典型的任务模型etl(Extract & Transformation & Loading,即数据抽取,转换,加载)最少也要分成3步. 对于每天要统计访问量这个目标来说, 我必须要抽取访问日志, 找到访问量的字段, 计算累加. 这3个任务之间有先后顺序,必须前一个执行完毕之后,后一个才可以执行. 这叫任务依赖. 不同的任务之间的依赖.在airflow里, 通过在关联任务实现依赖.
还有同一个任务的时间依赖. 比如,计算新增用户量, 我必须知道前天的数据和昨天的数据, 才能计算出增量. 那么, 这个任务就必须依赖于昨天的任务状态. 在airflow里,通过设置depends_on_past来决定.
airflow里有个功能叫backfill, 可以执行过去时间的任务. 我们把这个操作叫做补录或者补数,为了计算以前没计算的数据.
我们的任务是按时间执行的, 今天创建了一个任务, 计算每天的用户量, 那么明天会跑出今天的数据. 这时候,我想知道过去1个月每天的用户增量怎么办?
自己写code, 只要查询日期范围的数据,然后分别计算就好. 但调度任务是固定的, 根据日期去执行的. 我们只能创建不同日期的任务实例去执行这些任务. backfill就是实现这种功能的.
让跑过的任务再跑一次.
有时候, 我们的任务需要重跑. 比如, etl任务, 今天突然发现昨天抽取的数据任务有问题,少抽取一个app的数据, 那后面的计算用户量就不准确, 我们就需要重新抽取,重新计算.
在airflow里, 通过点击任务实例的clear按钮, 删除这个任务实例, 然后调度系统会再次创建并执行这个实例.
关于调度系统这个实现逻辑, 我们后面有机会来查看源码了解.
本文没太实质性的任务具体介绍, 而是引出Hello World, 先跑起来,我们接下来继续完善我们的dag.
转载地址:http://kgfbz.baihongyu.com/