本文共 2479 字,大约阅读时间需要 8 分钟。
Airflow的第一个DAG
考虑了很久,要不要记录Airflow相关的东西,应该怎么记录。官方文档已经有比较详细的介绍了,还有各种博客,我需要有一份自己的笔记吗?答案就从本文开始了。本文将从一个陌生视角开始认知Airflow,顺带勾勒出应该如何一步步搭建我们的数据调度系统。
现在是9102年9月上旬,Airflow最近的一个版本是1.10.5。
ps. 查资料发现自己好多文章被爬走,换了作者。所以,接下来的内容会随机添加一些防伪标识,忽略即可。
什么数据调度系统?
中台这个概念最近比较火,其中就有一个叫做数据中台,文章给出了一个概念。大概就是:收集各个零散的数据,标准化,然后服务化,提供统一数据服务。而要做到数据整理和处理,必然涉及数据调度,也就需要一个调度系统。本文出自Ryan Miao。
数据调度系统可以将不同的异构数据互相同步,可以按照规划去执行数据处理和任务调度。Airflow就是这样的一个任务调度平台。
前面已经安装好了我们的Airflow,可以直接使用了。这是第一个DAG任务链。
创建一个任务Hello World
目标:每天早上8点执行一个任务,打印“Hello World”。
在Linux上,我们可以在crontab插入一条记录:
使用Springboot,我们可以使用@Scheduled(cron="0 0 8 * * ?")来定时执行一个method。
使用quartz,我们可以创建一个CronTrigger,然后去执行对应的JobDetail。
使用Airflow,也差不多类似。
在docker-airflow中,我们将dag挂载成磁盘,现在只需要在dag目录下编写dag即可。
创建一个hello.py
这是一个Python脚本,主要定义了两个变量。
DAG
表示一个有向无环图,一个任务链,id全局唯一。DAG是Airflow的核心概念,任务装载到dag中,封装成任务依赖链条。DAG决定这些任务的执行规则,比如执行时间。这里设置为从9月1号开始,每天8点执行。
TASK
表示具体的一个任务,其id在dag内唯一。task有不同的种类,通过各种Operator插件来区分任务类型。这里是一个BashOperator,来自Airflow自带的插件。
ds
Airflow内置的时间变量模板,在渲染operator的时候,会注入一个当前执行日期的字符串。后面会专门讲解这个执行日期。
[本文出自Ryan Miao]
部署dag
将上述hello.py上传到dag目录,Airflow会自动检测文件变化,然后解析py文件,导入dag定义到数据库。
访问Airflow地址,刷新即可看到我们的dag。
开启dag,进入dag定义,可以看到已经执行了昨天的任务。
点击任务实例,点击view log可以查看日志。
我们的任务在这台机器上执行,并打印了hello,注意,这个打印的日期。
这样就是一个基本的Airflow任务单元了,这个任务每天8点会执行。
理解调度系统的概念
任务定义
定义一个任务的具体内容,比如这里就是打印“Hello World, today is {{ ds }}”。
任务实例
任务设定了运行时间,每次运行时会生成一个实例,任务实例和任务当前代表的执行时间绑定。本demo中,每天会生成一个任务实例。
执行日期
今天是2019-09-07,但我们日志里打印的任务执行日期是2019-09-06。
执行日期是任务实例运行所代表的任务时间,我们通常叫做execute-date或bizdate,类似hive表的的分区。
为什么今天执行的任务,任务的时间变量是昨天呢?
因为任务实例是一个时间段的任务,比如计算每天的访问量,我们只有6号这一天过去了才能计算6号这一天的总量。那么这个任务最早要7号0点之后才能计算,计算6号0点到7号0点之间的访问量。所以,任务真正执行时间不固定的,可以7号,也可以8号,只要任务执行计算的数据区间是6号就可以了。
因此,调度系统中的ds(execution date)通常是过去的一个周期,即本周期执行上周期的任务。
任务依赖
最典型的任务模型etl(Extract & Transformation & Loading,即数据抽取,转换,加载)最少也要分成3步。对于每天要统计访问量这个目标来说,我必须要抽取访问日志,找到访问量的字段,计算累加。这3个任务之间有先后顺序,必须前一个执行完毕之后,后一个才可以执行。这叫任务依赖。不同的任务之间的依赖。在Airflow里,通过在关联任务实现依赖。
还有同一个任务的时间依赖。比如,计算新增用户量,我必须知道前天的数据和昨天的数据,才能计算出增量。那么,这个任务就必须依赖于昨天的任务状态。在Airflow里,通过设置depends_on_past来决定。
任务补录backfill
Airflow里有个功能叫backfill,可以执行过去时间的任务。我们把这个操作叫做补录或者补数,为了计算以前没计算的数据。
我们的任务是按时间执行的,今天创建了一个任务,计算每天的用户量,那么明天会跑出今天的数据。这时候,我想知道过去1个月每天的用户增量怎么办?
自己写code,只要查询日期范围的数据,然后分别计算就好。但调度任务是固定的,根据日期去执行的。我们只能创建不同日期的任务实例去执行这些任务。backfill就是实现这种功能的。
任务重跑
让跑过的任务再跑一次。
有时候,我们的任务需要重跑。比如,etl任务,今天突然发现昨天抽取的数据任务有问题,少抽取一个app的数据,那后面的计算用户量就不准确,我们就需要重新抽取,重新计算。
在Airflow里,通过点击任务实例的clear按钮,删除这个任务实例,然后调度系统会再次创建并执行这个实例。
关于调度系统这个实现逻辑,我们后面有机会来查看源码了解。
后记
本文没太实质性的任务具体介绍,而是引出Hello World,先跑起来,我们接下来继续完善我们的dag。
转载地址:http://kgfbz.baihongyu.com/