BranchPythonOperator之后的气流任务没有失败并正确成功

在我的DAG中,我有一些任务只能在星期六运行.因此,我使用BranchPythonOperator在星期六和DummyTask的任务之间进行分支.之后,我加入了两个分支,并希望运行其他任务.

工作流程如下所示:
BranchPythonOperator之后的气流任务没有失败并正确成功
在这里,我将dummy3的触发规则设置为’one_success’,一切正常.

我遇到的问题是BranchPythonOperator上游的某些内容失败:
BranchPythonOperator之后的气流任务没有失败并正确成功
BranchPythonOperator和分支正确地具有state’upstream_failed’,但是加入分支的任务变为’跳过’,因此整个工作流显示’成功’.

我尝试使用’all_success’作为触发规则,然后如果某些操作失败则整个工作流失败,它会正常工作,但如果没有任何失败,则会跳过dummy3.

我也试过’all_done’作为触发规则,然后如果没有任何失败它可以正常工作,但是如果有什么事情失败,dummy3仍然会被执行.

我的测试代码如下所示:

from datetime import datetime, date
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
from airflow.operators.dummy_operator import DummyOperator

dag = DAG('test_branches',
          description='Test branches',
          catchup=False,
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 8, 1))


def python1():
    raise Exception('Test failure')
    # print 'Test success'


dummy1 = PythonOperator(
    task_id='python1',
    python_callable=python1,
    dag=dag
)


dummy2 = DummyOperator(
    task_id='dummy2',
    dag=dag
)


dummy3 = DummyOperator(
    task_id='dummy3',
    dag=dag,
    trigger_rule='one_success'
)


def is_saturday():
    if date.today().weekday() == 6:
        return 'dummy2'
    else:
        return 'today_is_not_saturday'


branch_on_saturday = BranchPythonOperator(
    task_id='branch_on_saturday',
    python_callable=is_saturday,
    dag=dag)


not_saturday = DummyOperator(
    task_id='today_is_not_saturday',
    dag=dag
)

dummy1 >> branch_on_saturday >> dummy2 >> dummy3
branch_on_saturday >> not_saturday >> dummy3

编辑

我刚想出一个丑陋的解决方法:
BranchPythonOperator之后的气流任务没有失败并正确成功
dummy4代表我实际需要运行的任务,dummy5只是一个虚拟.
dummy3仍然具有触发规则’one_success’.

现在,如果没有上游故障,dummy3和dummy4运行,如果日期不是星期六,则dummy5’运行’,如果星期六是星期六则跳过,这意味着DAG在两种情况下都被标记为成功.
如果上游出现故障,则跳过dummy3和dummy4,并将dummy5标记为“upstream_failed”,并将DAG标记为失败.

这种解决方法使我的DAG按照我的意愿运行,但我仍然更喜欢没有一些hacky解决方法的解决方案.

解决方法:

将dummy3的触发器规则设置为“none_failed”将使其在任何情况下都以预期状态结束.

https://airflow.apache.org/concepts.html#trigger-rules

编辑:当这个问题被提出并回答时,看起来这个’none_failed’触发规则还不存在:它是在2018年11月添加的

https://github.com/apache/airflow/pull/4182

上一篇:python – Airflow BashOperator OSError:[Errno 2]没有这样的文件或目录


下一篇:python – Airflow用户创建