Flink中任务的并发:slot和parallelism
1 任务的执行流程
Flink有三大组件:client、JobManager、TaskManager。client将用户提交的任务解析成执行流程图,然后提交给JobManager,JobManager收到任务执行流程图后,进行并行化
,JobManager再申请TaskManager资源,将具体的任务交给TaskManager执行。
2 slot
TaskManager相当于是Worker,是具体执行任务的工作者,TaskManager在启动后会设置slot的数量,即当前TaskManager可以执行的任务数量。
那么一个Task是否只包含一个运算呢?如下代码,其中的数据源读取、keyBy、window、apply、sink是否都分别需要一个Task执行呢?
kafkaStream
.keyBy(0, 1, 2)
.window()
.apply();
如果每个操作都需要一个Task执行,在不考虑并行度的情况下,至少需要5个Task才能运行该任务。如果不同的任务在一台机器上执行,则会有线程切换的开销,如果在不同机器上执行,还会有网络开销。通过对执行流的观察,我们发现有些场景是可以进行优化的。
例如,读取数据源后,如果需要发送到机器机器上执行keyBy()操作,性能显然是不行的,最好的办法就是将这两个操作使用一个线程执行,减少了网络开销。同样的,后面的window和apply也可以放到一起,进行窗口划分后,可以直接对窗口中的数据进行处理。
因此,上面的代码(包含读取数据源)只需要两个Task即可:
- 读取数据,进行HASH(keyBy)
- 对数据划分窗口,并对窗口中的记录执行操作
这种方式称为链式优化
,将多个可以同时执行的操作放在一个Task中执行,用以减少开销(线程切换和网络),提供系统性能。
那么,哪些操作可以进行链式优化呢?也就是,哪些操作可以合并在一起呢?
总的原则就是:上下游的并行度相同,并且允许链式优化
。
简单来说,有以下策略:
- 上下游并行度一致,也就是不需要进行HASH
- 下游只有一个上游
- 上下游都在同一个slot group中
- 用户没有禁用链式优化
上述优化是通过将部分操作合并在一起执行,相当于进行了打包,减少了Task的数量。上面的代码就可以用两个Task执行,那是否需要2个slot呢?
如果使用2个slot执行,由于两个Task对资源的需求是不一样的,可能会造成两个slot的资源使用率差别很大,为了更好地使用资源,提供系统性能,可以将2个Task放到一个slot执行。因此,上面的程序,在不考虑并行度的情况下,只需要1个slot即可。
这里涉及到Flink中的另一个概念:Slot Group(共享slot组)。默认情况下,Flink允许多个Task在同一个slot中执行,即属于同一个slot组,该组名是default,因此,有可能一个slot中有整个job的所有步骤。用户也可以手动将Task放到不同的组:operator().slotSharingGroup(“group_name”) 强制将operator()的共享组设置为group_name,而不是默认的default。
3 parallelism
为了提供系统的吞吐量,Task在执行时会启动多个执行实例,Task的执行实例的个数,就是该任务的并行度。
这里引入SubTask,用以跟Task区分:
Task:Job执行流程中的任务的数量,例如,上面的代码,经过链式优化后,Task的数量为2
SubTask:Task的执行实例,例如,如果某个Task的并行度是10,那么该Task的SubTask的数量就是10
Flink中有3个地方设置并行度,优先级依次降低(下面会覆盖上面的并行度配置):
- 全局级别,设置所有Job的默认并行度:flink-config.yaml中的parallelism.default
- Job级别,设置某个Job的默认并行度:提交任务时设置命令行参数:flink -p 2
- stream级别,设置某个执行环境的默认并行度:StreamExecutionEnvironment.setParallelism(2)
- 算子级别,设置某个算子的并行度:operator().setParallelism(2)
4 小结
本文讲解的是flink在实现高吞吐的3个优化的部分:
- 链式优化:对可以放在一起执行的算子进行合并,减少Task的数量
- slot共享组:将资源密集型和非密集型的Task放在同一个slot中执行,提高slot资源的使用率
- 并行度:指定Task中包含SubTask的个数
因此,Flink中的Job执行模型如下:
Job提交给JobManager后,JobManager会根据并行度和链式关系将可以合并的算子进行合并,然后通过资源使用优化的整合考虑将哪些Task放到一个slot中执行。因此,每个Task会以线程的方式执行,多个Task会共享使用slot的资源(当前只有内存)。