生成作业图
在分析完了流处理程序生成的流图(StreamGraph)以及批处理程序生成的优化后的计划(OptimizedPlan)之后,下一步就是生成它们面向Flink运行时执行引擎的共同抽象——作业图(JobGraph)。
什么是作业图
作业图(JobGraph)是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一。
相比流图(StreamGraph)以及批处理优化计划(OptimizedPlan),JobGraph发生了一些变化,已经不完全是“静态”的数据结构了,因为它加入了中间结果集(IntermediateDataSet)这一“动态”概念。
作业顶点(JobVertex)、中间数据集(IntermediateDataSet)、作业边(JobEdge)是组成JobGraph的基本元素。这三个对象彼此之间互为依赖:
- 一个JobVertex关联着若干个JobEdge作为输入端以及若干个IntermediateDataSet作为其生产的结果集;
- 一个IntermediateDataSet关联着一个JobVertex作为生产者以及若干个JobEdge作为消费者;
- 一个JobEdge关联着一个IntermediateDataSet可认为是源以及一个JobVertex可认为是目标消费者;
因此一个JobGraph可能的图形化表示如下:
那么JobGraph是怎么组织并存储这些元素的呢?其实JobGraph只以Map的形式存储了所有的JobVertex,键是JobVertexID:
private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();
至于其它的元素,通过JobVertex都可以根据关系找寻到。
JobGraph包含了如下这些属性:
- 描述作业相关的信息,比如上面的顶点、作业编号、名称等;
- 用户程序包相关的信息,比如类路径等;
- 执行的一些配置信息,比如异步快照的配置、会话超时、是否允许排队调度等;
绝大部分的实例方法都是维护这些属性的。
需要注意的是,用于迭代的反馈边(feedback edge)当前并不体现在JobGraph中,而是被内嵌在特殊的JobVertex中通过反馈信道(feedback channel)在它们之间建立关系。
流图生成作业图
这篇文章我们来分析流处理程序是如何从之前的Stream生成JobGraph的。这部分的实现位于类StreamingJobGraphGenerator中,它是流处理程序的JobGraph生成器,其核心是createJobGraph方法,它体现了生成JobGraph的主干调用,实现代码如下:
public JobGraph createJobGraph() {
//创建一个JobGraph实例对象
jobGraph = new JobGraph(streamGraph.getJobName());
//设置对task的调度模式为ALL,即所有的算子立即同时启动
jobGraph.setScheduleMode(ScheduleMode.ALL);
//对用于辅助生成JobGraph的一些实例变量进行初始化
init();
//给StreamGraph的每个StreamNode生成一个hash值,该hash值在节点不发生改变的情况下多次生成始终是一致的,
//可用来判断节点在多次提交时是否产生了变化并且该值也将作为JobVertex的ID
Map<Integer, byte[]> hashes = traverseStreamGraphAndGenerateHashes();
//基于StreamGraph从所有的source开始构建task chain
setChaining(hashes);
//给顶点设置物理边(入边)
setPhysicalEdges();
//为每个JobVertex设置slotShareGroup,同时为迭代的source/sink对设置CoLocationGroup
setSlotSharing();
//配置检查点
configureCheckpointing();
//配置重启策略
configureRestartStrategy();
//传递执行配置
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
return jobGraph;
}
接下来我们挨个对几个关键的方法进行分析。第一个要分析的方法是traverseStreamGraphAndGenerateHashes,它会对StreamGraph进行遍历并为每一个StreamNode都生成其哈希值,生成的哈希值将用于为每个JobVertex创建JobVertexID。方法的完整实现如下:
private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() {
//hash函数
final HashFunction hashFunction = Hashing.murmur3_128(0);
final Map<Integer, byte[]> hashes = new HashMap<>();
//存储访问过了的节点编号
Set<Integer> visited = new HashSet<>();
//入队即将访问的节点对象
Queue<StreamNode> remaining = new ArrayDeque<>();
//source是一个流拓扑的起点,从source开始遍历
//hash值的生成是顺序敏感的(依赖于顺序),因此首先要对source ID集合进行排序
//因为如果source的ID集合顺序不固定,那意味着多次提交包含该source ID集合的程序时可能导致不同的遍历路径,
//从而破坏了hash生成的因素
List<Integer> sources = new ArrayList<>();
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
sources.add(sourceNodeId);
}
Collections.sort(sources);
//按照排好的顺序,进行广度遍历,注意这不是树结构,而是图,因为就一个节点而言,其输入和输出都可能有多条路径
for (Integer sourceNodeId : sources) {
remaining.add(streamGraph.getStreamNode(sourceNodeId));
visited.add(sourceNodeId);
}
StreamNode currentNode;
//从即将访问的节点队列中出队首部的一个元素,没有元素了则结束
while ((currentNode = remaining.poll()) != null) {
// 给当前节点生成哈希值,并返回是否生成成功
if (generateNodeHash(currentNode, hashFunction, hashes)) {
//遍历当前节点的所有输出边
for (StreamEdge outEdge : currentNode.getOutEdges()) {
//获取输出边的目标顶点(该边另一头的顶点)
StreamNode child = outEdge.getTargetVertex();
//如果目标顶点没被访问过,则加入待访问队列和易访问元素集合
if (!visited.contains(child.getId())) {
remaining.add(child);
visited.add(child.getId());
}
}
}
else {
//如果对当前节点的哈希值生成操作失败,则将其从已访问的节点中移除,等待后续再次访问
visited.remove(currentNode.getId());
}
}
return hashes;
}
在上面代码段中调用的generateNodeHash方法,其实现逻辑大致分为两大部分,这两部分对应了生成哈希的两种方式:
- 根据StreamTransformation的编号进行计算
- 根据一些因素来综合计算
第二种方式对应的因素有如下三种:
- 节点相关的属性(ID、并行度、UDF的类名)
- 链接在一起的输出节点相关的属性
- 输入节点的哈希值
这里值得注意的是节点相关的ID属性,它并不是StreamTransformation的ID,因为StreamTransformation的ID是一个静态计数器,它可能会导致逻辑相同的Job最终生成的哈希值却不同。考虑下面的示例:
//program 1
DataStream<String> s1 = ...; //s1.ID = 1
DataStream<String> s2 = ...; //s2.ID = 2
s1.union(s2).print();
//program 2
DataStream<String> s2 = ...; //s2.ID = 1
DataStream<String> s1 = ...; //s1.ID = 2
s1.union(s2).print();
对于上面示例代码中的两个语义等价的程序,当借助StreamTransformation的ID属性来生成哈希值时会出现不一致。因此,Flink所使用的ID值其实是已完成哈希值计算的节点数目。这样就不会出现上述因为source定义的顺序不同而导致语义上等价的程序产生不一致哈希值的情况。最终traverseStreamGraphAndGenerateHashes方法将会为所有的StreamNode生成对应的哈希值。
为了更高效得执行,Flink对DAG在调度上进行了优化,该优化称之为算子链接(operator chain)。它允许某些算子可以“链接”在一起,在调度时这些被链接到一起的算子会被视为一个任务(Task)。而在执行时,一个Task会被并行化成若干个subTask实例进行执行,一个subTask对应一个执行线程。算子链接的示意图如下:
这种优化能减少线程之间的切换和跨节点的数据交换从而在减少时延的同时提升吞吐量。
当算子互相链接之后,原先存在于互相链接的算子之间的边就只是逻辑上存在的。而被链接的算子整体跟其他无法与其链接的算子之间的边才是真正的物理边。另外,为了方便源码解读,需要对“链接”和“连接”加以区分。在当前的上下文中,“链接”指的是“算子链”的形成方式,而“连接”指的是在算子之间建立关系。
接下来我们就来分析,将算子链接起来的setChaining方法。setChaining会沿着source生成算子链(但不要被其方法名误导,它其实还完成了很多额外的工作,比如创建JobVertex)。
setChaining会遍历StreamGraph中的sourceID集合。为每个source调用createChain方法,该方法以当前source为起点向后遍历并创建算子链。createChain方法会收集当前节点所连接的物理边,并为链接头节点与物理边下游的算子建立连接关系。
/**
* @param startNodeId : 起始节点编号
* @param currentNodeId : 当前遍历节点编号
* @param hashes : 节点编号与hash值映射表
* @return 遍历过的边集合
*/
private List<StreamEdge> createChain(Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
int chainIndex) {
//如果起始节点没有被构建过,才进入分支;否则直接返回一个空List(递归结束条件)
if (!builtVertices.contains(startNodeId)) {
//存储遍历过的边,该对象被作为最终结果返回
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
//存储可以被链接的出边
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
//存储不可被链接的出边
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
//遍历当前节点的每个出边
for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
//如果该出边是可被链接的,则加入可被链接的出边集合,否则加入不可被链接的出边集合
if (isChainable(outEdge)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
//遍历每个可被链接的出边,然后进行递归遍历
for (StreamEdge chainable : chainableOutputs) {
//起始节点不变,以该可被链接的出边的目标节点作为“当前”节点进行递归遍历并将遍历过的边集合加入到当前集合中
//这里值得注意的是所有可链接的边本身并不会被加入这个集合!
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, chainIndex + 1));
}
//遍历不可链接的出边,同样进行递归遍历
for (StreamEdge nonChainable : nonChainableOutputs) {
//将当前不可链接的出边加入到遍历过的边集合中
transitiveOutEdges.add(nonChainable);
//同样进行递归遍历,不过这里的起始节点和当前节点都被设置为该边的目标节点
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, 0);
}
//为当前节点创建链接的完整名称,如果当前节点没有可链接的边,那么其名称将直接是当前节点的operator名称
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
//创建流配置对象,流配置对象针对单个作业顶点而言,包含了顶点相关的所有信息。
//当创建配置对象的时候,如果当前节点即为起始节点(链接头),会先为该节点创建JobVertex对象
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes)
: new StreamConfig(new Configuration());
//然后为当前节点初始化流配置对象里的一系列属性
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
//如果当前节点是起始节点(chain头节点)
if (currentNodeId.equals(startNodeId)) {
//设置该节点是chain的开始
config.setChainStart();
config.setChainIndex(0);
//设置不可链接的出边
config.setOutEdgesInOrder(transitiveOutEdges);
//设置所有出边
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
//遍历当前节点的所有不可链接的出边集合
for (StreamEdge edge : transitiveOutEdges) {
//给当前节点到不可链接的出边之间建立连接
//通过出边找到其下游流节点,根据边的分区器类型,构建下游流节点跟输入端上游流节点(也即起始节点)
//的连接关系。在这个构建的过程中也就创建了IntermediateDataSet及JobEdge并跟当前节点的JobVertex
//三者建立了关联关系
connect(startNodeId, edge);
}
//将当前节点的所有子节点的流配置对象进行序列化
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else { //如果当前节点是chain中的节点,而非chain的头节点
Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId);
if (chainedConfs == null) {
chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());
}
config.setChainIndex(chainIndex);
//将当前节点的流配置对象加入到chain头节点点相关的配置中
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
//返回所有不可链接的边
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
上面的代码段中会先将当前节点的出边按照它们是否是可被链接进行分类,isChainable方法包含了判断逻辑,一个出边如果是可链接的,它需要满足的条件如下:
return downStreamVertex.getInEdges().size() == 1 //如果边的下游流节点的入边数目为1(也即其为单输入算子)
&& outOperator != null //边的下游节点对应的算子不为null
&& headOperator != null //边的上游节点对应的算子不为null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex) //边两端节点有相同的槽共享组名称
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS //边下游算子的链接策略为ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)//上游算子的链接策略为HEAD或者ALWAYS
&& (edge.getPartitioner() instanceof ForwardPartitioner) //边的分区器类型是ForwardPartitioner
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism() //上下游节点的并行度相等
&& streamGraph.isChainingEnabled(); //当前的streamGraph允许链接的
在createChain中会调用createJobVertex为链接头节点或者无法链接的节点创建JobVertex对象,创建完成之后会将它加入JobGraph并为当前的这个JobVertex创建流配置对象(StreamConfig)。
对于无法链接的物理边,Flink会将链头(chain header)与这些物理边(以及物理边所连接着的目标算子)进行连接(代码段中的connect方法),连接的过程也是创建JobEdge与IntermediateDataSet并跟它们建立关系的过程。
现在让我们回到createJobGraph方法的上下文中来,在setChaining方法调用中找出了物理出边以及从源到目的节点之间建立了连接。接着,会调用setPhysicalEdges从目标节点向源节点之间建立入边的连接。
接下来,为相关的节点设置槽共享组(SlotSharingGroup)以及同位组(CoLocationGroup),这两种机制都用于限制算子的部署。其中,CoLocationGroup主要用于迭代算子的执行。
当用户的Flink程序配置了检查点信息,那么需要将检查点相关的配置加入到JobGraph中去,这部分逻辑通过方法configureCheckpointing来完成,它将JobVertex划分成三类:
- triggerVertices:存储接收“触发检查点”消息的JobVertex集合,当前只收集source顶点;
- ackVertices:收集需要应答检查点消息的JobVertex集合,当前收集所有的JobVertex;
- commitVertices:存储接收“提交检查点”消息的JobVertex集合,当前收集所有JobVertex;
这些信息都被封装在JobSnapshottingSettings对象中,然后被存储到JobGraph。
基本生成JobGraph的主要步骤就是这些。接下来,我们将分析批处理程序在优化器生成的OptimizedPlan的基础之上如何生成的JobGraph。
优化后的计划生成作业图
分析完了流图如何生成作业图,下面我们来分析批处理程序经过优化后的计划如何生成作业图。其核心代码位于flink-clients模块下的ClusterClient类中的getJobGraph方法中:
JobGraphGenerator gen = new JobGraphGenerator(this.config);
job = gen.compileJobGraph((OptimizedPlan) optPlan);
这里的JobGraphGenerator位于optimizer模块中(注意跟流处理中生成JobGraph的StreamingJobGraphGenerator进行区别),它用于将优化器优化后的OptimizedPlan编译成JobGraph。编译的过程不作任何决策与假设,也就是说作业最终如何被执行早已被优化器确定,而编译也是在此基础上做确定性的映射。
JobGraphGenerator实现了Visitor接口,因此它是一个遍历器,遍历的对象是计划节点(PlanNode)。
关于遍历器、计划节点等更多的细节请参考“优化器”相关的文章。
compileJobGraph方法在内部调用OptimizedPlan的accept方法遍历它,而遍历访问器就是JobGraphGenerator自身:
program.accept(this);
在OptimizedPlan中,accept会挨个在每个sink上调用accept:
public void accept(Visitor<PlanNode> visitor) {
for (SinkPlanNode node : this.dataSinks) {
node.accept(visitor);
}
}
批处理中的计划是以sink作为起始点,然后通过遍历访问器逆向遍历直至source。
从sink开始的逆向遍历是符合特定的模式的:
public void accept(Visitor<PlanNode> visitor) {
//前置遍历,如果返回值为true,才会进行更进一步的后续操作
if (visitor.preVisit(this)) {
//获取到当前sink的输入端继续遍历,该调用会引发递归调用
this.input.getSource().accept(visitor);
//获得所有的广播输入通道,对所有的广播输入通道源进行遍历
for (Channel broadcastInput : getBroadcastInputs()) {
broadcastInput.getSource().accept(visitor);
}
//进行后置遍历
visitor.postVisit(this);
}
}
先来分析一下preVisit方法,它是遍历时的“前进”方法,它会对要遍历的PlanNode的具体类型进行枚举推断,针对不同的类型为其创建对应的JobVertex对象,接着为JobVertex对象设置相关属性,最后将其加入到一个公共的PlanNode与JobVertex的映射字典中去。
接下来是postVisit方法,它可以看成是遍历时的“后退”方法,当在某个节点上调用到postVisit方法时,表明该节点的前任(从正常的source往sink方向)都已经遍历完成。因此该方法在这里用来将当前节点与其前任建立连接。
postVisit方法同样会判断节点的类型,特殊节点特殊处理。例如,如果节点的类型是IterationPlanNode,那么它将立即遍历迭代路径中的节点。这里有可能存在递归遍历,所以使用了一个“栈”结构来保存当前节点。
if (this.currentIteration != null) {
this.iterationStack.add(this.currentIteration);
}
this.currentIteration = (IterationPlanNode) node;
this.currentIteration.acceptForStepFunction(this);
if (this.iterationStack.isEmpty()) {
this.currentIteration = null;
} else {
this.currentIteration = this.iterationStack.remove(this.iterationStack.size() - 1);
}
回到compileJobGraph方法的上下文中,在对OptimizedPlan进行遍历之后,会对收集到的迭代节点进行处理。通过遍历迭代描述符(IterationDescriptor)并判断其代表的节点属于哪种迭代类型来进行特定的处理:
for (IterationDescriptor iteration : this.iterations.values()) {
if (iteration.getIterationNode() instanceof BulkIterationPlanNode) {
finalizeBulkIteration(iteration);
} else if (iteration.getIterationNode() instanceof WorksetIterationPlanNode) {
finalizeWorksetIteration(iteration);
} else {
throw new CompilerException();
}
}
到此,遍历工作已经完成。下面会把链接任务的配置写入其父节点(也就是容器节点)的配置中。接着新建JobGraph对象并进行一系列设置,比如添加JobVertex、为JobVertex设置SlotSharingGroup等。然后将之前注册的缓存文件加入到Job的配置中,释放相关资源后返回JobGraph对象。
原文发布时间为:2017-02-21
本文作者:vinoYang
本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。