Jobmanager的submitJob逻辑,
/**
* Submits a job to the job manager. The job is registered at the libraryCacheManager which
* creates the job's class loader. The job graph is appended to the corresponding execution
* graph and the execution vertices are queued for scheduling.
*
* @param jobGraph representing the Flink job
* @param jobInfo the job info
* @param isRecovery Flag indicating whether this is a recovery or initial submission
*/
private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {
if (jobGraph == null) {
jobInfo.notifyClients(
decorateMessage(JobResultFailure(
new SerializedThrowable(
new JobSubmissionException(null, "JobGraph must not be null.")))))
}
else {
val jobId = jobGraph.getJobID
val jobName = jobGraph.getName
var executionGraph: ExecutionGraph = null try {
// Important: We need to make sure that the library registration is the first action,
// because this makes sure that the uploaded jar files are removed in case of
// unsuccessful
try {
libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys,
jobGraph.getClasspaths)
}
var userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID) //加载Jar val restartStrategy = //加载重启策略
Option(jobGraph.getSerializedExecutionConfig()
.deserializeValue(userCodeLoader)
.getRestartStrategy())
.map(RestartStrategyFactory.createRestartStrategy)
.filter(p => p != null) match {
case Some(strategy) => strategy
case None => restartStrategyFactory.createRestartStrategy()
} val jobMetrics = jobManagerMetricGroup match { //生成job manager metric group
case Some(group) =>
group.addJob(jobGraph) match {
case (jobGroup:Any) => jobGroup
case null => new UnregisteredMetricsGroup()
}
case None =>
new UnregisteredMetricsGroup()
} val numSlots = scheduler.getTotalNumberOfSlots() //现有的slots数目 // see if there already exists an ExecutionGraph for the corresponding job ID
val registerNewGraph = currentJobs.get(jobGraph.getJobID) match {
case Some((graph, currentJobInfo)) =>
executionGraph = graph
currentJobInfo.setLastActive()
false
case None =>
true
} executionGraph = ExecutionGraphBuilder.buildGraph( //build ExecutionGraph
executionGraph,
jobGraph,
flinkConfiguration,
futureExecutor,
ioExecutor,
userCodeLoader,
checkpointRecoveryFactory,
Time.of(timeout.length, timeout.unit),
restartStrategy,
jobMetrics,
numSlots,
log.logger) if (registerNewGraph) { //如果是新的JobGraph,注册到currentJobs
currentJobs.put(jobGraph.getJobID, (executionGraph, jobInfo))
} // get notified about job status changes
executionGraph.registerJobStatusListener( //jobmananger加到通知listeners
new StatusListenerMessenger(self, leaderSessionID.orNull)) jobInfo.clients foreach { //client加到通知listeners
// the sender wants to be notified about state changes
case (client, ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) =>
val listener = new StatusListenerMessenger(client, leaderSessionID.orNull)
executionGraph.registerExecutionListener(listener)
executionGraph.registerJobStatusListener(listener)
case _ => // do nothing
} } catch { //失败
case t: Throwable =>
log.error(s"Failed to submit job $jobId ($jobName)", t) libraryCacheManager.unregisterJob(jobId)
currentJobs.remove(jobId) if (executionGraph != null) {
executionGraph.fail(t) //fail executionGraph
} val rt: Throwable = if (t.isInstanceOf[JobExecutionException]) {
t
} else {
new JobExecutionException(jobId, s"Failed to submit job $jobId ($jobName)", t)
} jobInfo.notifyClients(
decorateMessage(JobResultFailure(new SerializedThrowable(rt)))) //通知提交失败
return
} //上面是准备executionGraph,下面是异步提交
// execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously
// because it is a blocking operation
future {
try {
if (isRecovery) {
// this is a recovery of a master failure (this master takes over)
executionGraph.restoreLatestCheckpointedState(false, false) //加载checkpoint状态
}
else {
// load a savepoint only if this is not starting from a newer checkpoint
// as part of an master failure recovery
val savepointSettings = jobGraph.getSavepointRestoreSettings
if (savepointSettings.restoreSavepoint()) { //处理savePoint
try {
val savepointPath = savepointSettings.getRestorePath()
val allowNonRestored = savepointSettings.allowNonRestoredState() log.info(s"Starting job from savepoint '$savepointPath'" +
(if (allowNonRestored) " (allowing non restored state)" else "") + ".") // load the savepoint as a checkpoint into the system
val savepoint: CompletedCheckpoint = SavepointLoader.loadAndValidateSavepoint(
jobId,
executionGraph.getAllVertices,
savepointPath,
executionGraph.getUserClassLoader,
allowNonRestored) executionGraph.getCheckpointCoordinator.getCheckpointStore
.addCheckpoint(savepoint) // Reset the checkpoint ID counter
val nextCheckpointId: Long = savepoint.getCheckpointID + 1
log.info(s"Reset the checkpoint ID to $nextCheckpointId")
executionGraph.getCheckpointCoordinator.getCheckpointIdCounter
.setCount(nextCheckpointId) executionGraph.restoreLatestCheckpointedState(true, allowNonRestored)
} catch {
case e: Exception =>
jobInfo.notifyClients(
decorateMessage(JobResultFailure(new SerializedThrowable(e))))
throw new SuppressRestartsException(e)
}
} try {
submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) //存储该JobGraph到zk,ZooKeeperSubmittedJobGraphStore
} catch {
case t: Throwable =>
// Don't restart the execution if this fails. Otherwise, the
// job graph will skip ZooKeeper in case of HA.
jobInfo.notifyClients(
decorateMessage(JobResultFailure(new SerializedThrowable(t))))
throw new SuppressRestartsException(t)
}
} jobInfo.notifyClients(
decorateMessage(JobSubmitSuccess(jobGraph.getJobID))) //通知clients提交成功 if (leaderElectionService.hasLeadership) {
// There is a small chance that multiple job managers schedule the same job after if
// they try to recover at the same time. This will eventually be noticed, but can not be
// ruled out from the beginning. // NOTE: Scheduling the job for execution is a separate action from the job submission.
// The success of submitting the job must be independent from the success of scheduling
// the job.
log.info(s"Scheduling job $jobId ($jobName).") executionGraph.scheduleForExecution(scheduler) //开始调度
} else {
// Remove the job graph. Otherwise it will be lingering around and possibly removed from
// ZooKeeper by this JM.
self ! decorateMessage(RemoveJob(jobId, removeJobFromStateBackend = false)) log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +
"this. I am not scheduling the job for execution.")
}
} catch {
case t: Throwable => try {
executionGraph.fail(t)
} catch {
case tt: Throwable =>
log.error("Error while marking ExecutionGraph as failed.", tt)
}
}
}(context.dispatcher)
}
}
可以看到executionGraph在调度前就已经通知用户提交成功
当job发生问题,需要调用到tryRestartOrFail
private boolean tryRestartOrFail() {
JobStatus currentState = state; if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
synchronized (progressLock) { //锁 final boolean isFailureCauseAllowingRestart = !(failureCause instanceof SuppressRestartsException);
final boolean isRestartStrategyAllowingRestart = restartStrategy.canRestart(); //重启策略是否允许重启
boolean isRestartable = isFailureCauseAllowingRestart && isRestartStrategyAllowingRestart; if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
restartStrategy.restart(this); return true;
} else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) { //如果不允许重启,就failed
final List<String> reasonsForNoRestart = new ArrayList<>(2);
if (!isFailureCauseAllowingRestart) {
reasonsForNoRestart.add("a type of SuppressRestartsException was thrown");
}
if (!isRestartStrategyAllowingRestart) {
reasonsForNoRestart.add("the restart strategy prevented it");
} LOG.info("Could not restart the job {} ({}) because {}.", getJobName(), getJobID(),
StringUtils.join(reasonsForNoRestart, " and "), failureCause);
postRunCleanup(); return true;
} else {
// we must have changed the state concurrently, thus we cannot complete this operation
return false;
}
}
} else {
// this operation is only allowed in the state FAILING or RESTARTING
return false;
}
}
有两处会调用到tryRestartOrFail
1. ExecutionGraph.jobVertexInFinalState
void jobVertexInFinalState() {
synchronized (progressLock) {
if (numFinishedJobVertices >= verticesInCreationOrder.size()) {
throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished.");
} numFinishedJobVertices++; if (numFinishedJobVertices == verticesInCreationOrder.size()) { //当所有的vertices都已经finished // we are done, transition to the final state
JobStatus current;
while (true) {
current = this.state; if (current == JobStatus.RUNNING) {
if (transitionState(current, JobStatus.FINISHED)) {
postRunCleanup();
break;
}
}
else if (current == JobStatus.CANCELLING) {
if (transitionState(current, JobStatus.CANCELED)) {
postRunCleanup();
break;
}
}
else if (current == JobStatus.FAILING) {
if (tryRestartOrFail()) { //如果failing,调用tryRestartOrFail
break;
}
// concurrent job status change, let's check again
}
2. 显式的调用到ExecutionGraph.fail
} else if (current == JobStatus.RESTARTING) {
this.failureCause = t; if (tryRestartOrFail()) {
return;
}
// concurrent job status change, let's check again
}
上面调用到restartStrategy.restart(this);
restartStrategy有很多种,我们先看看
FixedDelayRestartStrategy
@Override
public void restart(final ExecutionGraph executionGraph) {
currentRestartAttempt++;
FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getFutureExecutor());
}
异步的调用,ExecutionGraphRestarter.restartWithDelay
最终调用到
executionGraph.restart();
public void restart() {
try {
synchronized (progressLock) {
this.currentExecutions.clear(); Collection<CoLocationGroup> colGroups = new HashSet<>(); for (ExecutionJobVertex jv : this.verticesInCreationOrder) { CoLocationGroup cgroup = jv.getCoLocationGroup();
if(cgroup != null && !colGroups.contains(cgroup)){
cgroup.resetConstraints();
colGroups.add(cgroup);
} jv.resetForNewExecution();
} for (int i = 0; i < stateTimestamps.length; i++) {
if (i != JobStatus.RESTARTING.ordinal()) {
// Only clear the non restarting state in order to preserve when the job was
// restarted. This is needed for the restarting time gauge
stateTimestamps[i] = 0;
}
}
numFinishedJobVertices = 0;
transitionState(JobStatus.RESTARTING, JobStatus.CREATED); // if we have checkpointed state, reload it into the executions
if (checkpointCoordinator != null) {
checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
}
} scheduleForExecution(slotProvider); //加入schedule
}
catch (Throwable t) {
LOG.warn("Failed to restart the job.", t);
fail(t);
}
}
关于重启策略,
参考https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/restart_strategies.html
If checkpointing is not enabled, the “no restart” strategy is used. If checkpointing is activated and the restart strategy has not been configured, the fixed-delay strategy is used with Integer.MAX_VALUE
restart attempts.
StreamingJobGraphGenerator
private void configureCheckpointing() {
CheckpointConfig cfg = streamGraph.getCheckpointConfig(); long interval = cfg.getCheckpointInterval();
if (interval > 0) {
// check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy
if (streamGraph.getExecutionConfig().getRestartStrategy() == null) {
// if the user enabled checkpointing, the default number of exec retries is infinite.
streamGraph.getExecutionConfig().setRestartStrategy(
RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
}
}
当打开checkpoint的时候,默认是使用fixedDelayRestart,并Integer.MAX_VALUE次重启