flink架构,任务、子任务、算子概念

数据来源:https://blog.csdn.net/zhaocuit/article/details/106588758

 

flink架构
Job Managers(master):作业管理器,负责任务安排、协调检查点、协调故障恢复等
Task Managers(worker):任务管理器,接收master的任务调度,并在本地执行相关任务
在worker节点上,会启动一个TaskManagersRunner的进程,来接收master的任务调度

一个worker包含至少一个任务槽,每个任务槽表示worker内存资源的固定子集。

例如,具有三个槽的worker会将其托管内存的1/3专用于每个槽。分配资源意味着子任务不会与其他作业的子任务竞争托管内存。

注意:此处没有发生CPU隔离。当前插槽仅将任务的托管内存分开。

多个槽共享TaskManangerRunner的JVM内存以及TCP连接和心跳信息,还会共享数据集和数据结构。

任务槽中运行的是什么?任务?子任务?

 

任务、子任务、算子
一个job的任务、子任务该怎么划分呢?如下taskAndSubTask方法的代码:

public class Test{
public static void taskAndSubTask() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//这个方式的source并行度为1
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
//默认为电脑逻辑核数 如4
SingleOutputStreamOperator<Tuple2<String, Integer>> flatOperator = source.flatMap(new WindowWordCount.Splitter());
//并行度 继承前一个算子
KeyedStream<Tuple2<String, Integer>, Tuple> keyby = flatOperator.keyBy(0);
SingleOutputStreamOperator<Object> map = keyby.map(new MapFunction<Tuple2<String, Integer>, Object>() {
@Override
public Object map(Tuple2<String, Integer> value) throws Exception {
return null;
}
});
//并行度继承前一个算子
map.print();

//执行操作
env.execute("Window WordCount");
}
}

该job分为一个source算子(并行度1)、一个flatMap算子(并行度4)、一个keyBy算子(并行度4)、一个keyBy后map算子(并行度4)、一个sink算子(并行度4)

 

任务
  任务的划分:在一个job的执行计划(数据流图)中,从source到计算到sink,每当并行度发生变化或者数据需要分组(keyBy)时(还可通过API明确设置),就会产生任务。

  在上述代码中:source并行度和flatMap并行度不一样,因此source是一个任务,flatMap是一个任务,keyBy是一个分组算子,因此又是一个任务,而keyBy、keyBy后map算子和sink是分组后操作且并行度未改变,因此属于同一个任务。

  即该job有3个任务:source任务、flatMap任务、keyBy、keyBy后map算子和sink任务

  假设一个keyBy后map算子的并行度2,那么任务的划分如下:source任务、flatM任务、keyBy任务、keyBy后map以及sink算作一个任务

子任务
  子任务:一个任务的并行度为N,那么这个任务就拥有N个子任务。假设keyBy后map以及sink算子,他们的并行度为4,那么flink会在任务槽中运行4个keyBy后map以及sink算子

算子
  flink job中用于处理数据的一个单元,如读取数据、计算数据、保存数据等,addSource、addSink、keyBy、map等都是一个数据处理单元。

 

自定义任务
上述任务划分只是针对默认情况下的,我们可以通过代码让某个任务分解成多个任务,如方法startNewChain和disableChaining

  • startNewChain:当某个算子调用该方法时,那么该算子及其后面的且属于原来任务的算子将变成一个新的任务
  • disableChaining:当某个算子调用该方法时,那么该算子将从原来的任务中分离出来,变成一个新的任务,该算子前面的且属于原来任务的所有算子为一个任务,该算子后面的且属于原来任务的所有算子,也将变成一个任务


自定义任务示例
假设有如下流:source(并行度1)–>flatMap(并行度4)–>filter(并行度4)–>map(并行度4)–>keyby(并行度4)–>sink(并行度4)

  • 默认情况下:source是一个任务,flatMap、filter、map组成一个任务,keyby和sink组成一个任务
  • startNewChain:假设filter调用了startNewChain方法,那么任务就变成了:source是一个任务,flatMap是一个任务,filter、map组成一个任务,keyby和sink组成一个任务
  • disableChaining:假设filter调用了disableChaining方法,那么任务就变成了:source是一个任务,flatMap是一个任务,filter是一个任务,map是一个任务,keyby和sink组成一个任务

如何能看到任务子任务的划分情况呢?需要flink集群环境,然后进入flink网页控制台,将job打包上传到网页控制台,并启动任务或者点击执行计划,就可以在页面上看到任务和子任务的划分情况

 

代码逻辑和部署逻辑
上述代码中taskAndSubTask方法的代码逻辑为:

  • 一个source算子(并行度1)
  • 一个flatMap算子(并行度4)
  • 一个keyBy算子(并行度4)
  • 一个keyBy后map算子(并行度4)
  • 一个sink算子(并行度4)


上述代码中taskAndSubTask方法的部署逻辑为:

  • 一个source子任务
  • 4个flatMap子任务
  • 4个keyBy-map-sink子任务

即3个任务,9个子任务,17个算子,那么他们在槽中是如何分配的呢?

假设有2个worker,共A B C D四个槽,那么source子任务会随机分配到一个槽中,flatMap子任务将会每个槽分配一个,keyBy-map-sink子任务每个槽分配一个

子任务在槽中的分配:尽可能让每个槽都能执行一个完整的数据流,而不是将一个并行度为非1的某个子任务全部分配到一个槽里,这样才能最大化的提高性能

一个worker,包含多个槽,一个槽可以运行多个子任务,一个槽下的多个子任务共享整个槽的内存资源,多个槽的内存资源等于整个worker进程的内存资源

一个worker,就是一个进程,一个子任务就是该进程下某个任务槽中的一个线程

 

重启策略
设置重启策略:env.setRestartStrategy()

重启策略分类:

  • 不重启
  • 固定延迟重启策略:env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
  • 故障率重启策略

 

故障恢复策略

在flink-conf.yaml中配置jobmanager.execution.failover-strategy=full|region

  • full:重启Job中所有的Task,即重置整个ExecutionGraph,简单粗暴。
  • region:只重启ExecutionGraph中对应的Region包含的Task。

 

上一篇:第三次学flink


下一篇:02 metabase启动配置