简介
Airflow是一个以编程方式创作、调度和监控工作流的平台。
使用 Airflow 将工作流创作为有向无环图(DAG)任务。 Airflow 调度程序按照你指定的依赖项在一组workers上执行您的任务。同时,Airflow拥有丰富的命令行实用程序使得在DAG上进行复杂的诊断变得轻而易举。并且提供了丰富的用户界面使可视化生产中运行的工作流、监控进度和需要排查问题时变得非常容易。
当工作流被定义为代码时,它们变得更易于维护、可版本化、可测试和协作。
主要特点
- 动态的:Airflow工作流是使用代码(Python)的形式进行配置,允许动态工作流(DAG)生成。并允许编写动态实例化工作流的代码。
- 可扩展的:轻松定义您自己的operators、executors并扩展库,使其符合满足您的环境的抽象级别。
- 优雅的:设计简洁优雅。使用强大的 Jinja 模板引擎将参数化脚本内置到 Airflow 的核心中。
- 可伸缩的:Airflow 具有模块化架构,并使用消息队列来编排任意数量的workers。Airflow为无限扩展而生。
架构
Airflow 通常由以下组件组成:
- 一个调度器:它处理触发工作流调度,并将任务提交给执行器运行。
- 一个执行器:它处理正在运行的任务。在默认的 Airflow 安装中,它运行调度器内的所有内容,但大多数适合生产的执行器实际上将任务执行推送给workers。
- 一个WEB服务器:它提供了一个方便的用户界面来检查、触发和调试 DAG 和任务的运行情况。
- 一个包含DAG文件的文件夹:由调度器和执行器(以及执行程序拥有的任何workers)读取
- 一个元数据数据库:供调度器、执行器和WEB服务器用来存储状态。
Airflow安装及初始化
安装Airflow
# AirFlow 需要一个HOME目录,默认是~/airflow目录,你也可以设置到其他地方
export AIRFLOW_HOME=~/airflow
# 安装依赖库
AIRFLOW_VERSION=2.1.2
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# For example: 3.6
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.1.2/constraints-3.6.txt
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
初始化数据库,并创建用户
# 初始化数据库
airflow db init
# 创建用户和密码
airflow users create \
--username admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email spiderman@superhero.org
启动WEB服务及调度器
# 启动 web 服务,默认端口是 8080
airflow webserver --port 8080
# 启动调度器
airflow scheduler
# 在浏览器中浏览 10.247.128.69:8080,并在 home 页开启 example dag
运行官网Demo
# 运行第一个任务实例
# run your first task instance
airflow tasks run example_bash_operator runme_0 2015-01-01
# 运行两天的任务回填
# run a backfill over 2 days
airflow dags backfill example_bash_operator \
--start-date 2015-01-01 \
--end-date 2015-01-02
示例
定义工作流
~/airflow/dags/tutorial.py
from datetime import timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
# 定义默认参数
# 这些参数会传递给每个operator
# 您可以在operator初始化期间基于每个任务重写它们
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
# 实例化一个DAG
# 我们需要一个 DAG 对象来嵌套我们的任务。
# 在这里,我们传递一个定义 dag_id 的字符串,该字符串用作 DAG 的唯一标识符。
# 我们还传递了刚刚定义的默认参数字典,
# 并为 DAG 定义了调度间隔时间(schedule_interval )为1天 。
with DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['example'],
) as dag:
# 实例化Operator对象时会生成任务。
# 从Operator实例化的对象称为任务。 第一个参数task_id充当任务的唯一标识符。
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
# 添加工作流和任务的问题
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
)
dag.doc_md = __doc__ # 前提是您在DAG开始时有一个文档字符串
dag.doc_md = """
This is a documentation placed anywhere
""" # 否则, type it like this
# Jinja模板
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
)
# 设置任务依赖
t1 >> [t2, t3]
注意: 在执行您的脚本时,Airflow如果在您的DAG中发现循环或多次引用依赖项时,抛出异常。
测试
运行脚本
首先,让我们确保成功解析工作流。
python ~/airflow/dags/tutorial.py
如果脚本没有抛出异常,则意味着您没有做任何极严重的错误,并且您的 Airflow 环境看起来是完好的。
命令行元数据验证
# initialize the database tables
airflow db init
# print the list of active DAGs
airflow dags list
# prints the list of tasks in the "tutorial" DAG
airflow tasks list tutorial
# prints the hierarchy of tasks in the "tutorial" DAG
airflow tasks list tutorial --tree
测试任务及Jinja模板任务
让我们通过运行特定日期的实际任务实例来进行测试。在上下文中,通过被称为execution_date的字段指定日期。这是逻辑日期,它模拟调度程序在特定日期和时间运行您的任务或 dag,使它现在(或在满足其依赖关系时)实际运行。
# command layout: command subcommand dag_id task_id date
# testing print_date
airflow tasks test tutorial print_date 2015-06-01
# testing sleep
airflow tasks test tutorial sleep 2015-06-01
# testing templated
# 显示详细的事件日志,最终您的bash命令行运行并打印结果
airflow tasks test tutorial templated 2015-06-01
注意:airflow任务测试命令在本地运行任务实例,将它们的日志输出到标准输出(在屏幕上),不影响依赖关系,并且不会将状态(运行、成功、失败……)传递给数据库。 它只是进行测试单个任务实例。
这同样适用于在 DAG 级别上airflow dags test [dag_id] [execution_date]
。 它对给定的 DAG id 执行一次 DAG 运行。 虽然它确实考虑了任务依赖性,但没有在数据库中注册状态。 考虑到这一点,在本地测试 DAG 的完整运行很方便。 如果您的DAG中一项任务需要某个位置的数据,则该数据是可获得的。
回填
回填将按照您的依赖关系,将日志发送到文件中并与数据库交互以记录状态。
如果您有一个web服务,您将能够跟踪进度。如果您有兴趣在回填过程中直观地跟踪进度,airflow webserver将启动一个web服务。
# 可选的, 在后台以Debug模式启动一个WEB服务
# airflow webserver --debug &
# start your backfill on a date range
airflow dags backfill tutorial \
--start-date 2015-06-01 \
--end-date 2015-06-07
注意:
如果您使用depends_on_past=True,则单个任务实例将取决于其前一个任务实例(即根据 execution_date 的前一个)的成功。具有 execution_date==start_date 的任务实例将忽略此依赖性,因为不会为它们创建过去的任务实例。
在使用depends_on_past=True 时,您可能还需要考虑wait_for_downstream=True。虽然depends_on_past=True 导致任务实例依赖于其前一个任务实例的成功,但wait_for_downstream=True 将导致任务实例也等待前一个任务实例下游的所有任务实例成功。
总结
Apache Airflow 允许一个工作流的task在多台worker上同时执行;并以有向无环图的方式构建任务依赖关系;同时,工作流上每个task都是原子可重试的,一个工作流某个环节的task失败可自动或手动进行重试,不必从头开始任务。
总而言之,Apache Airflow 既是最受欢迎的工作流工具,也是功能最广泛的工作流工具。