airflow调度框架

airflow调度框架

1.认识大数据

1.1、什么是大数据

volume(数据量大):GB-TB-PB-ZB—差别1024倍,数据呈现指数级别增长。
velocity(速度快):数据处理速度快,数据增长速度快—第一代MR、第二代Hive、第三代lmpala和Spark、第四代Flink。
variety(数据种类多):结构化、半结构化、非结构化—非结构化的图像、文本,半结构化的Json及XML。
Value(价值密度低):对数据进行价值化提取—在大数据基础上对数据进行价值化提取。

1.2、大数据分析应用场景

airflow调度框架

2.任务调度相关概念

2.1、什么是任务调度

2.1.1、任务调度:实现执行程序的、规范化、自动化、可视化、集中化、统一调度和监控,让所有任务有序、高效运行,降低开发和运维成本。

airflow调度框架

2.1.2、分布式任务调度:任务的分布式处理,多台服务器同时处理任务的调度和监控,体现分布式思想特点:主从节点、容错、负载均衡、高可用。

airflow调度框架

2.2、任务调度应用场景

2.2.1、流行且开源的分布式任务调度框架。

airflow调度框架

2.2.2、国内开源的分布式任务调度框架。

airflow调度框架

2.2.3、国内国外非开源的分布式任务调度框架产品。

airflow调度框架

2.2.4、任务调度关键词

WorkFlow:工作流
airflow调度框架
工作流:业务过程的部分或整体在计算机应用环境下的自动化。
目的:为了实现某个业务目标,利用计算机软件在多个参与者之间按某种预定规则自动传递文档、信息活着任务。
Coordinator:协调器
airflow调度框架
协调器:组织多个job。
Bundle:批处理
airflow调度框架

批处理:管理多个Coordinator。

3.初始airflow

3.1、airflow任务调度相关概念

3.1.1、初始airflow

Airflow是一款用编程方式,编写、安排、监控工作流的任务调度工具
原生语言为python

3.1.2、Airflow任务动态图

airflow调度框架
airflow调度框架

3.1.3、Airflow基础架构

Airflow基础体系结构应用与开发程序,有多种方式执行程序。
airflow调度框架
元数据数据库:Airflow使用SQL数据库存储有关正在运行的数据管道的元数据。再上图中,它表示为Postgres,它在airflow中非常流行。airflow支持的备用数据库包括mysql。
Web服务器和调度程序:airflow web服务器和调度程序是在本地计算机上运行并与原数据库进行交互的独立进程。
执行器:单独显示,实际上executor不是一个单独的过程,而是scheduler中运行。
工作节点:单独流程,与airflow体系结构的其他组结和元数据存储库进行交互。
airflow.cfg:是airflow配置文件,可通过web server,scheduler和workers访问。
DAG:是指包含python代码的DAG文件,代表要由airflow运行的数据管道。这些文件的位置在airflow配置文件中指定,但是web服务器,调度程序和工作程序需要访问他们。

4.airflow初体验

4.1、如何进行airflow任务调度

4.1.1、airflow调度shell任务实现
from datetime import tiimedelta
from airflow import DAG
from airflow.operatros.bash import BashOperator
from airflow.utils.dates import days_ago

args = {
	'owner':'airflow',
	'email':['itcast@itcast.cn'],
	'email_on_failure':True,
	'email_on_retry':True,
	'retries':1,
	'retry_detay':timedelta(minutes=5)
}

dag = DAG(
	DAG_id='first_bash_operator_new',
	default_args=args,
	start_date=days_ago(2),
	dagrun_tiimeout=timedelta(minutes=5)
)

#[START howto-operator-bash]
run_this = BashOperator(
	task_id='echo_first_bash',
	bash_command='date +"%F %T" >>/root/first_bash/operator_new.log',
	dag=dag.
)
run_this

airflow调度框架

4.2、airflow任务调度的类型

4.2.1、airflow任务的生命周期

airflow调度框架

4.2.2、airflow调度其他任务实现

airflow调度框架
shell任务,jinja模块任务,python的etl任务,oracle任务,sqoop任务,mysql任务,hive任务,spark任务(离线任务、实时任务)
注意:需要安装任务执行环境

5.任务调度与大数据关系

5.1、大数据任务为什么用airflow调度

5.1.1、调度框架对比
特性 Oozie Azkaban Dolphincheduler Airflow
所有者 Apache(Cloudera) Linkedin Apache(易观) Apache(Airbnb)
社区 活跃 有些活跃 活跃 非常活跃
工作流描述语言 XML Key/value text Java Python
任务监控支持程度 一般
是否支持HA Y Y Y Y
任务支持类型 中等 中等 很丰富 超级丰富
是否支持任务暂停 Y N Y N
是否支持任务恢复 Y N Y N
是否支持任务重跑 Y N Y N
是否支持自定义任务类型 Y Y Y Y
任务监控支持程度 一般
集群可扩展性
任务实现复杂程度 一般 一般
框架自身扩展难以程度 一般
5.1.2、airflow支持的任务调度类型

airflow调度框架
扩展性极强:除官方提供的调度类型外,还支持自定义类型

6.大数据生态知识体系

6.1、框架功能与作用

airflow调度框架

6.2、框架应用

airflow调度框架

安装部署airflow

安装python36版本:

[root@master ~]# yum install python36u-pip python36 python36-libs -y

升级pip版本:

[root@master ~]# pip3.6 install --upgrade pip

查询版本:

[root@master ~]# pip --version
pip 20.3.3 from /usr/local/lib/python3.6/site-packages/pip (python 3.6)
[root@master ~]# pip list
Package   Version
---------- -------
pip     20.3.3
setuptools 39.2.0

安装数据库:

[root@master ~]# yum -y install mariadb mariadb-server

修改数据库密码:

[root@master ~]# systemctl start mariadb
[root@master ~]# mysqladmin -uroot -p password '1213456';

安装依赖的软件包:

[root@master ~]# yum install gcc gcc-c++ cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib -y

登录数据库:

[root@master ~]# mysql -uroot -p1213456

安装完成之后创建库和用户密码:

mysql> create database airflow;
Query OK, 1 row affected (0.00 sec)

mysql> create user 'airflow'@'%' identified by 'airflow';
Query OK, 0 rows affected (0.00 sec)

mysql> create user 'airflow'@'localhost' identified by 'airflow';
Query OK, 0 rows affected (0.00 sec)

mysql> grant all on airflow.* to 'airflow'@'%';
Query OK, 0 rows affected (0.00 sec)

mysql> grant all privileges on *.* to 'airflow'@'%';
Query OK, 0 rows affected (0.00 sec)

查询airflow的文件:

[root@master ~]# pip show --files apache-airflow

airflow安装的路径:

[root@master ~]# find / -name airflow
/var/lib/mysql/airflow

创建airflow的配置文件:

[root@master ~]# mkdir -p /etc/airflow
[root@master ~]# vim /etc/airflow/airflow.cfg
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow

设置airflow的文件路径:

[root@master ~]# vim /etc/profile
export AIRFLOW_HOME=/etc/airflow
export SLUGIFY_USES_TEXT_UNIDECODE=yes

安装airflow软件:

[root@master ~]# pip install apache-airflow
[root@master ~]# pip install apache-airflow[devel]
[root@master ~]# pip install apache-airflow[celery]
[root@master ~]# pip install apache-airflow[jdbc]
[root@master ~]# pip install apache-airflow[mysql]
[root@master ~]# pip install apache-airflow[password]
[root@master ~]# pip install apache-airflow[rabbitmq]
[root@master ~]# pip install apache-airflow[redis]

查询版本号:

[root@master ~]# pip list | grep -i airflow
apache-airflow          2.0.0

初始化报错:

[root@master ~]# airflow db init

DB: sqlite:root/airflow/airflow.db
[2020-12-04 12:45:16,925] {db.py:678} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
WARNI [unusual_prefix_619bc6aee900a08efa12755006b4aa599654a401_example_kubernetes_executor_config] Could not import DAGs in example_kubernetes_executor_config.py: No module named 'kubernetes'
WARNI [unusual_prefix_619bc6aee900a08efa12755006b4aa599654a401_example_kubernetes_executor_config] Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']

解决问题:

[root@master ~]# pip install apache-airflow['cncf.kubernetes']

再次初始化:

[root@master ~]# airflow db init
DB: sqlite:root/airflow/airflow.db
[2020-12-04 12:46:43,211] {db.py:678} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
Initialization done

启动sirflow服务:

[root@master ~]# airflow webserver -p 8080 &

  ____________    _____________
 ____   |__( )_________  __/__  /________    __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /  _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/   /_/   /_/  \____/____/|__/

[2020-12-04 12:50:19,220] {dagbag.py:440} INFO - Filling up the DagBag from /dev/null
[2020-12-04 12:50:19,294] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
Access Logformat:
=================================================================
[2020-12-04 12:50:37 +0800] [23608] [INFO] Starting gunicorn 19.10.0
[2020-12-04 12:50:37 +0800] [23608] [INFO] Listening at: http://0.0.0.0:8080 (23608)
[2020-12-04 12:50:37 +0800] [23608] [INFO] Using worker: sync
[2020-12-04 12:50:37 +0800] [23618] [INFO] Booting worker with pid: 23618
[2020-12-04 12:50:37 +0800] [23619] [INFO] Booting worker with pid: 23619
[2020-12-04 12:50:37 +0800] [23620] [INFO] Booting worker with pid: 23620
[2020-12-04 12:50:37 +0800] [23621] [INFO] Booting worker with pid: 23621
[2020-12-04 12:50:44,587] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.
[2020-12-04 12:50:44,604] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.
[2020-12-04 12:50:44,659] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.
[2020-12-04 12:50:44,713] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.
[2020-12-04 12:51:09 +0800] [23608] [INFO] Handling signal: ttin
[2020-12-04 12:51:09 +0800] [23820] [INFO] Booting worker with pid: 23820
[2020-12-04 12:51:11,327] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.

在web页面上展示出来:
airflow调度框架

上一篇:Docker安装airflow(超详细)单机,集群部署教程


下一篇:airflow 2.0.2 python依赖清单