Airflow的MySqlOperator和PostgresOperator

1. 依赖

MySqlOperator 的数据库交互通过 MySQLdb 模块来实现, 使用前需要安装相关依赖:

pip install apache-airflow[mysql]

2. 使用

使用 MySqlOperator 执行sql任务的一个简单例子:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.mysql_operator import MySqlOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'email': ['j_hao104@163.com'],
    'email_on_failure': True,
    'email_on_retry': False,
}

dag = DAG(
    'MySqlOperatorExample',
    default_args=default_args,
    description='MySqlOperatorExample',
    schedule_interval="30 18 * * *")

insert_sql = "insert into log SELECT * FROM temp_log"


task = MySqlOperator(
    task_id='select_sql',
    sql=insert_sql,
    mysql_conn_id='mysql_conn',
    autocommit=True,
    dag=dag)

3. 参数

MySqlOperator 接收几个参数:

  • sql: 待执行的sql语句;
  • mysql_conn_id: mysql数据库配置ID, Airflow的conn配置有两种配置方式,一是通过os.environ来配置环境变量实现,二是通过web界面配置到代码中,具体的配置方法会在下文描述;
  • parameters: 相当于MySQLdb库的execute 方法的第二参数,比如: cur.execute('insert into UserInfo values(%s,%s)',('alex',18));
  • autocommit: 自动执行 commit;
  • database: 用于覆盖conn配置中的数据库名称, 这样方便于连接统一个mysql的不同数据库;

4. conn配置

Airflow的MySqlOperator和PostgresOperator

建议conn配置通过web界面来配置,这样不用硬编码到代码中,关于配置中的各个参数:

  • Conn Id: 对应 MySqlOperator 中的 mysql_conn_id
  • Host: 数据库IP地址;
  • Schema: 库名, 可以被MySqlOperator中的database重写;
  • Login: 登录用户名;
  • Password: 登录密码;
  • Port: 数据库端口;
  • ExtraMySQLdb.connect的额外参数,包含charsetcursorssllocal_infile

其中cursor的值的对应关系为: sscursor —> MySQLdb.cursors.SSCursordictcursor —> MySQLdb.cursors.DictCursorssdictcursor —> MySQLdb.cursors.SSDictCursor

 

######################################################################

PostgresOperator(sqlpostgres_conn_id='postgres_default'autocommit=Falseparameters=Nonedatabase=None*args**kwargs)

通过dag代码设置环境变量连接postgres数据库进行操作  dag文件:

import os
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators import ExternalTaskSensor
from airflow.operators import EmailOperator
from datetime import datetime, timedelta
from airflow.operators.postgres_operator import PostgresOperator
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 11, 28,13,48),
    'retries': 3,
    'retryDelay': timedelta(seconds=5),
    'end_date': datetime(9999, 12, 31)
}

os.environ['AIRFLOW_CONN_POSTGRES_MASTER']='postgres://postgres:postgres@192.168.239.128:54321' 

dag = DAG('testpg',
    default_args=default_args,
    schedule_interval=timedelta(seconds=10))
 
testpostgres = PostgresOperator(
    postgres_conn_id='postgres_master',
    task_id='testpostgres',
    database='postgres',
    sql='truncate table testpg',
    dag=dag
)

os.environ['AIRFLOW_CONN_POSTGRES_MASTER']='postgres://postgres:postgres@192.168.239.128:54321'   ->为设置环境变量

模板:postgres://user:password@localhost:54321

上一篇:尝试查询mssql数据库时出现气流Fernet_Key问题


下一篇:airflow使用mysql数据库,LocalExecutor并发调度(1)