五分钟快速了解Airflow工作流

简介

Airflow是一个以编程方式创作、调度和监控工作流的平台。

使用 Airflow 将工作流创作为有向无环图(DAG)任务。 Airflow 调度程序按照你指定的依赖项在一组workers上执行您的任务。同时,Airflow拥有丰富的命令行实用程序使得在DAG上进行复杂的诊断变得轻而易举。并且提供了丰富的用户界面使可视化生产中运行的工作流、监控进度和需要排查问题时变得非常容易。

当工作流被定义为代码时,它们变得更易于维护、可版本化、可测试和协作。

五分钟快速了解Airflow工作流

主要特点

  • 动态的:Airflow工作流是使用代码(Python)的形式进行配置,允许动态工作流(DAG)生成。并允许编写动态实例化工作流的代码。
  • 可扩展的:轻松定义您自己的operators、executors并扩展库,使其符合满足您的环境的抽象级别。
  • 优雅的:设计简洁优雅。使用强大的 Jinja 模板引擎将参数化脚本内置到 Airflow 的核心中。
  • 可伸缩的:Airflow 具有模块化架构,并使用消息队列来编排任意数量的workers。Airflow为无限扩展而生。

架构

五分钟快速了解Airflow工作流

Airflow 通常由以下组件组成:

  • 一个调度器:它处理触发工作流调度,并将任务提交给执行器运行。
  • 一个执行器:它处理正在运行的任务。在默认的 Airflow 安装中,它运行调度器内的所有内容,但大多数适合生产的执行器实际上将任务执行推送给workers。
  • 一个WEB服务器:它提供了一个方便的用户界面来检查、触发和调试 DAG 和任务的运行情况。
  • 一个包含DAG文件的文件夹:由调度器和执行器(以及执行程序拥有的任何workers)读取
  • 一个元数据数据库:供调度器、执行器和WEB服务器用来存储状态。

Airflow安装及初始化

安装Airflow

# AirFlow 需要一个HOME目录,默认是~/airflow目录,你也可以设置到其他地方
export AIRFLOW_HOME=~/airflow

# 安装依赖库
AIRFLOW_VERSION=2.1.2
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# For example: 3.6
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

# For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.1.2/constraints-3.6.txt
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

初始化数据库,并创建用户

# 初始化数据库
airflow db init

# 创建用户和密码
airflow users create \
 --username admin \
 --firstname Peter \
 --lastname Parker \
 --role Admin \
 --email spiderman@superhero.org

启动WEB服务及调度器

# 启动 web 服务,默认端口是 8080
airflow webserver --port 8080

# 启动调度器
airflow scheduler

# 在浏览器中浏览 10.247.128.69:8080,并在 home 页开启 example dag

运行官网Demo

# 运行第一个任务实例
# run your first task instance
airflow tasks run example_bash_operator runme_0 2015-01-01

# 运行两天的任务回填
# run a backfill over 2 days
airflow dags backfill example_bash_operator \
 --start-date 2015-01-01 \
 --end-date 2015-01-02

示例

定义工作流

~/airflow/dags/tutorial.py

from datetime import timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

# 定义默认参数
# 这些参数会传递给每个operator
# 您可以在operator初始化期间基于每个任务重写它们
default_args = {
 'owner': 'airflow',
 'depends_on_past': False,
 'email': ['airflow@example.com'],
 'email_on_failure': False,
 'email_on_retry': False,
 'retries': 1,
 'retry_delay': timedelta(minutes=5),
 # 'queue': 'bash_queue',
 # 'pool': 'backfill',
 # 'priority_weight': 10,
 # 'end_date': datetime(2016, 1, 1),
 # 'wait_for_downstream': False,
 # 'dag': dag,
 # 'sla': timedelta(hours=2),
 # 'execution_timeout': timedelta(seconds=300),
 # 'on_failure_callback': some_function,
 # 'on_success_callback': some_other_function,
 # 'on_retry_callback': another_function,
 # 'sla_miss_callback': yet_another_function,
 # 'trigger_rule': 'all_success'
}


# 实例化一个DAG
# 我们需要一个 DAG 对象来嵌套我们的任务。 
# 在这里,我们传递一个定义 dag_id 的字符串,该字符串用作 DAG 的唯一标识符。 
# 我们还传递了刚刚定义的默认参数字典,
# 并为 DAG 定义了调度间隔时间(schedule_interval )为1天 。
with DAG(
 'tutorial',
 default_args=default_args,
 description='A simple tutorial DAG',
 schedule_interval=timedelta(days=1),
 start_date=days_ago(2),
 tags=['example'],
) as dag:
 # 实例化Operator对象时会生成任务。 
 # 从Operator实例化的对象称为任务。 第一个参数task_id充当任务的唯一标识符。
 # t1, t2 and t3 are examples of tasks created by instantiating operators
 t1 = BashOperator(
     task_id='print_date',
     bash_command='date',
 )

 t2 = BashOperator(
     task_id='sleep',
     depends_on_past=False,
     bash_command='sleep 5',
     retries=3,
 )

 # 添加工作流和任务的问题

 t1.doc_md = dedent(
     """\
 #### Task Documentation
 You can document your task using the attributes `doc_md` (markdown),
 `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
 rendered in the UI's Task Instance Details page.
 ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)

 """
 )

 dag.doc_md = __doc__  # 前提是您在DAG开始时有一个文档字符串
 dag.doc_md = """
 This is a documentation placed anywhere
 """     # 否则, type it like this

 # Jinja模板
 templated_command = dedent(
     """
 {% for i in range(5) %}
     echo "{{ ds }}"
     echo "{{ macros.ds_add(ds, 7)}}"
     echo "{{ params.my_param }}"
 {% endfor %}
 """
 )
 t3 = BashOperator(
     task_id='templated',
     depends_on_past=False,
     bash_command=templated_command,
     params={'my_param': 'Parameter I passed in'},
 )
 
 # 设置任务依赖
 t1 >> [t2, t3]

注意: 在执行您的脚本时,Airflow如果在您的DAG中发现循环或多次引用依赖项时,抛出异常。

测试

运行脚本

首先,让我们确保成功解析工作流。

 python ~/airflow/dags/tutorial.py

如果脚本没有抛出异常,则意味着您没有做任何极严重的错误,并且您的 Airflow 环境看起来是完好的。

命令行元数据验证

# initialize the database tables
airflow db init

# print the list of active DAGs
airflow dags list

# prints the list of tasks in the "tutorial" DAG
airflow tasks list tutorial

# prints the hierarchy of tasks in the "tutorial" DAG
airflow tasks list tutorial --tree

测试任务及Jinja模板任务

让我们通过运行特定日期的实际任务实例来进行测试。在上下文中,通过被称为execution_date的字段指定日期。这是逻辑日期,它模拟调度程序在特定日期和时间运行您的任务或 dag,使它现在(或在满足其依赖关系时)实际运行。

# command layout: command subcommand dag_id task_id date

# testing print_date
airflow tasks test tutorial print_date 2015-06-01

# testing sleep
airflow tasks test tutorial sleep 2015-06-01
# testing templated
# 显示详细的事件日志,最终您的bash命令行运行并打印结果
airflow tasks test tutorial templated 2015-06-01

注意:airflow任务测试命令在本地运行任务实例,将它们的日志输出到标准输出(在屏幕上),不影响依赖关系,并且不会将状态(运行、成功、失败……)传递给数据库。 它只是进行测试单个任务实例。

这同样适用于在 DAG 级别上airflow dags test [dag_id] [execution_date]。 它对给定的 DAG id 执行一次 DAG 运行。 虽然它确实考虑了任务依赖性,但没有在数据库中注册状态。 考虑到这一点,在本地测试 DAG 的完整运行很方便。 如果您的DAG中一项任务需要某个位置的数据,则该数据是可获得的。

回填

回填将按照您的依赖关系,将日志发送到文件中并与数据库交互以记录状态。

如果您有一个web服务,您将能够跟踪进度。如果您有兴趣在回填过程中直观地跟踪进度,airflow webserver将启动一个web服务。

# 可选的, 在后台以Debug模式启动一个WEB服务
# airflow webserver --debug &

# start your backfill on a date range
airflow dags backfill tutorial \
 --start-date 2015-06-01 \
 --end-date 2015-06-07

注意

如果您使用depends_on_past=True,则单个任务实例将取决于其前一个任务实例(即根据 execution_date 的前一个)的成功。具有 execution_date==start_date 的任务实例将忽略此依赖性,因为不会为它们创建过去的任务实例。

在使用depends_on_past=True 时,您可能还需要考虑wait_for_downstream=True。虽然depends_on_past=True 导致任务实例依赖于其前一个任务实例的成功,但wait_for_downstream=True 将导致任务实例也等待前一个任务实例下游的所有任务实例成功。

总结

Apache Airflow 允许一个工作流的task在多台worker上同时执行;并以有向无环图的方式构建任务依赖关系;同时,工作流上每个task都是原子可重试的,一个工作流某个环节的task失败可自动或手动进行重试,不必从头开始任务。

总而言之,Apache Airflow 既是最受欢迎的工作流工具,也是功能最广泛的工作流工具。

上一篇:LOJ #3165. 「CEOI2019」游乐园


下一篇:Ethereum White Paper