airflow trigger a DAG run with REST API

REST API

https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#section/Overview

为了利于管理, 支持了REST API。

To facilitate management, Apache Airflow supports a range of REST API endpoints across its objects. This section provides an overview of the API design, methods, and supported use cases.

Most of the endpoints accept JSON as input and return JSON responses. This means that you must usually add the following headers to your request:

Content-type: application/json
Accept: application/json

 

Open Authentication

https://airflow.apache.org/docs/apache-airflow/stable/security/api.html#basic-authentication

默认API是关闭的, 需要调整为 鉴权认证 模式。

Basic authentication

Basic username password authentication is currently supported for the API. This works for users created through LDAP login or within Airflow Metadata DB using password.

To enable basic authentication, set the following in the configuration:

[api]
auth_backend = airflow.api.auth.backend.basic_auth

 

https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#section/Authentication

默认为阻止任何API

To be able to meet the requirements of many organizations, Airflow supports many authentication methods, and it is even possible to add your own method.

If you want to check which auth backend is currently set, you can use airflow config get-value api auth_backend command as in the example below.

$ airflow config get-value api auth_backend
airflow.api.auth.backend.basic_auth

The default is to deny all requests.

For details on configuring the authentication, see API Authorization.

 

Trigger a new DAG run

https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/post_dag_run

Trigger a new DAG run

path Parameters
dag_id required string

The DAG ID.

Request Body schema: application/json
dag_run_id string Nullable

Run ID.

The value of this field can be set only when creating the object. If you try to modify the field of an existing object, the request fails with an BAD_REQUEST error.

If not provided, a value will be generated based on execution_date.

If the specified dag_run_id is in use, the creation request fails with an ALREADY_EXISTS error.

This together with DAG_ID are a unique key.

execution_date string <date-time>

The execution date. This is the time when the DAG run should be started according to the DAG definition. The value of this field can be set only when creating the object. If you try to modify the field of an existing object, the request fails with an BAD_REQUEST error. This together with DAG_ID are a unique key.

state string (DagState) Enum: "success" "running" "failed"

DAG State.

conf object

JSON object describing additional configuration parameters.

The value of this field can be set only when creating the object. If you try to modify the field of an existing object, the request fails with an BAD_REQUEST error.

 

WHY not using python client?

https://github.com/apache/airflow-client-python

项目还在开发,有些API是坏的。

Apache Airflow Python Client

NOTE: The Apache Airflow Client is still under active development and some methods or APIs might be broken. Please raise an issue in github if you encounter any such issues.

trigger接口确实是坏的, 并且测试用例大都是空的。

https://github.com/apache/airflow-client-python/issues/21

 

Requirement on DAG

https://*.com/questions/56480312/how-to-trigger-a-dag-to-run-immediately

必须将DAG调度模式(scheduling_interva)定义为None

If you want to trigger this dag manually then you need to set scheduling_interval=None and use airflow trigger_dag dag_id (Documentation : airflow trigger dag)

 

如下:

https://www.waitingforcode.com/apache-airflow/externally-triggered-dags-apache-airflow/read

dag = DAG(
    dag_id='hello_world_a',
    default_args={
        "owner": "airflow",
        'start_date': airflow.utils.dates.days_ago(1),
    },
    schedule_interval=None
)


def print_hello(**kwargs):
    task_params = kwargs['dag_run'].conf['task_payload']
    print('Hello world a with {}'.format(task_params))

PythonOperator(
    task_id='hello_world_printer',
    python_callable=print_hello,
    provide_context=True,
    dag=dag)

 

Then deploy this DAG file

https://*.com/questions/49033163/airflow-publish-a-dynamically-created-dag

此问题是动态创建dag的讨论, 正常的发布方法为, 将dag文件拷贝到 $AIRFLOW_HOME/dags 目录下, 则airflow文件会自动扫描加载dag。

尝试过,将文件放入此目录下, DAG在数秒之内就能生成, 貌似没有必要研究动态创建的的新的方法。

 

I want to be able to publish and trigger a DAG object from my code which is not in control of scheduler (viz. $AIRFLOW_HOME/dags folder)

My last resort would be to programmatically create a py file containing the DAG definition that I want to publish and save this file to the $AIRFLOW_HOME/dags folder. I'm sure it should be easier than that.

 

https://airflow.apache.org/docs/apache-airflow/stable/dag-serialization.html

2.0做了架构上的优化,scheduler将dag文件解析后的结果(序列化结果), 保存到 数据库中, 然后sheduler和webserver都使用数据库中的序列化结果。达到解析成果的复用。

In order to make Airflow Webserver stateless, Airflow >=1.10.7 supports DAG Serialization and DB Persistence. From Airflow 2.0.0, the Scheduler also uses Serialized DAGs for consistency and makes scheduling decisions.

As shown in the image above, when using this feature, the DagFileProcessorProcess in the Scheduler parses the DAG files, serializes them in JSON format and saves them in the Metadata DB as SerializedDagModel model.

airflow trigger a DAG run with REST API

 

Code Sample

https://github.com/fanqingsong/machine_learning_workflow_on_airflow/blob/master/rest_api_call/trigger_dag.py

触发一个dag run,然后轮询其状态,直至状态为sucess。

import requests
import json
from pprint import pprint
from datetime import datetime
import sched, time

def get_execution_time():
    # datetime object containing current date and time
    now = datetime.utcnow()
    
    print("now =", now)

    dt_string = now.strftime("%Y-%m-%dT%H:%M:%SZ")
    print("date and time =", dt_string)    

    return dt_string

dag_id = "kmeans_with_workflow"

def trigger_dag():
    exec_time = get_execution_time()

    data = {
        # "dag_run_id": dag_run_id,
        "execution_date": exec_time,
        # "execution_date": None,
        # "state": None,
        "conf": { }
    }

    header = {"content-type": "application/json"}

    result = requests.post(
    f"http://localhost:8080/api/v1/dags/{dag_id}/dagRuns",
    data=json.dumps(data),
    headers=header,
    auth=("admin", "admin"))

    pprint(result.content.decode('utf-8'))

    result = json.loads(result.content.decode('utf-8'))

    pprint(result)

    return result


def get_dag_run(dag_run_id):
    result = requests.get(
    f"http://localhost:8080/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}",
    auth=("admin", "admin"))

    pprint(result.content.decode('utf-8'))


    result = json.loads(result.content.decode('utf-8'))

    pprint(result)

    return result

result = trigger_dag()
dag_run_id = result["dag_run_id"]

s = sched.scheduler(time.time, time.sleep)

def watch_dag_until_complete():
    result = get_dag_run(dag_run_id)
    state = result["state"]

    if state != "success":
        s.enter(1, 1, watch_dag_until_complete)
    else:
        print("dag completed!")

s.enter(1, 1, watch_dag_until_complete)
s.run()

 

Time zones

https://airflow.apache.org/docs/apache-airflow/stable/timezone.html

上面的例子中, execution_time 为 UTC时间(本初子午线时间),经过查阅资料,发现其内部设计如此,这样可以解除时区的依赖。

Support for time zones is enabled by default. Airflow stores datetime information in UTC internally and in the database. It allows you to run your DAGs with time zone dependent schedules. At the moment Airflow does not convert them to the end user’s time zone in the user interface. There it will always be displayed in UTC. Also templates used in Operators are not converted. Time zone information is exposed and it is up to the writer of DAG what do with it.

 

schedule_interval

https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#running-dags

除了手动和API触发,

DAG还支持定时触发, 使用的语法和  Crontab一致。

DAGs will run in one of two ways:

  • When they are triggered either manually or via the API

  • On a defined schedule, which is defined as part of the DAG

DAGs do not require a schedule, but it's very common to define one. You define it via the schedule_interval argument, like this:

with DAG("my_daily_dag", schedule_interval="@daily"):
    ...

The schedule_interval argument takes any value that is a valid Crontab schedule value, so you could also do:

with DAG("my_daily_dag", schedule_interval="0 * * * *"):
    ...

Every time you run a DAG, you are creating a new instance of that DAG which Airflow calls a DAG Run. DAG Runs can run in parallel for the same DAG, and each has a defined execution_date, which identifies the logical date and time it is running for - not the actual time when it was started.

 

https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html

输入三种类型:

#1 crontab格式

#2 datetime.timedelta 多长时间间隔执行一次

#3 presets 预设值

或者None

A DAG Run is an object representing an instantiation of the DAG in time.

Each DAG may or may not have a schedule, which informs how DAG Runs are created. schedule_interval is defined as a DAG argument, which can be passed a cron expression as a str, a datetime.timedelta object, or one of the following cron "presets".

Cron Presets

preset

meaning

cron

None

Don't schedule, use for exclusively "externally triggered" DAGs

 

@once

Schedule once and only once

 

@hourly

Run once an hour at the beginning of the hour

0 * * * *

@daily

Run once a day at midnight

0 0 * * *

@weekly

Run once a week at midnight on Sunday morning

0 0 * * 0

@monthly

Run once a month at midnight of the first day of the month

0 0 1 * *

@quarterly

Run once a quarter at midnight on the first day

0 0 1 */3 *

@yearly

Run once a year at midnight of January 1

0 0 1 1 *

Your DAG will be instantiated for each schedule along with a corresponding DAG Run entry in the database backend.

 

timedelta

https://github.com/apache/airflow/issues/14969

import datetime as dt

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

dag_params = {
    'dag_id': 'schedule_interval_bug_example_dag',
    'default_args':{
        'owner': 'Administrator',
        'depends_on_past': False,
        'retries': 0,
        'email': ['example@example.com']
    },
    'schedule_interval': dt.timedelta(days=1),
    'start_date': dt.datetime(year=2021, month=1, day=1, hour=11, minute=10),
    'catchup': False
}

with DAG(**dag_params) as dag:
    DummyOperator(task_id='start') >> DummyOperator(task_id='end')

 

 

OpenAPI?

此API是遵守 openapi 规范的。

这是什么东东?

https://spec.openapis.org/oas/v3.1.0

What is the OpenAPI Specification?

The OpenAPI Specification (OAS) defines a standard, programming language-agnostic interface description for HTTP APIs, which allows both humans and computers to discover and understand the capabilities of a service without requiring access to source code, additional documentation, or inspection of network traffic. When properly defined via OpenAPI, a consumer can understand and interact with the remote service with a minimal amount of implementation logic. Similar to what interface descriptions have done for lower-level programming, the OpenAPI Specification removes guesswork in calling a service.

 

https://oai.github.io/Documentation/start-here.html

Advantages of Using OpenAPI

Having your API formally described in a machine-readable format allows automated tools to process it, instantly opening the door to:

  • Description Validation and Linting: Check that your description file is syntactically correct and adheres to a specific version of the Specification and the rest of your team’s formatting guidelines.
  • Data Validation: Check that the data flowing through your API (in both directions) is correct, during development and once deployed.
  • Documentation Generation: Create traditional human-readable documentation based on the machine-readable description, which always stays up-to-date.
  • Code Generation: Create both server and client code in any programming language, freeing developers from having to perform data validation or write SDK glue code, for example.
  • Graphical Editors: Allow easy creation of description files using a GUI instead of typing them by hand.
  • Mock Servers: Create fake servers providing example responses which you and your customers can start testing with before you write a single line of code.
  • Security Analysis: Discover possible vulnerabilities at the API design stage instead of much, much later.

 

swagger vs openapi

https://swagger.io/blog/api-strategy/difference-between-swagger-and-openapi/

Let's start with clarifying Swagger vs OpenAPI

The easiest way to understand the difference is:

  • OpenAPI = Specification
  • Swagger = Tools for implementing the specification

The OpenAPI is the official name of the specification. The development of the specification is fostered by the OpenAPI Initiative, which involves more the 30 organizations from different areas of the tech world — including Microsoft, Google, IBM, and CapitalOne. Smartbear Software, which is the company that leads the development of the Swagger tools, is also a member of the OpenAPI Initiative, helping lead the evolution of the specification.

 

上一篇:airflow 2.0.2 python依赖清单


下一篇:你不可不知的任务调度神器-AirFlow