1.使用python3 pip安装Airflow
pip install apache-airflow ,安装结束提示如下:
airflow安装到目录:/usr/local/python3/lib/python3.7/site-packages/airflow/下
2.使用mysql作为airflow的元数据库
- 创建airflow数据库
create database airflow; grant all on airflow.* to 'root'@'%'; flush privileges;
3.配置环境变量vi /etc/profile
export AIRFLOW_HOME=/root/airflow
export SITE_AIRFLOW_HOME=/usr/local/python3/lib/python3.7/site-packages/airflow/
export PATH=$PATH:$SITE_AIRFLOW_HOME/bin
- 生效环境变量
source /etc/profile
4.执行airflow命令初始化操作
-
添加python依赖,避免初始化失败,
yum -y install sqlite sqlite-devel
若步骤一完成后,仍然报错: ModuleNotFoundError: No module named ‘_sqlite3’,则是由于python3不兼容_sqlite3,单独安装sqlite3:
wget https://www.sqlite.org/2019/sqlite-autoconf-3300100.tar.gz --no-check-certificate tar -zxvf sqlite-autoconf-3300100.tar.gz cd sqlite-autoconf-3300100 $ mkdir /usr/local/sqlite3 $ ./configure --prefix=/usr/local/sqlite3 --disable-static --enable-fts5 --enable-json1 CFLAGS="-g -O2 -DSQLITE_ENABLE_FTS3=1 -DSQLITE_ENABLE_FTS4=1 -DSQLITE_ENABLE_RTREE=1" $ make && make install $ cd /root/Python-3.7.6 $ LD_RUN_PATH=/usr/local/sqlite3/lib ./configure --prefix=/usr/local/python3 --with-ssl LDFLAGS="-L /usr/local/sqlite3/lib" CPPFLAGS=" -I /usr/local/sqlite3/include" $ LD_RUN_PATH=/usr/local/sqlite3/lib make $ LD_RUN_PATH=/usr/local/sqlite3/lib make install
-
验证是否成功,进入python 命令行,import sqlite3,若不报错说明安装成功!
-
执行airflow,执行结束如报如下错误,可略过
-
给airflow安装mysql模块,采用mysql作为airflow的元数据库
$ pip install 'apache-airflow[mysql]' $ yum -y install mysql-devel #完成后验证是否有mysql_config $ find / -name mysql_config $ pip install mysqlclient $ pip install pymysql $ pip install cryptography #避免之后产生错误,需要修改airflow.cfg (默认位于~/airflow/)里的fernet_key #修改方法 $ python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())" #这个命令生成一个key,复制这个key然后替换airflow.cfg文件里的fernet_key的值, $ vi ~/airflow.cfg #修改fernet_key fernet_key = #修改airflow.cfg文件中的sql_alchemy_conn配置 sql_alchemy_conn = mysql+mysqldb://user:passwd@localhost:3306/airflow
-
初始化airflow db
$ airflow initdb #如有如下报错 #Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql #修改MySQL配置文件my.cnf #查找my.cnf位置 mysql --help | grep my.cnf #修改my.cnf vi /etc/my.cnf #在[mysqld]下面添加如下配置: explicit_defaults_for_timestamp=true
#重启mysql服务使配置生效
$service mysqld restart
#检查配置是否生效
mysql> select @@global.explicit_defaults_for_timestamp;
+------------------------------------------+
| @@global.explicit_defaults_for_timestamp |
+------------------------------------------+
| 1 |
+------------------------------------------+
#然后再次执行$ airflow initdb 即可
- 修改airflow.cfg配置,调整webserver端口,executor,load_examples,以及时区
#1.修改webserver端口
base_url = http://localhost:8081
web_server_port = 8081
#2.修改executor
#SequentialExecutor是单进程顺序执行任务,默认执行器,通常只用于测试
#LocalExecutor是多进程本地执行任务使用的
#CeleryExecutor是分布式调度使用(当然也可以单机),生产环境常用
#DaskExecutor则用于动态任务调度,常用于数据分析
executor = CeleryExecutor
#不显示示例DAG
load_examples = False
#3.修改时区
default_timezone = Asia/Shanghai
#调整时区还需要修改另外几个文件:
#3.1修改webserver页面上右上角展示的时间:
vi ${PYTHON_HOME}/lib/python3.7/site-packages/airflow/www/templates/admin/master.html
#注释://var UTCseconds = (x.getTime() + x.getTimezoneOffset()*60*1000);
var UTCseconds = x.getTime();##修改后
#3.2修改webserver lastRun时间,在指定位置添加如下内容,分别找到DagModel(Base),DAG(BaseDag, LoggingMixin)两个类,在get_last_dagrun方法前添加如下方法:
vi /usr/local/python3/lib/python3.7/site-packages/airflow/models/dag.py
def utc2local(self,utc):
import time
epoch = time.mktime(utc.timetuple())
offset = datetime.fromtimestamp(epoch) - datetime.utcfromtimestamp(epoch)
return utc + offset
#3.3修改dags页面显示时间,根据execution_date定位
vi ${PYTHON_HOME}/lib/python3.7/site-packages/airflow/www/templates/airflow/dags.html
#将{{ last_run.execution_date.strftime("%Y-%m-%d %H:%M") }}修改为:
{{ dag.utc2local(last_run.execution_date).strftime("%Y-%m-%d %H:%M") }}
#将 {{ last_run.start_date.strftime("%Y-%m-%d %H:%M") }}修改为:
{{ dag.utc2local(last_run.start_date).strftime("%Y-%m-%d %H:%M") }}
#3.4修改timezone.py文件
vi /usr/local/python3/lib/python3.7/site-packages/airflow/utils/timezone.py
在utc = pendulum.timezone('UTC')下面添加如下内容:
from airflow import configuration as conf
try:
tz = conf.get("core", "default_timezone")
if tz == "system":
utc = pendulum.local_timezone()
else:
utc = pendulum.timezone(tz)
except Exception:
pass
##修改utcnow()函数
将d = dt.datetime.utcnow() 修改为d = dt.datetime.now()
#3.5修改sqlalchemy.py文件
vi /usr/local/python3/lib/python3.7/site-packages/airflow/utils/sqlalchemy.py
在utc = pendulum.timezone('UTC')下边添加如下代码:
from airflow import configuration as conf
try:
tz = conf.get("core", "default_timezone")
if tz == "system":
utc = pendulum.local_timezone()
else:
utc = pendulum.timezone(tz)
except Exception:
pass
- 初始化元数据库
#初始化元数据库(其实也就是新建airflow依赖的表)
$ airflow resetdb
#或者使用airflow initdb
5.启动airflow相关组件
#安装如下依赖组件
$ pip install celery
#安装过程如遇如下Error:
#ERROR: No matching distribution found for kombu<4.7,>=4.6.7 (from celery)
#需要升级pip版本
$ python -m pip install --upgrade pip
$ pip install apache-airflow['kubernetes']
#启动组件:
#守护进程运行webserver
$ airflow webserver -D
#守护进程运行调度器,是一种使用 DAG 定义结合元数据中的任务状态来决定哪些任务需要被执行以及任务执行优先级的过程
$ airflow scheduler -D
#守护进程运行worker,是实际执行任务逻辑的进程
$ airflow worker -D
#守护进程Flower Web,建立Celery之上的Web UI,用于监控您的worker,可以不启动
$ airflow flower -D
####重启webserver和scheduler服务
$ ps -ef|egrep 'scheduler|airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -9
$ rm -rf /root/airflow/airflow-webserver.pid &&rm -rf /root/airflow/airflow-scheduler.pid
$ airflow webserver -D
$ airflow scheduler -D
#查看日志,无错误则启动成功
$ tail -f /root/airflow/airflow-scheduler.err
#重启worker服务
ps -ef|egrep 'serve_logs|celeryd'|grep -v grep|awk '{print $2}'|xargs kill -9
rm -rf /root/airflow/airflow-worker.pid
airflow worker -D
tail -f /root/airflow/airflow-worker.err
6.airflow基本命令
- 1.查看dags:
airflow list_dags
- 2.列出dag下tasks:
airflow list_tasks dag_id
- 3.执行dag下某个task:
airflow run dag_id task_id start_time
- 4.测试整个dag执行:
airflow backfill dag_id -s start_time -e end_time -l
backfill命令本身是用于回填的,-s表示开始时间,-e表示结束时间,使用该命令可以回填多个DAG Run,最后有个-l参数,表示些任务在本地机器上执行
- 5.运行整个dag文件:
airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE
- 6.暂停任务:
airflow pause dag_id
- 7.清楚执行记录:
airflow clear -s 开始日期 -e 结束日期 dag_id