python – Apache Airflow调度程序不会在计划时触发DAG

当我计划每天在特定时间运行DAG时,DAG执行根本不会发生.
但是,当我重新启动Airflow网络服务器和调度程序时,DAG在该特定日期的预定时间执行一次,并且从第二天开始不执行.
我使用的是带有python 2.7.6的Airflow版本v1.7.1.3.
这里是DAG代码:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

import time
n=time.strftime("%Y,%m,%d")
v=datetime.strptime(n,"%Y,%m,%d")
default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': v,
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=10),

}

dag = DAG('dag_user_answer_attempts', default_args=default_args, schedule_interval='03 02 * * *')

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='user_answer_attempts',
    bash_command='python /home/ubuntu/bigcrons/appengine-flask-skeleton-master/useranswerattemptsgen.py',
    dag=dag)

难道我做错了什么?

解决方法:

您的问题是为当前时间设置的start_date.气流在间隔结束时运行作业,而不是开始.这意味着您的工作的第一次运行将在第一个间隔之后.

例:

你做了一个dag,并在午夜将它放在Airflow中.今天(20XX-01-01 00:00:00)也是start_date,但它是硬编码的(“start_date”:datetime(20XX,1,1)).计划间隔是每天,与您的一样(3 2 * * *).

这个dag第一次排队执行是20XX-01-02 02:03:00,因为这是间隔期结束的时间.如果您查看当时正在运行的dag,它应该具有schedule_date之后大约一天的开始日期时间.

你可以通过将start_date硬编码到某个日期或确保动态日期比过去的间隔时间更长(在你的情况下,2天就足够了)来解决这个问题.如果您需要重新运行作业或回填(或结束dag),Airflow建议您使用静态start_dates.

有关回填的更多信息(这个常见的*问题的反面),请查看文档或此问题:
Airflow not scheduling Correctly Python

上一篇:如何在Airflow上重新启动失败的任务


下一篇:如何在Airflow中设置DAG之间的依赖关系?