Flink运行时之客户端提交作业图-上


客户端提交作业图

作业图(JobGraph)是Flink的运行时所能理解的作业表示,无论程序通过是DataStream还是DataSet API编写的,它们的JobGraph提交给JobManager以及之后的处理都将得到统一。本篇我们将分析客户端如何提交JobGraph给JobManager。

流处理程序提交作业图

在前面讲解Flink的核心概念的时候我们谈到了Flink利用了“惰性求值”的概念,只有当最终调用execute方法时,才会真正开始执行。因此,execute方法是我们的切入点。

DataStream API所编写的程序生成作业图之后,在提交时产生的方法调用时序图示意如下:

Flink运行时之客户端提交作业图-上

上图中的多个run方法是同名的方法重载。

从时序图中可以看到,ClusterClient对其自身抽象方法submitJob的调用是触发作业图提交的方法。随后真正的提交逻辑由JobClient实现。

ClusterClient封装了提交一个程序到远程集群的必要的功能,而StandaloneClusterClient则扩展了ClusterClient的功能,它专门针对独立的集群提供服务,这两个类都位于flink-clients模块中。JobClient则负责将用户的Job提交给JobManager,它充当了提交代理的角色,并返回表示作业执行结果的JobExecutionResult对象。

JobClient是提交所有类型的Job的统一入口,具体的提交细节我们将会在“公共的提交流程”中详细分析。

批处理程序提交作业图

利用DataSet API所编写的程序生成作业图之后,在提交时产生的方法调用的时序图如下:

Flink运行时之客户端提交作业图-上

上图中出现多个重名的run方法为同名方法重载。

从上图中可以看到,批处理程序的JobGraph跟流处理程序的JobGraph在提交之前有非常明显的不同。它引入了PlanExecutor作为Flink程序的计划执行器。而RemoteExecutor是PlanExecutor的实现,用于将程序提交给远程的Flink集群。具体的提交动作被进一步委托给ClusterClient及其实现(StandaloneClusterClient)最终同样被JobClient代理提交给JobManager。

公共的提交流程

从前面的时序图可见Flink对于不同类型的程序的提交流程最终是殊途同归的。因此,接下来我们将对公共的提交流程进行分析。一个程序的JobGraph真正被提交始于对JobClient的submitJobAndWait方法的调用。

submitJobAndWait方法用于将一个JobGraph发送到指定的JobClient actor,随后它会将该JobGraph转发给JobManager。该方法会一直阻塞,直到该作业执行完成或者感知不到JobManager的存活。如果作业被顺利执行完成则返回JobExecutionResult对象而如果JobManager产生故障,则抛出抛出JobExecutionException异常。

一个JobGraph从提交开始会经过多个对象层层递交,各个对象之间的交互关系如下图所示:

Flink运行时之客户端提交作业图-上

JobClient在其中起到了“桥接”作用,它桥接了同步的方法调用和异步的消息通信。更具体得说,JobClient可以看做是一个“静态类”提供了一些静态方法,这里我们主要关注上面的submitJobAndWait方法,该方法内部封装了Actor之间的异步通信(具体的通信对象是JobClientActor,它负责跟JobManager的ActorSystem的Actor对象进行通信),并以阻塞的形式返回结果。而ClusterClient只需调用JobClient的submitJobAndWait方法,而无需关注其内部是如何实现的。

通过调用JobClient的submitJobAndWait静态方法,会触发基于Akka的Actor之间的消息通信来完成后续的提交JobGraph的动作。这之间的交互示意图如下:

Flink运行时之客户端提交作业图-上

这里总共有两个ActorSystem,一个归属于JobClient,另一个归属于JobManager。在submitJobAndWait方法中,其首先会创建一个JobClientActor的ActorRef:

ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);

然后向其发起一个SubmitJobAndWait消息,该消息将JobGraph的实例提交给JobClientActor。发起模式是ask,它表示需要一个应答消息。

Akka的消息通信模型有两种:

  1. Fire and forget:消息的生产者不期望从消息的消费者那里得到应答。这种消息会以异步的形式发送,发送方法在发送完成之后立即返回。Akka的actor使用tell方法发送这种消息。
  2. Send and receive:消息的生产者期待并将等待从消费者那里得到应答。这种消息也会以异步的形式发送,发送完成后会返回一个Future对象,该对象表示一个潜在的应答。Akka的actor使用ask方法发送这种消息,并通过Future来获取响应。

JobClient向JobClientActor发送消息的代码段如下:

Future<Object> future = Patterns.ask(jobClientActor,
                                     new JobClientMessages.SubmitJobAndWait(jobGraph),
                                     new Timeout(AkkaUtils.INF_TIMEOUT()));
answer = Await.result(future, AkkaUtils.INF_TIMEOUT());

该SubmitJobAndWait消息被JobClientActor接收后,最终通过调用tryToSubmitJob方法触发真正的提交动作。在tryToSubmitJob方法中,一个JobGraph的提交将会分为两步:

  1. 将用户程序相关的Jar包上传至JobManager;
  2. 给JobManager Actor发送封装JobGraph的SubmitJob消息;

随后,JobManager Actor会接收到来自JobClientActor的SubmitJob消息,进而触发submitJob方法,该方法的执行主体已经是JobManager了。submitJob包含的逻辑较为复杂,且任何一个检测或者子调用所产生的异常都可能会导致提交失败。我们列举一下该方法完成的主要任务:

  1. 向BlobLibraryCacheManager注册该Job;
  2. 构建ExecutionGraph对象;
  3. 对JobGraph中的每个顶点进行初始化;
  4. 将DAG拓扑中从source开始排序,排序后的顶点集合附加到ExecutionGraph对象;
  5. 获取检查点相关的配置,并将其设置到ExecutionGraph对象;
  6. 向ExecutionGraph注册相关的listener;
  7. 执行恢复操作或者将JobGraph信息写入SubmittedJobGraphStore以在后续用于恢复目的;
  8. 响应给客户端JobSubmitSuccess消息;
  9. 对ExecutionGraph对象进行调度执行;

如果提交流程顺利,用户程序包以及描述Job的JobGraph将会被JobManager接收,随后JobManager会对Job进行调度、部署并执行。JobClient会阻塞等待提交结果返回。在得到返回结果之后,先进行解析判断它是否是Job被成功执行后返回的结果:

if (answer instanceof JobManagerMessages.JobResultSuccess) {
    SerializedJobExecutionResult result = ((JobManagerMessages.JobResultSuccess) answer).result();
    if (result != null) {
        try {
            return result.toJobExecutionResult(classLoader);
         } catch (Throwable t) {
             throw new JobExecutionException(jobGraph.getJobID(),
             "Job was successfully executed but JobExecutionResult could not be deserialized.");
        }
    } else {
         throw new JobExecutionException(jobGraph.getJobID(),
         "Job was successfully executed but result contained a null JobExecutionResult.");
    }
}

还是失败后返回的结果:

if (answer instanceof JobManagerMessages.JobResultFailure) {
    LOG.info("Job execution failed");
    SerializedThrowable serThrowable = ((JobManagerMessages.JobResultFailure) answer).cause();
    if (serThrowable != null) {
        Throwable cause = serThrowable.deserializeError(classLoader);
        if (cause instanceof JobExecutionException) {
            throw (JobExecutionException) cause;
        } else {
            throw new JobExecutionException(jobGraph.getJobID(), "Job execution failed", cause);
        }
    } else {
        throw new JobExecutionException(jobGraph.getJobID(),
        "Job execution failed with null as failure cause.");
    }
} else {
    throw new JobExecutionException(jobGraph.getJobID(),
        "Unknown answer from JobManager after submitting the job: " + answer);
}

以上就是批处理作业和流处理作业共同的提交流程,这中间涉及了JobManager接收到用户提交后一系列处理,这部分的处理细节我们随后进行分析。


原文发布时间为:2017-03-31

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

上一篇:学习Python的三种境界


下一篇:Action的动态调用方法