Azkaban 单个Flow 任务执行流程 源码解读

Azkaban框架会将每个Flow抽象为FlowRunner,然后将FlowRunner放入线程池中异步运行,运行过程中涉及到多次修改job的状态,以及将状态持久化到DB元数据库中,这里就从源码角度将整个过程做个简单的分析:

先从azkaban.execapp.FlowRunner#runFlow开始分析

/**
 * Main method that executes the jobs.
 */
private void runFlow() throws Exception {
    this.logger.info("Starting flows");
    runReadyJob(this.flow); //执行准备job工作
    updateFlow(); //更新update时间
//阻塞直到任务结束
    while (!this.flowFinished) {
        synchronized (this.mainSyncObj) {
            if (this.flowPaused) {
                try {
                    this.mainSyncObj.wait(CHECK_WAIT_MS);
                } catch (final InterruptedException e) {
                }

                continue;
            } else {
                if (this.retryFailedJobs) {
                    retryAllFailures();
                } else if (!progressGraph()) {
                    try {
                        this.mainSyncObj.wait(CHECK_WAIT_MS);
                    } catch (final InterruptedException e) {
                    }
                }
            }
        }
    }
//停止Flow,更新状态
    this.logger.info("Finishing up flow. Awaiting Termination");
    this.executorService.shutdown();

    updateFlow();
    this.logger.info("Finished Flow");
}

以上方法中最主要的就是runReadyJob(this.flow)方法,该方法为Flow执行主体方法:

private boolean runReadyJob(final ExecutableNode node) throws IOException {
//任务结束 或者 正在运行中就返回False
      if (Status.isStatusFinished(node.getStatus())
              || Status.isStatusRunning(node.getStatus())) {
          return false;
      }
	 /**
     * Determines what the state of the next node should be. Returns null if the node should not be
     * run.
     */
     //nextNodeStatus 只要不是null就代表可以执行。
      final Status nextNodeStatus = getImpliedStatus(node);
      if (nextNodeStatus == null) {
          return false;
      }

      if (nextNodeStatus == Status.CANCELLED) {
          this.logger.info("Cancelling '" + node.getNestedId()
                  + "' due to prior errors.");
          node.cancelNode(System.currentTimeMillis());
          finishExecutableNode(node);
      } else if (nextNodeStatus == Status.SKIPPED) {
          this.logger.info("Skipping disabled job '" + node.getId() + "'.");
          node.skipNode(System.currentTimeMillis());
          finishExecutableNode(node);
      } else if (nextNodeStatus == Status.READY) {
      //具备执行条件,准备执行
      //递归执行该Flow下的所有可执行节点
          if (node instanceof ExecutableFlowBase) {
              final ExecutableFlowBase flow = ((ExecutableFlowBase) node);
              this.logger.info("Running flow '" + flow.getNestedId() + "'.");
              //修改flow运行状态以及初始时间属性
              flow.setStatus(Status.RUNNING);
              flow.setStartTime(System.currentTimeMillis());
              prepareJobProperties(flow);
              for (final String startNodeId : ((ExecutableFlowBase) node).getStartNodes()) {
                  final ExecutableNode startNode = flow.getExecutableNode(startNodeId);
                  runReadyJob(startNode);
              }
          } else {
              runExecutableNode(node);
          }
      }
      return true;
}

以上代码中,可执行节点执行主要方法在于runExecutableNode()方法:

@SuppressWarnings("FutureReturnValueIgnored")
private void runExecutableNode(final ExecutableNode node) throws IOException {
    // Collect output props from the job's dependencies.
    prepareJobProperties(node);
    //先将该节点的状态设置为队列等待状态
    node.setStatus(Status.QUEUED);
    //获取JobRunner
    final JobRunner runner = createJobRunner(node);
    this.logger.info("Submitting job '" + node.getNestedId() + "' to run.");
    try {
    //将JobRunner添加到线程池中,异步运行
        this.executorService.submit(runner);
        this.activeJobRunners.add(runner);
    } catch (final RejectedExecutionException e) {
        this.logger.error(e);
    }
}

这里有一个executorService,可以看到它就是一个线程池:
Azkaban 单个Flow 任务执行流程 源码解读

上边代码中我们可以看到,azkaban将我们的任务封装成了JobRunner对象,并异步提交线程,所以接下来我们要从azkaban.execapp.JobRunner#run着手:

/**
 * The main run thread.
 */
@Override
public void run() {
  try {
    doRun();
  } catch (final Exception e) {
    serverLogger.error("Unexpected exception", e);
    throw e;
  }
}

可以看到这里执行了一个doRun()方法,我们继续追踪:

private void doRun() {
//设置线程名称
  Thread.currentThread().setName(
      "JobRunner-" + this.jobId + "-" + this.executionId);

  // If the job is cancelled, disabled, killed. No log is created in this case
  //再次判断任务状态,如果任务被取消,任务处于无效状态或者被强制杀掉,就直接结束
  if (handleNonReadyStatus()) {
    return;
  }
//创建Attachment文件以及相关日志服务
  createAttachmentFile();
  createLogger();
  boolean errorFound = false;
  // Delay execution if necessary. Will return a true if something went wrong.
  errorFound |= delayExecution();

  // For pipelining of jobs. Will watch other jobs. Will return true if
  // something went wrong.
  errorFound |= blockOnPipeLine();

  // Start the node.
  //开始执行节点任务
  this.node.setStartTime(System.currentTimeMillis());
  Status finalStatus = this.node.getStatus();
  //往MySQL中插入一条数据,该方法主体后面会讲到
  uploadExecutableNode(); 
  if (!errorFound && !isKilled()) {
  //启动一个监听器
    fireEvent(Event.create(this, EventType.JOB_STARTED, new EventData(this.node)));

//做一些任务执行前的准备工作,构建Job对象,这个prepareJob后面也会重点分析一下
    final Status prepareStatus = prepareJob();
    // 任务正常的情况下,这里返回的是RUNNING。
    // 如果任务中断 或者 取消等其他情况,prepareStatus 返回null,具体可看下边对于prepareJob()方法的解析
    if (prepareStatus != null) {
      // Writes status to the db
      //将状态持久化到数据库中
      writeStatus();
      fireEvent(Event.create(this, EventType.JOB_STATUS_CHANGED,
          new EventData(prepareStatus, this.node.getNestedId())));
       
// 终于到最后的任务实际执行了,具体方法分析在后边
      finalStatus = runJob();
    } else {
      finalStatus = changeStatus(Status.FAILED);
      logError("Job run failed preparing the job.");
    }
  }
  this.node.setEndTime(System.currentTimeMillis());

  if (isKilled()) {
    // even if it's killed, there is a chance that the job failed is marked as
    // failure,
    // So we set it to KILLED to make sure we know that we forced kill it
    // rather than
    // it being a legitimate failure.
    finalStatus = changeStatus(Status.KILLED);
  }

  logInfo(
      "Finishing job " + this.jobId + getNodeRetryLog() + " at " + this.node.getEndTime()
          + " with status " + this.node.getStatus());

  try {
    finalizeLogFile(this.node.getAttempt());
    finalizeAttachmentFile();
    writeStatus();
  } finally {
    // note that FlowRunner thread does node.attempt++ when it receives the JOB_FINISHED event
    fireEvent(Event.create(this, EventType.JOB_FINISHED,
        new EventData(finalStatus, this.node.getNestedId())), false);
  }
}

如上 doRun() 方法里边有3个比较重要的方法:uploadExecutableNode()prepareJob()runJob() ,下面分别介绍一下这三个方法:

uploadExecutableNode()
可以追溯到以下方法:
可以看出azkaban在任务执行前会先在mysql中插入一条任务的相关元数据信息

public void uploadExecutableNode(final ExecutableNode node, final Props inputProps)
    throws ExecutorManagerException {
  final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs "
      + "(exec_id, project_id, version, flow_id, job_id, start_time, "
      + "end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";

  byte[] inputParam = null;
  if (inputProps != null) {
    try {
      final String jsonString =
          JSONUtils.toJSON(PropsUtils.toHierarchicalMap(inputProps));
      inputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
    } catch (final IOException e) {
      throw new ExecutorManagerException("Error encoding input params");
    }
  }

  final ExecutableFlow flow = node.getExecutableFlow();
  final String flowId = node.getParentFlow().getFlowPath();
  logger.info("Uploading flowId " + flowId);
  try {
    this.dbOperator.update(INSERT_EXECUTION_NODE, flow.getExecutionId(),
        flow.getProjectId(), flow.getVersion(), flowId, node.getId(),
        node.getStartTime(), node.getEndTime(), node.getStatus().getNumVal(),
        inputParam, node.getAttempt());
  } catch (final SQLException e) {
    throw new ExecutorManagerException("Error writing job " + node.getId(), e);
  }
}

azkaban.execapp.JobRunner#prepareJob

private Status prepareJob() throws RuntimeException {
 // Check pre conditions
 //再次判断条件
 if (this.props == null || this.isKilled()) {
   logError("Failing job. The job properties don't exist");
   return null;
 }
//判断条件
 final Status finalStatus;
 synchronized (this.syncObject) {
   if (this.node.getStatus() == Status.FAILED || this.isKilled()) {
     return null;
   }
//打印开始执行日志
   logInfo("Starting job " + this.jobId + getNodeRetryLog() + " at " + this.node.getStartTime());

   // If it's an embedded flow, we'll add the nested flow info to the job
   // conf 配置信息
   if (this.node.getExecutableFlow() != this.node.getParentFlow()) {
     final String subFlow = this.node.getPrintableId(":");
     this.props.put(CommonJobProperties.NESTED_FLOW_PATH, subFlow);
   }
// 以下方法是将任务的元数据参数 以及 JVM 参数 放入JobRunner对象的props中
   insertJobMetadata();
   insertJVMAargs();

   this.props.put(CommonJobProperties.JOB_ID, this.jobId);
   this.props.put(CommonJobProperties.JOB_ATTEMPT, this.node.getAttempt());
   this.props.put(CommonJobProperties.JOB_METADATA_FILE,
       createMetaDataFileName(this.node));
   this.props.put(CommonJobProperties.JOB_ATTACHMENT_FILE, this.attachmentFileName);
   this.props.put(CommonJobProperties.JOB_LOG_FILE, this.logFile.getAbsolutePath());
//将运行状态改为RUNNING状态
   finalStatus = changeStatus(Status.RUNNING);

   // Ability to specify working directory 工作目录配置
   if (!this.props.containsKey(AbstractProcessJob.WORKING_DIR)) {
     this.props.put(AbstractProcessJob.WORKING_DIR, this.workingDir.getAbsolutePath());
   }
//提交任务的用户,先判断是否设置有代理用户及代理用户的权限检查,否则就是默认提交用户
   if (this.props.containsKey(JobProperties.USER_TO_PROXY)) {
     final String jobProxyUser = this.props.getString(JobProperties.USER_TO_PROXY);
     if (this.proxyUsers != null && !this.proxyUsers.contains(jobProxyUser)) {
       final String permissionsPageURL = getProjectPermissionsURL();
       this.logger.error("User " + jobProxyUser
           + " has no permission to execute this job " + this.jobId + "!"
           + " If you want to execute this flow as " + jobProxyUser
           + ", please add it to Proxy Users under project permissions page: " +
           permissionsPageURL);
       return null;
     }
   } else {
     final String submitUser = this.getNode().getExecutableFlow().getSubmitUser();
     this.props.put(JobProperties.USER_TO_PROXY, submitUser);
     this.logger.info("user.to.proxy property was not set, defaulting to submit user " +
         submitUser);
   }

   try {
   // 构建Job对象,这个job会在azkaban.execapp.JobRunner#runJob方法中执行
     this.job = this.jobtypeManager.buildJobExecutor(this.jobId, this.props, this.logger);
   } catch (final JobTypeManagerException e) {
     this.logger.error("Failed to build job type", e);
     return null;
   }
 }
// 同时 返回任务的状态,这里正常情况下,返回的应该为RUNNING
 return finalStatus;
}

azkaban.execapp.JobRunner#runJob
JobRunner的runJob()方法:

private Status runJob() {
 Status finalStatus;
 try {
 //前边的prepareJob()方法,构建的job对象
   this.job.run();
   //获取任务状态
   finalStatus = this.node.getStatus();
 } catch (final Throwable e) {
   synchronized (this.syncObject) {
     if (this.props.getBoolean("job.succeed.on.failure", false)) {
       finalStatus = changeStatus(Status.FAILED_SUCCEEDED);
       logError("Job run failed, but will treat it like success.");
       logError(e.getMessage() + " cause: " + e.getCause(), e);
     } else {
       if (isKilled() || this.node.getStatus() == Status.KILLED) {
         finalStatus = Status.KILLED;
         logError("Job run killed!", e);
       } else {
         finalStatus = changeStatus(Status.FAILED);
         logError("Job run failed!", e);
       }
       logError(e.getMessage() + " cause: " + e.getCause());
     }
   }
 }

 if (this.job != null) {
   this.node.setOutputProps(this.job.getJobGeneratedProperties());
 }

 synchronized (this.syncObject) {
   // If the job is still running (but not killed), set the status to Success.
   if (!Status.isStatusFinished(finalStatus) && !isKilled()) {
     finalStatus = changeStatus(Status.SUCCEEDED);
   }
 }
 return finalStatus;
}

上边方法中执行到了this.job.run();,这里就要分情况了,job的类型不同,执行的方法也不一样,我们这里以最简单的commandtype为例:

azkaban.jobExecutor.ProcessJob

如下,当typecommand时,这里的job就是ProcessJob的实例:
Azkaban 单个Flow 任务执行流程 源码解读
ProcessJob的run方法如下:

  @Override
  public void run() throws Exception {
    try {
      resolveProps();
    } catch (final Exception e) {
      handleError("Bad property definition! " + e.getMessage(), e);
    }

    if (this.getSysProps().getBoolean(MEMCHECK_ENABLED, true)
        && this.getJobProps().getBoolean(AZKABAN_MEMORY_CHECK, true)) {
      final Pair<Long, Long> memPair = getProcMemoryRequirement();
      final long xms = memPair.getFirst();
      final long xmx = memPair.getSecond();
      // retry backoff in ms
      final String oomMsg = String
          .format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
              xms, xmx, getId());
      int attempt;
      boolean isMemGranted = true;

      //todo HappyRay: move to proper Guice after this class is refactored.
      final SystemMemoryInfo memInfo = SERVICE_PROVIDER.getInstance(SystemMemoryInfo.class);
      for (attempt = 1; attempt <= Constants.MEMORY_CHECK_RETRY_LIMIT; attempt++) {
        isMemGranted = memInfo.canSystemGrantMemory(xmx);
        if (isMemGranted) {
          info(String.format("Memory granted for job %s", getId()));
          if (attempt > 1) {
            this.commonMetrics.decrementOOMJobWaitCount();
          }
          break;
        }
        if (attempt < Constants.MEMORY_CHECK_RETRY_LIMIT) {
          info(String.format(oomMsg + ", sleep for %s secs and retry, attempt %s of %s",
              TimeUnit.MILLISECONDS.toSeconds(
                  Constants.MEMORY_CHECK_INTERVAL_MS), attempt,
              Constants.MEMORY_CHECK_RETRY_LIMIT));
          if (attempt == 1) {
            this.commonMetrics.incrementOOMJobWaitCount();
          }
          synchronized (this) {
            try {
              this.wait(Constants.MEMORY_CHECK_INTERVAL_MS);
            } catch (final InterruptedException e) {
              info(String
                  .format("Job %s interrupted while waiting for memory check retry", getId()));
            }
          }
          if (this.killed) {
            this.commonMetrics.decrementOOMJobWaitCount();
            info(String.format("Job %s was killed while waiting for memory check retry", getId()));
            return;
          }
        }
      }

      if (!isMemGranted) {
        this.commonMetrics.decrementOOMJobWaitCount();
        handleError(oomMsg, null);
      }
    }

    List<String> commands = null;
    try {
      commands = getCommandList();
    } catch (final Exception e) {
      handleError("Job set up failed: " + e.getMessage(), e);
    }

    final long startMs = System.currentTimeMillis();

    if (commands == null) {
      handleError("There are no commands to execute", null);
    }

    info(commands.size() + " commands to execute.");
    final File[] propFiles = initPropsFiles();

    // change krb5ccname env var so that each job execution gets its own cache
    final Map<String, String> envVars = getEnvironmentVariables();
    envVars.put(KRB5CCNAME, getKrb5ccname(this.getJobProps()));

    // determine whether to run as Azkaban or run as effectiveUser,
    // by default, run as effectiveUser
    String executeAsUserBinaryPath = null;
    String effectiveUser = null;
    final boolean isExecuteAsUser = this.getSysProps().getBoolean(EXECUTE_AS_USER, true);

    //Get list of users we never execute flows as. (ie: root, azkaban)
    final Set<String> blackListedUsers = new HashSet<>(
        Arrays.asList(
            this.getSysProps()
                .getString(Constants.ConfigurationKeys.BLACK_LISTED_USERS, "root,azkaban")
                .split(",")
        )
    );

    // nativeLibFolder specifies the path for execute-as-user file,
    // which will change user from Azkaban to effectiveUser
    if (isExecuteAsUser) {
      final String nativeLibFolder = this.getSysProps().getString(AZKABAN_SERVER_NATIVE_LIB_FOLDER);
      executeAsUserBinaryPath = String.format("%s/%s", nativeLibFolder, "execute-as-user");
      effectiveUser = getEffectiveUser(this.getJobProps());
      // Throw exception if Azkaban tries to run flow as a prohibited user
      if (blackListedUsers.contains(effectiveUser)) {
        throw new RuntimeException(
            String.format("Not permitted to proxy as '%s' through Azkaban", effectiveUser)
        );
      }
      // Set parent directory permissions to <uid>:azkaban so user can write in their execution directory
      // if the directory is not permissioned correctly already (should happen once per execution)
      if (!canWriteInCurrentWorkingDirectory(effectiveUser)) {
        info("Changing current working directory ownership");
        assignUserFileOwnership(effectiveUser, getWorkingDirectory());
      }
      // Set property file permissions to <uid>:azkaban so user can write to their prop files
      // in order to pass properties from one job to another
      for (final File propFile : propFiles) {
        info("Changing properties files ownership");
        assignUserFileOwnership(effectiveUser, propFile.getAbsolutePath());
      }
    }

    for (String command : commands) {
      AzkabanProcessBuilder builder = null;
      if (isExecuteAsUser) {
        command =
            String.format("%s %s %s", executeAsUserBinaryPath, effectiveUser,
                command);
        info("Command: " + command);
        builder =
            new AzkabanProcessBuilder(partitionCommandLine(command))
                .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog())
                .enableExecuteAsUser().setExecuteAsUserBinaryPath(executeAsUserBinaryPath)
                .setEffectiveUser(effectiveUser);
      } else {
        info("Command: " + command);
        builder =
            new AzkabanProcessBuilder(partitionCommandLine(command))
                .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog());
      }

      if (builder.getEnv().size() > 0) {
        info("Environment variables: " + builder.getEnv());
      }
      info("Working directory: " + builder.getWorkingDir());

      // print out the Job properties to the job log.
      this.logJobProperties();

      synchronized (this) {
        // Make sure that checking if the process job is killed and creating an AzkabanProcess
        // object are atomic. The cancel method relies on this to make sure that if this.process is
        // not null, this block of code which includes checking if the job is killed has not been
        // executed yet.
        if (this.killed) {
          info("The job is killed. Abort. No job process created.");
          return;
        }
        this.process = builder.build();
      }
      try {
        this.process.run();
        this.success = true;
      } catch (final Throwable e) {
        for (final File file : propFiles) {
          if (file != null && file.exists()) {
            file.delete();
          }
        }
        throw new RuntimeException(e);
      } finally {
        info("Process completed "
            + (this.success ? "successfully" : "unsuccessfully") + " in "
            + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
      }
    }

    // Get the output properties from this job.
    generateProperties(propFiles[1]);
  }

当然后面还有更细一层的分析:
this.process = builder.build();以及this.process.run();,基本上看了源码就能看懂了,这里也就不再提了。

上一篇:不做跟风党,LiveData,StateFlow,SharedFlow 使用场景对比


下一篇:Salesforce: lwc处理Flow的Navigation事件