运行时的气流动态任务

有关“动态任务”的其他问题似乎解决了时间表或设计时动态构建DAG的问题.我对在执行期间将任务动态添加到DAG感兴趣.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

dag = DAG('test_dag', description='a test',
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 1, 1),
          catchup=False)

def make_tasks():
    du1 = DummyOperator(task_id='dummy1', dag=dag)
    du2 = DummyOperator(task_id='dummy2', dag=dag)
    du3 = DummyOperator(task_id='dummy3', dag=dag)
    du1 >> du2 >> du3

p = PythonOperator(
    task_id='python_operator',
    dag=dag,
    python_callable=make_tasks)

这种幼稚的实现似乎不起作用-虚拟任务永远不会出现在UI中.

在执行期间向DAG添加新运算符的正确方法是什么?可能吗?

解决方法:

无法在DAG执行期间修改DAG(无需进行更多工作).

dag = DAG(…)由调度程序在循环中拾取.它将有任务实例“ python_operator”.该任务实例在dag运行中进行调度,并由工作程序或执行程序执行.在Airflow DB中,仅由调度程序更新这些添加的虚拟任务不会持久化到DAG或调度运行.当工作人员退出时,它们将被忘记.除非您从调度程序中复制有关持久化和更新程序的所有代码.模型…,但是在下次调度程序访问DAG文件进行解析时将撤消该操作,该操作可能每分钟一次,每秒一次或更快,这取决于要解析的其他DAG文件的数量.

气流实际上希望每个DAG在运行之间大致保持相同的布局.它还希望不断重新加载/解析DAG文件.因此,尽管您可以制作一个DAG文件,该文件在每次运行时根据某些外部数据动态地确定任务(最好是缓存在文件或pyc模块中,而不是像DB查找那样缓存在网络I / O中),但您会减慢整个调度循环的速度对于所有DAG),这不是一个好计划,因为您的图形和树形视图将使所有混乱,并且该查找将使调度程序解析更加繁重.

您可以使可调用对象运行每个任务…

def make_tasks(context):
    du1 = DummyOperator(task_id='dummy1', dag=dag)
    du2 = DummyOperator(task_id='dummy2', dag=dag)
    du3 = DummyOperator(task_id='dummy3', dag=dag)
    du1.execute(context)
    du2.execute(context)
    du3.execute(context)

p = PythonOperator(
    provides_context=true,

但这是顺序的,您必须弄清楚如何使用python使它们并行(使用Future?),如果有异常,则整个任务将失败.而且它被绑定到一个执行者或工人,因此不使用气流的任务分配(kubernetes,mesos,芹菜).

使用此方法的另一种方法是添加固定数量的任务(最大数量),并使用可调用对象将不需要的任务短路或为每个任务使用xcom推送参数,从而在运行时更改其行为但不更改DAG.

上一篇:python-Apache Airflow-自定义日志记录格式


下一篇:那些离不开的 Chrome 扩展插件