如何在Airflow中将参数传递给PythonOperator

我刚刚开始使用Airflow,有人可以启发我如何将参数传递给PythonOperator,如下所示:

t5_send_notification = PythonOperator(
    task_id='t5_send_notification',
    provide_context=True,
    python_callable=SendEmail,
    op_kwargs=None,
    #op_kwargs=(key1='value1', key2='value2'),
    dag=dag,
)

def SendEmail(**kwargs):
    msg = MIMEText("The pipeline for client1 is completed, please check.")
    msg['Subject'] = "xxxx"
    msg['From'] = "xxxx"
    ......
    s = smtplib.SMTP('localhost')
    s.send_message(msg)
    s.quit()

我希望能够将一些参数传递到t5_send_notification的可调用对象(即SendEmail)中,理想情况下,我想将完整的日志和/或部分日志(本质上是来自垃圾邮件)附加到要发送的电子邮件中,猜测t5_send_notification是收集这些信息的地方.

非常感谢你.

解决方法:

>将字典对象传递给op_kwargs
>使用键从可调用的python中的kwargs dict访问其值

def SendEmail(**kwargs):
    print(kwargs['key1'])
    print(kwargs['key2'])
    msg = MIMEText("The pipeline for client1 is completed, please check.")
    msg['Subject'] = "xxxx"
    msg['From'] = "xxxx"
    ......
    s = smtplib.SMTP('localhost')
    s.send_message(msg)
    s.quit()


t5_send_notification = PythonOperator(
    task_id='t5_send_notification',
    provide_context=True,
    python_callable=SendEmail,
    op_kwargs={'key1': 'value1', 'key2': 'value2'},
    dag=dag,
)
上一篇:airflow使用mysql数据库,LocalExecutor并发调度(1)


下一篇:python – 有没有办法通过Airflow API创建/修改连接