初识 airflow
其实早在去年就已经接触到 airflow 了,当时的需求是按小时拼接两份线上日志,而拼接操作必须依赖于两份日志的完整性。从这个需求出发,了解到了 airflow ,但迫于官方文档的晦涩难懂,再加上这个任务对 airflow 的需求不够强,就弃坑了;最后使用 python 脚本中一个 while True 死循环 + time.sleep() 来解决问题(如果所依赖的日志没有完全写入,则 sleep 一段时间)。
正式入坑
前几天 boss 布置了一个广告平台的统计任务,依赖倒也不复杂,但是考虑到未来平台的扩展性,还是需要一个能够很好的处理统计任务的工具的。自己也调研比较了一下ETL(数据仓库)
这方面的工具,确实数 airflow 功能最为强大,既然这样那就入坑吧。
airflow 安装配置过程
这里都是按照官网操作,具体如下:
1、设置 airflow 项目路径,默认是用户主目录下 airflow ,也可以显示用环境变量 AIRFLOW_HOME
指定;
|
|
2、airflow 安装,因为 airflow 是基于 python 编写,因此可以直接利用 pip 安装,这里要注意的是 airflow 还提供了使用其他软件的插件,例如 mysql、hdfs、hive 等工具,当然这些也可以在后续需要时再安装。
|
|
3、初始化 airflow 相关数据库
|
|
4、启动 webserver 服务
|
|
至此,一个最基础的 airflow 服务就启动了,下面说一说 airflow 的配置。
airflow 基础配置
airflow 的配置文件保存在项目目录下,命名为 airflow.cfg ,通过文件内的注释也可以知道每个配置项的大致含义,这里简单说下常用的配置项。
更改数据库
airflow 为了能够让用户快速使用选择了 sqllite 数据库,但在实际使用中,往往 mysql 更为广泛使用,若要使 airflow 使用 mysql ,首先执行
|
|
更改 airflow.cfg 中 sql_alchemy_conn
的值,通过变量名可知配置使用的是 sql_alchemy 的数据库连接字符串,具体格式为:mysql+pymysql://username:password@host:port/database
,当然这里 pymysql 可以根据自己喜好更换。
修改完配置文件后,需要执行 airflow resetdb
并重新启动 web 服务(airflow webserver
),数据库更换则生效。
为web服务开启用户身份验证功能
airflow 默认是无需登录即可访问使用的,而在实际生产环境中为了提高系统的安全性,需要开启用户身份验证功能,具体步骤如下:
|
|
这样就开启了用户身份验证的功能,但系统还没有默认用户,需要自行手动添加,有两种方式,一种是直接插入默认用户信息到数据库对应的 users 表中(不推荐);另一种则是通过 airflow 提供的功能来添加,具体操作如下:
打开ipython,执行以下代码:
|
|
重启 web 服务( airflow webserver
),刷新网址浏览器会自动跳转到用户登录页面。
配置 airflow + Celery
airflow 提供了三种 Executor ,分别是 SequentialExecutor
、LocalExecutor
以及 CeleryExecutor
;airflow 配置文件中,默认的执行方式是(SequentialExecutor
),这三者的特点如下:
SequentialExecutor
: 单进程顺序执行,通常只用于测试LocalExecutor
: 多进程本地执行,使用python的多进程库达到多进程执行目的CeleryExecutor
: 使用Celery 作为执行器,配置 Celery 后可以利用集群分布式执行任务
在这里我选用了Celery,同样是为了 airflow 未来的可扩展性。
官方文档中提到使用 Celery 时,后端服务可以使用 RabbitMQ 或 Redis ,相对来说自己对 redis 熟悉一些,但据我所知 Redis 相对来说比较吃内存,因此在这里选用了 RabbitMQ 。具体配置如下:
安装 RabbitMQ
|
|
RabbitMQ 添加用户并设置密码
|
|
更改配置文件
|
|
至此 airflow + Celery 配置完成,接下来说说如何启动服务
启动 Airflow + Celery 服务
一共有四个服务,分别是:
启动 airflow webserver
服务
|
|
启动airflow flower
airflow flower
是一个监控 Celery 分布式队列的 web 服务,通过它可以看到 airflow dag 中task 的执行状况。
|
|
启动 airflow worker
Celery 的 worker,用于执行 dag 中具体的 task
|
|
启动 airflow scheduler
airflow scheduler
用于启动调度器,完成对 dag 中 task 的调度。
|
|
airflow 简单示例
环境已经搭好,现在写个 dag 来测试一下 airflow 吧。在具体的代码展现之前,先介绍一下上文已经出现多次的 dag 这个概念,dag 对于熟悉图论的同学一定不陌生,也即 (Directed Acyclic Graph, DAG)
,为什么叫有向无环图呢,因为任务之间的依赖关系就是一个 DAG ,如下图:
。
airflow 会默认读取 airflow 目录下 dags 中的所有 python 文件作为 dag ,因此我们新建一个 dags 目录,并将下面的代码写入 dags/test.py
中,为了能够方便看到 dag 测试成功与否,在 test.py 里面设置了每次向 test 目录下的文件写入新内容。
|
|
以上代码保存后,执行 python test.py
即将该 dag 导入 airflow 中,之后可以选用 airflow test
或 airflow backfill
测试,这里就不再讲述具体用法。
启动 dag 时既可以使用 airflow run dag_id task_id execution_date
,也可以使用 web 界面的执行按钮执行。但需要注意的是,在启动之前需要 airflow unpause dag_id
或在 web 界面开启 dag 对应的按钮。
接下来就可以看到执行状态以及结果了。另外下面提供了另一份示例代码,注意观察这份代码中定义任务间依赖关系的语句:),个人认为这样的语句能够更直观表明任务之间的关系。
|
|
如果执行 dag 后,发现 dag 中部分 task 一直处于 queue 的状态,可以查看 airflow scheduler
的执行状态,如果是挂掉了,且重启后又挂掉可以参考下面的解决方法。
改源码解决 airflow scheduler
中断 bug
我在使用 airflow 的过程中,产生了 airflow scheduler
启动后执行片刻即被断开连接的情况,通过谷歌搜索,找到了以下修改 airflow 源码的解决方法,至于这个中断bug的具体原因尚不清楚。
|
|
airflow 的介绍与使用就先写到这里,如果读者有与我类似或相同的问题,欢迎评论区探讨。
参考文章
- Airflow 官方文档
- Airflow 简明指南
- Airflow Useage
- Exception in airflow scheduler: Connection reset by peer