python – Airflow:将动态值传递给Sub DAG运算符

我是Airflow的新手.
我遇到过一个场景,其中Parent DAG需要将一些动态数字(比方说n)传递给Sub DAG.
SubDAG将使用此数字动态创建n个并行任务.

气流文档未涵盖实现此目的的方法.所以我探索了几种方法:

选项-1(使用xcom Pull)

我试图传递为xcom值,但由于某种原因,SubDAG没有解析为传递的值.

父Dag文件

def load_dag(**kwargs):
    number_of_runs = json.dumps(kwargs['dag_run'].conf['number_of_runs'])
    dag_data = json.dumps({
        "number_of_runs": number_of_runs
    })
    return dag_data

# ------------------ Tasks ------------------------------
load_config = PythonOperator(
    task_id='load_config',
    provide_context=True,
    python_callable=load_dag,
    dag=dag)


t1 = SubDagOperator(
    task_id=CHILD_DAG_NAME,
    subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config') }}'" ),
    default_args=default_args,
    dag=dag,
)

Sub Dag文件

def sub_dag(parent_dag_name, child_dag_name, args, num_of_runs):
    dag_subdag = DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        default_args=args,
        schedule_interval=None)

    variabe_names = {}

    for i in range(num_of_runs):
        variabe_names['task' + str(i + 1)] =  DummyOperator(
        task_id='dummy_task',
        dag=dag_subdag,
    )

    return dag_subdag

选项 – 2

我也尝试将number_of_runs作为全局变量传递,但是没有用.

选项 – 3

我们还尝试将此值写入数据文件.但是子DAG正在抛出File不存在的错误.这可能是因为我们正在动态生成此文件.

有人可以帮我弄这个吗.

解决方法:

我已经使用选项3完成了它.关键是如果文件不存在则返回没有任务的有效dag.因此,如果需要,load_config将生成包含您的任务数量或更多信息的文件.您的子工厂看起来像:

def subdag(...):
    sdag = DAG('%s.%s' % (parent, child), default_args=args, schedule_interval=timedelta(hours=1))
    file_path = "/path/to/generated/file"
    if os.path.exists(file_path):
        data_file = open(file_path)
        list_tasks = data_file.readlines()
        for task in list_tasks:
            DummyOperator(
                  task_id='task_'+task,
                  default_args=args,
                  dag=sdag,
            )
    return sdag

在dag生成中,您将看到一个没有任务的子标记.在执行dag时,在load_config完成后,您可以看到动态生成的子标记

上一篇:如何在无需重启气流Web服务器的情况下更新气流中的python函数


下一篇:centos7离线安装airflow1.10.10(真正离线)