0 Flink的任务链
Flink 中的每个算子都可以设置并行度,每个算子的一个并行度实例就是一个 subTask。由于 Flink 的 TaskManager 运行 Task 的时候是每个 Task 采用一个单独的线程,这会带来很多线程切换和数据交换的开销,进而影响吞吐量。
为了避免数据在网络或线程之间传输导致的开销,Flink 会在 JobGraph 阶段,将代码中可以优化的算子优化成一个算子链(Operator Chains)以放到一个 Task 中执行。
用户也可以自己指定相应的链条,将相关性非常强的转换操作绑定在一起,这样能够让转换过程中上下游的 Task 在同一个 Pipeline 中执行,进而避免因为数据在网络或者线程间传输导致的开销,提高整体的吞吐量和延迟。
一般情况下,Flink 在 Map 操作中默认开启 TaskChain,以提高 Flink 作业的整体性能。
如图1,Source 和 Map 在优化后,组成一个算子链,作为一个 task 运行在一个线程上,其简图如 Condensed view 所示,并行图如 parellelized view 所示。
Flink提供了更细粒度的任务链控制方法,用户可根据需求创建任务链或禁止任务链。
1 禁用全局任务链
evn.disableOperatorChaining();
关闭全局任务链后,创建对应Operator Chain,需要用户先指定操作符,然后再调用startNewChain()
方法创建。
dataStream.keyBy(0).filter().map().startNewChain().map();
startNewChain
方法创建的链条只对调用方法的前一个操作符和后一个操作符有效,不影响其他的。比如示例中新建的链条只有map->map
,对前面的filter
无效。
禁用全局任务链会影响整体任务执行的情况,禁用前,要清楚任务执行的流程,否则可能造成非预期的结果。
2 禁用局部任务链
如果不想关闭整体算子上的链条,只是想关闭部分算子上链条绑定,可以使用disableChaining()
方法禁用当前操作符上的链条。
dataStream.keyBy(0).filter().map().disableChaining()
上述代码只会禁用map操作上的任务链,不会影响其他操作符。
3 开始新链
Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.
someStream.filter(...).map(...).startNewChain().map(...);
4 Set Slot Sharing Group
Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don’t have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is “default”, operations can explicitly be put into this group by calling slotSharingGroup(“default”).
someStream.filter(...).slotSharingGroup("name");
5 Operator Chains(操作链)
-
Flink出于分布式执行的目的,将operator的subtask链接在一起形成task(类似spark中的管道)。
-
每个task在一个线程中执行。
-
将operators链接成task是非常有效的优化:它可以减少线程与线程间的切换和数据缓冲的开销,并在降低延迟的同时提高整体吞吐量。
-
链接的行为可以在编程API中进行指定,详情请见代码OperatorChainTest。
-
开启操作链 和 禁用操作链的对比图(默认开启):
-
Flink默认会将多个operator进行串联,形成任务链(task chain)
-
注意: task chain 可以理解为就是 operator chain 只是不同场景下,称呼不同。
-
我们也可以禁用任务链,让每个operator形成一个task。
-
StreamExecutionEnvironment.disableOperatorChaining() 这个方法会禁用整条工作链
-
操作链其实就是类似spark的pipeline管道模式,一个task可以执行同一个窄依赖中的算子操作。
-
我们也可以细粒度的控制工作链的形成,比如调用dataStreamSource.map(...).startNewChain(),但不能使用dataStreamSource.startNewChain()
-
dataStreamSource.filter(...).map(...).startNewChain().map(...),需要注意的是,当这样写时相当于source和filter组成一条链,两个map组成一条链。
-
即在filter和map之间断开,各自形成单独的链。
-
代码:
package com.ronnie.flink.stream.test; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * 开启与禁用工作链时,输出的结果不一样。 * 当开启工作链时(默认启动),operator map1与map2 组成一个task. * 此时task运行时,对于hello,flink 这两条数据是: * 先打印 hello ---- 1 , hello->1 ---- 2 * 后打印 flink ---- 1 , flink->1 ---- 2 * 当禁用工作链时,operator map1与map2 分别在两个task中执行 * 此时task运行时,对于hello,flink 这两条数据是: * 先打印 hello ---- 1 , flink ---- 1 * 后打印 hello->1 ---- 2 , flink->1 ---- 2 * * 注:操作链类似spark的管道,一个task执行多个的算子. */ public class OperatorChainTest { public static final String[] WORDS = new String[] { "hello", "flink", "spark", "hbase" }; public static void main(String[] args) { // 设置执行环境, 类似spark中初始化sparkContext一样 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 关闭操作链.. env.disableOperatorChaining(); DataStreamSource<String> dataStreamSource = env.fromElements(WORDS); SingleOutputStreamOperator<String> pairStream = dataStreamSource.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { System.err.println(value + " ---- 1"); return value + "->1"; } }).map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { System.err.println(value + " ---- 2"); return value + "->2"; } }); // 还可以控制更细粒度的任务链,比如指明从哪个operator开始形成一条新的链 // someStream.map(...).startNewChain(),但不能使用someStream.startNewChain()。 try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } }