flink架构及原理

本文主要介绍内容

一、flink分层架构

flink架构及原理
高级API层:包含机器学习及Flink SQL API等库。对批处理和流处理进行了统一。
API层:主要包含 Flink 的流处理API 和批处理API
执行引擎:Flink 的执行处理,Flink 的执行引擎基于流处理实现。
物理资源层:Flink 任务执行的物理资源,主要有 本地(JVM) , 集群(yarn),云端(GCE/EC2)等,flink1.11以上版本也支持k8s部署。

二、flink系统架构

flink架构及原理
Client:Flink Client 用于与JobManger建立连接,进行Flink 任务的提交。Client会将Flink任务组装为一个 JobGraph并进行提交。一个JobGraph是一个flink dataflow,其中包含了一个Flink程序的:JobID、Job名称、配置信息、一组JobVertex等。
JobManger:Flink系统协调者,负责接收job任务并调度job的多个task执行。同时负责job信息的收集和管理TaskManger。
TaskManger:负责执行计算的Worker,同时进行所在节点的资源管理(包括内存,cup,网络),启动时向JobManger汇报资源信息

三、flink任务部署流程

flink目前支持本地,yarn,k8s等多种部署方案。

3.1、flink任务yarn部署流程

flink架构及原理
flink yarn client 负责与yarn RM 进行通信及资源申请
jobManger 和taskManger 分别申请Container 资源运行各自的进程。
jobManger 和yarn AM 属于同一个Container中,从而 yarn AM 可进行申请Container及调度TaskManger
HDFS 用于数据的存储,如checkpoints, savepoints 等数据

四、flink任务执行流程及原理

一个flink程序执行时,都会映射为一个Streaming Dataflow 进行处理,类似一个DAG图。从Source开始到Sink结束。

4.1、 flink程序与DataFlow映射

flink架构及原理
flink程序由一个或多个输入流Stream(Source)经过计算(Transformation), 最终输出到一个或多个输出流Stream中(Sink)。
支持的Source:kafka,hdfs,本地文件
支持的Sink:kafka,mysql,hdfs,本地文件

4.2、parallel Dataflow (并发原理)

flink程序天生就支持并行及分部署处理:
a、一个Stream支持分为多个Stream分区,一个Operate支持分成多个Operate Subtask,每个Subtask都执行在不同的线程中。
b、一个Operate 并行度等于Operate Subtask个数,Stream并行度总等于Operate并行度。

parallel Dataflow 示例:
Source并行度为2,Sink并行度为1。
flink架构及原理
上图展示了Operate与Stream之间存在的两种模式:
a、One-to-one模式:Source[1]–>Map[1]的数据流模式,该模式保持了Source的分区特性及数据处理的有序性。
b、Redistribution模式:map[1]–>apply()[2]的数据流模式,该模式会改变数据流的分区。其与选择的Operate操作有关。

4.3、Task & Operator Chain

 flink 在分布式环境中会将多个Operate Subtask 串在一起作为一个 Operate Chain 的执行链。每个执行链在TaskManger上独立的线程中执行。多个Operate 通过Stream进行连接。每个Operate对应一个task。

下图分别展示单个并发与多个并发的执行原理图。
flink架构及原理

4.4、时间窗口

flink支持基于时间和数据的时间窗口。Flink支持基于多种时间的窗口。
a、基于事件的创建时间
b、基于事件进入 Dataflow 的时间
c、基于某Operate对事件处理时的本地时间
各种时间所处的位置及含义:
flink架构及原理

五、checkpoint原理

flink是在Chandy–Lamport算法[的基础上实现的一种分布式快照。通过不断的生成分布式Streaming数据流Snapshot,实现利用snapshot进行数据流的恢复处理。

5.1、checkpoint主要步骤

a、Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint
b、source 节点向下游广播 barrier (附带在数据流中,随DAG流动)
c、task收到barrier后,异步的执行快照并进行持久化处理。
d、sink完成快照后,表示本次checkpoint完成。并将所有快照数据进行整合持久化处理。
flink架构及原理

5.2、Barrier

a、Stream Barrier是Flink分布式Snapshotting中的核心元素,它会对数据流进行记录并插入到数据流中,对数据流进行分组,并沿着数据流的方向向前推进。
b、每个Barrier会携带一个Snapshot ID,属于该Snapshot的记录会被推向该Barrier的前方。Barrier非常轻量,不会中断数据流处理。
带有Barrier的数据流图:
flink架构及原理

5.3、Strean Aligning

当Operate具有多个数据输入流时,需在Snapshot Barrier中进行数据对齐处理。
具体处理过程:
a、Operator从一个incoming Stream接收到Snapshot Barrier n,然后暂停处理,直到其它的incoming Stream的Barrier n(否则属于2个Snapshot的记录就混在一起了)到达该Operator。
b、接收到Barrier n的Stream被临时搁置,来自这些Stream的记录不会被处理,而是被放在一个Buffer中
c、一旦最后一个Stream接收到Barrier n,Operator会emit所有暂存在Buffer中的记录,然后向Checkpoint Coordinator发送Snapshot n
d、继续处理来自多个Stream的记录
flink架构及原理
基于Stream Aligning操作能够实现Exactly Once语义,但是也会给流处理应用带来延迟,因为为了排列对齐Barrier,会暂时缓存一部分Stream的记录到Buffer中。通常以最迟对齐Barrier的一个Stream做为处理Buffer中缓存记录的时刻点。可通过开关,选择是否使用Stream Aligning,如果关掉则Exactly Once会变成At least once。

5.4、State Backend(数据持久化方案)

flink的State Backend 是实现快照持久化的重要功能,flink将State Backend抽象成一种插件,支持三种State Backend。
a、MemoryStateBackend:基于内存实现,将数据存储在堆中。数据过大会导致OOM问题,不建议生产环境使用,默认存储的大小为4M。
设置内存大小:env.setStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE))
b、FsStateBackend:将数据持久化到文件系统包括(本地,hdfs,Amazon,阿里云),通过地址进行指定。

// 使用HDFS作为State Backend
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink-checkpoints/chk-17/"))
// 使用Amazon作为State Backend
env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>"))
// 关闭Asynchronous Snapshot,默认开启
env.setStateBackend(new FsStateBackend(checkpointPath, false))

c、RocksDBStateBackend:RocksDB是一种嵌入式Key-Value数据库,数据实际保存在本地磁盘上。比起FsStateBackend的本地状态存储在内存中,RocksDB利用了磁盘空间,所以可存储的本地状态更大。从RocksDB中读写数据都需要进行序列化和反序列化,读写成本更高。允许增量快照,每次快照时只对发生变化的数据增量写到分布式存储上,而不是将所有的本地状态都拷贝过去。

// 开启Incremental Checkpoint (增量快照)
val enableIncrementalCheckpointing = true
env.setStateBackend(new RocksDBStateBackend(checkpointPath, enableIncrementalCheckpointing))

六、checkpoint相关配置

默认情况下,Checkpoint机制是关闭的,需要调用env.enableCheckpointing(n)来开启。参数需要配置统一封装在CheckpointConfig中,常用配置如下:

val cpConfig: CheckpointConfig = env.getCheckpointConfig
// 使用At-Least-Once
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
// 超时时间100s
env.getCheckpointConfig.setCheckpointTimeout(100*1000)
// 两次Checkpoint的间隔为60秒
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60*1000)
// 最多同时进行3个Checkpoint
env.getCheckpointConfig.setMaxConcurrentCheckpoints(3)
作业取消后仍然保存Checkpoint,否则自动删除Checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
// 关闭checkpoint失败重启任务
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
上一篇:checkPoint机制


下一篇:PID控制