airflow调度框架
1.认识大数据
1.1、什么是大数据
volume(数据量大):GB-TB-PB-ZB—差别1024倍,数据呈现指数级别增长。
velocity(速度快):数据处理速度快,数据增长速度快—第一代MR、第二代Hive、第三代lmpala和Spark、第四代Flink。
variety(数据种类多):结构化、半结构化、非结构化—非结构化的图像、文本,半结构化的Json及XML。
Value(价值密度低):对数据进行价值化提取—在大数据基础上对数据进行价值化提取。
1.2、大数据分析应用场景
2.任务调度相关概念
2.1、什么是任务调度
2.1.1、任务调度:实现执行程序的、规范化、自动化、可视化、集中化、统一调度和监控,让所有任务有序、高效运行,降低开发和运维成本。
2.1.2、分布式任务调度:任务的分布式处理,多台服务器同时处理任务的调度和监控,体现分布式思想特点:主从节点、容错、负载均衡、高可用。
2.2、任务调度应用场景
2.2.1、流行且开源的分布式任务调度框架。
2.2.2、国内开源的分布式任务调度框架。
2.2.3、国内国外非开源的分布式任务调度框架产品。
2.2.4、任务调度关键词
WorkFlow:工作流
工作流:业务过程的部分或整体在计算机应用环境下的自动化。
目的:为了实现某个业务目标,利用计算机软件在多个参与者之间按某种预定规则自动传递文档、信息活着任务。
Coordinator:协调器
协调器:组织多个job。
Bundle:批处理
批处理:管理多个Coordinator。
3.初始airflow
3.1、airflow任务调度相关概念
3.1.1、初始airflow
Airflow是一款用编程方式,编写、安排、监控工作流的任务调度工具
原生语言为python
3.1.2、Airflow任务动态图
3.1.3、Airflow基础架构
Airflow基础体系结构应用与开发程序,有多种方式执行程序。
元数据数据库:Airflow使用SQL数据库存储有关正在运行的数据管道的元数据。再上图中,它表示为Postgres,它在airflow中非常流行。airflow支持的备用数据库包括mysql。
Web服务器和调度程序:airflow web服务器和调度程序是在本地计算机上运行并与原数据库进行交互的独立进程。
执行器:单独显示,实际上executor不是一个单独的过程,而是scheduler中运行。
工作节点:单独流程,与airflow体系结构的其他组结和元数据存储库进行交互。
airflow.cfg:是airflow配置文件,可通过web server,scheduler和workers访问。
DAG:是指包含python代码的DAG文件,代表要由airflow运行的数据管道。这些文件的位置在airflow配置文件中指定,但是web服务器,调度程序和工作程序需要访问他们。
4.airflow初体验
4.1、如何进行airflow任务调度
4.1.1、airflow调度shell任务实现
from datetime import tiimedelta
from airflow import DAG
from airflow.operatros.bash import BashOperator
from airflow.utils.dates import days_ago
args = {
'owner':'airflow',
'email':['itcast@itcast.cn'],
'email_on_failure':True,
'email_on_retry':True,
'retries':1,
'retry_detay':timedelta(minutes=5)
}
dag = DAG(
DAG_id='first_bash_operator_new',
default_args=args,
start_date=days_ago(2),
dagrun_tiimeout=timedelta(minutes=5)
)
#[START howto-operator-bash]
run_this = BashOperator(
task_id='echo_first_bash',
bash_command='date +"%F %T" >>/root/first_bash/operator_new.log',
dag=dag.
)
run_this
4.2、airflow任务调度的类型
4.2.1、airflow任务的生命周期
4.2.2、airflow调度其他任务实现
shell任务,jinja模块任务,python的etl任务,oracle任务,sqoop任务,mysql任务,hive任务,spark任务(离线任务、实时任务)
注意:需要安装任务执行环境
5.任务调度与大数据关系
5.1、大数据任务为什么用airflow调度
5.1.1、调度框架对比
特性 | Oozie | Azkaban | Dolphincheduler | Airflow |
---|---|---|---|---|
所有者 | Apache(Cloudera) | Apache(易观) | Apache(Airbnb) | |
社区 | 活跃 | 有些活跃 | 活跃 | 非常活跃 |
工作流描述语言 | XML | Key/value text | Java | Python |
任务监控支持程度 | 强 | 一般 | 强 | 强 |
是否支持HA | Y | Y | Y | Y |
任务支持类型 | 中等 | 中等 | 很丰富 | 超级丰富 |
是否支持任务暂停 | Y | N | Y | N |
是否支持任务恢复 | Y | N | Y | N |
是否支持任务重跑 | Y | N | Y | N |
是否支持自定义任务类型 | Y | Y | Y | Y |
任务监控支持程度 | 高 | 一般 | 高 | 高 |
集群可扩展性 | 强 | 强 | 强 | 强 |
任务实现复杂程度 | 高 | 一般 | 一般 | 低 |
框架自身扩展难以程度 | 高 | 高 | 一般 | 低 |
5.1.2、airflow支持的任务调度类型
扩展性极强:除官方提供的调度类型外,还支持自定义类型
6.大数据生态知识体系
6.1、框架功能与作用
6.2、框架应用
安装部署airflow
安装python36版本:
[root@master ~]# yum install python36u-pip python36 python36-libs -y
升级pip版本:
[root@master ~]# pip3.6 install --upgrade pip
查询版本:
[root@master ~]# pip --version
pip 20.3.3 from /usr/local/lib/python3.6/site-packages/pip (python 3.6)
[root@master ~]# pip list
Package Version
---------- -------
pip 20.3.3
setuptools 39.2.0
安装数据库:
[root@master ~]# yum -y install mariadb mariadb-server
修改数据库密码:
[root@master ~]# systemctl start mariadb
[root@master ~]# mysqladmin -uroot -p password '1213456';
安装依赖的软件包:
[root@master ~]# yum install gcc gcc-c++ cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib -y
登录数据库:
[root@master ~]# mysql -uroot -p1213456
安装完成之后创建库和用户密码:
mysql> create database airflow;
Query OK, 1 row affected (0.00 sec)
mysql> create user 'airflow'@'%' identified by 'airflow';
Query OK, 0 rows affected (0.00 sec)
mysql> create user 'airflow'@'localhost' identified by 'airflow';
Query OK, 0 rows affected (0.00 sec)
mysql> grant all on airflow.* to 'airflow'@'%';
Query OK, 0 rows affected (0.00 sec)
mysql> grant all privileges on *.* to 'airflow'@'%';
Query OK, 0 rows affected (0.00 sec)
查询airflow的文件:
[root@master ~]# pip show --files apache-airflow
airflow安装的路径:
[root@master ~]# find / -name airflow
/var/lib/mysql/airflow
创建airflow的配置文件:
[root@master ~]# mkdir -p /etc/airflow
[root@master ~]# vim /etc/airflow/airflow.cfg
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
设置airflow的文件路径:
[root@master ~]# vim /etc/profile
export AIRFLOW_HOME=/etc/airflow
export SLUGIFY_USES_TEXT_UNIDECODE=yes
安装airflow软件:
[root@master ~]# pip install apache-airflow
[root@master ~]# pip install apache-airflow[devel]
[root@master ~]# pip install apache-airflow[celery]
[root@master ~]# pip install apache-airflow[jdbc]
[root@master ~]# pip install apache-airflow[mysql]
[root@master ~]# pip install apache-airflow[password]
[root@master ~]# pip install apache-airflow[rabbitmq]
[root@master ~]# pip install apache-airflow[redis]
查询版本号:
[root@master ~]# pip list | grep -i airflow
apache-airflow 2.0.0
初始化报错:
[root@master ~]# airflow db init
DB: sqlite:root/airflow/airflow.db
[2020-12-04 12:45:16,925] {db.py:678} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
WARNI [unusual_prefix_619bc6aee900a08efa12755006b4aa599654a401_example_kubernetes_executor_config] Could not import DAGs in example_kubernetes_executor_config.py: No module named 'kubernetes'
WARNI [unusual_prefix_619bc6aee900a08efa12755006b4aa599654a401_example_kubernetes_executor_config] Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']
解决问题:
[root@master ~]# pip install apache-airflow['cncf.kubernetes']
再次初始化:
[root@master ~]# airflow db init
DB: sqlite:root/airflow/airflow.db
[2020-12-04 12:46:43,211] {db.py:678} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
Initialization done
启动sirflow服务:
[root@master ~]# airflow webserver -p 8080 &
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
[2020-12-04 12:50:19,220] {dagbag.py:440} INFO - Filling up the DagBag from /dev/null
[2020-12-04 12:50:19,294] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
Access Logformat:
=================================================================
[2020-12-04 12:50:37 +0800] [23608] [INFO] Starting gunicorn 19.10.0
[2020-12-04 12:50:37 +0800] [23608] [INFO] Listening at: http://0.0.0.0:8080 (23608)
[2020-12-04 12:50:37 +0800] [23608] [INFO] Using worker: sync
[2020-12-04 12:50:37 +0800] [23618] [INFO] Booting worker with pid: 23618
[2020-12-04 12:50:37 +0800] [23619] [INFO] Booting worker with pid: 23619
[2020-12-04 12:50:37 +0800] [23620] [INFO] Booting worker with pid: 23620
[2020-12-04 12:50:37 +0800] [23621] [INFO] Booting worker with pid: 23621
[2020-12-04 12:50:44,587] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.
[2020-12-04 12:50:44,604] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.
[2020-12-04 12:50:44,659] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.
[2020-12-04 12:50:44,713] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.
[2020-12-04 12:51:09 +0800] [23608] [INFO] Handling signal: ttin
[2020-12-04 12:51:09 +0800] [23820] [INFO] Booting worker with pid: 23820
[2020-12-04 12:51:11,327] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.
在web页面上展示出来: