在我的DAG中,我有一些任务只能在星期六运行.因此,我使用BranchPythonOperator在星期六和DummyTask的任务之间进行分支.之后,我加入了两个分支,并希望运行其他任务.
工作流程如下所示:
在这里,我将dummy3的触发规则设置为’one_success’,一切正常.
我遇到的问题是BranchPythonOperator上游的某些内容失败:
BranchPythonOperator和分支正确地具有state’upstream_failed’,但是加入分支的任务变为’跳过’,因此整个工作流显示’成功’.
我尝试使用’all_success’作为触发规则,然后如果某些操作失败则整个工作流失败,它会正常工作,但如果没有任何失败,则会跳过dummy3.
我也试过’all_done’作为触发规则,然后如果没有任何失败它可以正常工作,但是如果有什么事情失败,dummy3仍然会被执行.
我的测试代码如下所示:
from datetime import datetime, date
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
from airflow.operators.dummy_operator import DummyOperator
dag = DAG('test_branches',
description='Test branches',
catchup=False,
schedule_interval='0 0 * * *',
start_date=datetime(2018, 8, 1))
def python1():
raise Exception('Test failure')
# print 'Test success'
dummy1 = PythonOperator(
task_id='python1',
python_callable=python1,
dag=dag
)
dummy2 = DummyOperator(
task_id='dummy2',
dag=dag
)
dummy3 = DummyOperator(
task_id='dummy3',
dag=dag,
trigger_rule='one_success'
)
def is_saturday():
if date.today().weekday() == 6:
return 'dummy2'
else:
return 'today_is_not_saturday'
branch_on_saturday = BranchPythonOperator(
task_id='branch_on_saturday',
python_callable=is_saturday,
dag=dag)
not_saturday = DummyOperator(
task_id='today_is_not_saturday',
dag=dag
)
dummy1 >> branch_on_saturday >> dummy2 >> dummy3
branch_on_saturday >> not_saturday >> dummy3
编辑
我刚想出一个丑陋的解决方法:
dummy4代表我实际需要运行的任务,dummy5只是一个虚拟.
dummy3仍然具有触发规则’one_success’.
现在,如果没有上游故障,dummy3和dummy4运行,如果日期不是星期六,则dummy5’运行’,如果星期六是星期六则跳过,这意味着DAG在两种情况下都被标记为成功.
如果上游出现故障,则跳过dummy3和dummy4,并将dummy5标记为“upstream_failed”,并将DAG标记为失败.
这种解决方法使我的DAG按照我的意愿运行,但我仍然更喜欢没有一些hacky解决方法的解决方案.
解决方法:
将dummy3的触发器规则设置为“none_failed”将使其在任何情况下都以预期状态结束.
见https://airflow.apache.org/concepts.html#trigger-rules
编辑:当这个问题被提出并回答时,看起来这个’none_failed’触发规则还不存在:它是在2018年11月添加的
见https://github.com/apache/airflow/pull/4182