作者:朱翥(长耕)
本文由 Apache Flink PMC 及 Committer 朱翥分享,主要介绍 Flink Runtime 的底层架构。本篇文章包含四部分:
- Runtime总览
- 作业的控制中心—Jobmaster
- 任务的运行容器—TaskExacutor
- 资源的管理中心—ResourceManager
一、Runtime 总览
众所周知 Flink 是分布式的数据处理框架,用户的业务逻辑会以Job的形式提交给 Flink 集群。Flink Runtime作为 Flink 引擎,负责让这些作业能够跑起来并正常完结。这些作业既可以是流计算作业,也可以是批处理作业,既可以跑在裸机上,也可以在Flink集群上跑,Flink Runtime必须支持所有类型的作业,以及不同条件下运行的作业。
1.作业的表达
要执行作业,首先要理解作业是如何在 Flink 中进行表达的。
用户通过API的方式写一个作业,例如上图左侧StreamWordInput的示例,它可以不断的输出一个个单词;下面的Map操作负责把单词映射成一个二元组;再接一个keyBy,使相同的word的二元组都被分配在一起,然后sum将它们计数,最后打印出来。
左侧的作业对应着右边的逻辑拓扑(StreamGraph)。这个拓扑中有4个节点,分别是source、map、sum和print。这些是数据处理逻辑,又称之为算子;节点之间的线条对应着数据的分发方式,影响着数据以什么样的方式分发给下游。举例来说,map到sum之间是keyBy,意味着map产出的数据,同一个key的数据都必须分发到同一个下游。
有了StreamGraph后, Flink Runtime会进一步的把它翻译成JobGraph。JobGraph和StreamGraph的区别是,JobGraph会把一些节点 chain 起来,形成Operator chain。Chain条件是需要两个算子的并发度是一样的,并且它们的数据交换方式是一对一的。 形成的Operator chain,又称为JobVertex。
Operator chain的意义是能够减少一些不必要的数据交换,这样chain的operator都是在同一个地方进行执行。在作业实际执行过程中,逻辑图会进一步被翻译成执行图 — ExecutionGraph。执行图是逻辑图并发层面的视图,如上图所示,下面的执行图就是上面逻辑图所有算子并发都为2的表达。
为什么上图中的map和sum不能嵌起来?因为它们的数据是涉及到多个下游算子的,并非一对一的数据交换方式。逻辑图JobVertex中的一个节点,会对应着并发数个执行节点ExecutionVertex,节点对应着一个个任务,这些任务最后会作为实体部署到Worker节点上,并执行实际的数据处理业务逻辑。
2.分布式架构
Flink 作为分布式数据处理框架,它有一套分布式的架构,主要分为三块:Client、Master和Worker节点。
Master是 Flink 集群的主控中心,它可以有一个到多个JobMaster,每个JobMaster对应一个作业,而这些JobMaster由一个叫Dispatcher的控件统一管理。Master节点中还有一个ResourceManager进行资源管理。ResourceManager管理着所有Worker节点,它同时服务于所有作业。此外Master节点中还有一个Rest Server,它会用于响应各种Client端来的Rest请求,Client端包括Web端以及命令行的客户端,它可以发起的请求包括提交作业、查询作业的状态和停止作业等等。作业会通过执行图被划分成一个个的任务,这些任务最后都会在Worker节点中进行执行。Worker就是TaskExecutor,它们是任务执行的容器。
作业执行的核心组件有三个,分别是JobMaster、TaskExecutor和ResourceManager:JobMaster用于管理作业;TaskExecutor用于执行各个任务;ResourceManager用于管理资源,并服务于JobMaster的资源请求。
二、JobMaster:作业的控制中心
JobMaster的主要职责包括作业生命周期的管理、任务的调度、出错恢复、状态查询和分布式状态快照。
分布式状态快照包括Checkpoint和Savepoint,其中Checkpoint主要是为出错恢复服务的,而Savepoint主要是用于作业的维护,包括升级和迁移等等。分布式快照是由CheckpointCoordinator组件来进行触发和管理的。
JobMaster中的核心组件是Scheduler,无论是作业的生命周期管理、作业的状态维护,还是任务的调度以及出错恢复,都是由Schedule来负责的。
1.作业的生命周期管理
作业的生命周期的状态,作业所有可能的状态迁移都在下图展示出来了。
正常流程下作业会有三种状态,分别是Created、Running和Finished。一个作业开始是处于Created的状态,当这个作业被开始调度就开始进入Running状态并开始调度任务,等到所有的任务都成功结束了,这个作业就走到Finished的状态,并汇报最终结果,然后退出。
然而,一个作业在执行过程中可能会遇到一些问题,因此作业也会有异常处理的状态。作业执行过程中如果出现作业级别错误,整个作业会进到Failing状态,然后Cancel所有任务。等到所有任务都进入最终状态后,包括 Failed、Canceled、Finished,再去check出错的异常。如果异常是不可恢复的,那么整个作业会走到Failed状态并退出。如果异常是可恢复的,那么会走到Restarting状态,来尝试重启。如果重启的次数没有超过上限,作业会从Created状态重新进行调度;如果达到上限,作业会走到Failed状态并退出。 (注:在 Flink 1.10 之后的版本中,当发生错误时,如果可以恢复,作业不会进入 Failing 状态而会直接进入 Restarting 状态,当所有任务都恢复正常后,作业会回到 Running 状态。如果作业无法恢复,则作业会经由 Failing 状态最终进入 Failed 状态并结束。)
Cancelling和Canceled两种状态只会在用户手动去Cancel作业的时候走到。当用户手动的在Web UI或通过 Flink command探索作业的时候, Flink 会首先把状态转到Cancel里,然后Cancel所有任务,等所有任务都进入最终状态后,整个作业就会进入Canceled状态并退出。
Suspended状态只会在配置了high availability,并且当JobMaster丢掉leadership才会走到。这个状态只意味着这个JobMaster出现问题终止了。一般来说等到JobMaster重新拿到leadership之后,或是另外有一个standby Master拿到leadership之后,会在拿到leadership的节点上重新启动起来。
2.任务调度
任务调度是JobMaster的核心职责之一。要调度任务,一个首要的问题就是决定什么时候去调度任务。任务调度时机是由调度策略(SchedulingStrategy)来控制的。这个策略是一个事件驱动的组件,它监听的事件包括:作业开始调度、任务的状态发生变化、任务产出的数据变成可消费以及失败的任务需要重启,通过监听这些事件,它能够比较灵活地来决定任务启动的时机。
目前我们有多种不同的调度策略,分别是Eager和Lazy from sources。EagerSchedulingStrategy主要是服务于流式作业,它的策略是在作业开始调度时,直接启动所有的任务,这样做的好处是可以降低调度时间。Lazy from sources主要服务于批处理作业。它的策略是作业一开始只调度Source节点,等到有任意节点的输入数据可以被消费后,它才会被调起来。如下图所示,source节点的数据开始产出后,agg节点才能被调起来,agg节点结束后,sink节点才能被调起来。
为什么Batch作业和Streaming作业会有不同的调度策略呢?是因为Batch作业里边存在blocking shuffle数据交换模式。在这种模式下,需要等上游完全产出所有数据后,下游才能去消费这部分数据集,如果预先把下游调起来的话,它只会在那空转浪费资源。相比Eager策略而言,对于批处理作业它能够节省一定量的资源。
目前还有一个正在开发中的叫Pipelined region based调度策略,这个策略比较类似于Lazy from source策略,差异在于前者是以Pipelined region为粒度调度任务的。
Pipelined region 是以 pipelined 相连的任务集合。Pipelined边意味着上下游节点会流式的进行数据交换,即上游边写,下游就边读边消费。Pipelined region调度的好处是可以一定程度上继承了Eager调度好处,能够节省调度花费的时间,且让上下游任务并行起来。同时也保留了Lazy from sources避免不必要资源的浪费。通过把一部分Task整体调度,就能知道这部分需要同时运行的作业所需的资源量是多少,能够以此进行一些更深度的优化。
(注:从 Flink 1.11 开始, Pipelined region strategy成为默认调度策略,同时服务于流和批作业。)
3.任务调度的过程
任务具有很多种不同状态,最初任务处在Created状态。当调度策略认为这个任务可以开始被调的时候,它会转到Scheduled状态,并开始申请资源,即Slot。申请到Slot之后,它就转到Deploying状态来生成Task的描述,并部署到worker节点上,再之后Task就会在worker节点上启动起来。成功启动后,它会在worker节点上转到running状态并通知JobMaster,然后在JobMaster端把任务的状态转到running。
对于无限流的作业来说,转到running状态就是最终状态了;对于有限流的作业,一旦所有数据处理完了,任务还会转到finished状态,标志任务完成。当有异常发生时,任务也会转到Failed的状态,同时其它受到影响的任务可能会被Cancel掉并走到Canceled状态。
4.出错恢复
当有任务出现错误时,JobMaster的策略或基本思路是,通过重启出错失败的任务以及可能受到影响的任务,来恢复作业的数据处理。这包含三个步骤:
- 第一步,停止相关任务,包括出错失败任务和可能受其影响任务,失败任务可能已经是FAILED的,然后其它受影响任务会被Cancel最终进到Canceled状态;
- 第二步,重置任务回Created状态;
- 第三步,通知调度策略重新调度这些任务。
上文提及了可能受到影响的任务,那么什么样的任务可能受影响呢?这是由出错恢复策略(FailoverStrategy)来决定的。
目前 Flink 默认的 FailoverStrategy是RestartPipelinedRegionFailoverStrategy。采用了这个策略后,如果一个Task失败了就会重启它所在的region。这其实跟上文提及的Pipelined数据交换有关系。在Pipelined数据交换的节点之间,如果任意一个节点失败了,其相关联的其它节点也会跟着失败。这是为了防止出现数据的不一致。因此为了避免单个Task导致多次Failover,一般的操作是在收到第一个Task failed时,就把其他的一起cancel掉,再一起重启。
RestartPipelinedRegion策略除了重启失败任务所在的Region外,还会重启它的下游Region。原因是任务的产出很多时候是非确定性的,比如说一个record,分发到下游的第一个并发,重跑一次;分发到下游的第二个并发时,一旦这两个下游在不同region中,就可能会导致 record丢失,甚至产生不一样的数据。为了避免这种情况,采用PipelinedRegionFailoverStrategy会重启失败任务所在的Region以及它的所有的下游Region。
另外,还有一个RestartAllFailoverStrategy策略,它会在任意Task fail的时候,重启作业中的所有任务。一般情况,这个策略并不被经常用到,但是在一些特殊情况下,比如当任务失败,用户不希望局部运行而是希望所有任务都结束并整体进行恢复,可以用这个策略。
三、TaskExecutor:任务的运行器
TaskExecutor是任务的运行器,为了运行任务它具有各种各样的资源。如下图所示,这里主要介绍memory的资源。
所有内存资源都是可以单独配置的。TaskManager也对它们的配置进行了分层的管理,最外层是Process Memory,对应的是整个TaskExecutor JVM 的总资源。这份内存又包含了JVM自身占有的内存以及 Flink 占有内存。而 Flink 占用内存又包含了框架占有的内存和任务的内存。
任务占用内存包括了Task Heap Memory,即任务的Java对象占有的内存;Task Off-Heap Memory一般用于 native的第三方库;Network Memory是用来创建Network Buffer用来服务于任务的输入和输出; Managed Memory则是受管控的 Off-Heap Memory,它会被一些组件用到,比如算子和StateBackend。这些Task资源会被它分成一个一个的Slot,Slot是任务运行的逻辑容器。当前,Slot大小是直接把 整个TasExecutor的资源,按照 Slot的数量进行均分得到的。
一个Slot里可以运行一个到多个任务,但是有一定约束,即同一个共享组中的不同类型的任务才可以同时在一个Slot中运行。一般来说,同一个PipelinedRegion中的任务都是在一个共享组中,流式作业的所有任务也都是在一个共享组中。不同类型指的是它们需要属于不同的JobVertex。
如上图右侧示例,这是一个source、map、sink的作业。可以看到部署后,有三个Slot中都有三个任务,分别是source、map、sum各一份。而有一个slot中只有两个任务,就是因为source只有三个并发,没有更多并发可以部署进来。
进行SlotSharing第一个好处是,可以降低数据交换的开销。像map、sink之间是一对一的数据交换,实际上有物理数据交换的这些节点都被共享在了一块,这样可以使得它们的数据交换在内存中进行,比在网络中进行的开销更低。
第二个好处是,方便用户配置资源。通过SlotSharing,用户只需要配置 n个Slot就可以保证一个sum作业总能跑起来。n是最大算子的并发度。
第三个好处是,在各个算子并发度差异不大的情况下,提高负载均衡。这是因为每个Slot里边会有各种不同类型的算子各一份,这就避免某些负载重的算子全挤在同一个TaskExecutor中。
1.任务的执行模型
上文提到每个任务对应着一个OperatorChain。一般来说每个OperatorChain都有自己的输入和输出,输入是InputGate,输出是ResultPartition。这些任务总体会在一个独占的线程中执行,任务从InputGate中读取数据,将它喂给OperatorChain,OperatorChain进行业务逻辑的处理,最后会将产出的数据输出到ResultPartition中。
这地方有一个例外是Source task,它不从InputGate中读取数据,而直接通过SourceFunction方式来产出数据。上游的ResultPartition和下游的InputGate 会通过 Flink 的ShuffleService进行数据交换。ShuffleService是一个插件,目前 Flink 默认是NettyShuffleService,下游的InputGate会通过 Netty来从上游的ResultPartition中获取数据。
ResultPartition是由一个个的SubPartition组成的,每个SubPartition都对应着一个下游消费者并发。InputGate也是由一个个的InputChannel组成的,每个不同的InputChannel都对应着一个上游并发。
四、ResouceManager:资源的管理中心
ResourceManager是 Flink 的资源管理中心。在前面我们有提到过TaskExecutor包含了各种各样的资源。而ResourceManager,就管理着这些TaskExecutor。新启动的TaskExecutor,需要向ResourceManager进行注册,之后它里边的资源才能服务于作业的请求。
ResourceManager里边有个关键组件叫做SlotManager,它管理着Slot的状态。这些Slot状态是通过TaskExecutor到ResourceManager之间的心跳跳来进行更新的,在心跳信息中包含了TaskExecutor中的所有Slot的状态。有了当前所有的Slot状态之后,ResourceManager就可以服务于作业的资源申请。当JobMaster调度一个任务的时候,会向ResourceManager发起Slot请求。收到请求的ResourceManager会转交给SlotManager,SlotManager会去检查它里边的可用的Slot有没有符合请求条件的。如果有的话,它就会向相应的TaskExecutor发起Slot申请。如果请求成功,TaskExecutor会主动的向JobMaster offer这个 Slot。
之所以要这么绕一圈,是为了避免分布式带来的不一致的问题。像刚才我们有提到,SlotManager中的Slot状态是通过心跳来进行更新的,所以存在一定的延迟。此外在整个Slot申请过程中,Slot状态也是可能发生变化的。所以最终我们需要以Slot offer,以及它的ACK来作为所有的申请的最终结果。
ResourceManager有多种不同的实现,Standalone模式下采用的ResourceManager,是StandaloneResourceManager,需要用户手动拉起Worker节点,这样就要求用户要先了解这个作业会需要多少总资源。
除此之外,还有一些会去自动申请资源的ResourceManager,包括YarnResourceManager,MesosResourceManager和KubernetesResourceManager。采用这些ResourceManager后,在不能满足的情况下,ResourceManager会在Slot的请求过程自动拉起Worker节点。
拿YarnResourceManager举个例子,JobMaster去为某个任务请求一个Slot。YarnResourceManager将这个请求交给SlotManager,SlotManager发觉没有Slot符合申请的话会告知YarnResourceManager,YarnResourceManager就会向真正的外部的YarnResourceManager去请求一个container,拿到container之后,它会启动一个TaskExecutor,当TaskExecutor起来之后,它会注册到 ResourceManager中,并去告知它可用的Slot信息。SlotManager拿到这个信息之后,就会尝试去满足当前pending的那些SlotRequest。如果能够满足,JobMaster就会去向TaskExecutor发起Slot请求,请求成功的话,TaskExecutor就会向JobMaster去offer这个Slot。这样用户就不需要在一开始去计算它作业的资源需求量是多少,而只需要保证单个Slot的大小,能够满足任务的执行了。
活动推荐:
仅需99元即可体验阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版!点击下方链接了解活动详情:https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506