数据来源:https://blog.csdn.net/zhaocuit/article/details/106588758
flink架构
Job Managers(master):作业管理器,负责任务安排、协调检查点、协调故障恢复等
Task Managers(worker):任务管理器,接收master的任务调度,并在本地执行相关任务
在worker节点上,会启动一个TaskManagersRunner的进程,来接收master的任务调度
一个worker包含至少一个任务槽,每个任务槽表示worker内存资源的固定子集。
例如,具有三个槽的worker会将其托管内存的1/3专用于每个槽。分配资源意味着子任务不会与其他作业的子任务竞争托管内存。
注意:此处没有发生CPU隔离。当前插槽仅将任务的托管内存分开。
多个槽共享TaskManangerRunner的JVM内存以及TCP连接和心跳信息,还会共享数据集和数据结构。
任务槽中运行的是什么?任务?子任务?
任务、子任务、算子
一个job的任务、子任务该怎么划分呢?如下taskAndSubTask方法的代码:
public class Test{
public static void taskAndSubTask() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//这个方式的source并行度为1
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
//默认为电脑逻辑核数 如4
SingleOutputStreamOperator<Tuple2<String, Integer>> flatOperator = source.flatMap(new WindowWordCount.Splitter());
//并行度 继承前一个算子
KeyedStream<Tuple2<String, Integer>, Tuple> keyby = flatOperator.keyBy(0);
SingleOutputStreamOperator<Object> map = keyby.map(new MapFunction<Tuple2<String, Integer>, Object>() {
@Override
public Object map(Tuple2<String, Integer> value) throws Exception {
return null;
}
});
//并行度继承前一个算子
map.print();
//执行操作
env.execute("Window WordCount");
}
}
该job分为一个source算子(并行度1)、一个flatMap算子(并行度4)、一个keyBy算子(并行度4)、一个keyBy后map算子(并行度4)、一个sink算子(并行度4)
任务
任务的划分:在一个job的执行计划(数据流图)中,从source到计算到sink,每当并行度发生变化或者数据需要分组(keyBy)时(还可通过API明确设置),就会产生任务。
在上述代码中:source并行度和flatMap并行度不一样,因此source是一个任务,flatMap是一个任务,keyBy是一个分组算子,因此又是一个任务,而keyBy、keyBy后map算子和sink是分组后操作且并行度未改变,因此属于同一个任务。
即该job有3个任务:source任务、flatMap任务、keyBy、keyBy后map算子和sink任务
假设一个keyBy后map算子的并行度2,那么任务的划分如下:source任务、flatM任务、keyBy任务、keyBy后map以及sink算作一个任务
子任务
子任务:一个任务的并行度为N,那么这个任务就拥有N个子任务。假设keyBy后map以及sink算子,他们的并行度为4,那么flink会在任务槽中运行4个keyBy后map以及sink算子
算子
flink job中用于处理数据的一个单元,如读取数据、计算数据、保存数据等,addSource、addSink、keyBy、map等都是一个数据处理单元。
自定义任务
上述任务划分只是针对默认情况下的,我们可以通过代码让某个任务分解成多个任务,如方法startNewChain和disableChaining
- startNewChain:当某个算子调用该方法时,那么该算子及其后面的且属于原来任务的算子将变成一个新的任务
- disableChaining:当某个算子调用该方法时,那么该算子将从原来的任务中分离出来,变成一个新的任务,该算子前面的且属于原来任务的所有算子为一个任务,该算子后面的且属于原来任务的所有算子,也将变成一个任务
自定义任务示例
假设有如下流:source(并行度1)–>flatMap(并行度4)–>filter(并行度4)–>map(并行度4)–>keyby(并行度4)–>sink(并行度4)
- 默认情况下:source是一个任务,flatMap、filter、map组成一个任务,keyby和sink组成一个任务
- startNewChain:假设filter调用了startNewChain方法,那么任务就变成了:source是一个任务,flatMap是一个任务,filter、map组成一个任务,keyby和sink组成一个任务
- disableChaining:假设filter调用了disableChaining方法,那么任务就变成了:source是一个任务,flatMap是一个任务,filter是一个任务,map是一个任务,keyby和sink组成一个任务
如何能看到任务子任务的划分情况呢?需要flink集群环境,然后进入flink网页控制台,将job打包上传到网页控制台,并启动任务或者点击执行计划,就可以在页面上看到任务和子任务的划分情况
代码逻辑和部署逻辑
上述代码中taskAndSubTask方法的代码逻辑为:
- 一个source算子(并行度1)
- 一个flatMap算子(并行度4)
- 一个keyBy算子(并行度4)
- 一个keyBy后map算子(并行度4)
- 一个sink算子(并行度4)
上述代码中taskAndSubTask方法的部署逻辑为:
- 一个source子任务
- 4个flatMap子任务
- 4个keyBy-map-sink子任务
即3个任务,9个子任务,17个算子,那么他们在槽中是如何分配的呢?
假设有2个worker,共A B C D四个槽,那么source子任务会随机分配到一个槽中,flatMap子任务将会每个槽分配一个,keyBy-map-sink子任务每个槽分配一个
子任务在槽中的分配:尽可能让每个槽都能执行一个完整的数据流,而不是将一个并行度为非1的某个子任务全部分配到一个槽里,这样才能最大化的提高性能
一个worker,包含多个槽,一个槽可以运行多个子任务,一个槽下的多个子任务共享整个槽的内存资源,多个槽的内存资源等于整个worker进程的内存资源
一个worker,就是一个进程,一个子任务就是该进程下某个任务槽中的一个线程
重启策略
设置重启策略:env.setRestartStrategy()
重启策略分类:
- 不重启
- 固定延迟重启策略:env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
- 故障率重启策略
故障恢复策略
在flink-conf.yaml中配置jobmanager.execution.failover-strategy=full|region
- full:重启Job中所有的Task,即重置整个ExecutionGraph,简单粗暴。
- region:只重启ExecutionGraph中对应的Region包含的Task。