Flink中的计算资源
首先理解Flink中的计算资源的核心概念,比如Slot、Chain、Task等,这有助于我们快速定位生产中的问题。
Task Slot
Flink都是以集群在运行,在运行的过程中包含两类进程,其中之一就是TaskManager。 在Flink集群中,一个TaskManager就是一个JVM进程,并且会用独立的现场来执行task,为了控制一个TaskManager能接受多少个task,Flink提出了Task Slot的概念。
Slot共享
默认情况下,Flink还允许同一个Job的子任务共享slot。因为在一个Flink任务中,有很多算子,这些算子的计算压力各不相同,比如简单的map和filter算子所需要的资源不多,但是有些算子比如window、group by则需要更多的计算资源才能满足计算所需。这时那些资源需求大的算子就可以共用其他slot,提高整个集群的资源利用率。
Operator Chain
此外Flink自身会把不同的算子的task连接在一起组成一个新的task。这么做时因为Flink本身提供了非常有效的任务优化手段,因为task是在同一个线程中执行,那么可以有效减少线程间上下文的切换,并且减少序列化/反序列化带来的资源消耗,从而在整体上提高我们任务的吞吐量。
并行度
Flink使用并行度来定义某一个算子被切分成多少个子任务。Flink代码会被转换成逻辑是图,在实际运行时根据用户的并行度设置会被转换成对应的子任务进行执行。
源码解析
Flink Job在执行中会通过SlotProvider向ResourceManager申请资源,RM负责协调TaskManager,满足JobManager的资源请求。
整体的类图如上所述,SlotProvider中的allocateSlot方法负责向SlotPool申请可用的slot资源,通过returnLogicSlot将空闲的slot释放至SlotPool。 在整个Flink的资源管理的类中,核心的类包括Scheduler、SlotPool、JobMaster。他们之间的交互流程主要:Scheduler调度器向SlotPool资源池申请和释放slot;如果SlotPool不能满足需求,那么会向ResourceManager发起申请;获取到的资源通过JobMaster分配给SlotPool。
如何设置并行度
Flink本身支持不同级别来设置我们任务并行度的方法,他们分别是:
-
算子级别
-
环境级别
-
客户端级别
-
集群配置级别
算子级别
在编写Flink程序时,可以在代码中显示的制定不同算子的并行度。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
?
DataStream<String> text = ...
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.setParallelism(10)
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(1);
?
wordCounts.print();
env.execute("word count");
如上,可以通过显示的调用setParallelism()方法来显示的指定每个算子的并行度配置。 在实际生产中,推荐在算子级别显示指定各自的并行度,方便进行显示和精确的资源控制。
环境级别
环境级别的并行度设置指的是可以通过调用env.setParallelism()方法来这是整个任务的并行度:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
...
一旦设置了这个参数,表明任务中的所有算子的并行度都是指定的值,生产环境中不推荐。
客户端级别
可以在使用命令提交Flink Job的时候指定并行度,当任务执行时发现代码中没有设置并行度,便会采用提交命令时的参数。 通过 -p 命令来指定提交任务时候的并行度:
./bin/flink run -p 5 ../wordCount-java*.jar
集群配置级别
在flink-conf.yaml文件中有一个参数parallelism.default。该参数会在用户不设置任何其他的并行度配置时生效:
parallelism.default:1
需要特别指出的是,设置并行度的优先度依次是: