任务调度原理
-
客户端不是运行时和程序执行的一部分,但它用于准备并发送dataflow(JobGraph)给Master(JobManager),然后,客户端断开连接或者维持连接以等待接收计算结果。而Job Manager会产生一个执行图(Dataflow Graph)
-
当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager,JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程。
-
Client 为提交 Job 的客户端,可以是运行在任何机器上(与 JobManager 环境连通即可)。提交 Job 后,Client 可以结束进程(Streaming的任务),也可以不结束并等待结果返回。
-
JobManager 主要负责调度 Job 并协调 Task 做 checkpoint,职责上很像 Storm 的 Nimbus。从 Client 处接收到 Job 和 JAR 包等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个 TaskManager 去执行。
-
TaskManager 在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。从 JobManager 处接收需要部署的 Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。
注:如果一个Slot中启动多个线程,那么这几个线程类似CPU调度一样共用同一个slot
4.3.1 TaskManger与Slots
要点:
-
考虑到Slot分组,所以实际运行Job时所需的Slot总数 = 每个Slot组中的最大并行度。
eg(1,1,2,1),其中第一个归为组“red”、第二个归组“blue”、第三个和第四归组“green”,那么运行所需的slot即max(1)+max(1)+max(2,1) = 1+1+2 = 4
-
Flink中每一个worker(TaskManager)都是一个JVM进程,它可能会在独立的线程上执行一个或多个subtask。
-
为了控制一个worker能接收多少个task,worker通过task slot来进行控制(一个worker至少有一个task slot)。
上图这个每个子任务各自占用一个slot,可以在代码中通过算子的.slotSharingGroup("组名")
指定算子所在的Slot组名,默认每一个算子的SlotGroup和上一个算子相同,而默认的SlotGroup就是"default"。
同一个SlotGroup的算子能共享同一个slot,不同组则必须另外分配独立的Slot。
-
默认情况下,Flink允许子任务共享slot,即使它们是不同任务的子任务(前提需要来自同一个Job)。这样结果是,一个slot可以保存作业的整个管道pipeline。
-
不同任务共享同一个Slot的前提:这几个任务前后顺序不同,如上图中Source和keyBy是两个不同步骤顺序的任务,所以可以在同一个Slot执行。
-
一个slot可以保存作业的整个管道的好处:
-
如果有某个slot执行完了整个任务流程,那么其他任务就可以不用继续了,这样也省去了跨slot、跨TaskManager的通信损耗(降低了并行度)
-
同时slot能够保存整个管道,使得整个任务执行健壮性更高,因为某些slot执行出异常也能有其他slot补上。
-
有些slot分配到的子任务非CPU密集型,有些则CPU密集型,如果每个slot只完成自己的子任务,将出现某些slot太闲,某些slot过忙的现象。
-
-
假设拆分的多个Source子任务放到同一个Slot,那么任务不能并行执行了=>因为多个相同步骤的子任务需要抢占的具体资源相同,比如抢占某个锁,这样就不能并行。
-
-
Task Slot是静态的概念,是指TaskManager具有的并发执行能力,可以通过参数
taskmanager.numberOfTaskSlots
进行配置。而并行度parallelism是动态概念,即TaskManager运行程序时实际使用的并发能力,可以通过参数
parallelism.default
进行配置。
每个task slot表示TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot。资源slot化意味着一个subtask将不需要跟来自其他job的subtask竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备。
需要注意的是,这里不会涉及到CPU的隔离,slot目前仅仅用来隔离task的受管理的内存。
通过调整task slot的数量,允许用户定义subtask之间如何互相隔离。如果一个TaskManager一个slot,那将意味着每个task group运行在独立的JVM中(该JVM可能是通过一个特定的容器启动的),而一个TaskManager多个slot意味着更多的subtask可以共享同一个JVM。而在同一个JVM进程中的task将共享TCP连接(基于多路复用)和心跳消息。它们也可能共享数据集和数据结构,因此这减少了每个task的负载。
4.3.2 Slot和并行度
-
一个特定算子的 子任务(subtask)的个数被称之为其并行度(parallelism),我们可以对单独的每个算子进行设置并行度,也可以直接用env设置全局的并行度,更可以在页面中去指定并行度。
-
最后,由于并行度是实际Task Manager处理task 的能力,而一般情况下,一个 stream 的并行度,可以认为就是其所有算子中最大的并行度,则可以得出在设置Slot时,在所有设置中的最大设置的并行度大小则就是所需要设置的Slot的数量。(如果Slot分组,则需要为每组Slot并行度最大值的和)
假设一共有3个TaskManager,每一个TaskManager中的分配3个TaskSlot,也就是每个TaskManager可以接收3个task,一共9个TaskSlot,如果我们设置parallelism.default=1
,即运行程序默认的并行度为1,9个TaskSlot只用了1个,有8个空闲,因此,设置合适的并行度才能提高效率。
ps:上图最后一个因为是输出到文件,避免多个Slot(多线程)里的算子都输出到同一个文件互相覆盖等混乱问题,直接设置sink的并行度为1。