初识 airflow

其实早在去年就已经接触到 airflow 了,当时的需求是按小时拼接两份线上日志,而拼接操作必须依赖于两份日志的完整性。从这个需求出发,了解到了 airflow ,但迫于官方文档的晦涩难懂,再加上这个任务对 airflow 的需求不够强,就弃坑了;最后使用 python 脚本中一个 while True 死循环 + time.sleep() 来解决问题(如果所依赖的日志没有完全写入,则 sleep 一段时间)。

正式入坑

前几天 boss 布置了一个广告平台的统计任务,依赖倒也不复杂,但是考虑到未来平台的扩展性,还是需要一个能够很好的处理统计任务的工具的。自己也调研比较了一下ETL(数据仓库)这方面的工具,确实数 airflow 功能最为强大,既然这样那就入坑吧。

airflow 安装配置过程

这里都是按照官网操作,具体如下:

1、设置 airflow 项目路径,默认是用户主目录下 airflow ,也可以显示用环境变量 AIRFLOW_HOME 指定;

1
export AIRFLOW_HOME=~/airflow

2、airflow 安装,因为 airflow 是基于 python 编写,因此可以直接利用 pip 安装,这里要注意的是 airflow 还提供了使用其他软件的插件,例如 mysql、hdfs、hive 等工具,当然这些也可以在后续需要时再安装。

1
2
3
4
pip3 install airflow
# 安装 [airflow](https://github.com/apache/incubator-airflow) hive 插件
pip3 install "airflow[hive]"

3、初始化 airflow 相关数据库

1
airflow initdb

4、启动 webserver 服务

1
airflow webserver

至此,一个最基础的 airflow 服务就启动了,下面说一说 airflow 的配置。

airflow 基础配置

airflow 的配置文件保存在项目目录下,命名为 airflow.cfg ,通过文件内的注释也可以知道每个配置项的大致含义,这里简单说下常用的配置项。

更改数据库

airflow 为了能够让用户快速使用选择了 sqllite 数据库,但在实际使用中,往往 mysql 更为广泛使用,若要使 airflow 使用 mysql ,首先执行

1
pip3 install "airflow[mysql]"

更改 airflow.cfg 中 sql_alchemy_conn 的值,通过变量名可知配置使用的是 sql_alchemy 的数据库连接字符串,具体格式为:mysql+pymysql://username:password@host:port/database,当然这里 pymysql 可以根据自己喜好更换。

修改完配置文件后,需要执行 airflow resetdb 并重新启动 web 服务(airflow webserver),数据库更换则生效。

为web服务开启用户身份验证功能

airflow 默认是无需登录即可访问使用的,而在实际生产环境中为了提高系统的安全性,需要开启用户身份验证功能,具体步骤如下:

1
2
3
4
5
6
[webserver]
# 开启用户身份验证
authenticate = True
# 根据用户显示dag列表(依赖于用户身份验证的开启)
filter_by_owner = False

这样就开启了用户身份验证的功能,但系统还没有默认用户,需要自行手动添加,有两种方式,一种是直接插入默认用户信息到数据库对应的 users 表中(不推荐);另一种则是通过 airflow 提供的功能来添加,具体操作如下:

打开ipython,执行以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'new_user_name'
>>> user.email = 'new_user_email@example.com'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

重启 web 服务( airflow webserver ),刷新网址浏览器会自动跳转到用户登录页面。

配置 airflow + Celery

airflow 提供了三种 Executor ,分别是 SequentialExecutorLocalExecutor 以及 CeleryExecutorairflow 配置文件中,默认的执行方式是(SequentialExecutor),这三者的特点如下:

  • SequentialExecutor: 单进程顺序执行,通常只用于测试
  • LocalExecutor: 多进程本地执行,使用python的多进程库达到多进程执行目的
  • CeleryExecutor: 使用Celery 作为执行器,配置 Celery 后可以利用集群分布式执行任务

在这里我选用了Celery,同样是为了 airflow 未来的可扩展性。

官方文档中提到使用 Celery 时,后端服务可以使用 RabbitMQRedis ,相对来说自己对 redis 熟悉一些,但据我所知 Redis 相对来说比较吃内存,因此在这里选用了 RabbitMQ 。具体配置如下:

安装 RabbitMQ

1
2
# Ubuntu 17.04
sudo apt install erlang rabbitmq-server

RabbitMQ 添加用户并设置密码

1
2
3
4
5
6
# Ubuntu 17.04
sudo rabbitmqctl add_user airflow airflow
sudo rabbitmqctl add_vhost airflow
sudo rabbitmqctl set_user_tags airflow airflow
sudo rabbitmqctl set_permissions -p airflow airflow ".*" ".*" ".*"
sudo rabbitmq-plugins enable rabbitmq_management

更改配置文件

1
2
3
# transport://userid:password@hostname:port/virtual_host
broker_url = amqp://ariflow:airflow@localhost:5672/airflow
celery_result_backend = amqp://airflow:airflow@localhost:5672/airflow

至此 airflow + Celery 配置完成,接下来说说如何启动服务

启动 Airflow + Celery 服务

一共有四个服务,分别是:

  • airflow webserver 服务
  • airflow flower Celery 管理界面
  • airflow worker Celery Worker
  • airflow scheduler 调度器

启动 airflow webserver 服务

1
airflow webserver

启动airflow flower

airflow flower 是一个监控 Celery 分布式队列的 web 服务,通过它可以看到 airflow dag 中task 的执行状况。

1
airflow flower

启动 airflow worker

Celery 的 worker,用于执行 dag 中具体的 task

1
airflow worker

启动 airflow scheduler

airflow scheduler 用于启动调度器,完成对 dag 中 task 的调度。

1
airflow scheduler

airflow 简单示例

环境已经搭好,现在写个 dag 来测试一下 airflow 吧。在具体的代码展现之前,先介绍一下上文已经出现多次的 dag 这个概念,dag 对于熟悉图论的同学一定不陌生,也即 (Directed Acyclic Graph, DAG) ,为什么叫有向无环图呢,因为任务之间的依赖关系就是一个 DAG ,如下图:
DAG

airflow 会默认读取 airflow 目录下 dags 中的所有 python 文件作为 dag ,因此我们新建一个 dags 目录,并将下面的代码写入 dags/test.py 中,为了能够方便看到 dag 测试成功与否,在 test.py 里面设置了每次向 test 目录下的文件写入新内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from builtins import range
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from pprint import pprint
args = {
'owner': 'yinwoods',
'start_date': airflow.utils.dates.datetime(2017, 7, 27, 0, 0, 0)
}
dag = DAG(
dag_id='yinwoods', default_args=args,
schedule_interval='0 1 * * *')
def my_write_function(random_base):
"""This is a function that will run within the DAG execution"""
with open('~/airflow/test/' + random_base, 'a') as f:
f.write(random_base[-1] + '\n')
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
op_kwargs={'test': 'test'},
dag=dag)
# Generate 10 sleeping tasks, sleeping from 0 to 9 seconds respectively
for i in range(10):
task = PythonOperator(
task_id='print_for_' + str(i),
python_callable=my_write_function,
op_kwargs={'random_base': 'file_' + str(i)},
dag=dag)
task.set_upstream(run_this)

以上代码保存后,执行 python test.py 即将该 dag 导入 airflow 中,之后可以选用 airflow testairflow backfill 测试,这里就不再讲述具体用法。

启动 dag 时既可以使用 airflow run dag_id task_id execution_date ,也可以使用 web 界面的执行按钮执行。但需要注意的是,在启动之前需要 airflow unpause dag_id 或在 web 界面开启 dag 对应的按钮。

接下来就可以看到执行状态以及结果了。另外下面提供了另一份示例代码,注意观察这份代码中定义任务间依赖关系的语句:),个人认为这样的语句能够更直观表明任务之间的关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from datetime import datetime
from [airflow](https://github.com/apache/incubator-airflow) import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
def print_hello():
return 'Hello world!'
dag = DAG('hello_world', description='Simple tutorial DAG',
schedule_interval='0 12 * * *',
start_date=datetime(2017, 3, 20),
catchup=False)
dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)
hello_operator = PythonOperator(task_id='hello_task',
python_callable=print_hello, dag=dag)
dummy_operator >> hello_operator

如果执行 dag 后,发现 dag 中部分 task 一直处于 queue 的状态,可以查看 airflow scheduler 的执行状态,如果是挂掉了,且重启后又挂掉可以参考下面的解决方法。

改源码解决 airflow scheduler 中断 bug

我在使用 airflow 的过程中,产生了 airflow scheduler 启动后执行片刻即被断开连接的情况,通过谷歌搜索,找到了以下修改 airflow 源码的解决方法,至于这个中断bug的具体原因尚不清楚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 11:58:55.057991344 +0000
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:57:07.060060262 +0000
@@ -1371,6 +1371,8 @@
last_stat_print_time = datetime(2000, 1, 1)
# Last time that self.heartbeat() was called.
last_self_heartbeat_time = datetime.now()
+ # Last time that self.executor.heartbeat() was called.
+ last_executor_heartbeat_time = datetime.now()
# Last time that the DAG dir was traversed to look for files
last_dag_dir_refresh_time = datetime.now()
@@ -1436,9 +1438,14 @@
self._execute_task_instances(simple_dag_bag,
(State.SCHEDULED,))
- # Call hearbeats
- self.logger.info("Heartbeating the executor")
- self.executor.heartbeat()
+ # Heartbeat the executor periodically
+ time_since_last_heartbeat = (datetime.now() -
+ last_executor_heartbeat_time).total_seconds()
+ if time_since_last_heartbeat > self.heartrate:
+ self.logger.info("Heartbeating the executor")
+ try: self.executor.heartbeat()
+ except ConnectionResetError: pass # RabbitMQ sometimes resets the socket connection
+ last_executor_heartbeat_time = datetime.now()
# Process events from the executor
self._process_executor_events()

airflow 的介绍与使用就先写到这里,如果读者有与我类似或相同的问题,欢迎评论区探讨。

参考文章