Azkaban框架会将每个Flow抽象为FlowRunner,然后将FlowRunner放入线程池中异步运行,运行过程中涉及到多次修改job的状态,以及将状态持久化到DB元数据库中,这里就从源码角度将整个过程做个简单的分析:
先从azkaban.execapp.FlowRunner#runFlow
开始分析
/**
* Main method that executes the jobs.
*/
private void runFlow() throws Exception {
this.logger.info("Starting flows");
runReadyJob(this.flow); //执行准备job工作
updateFlow(); //更新update时间
//阻塞直到任务结束
while (!this.flowFinished) {
synchronized (this.mainSyncObj) {
if (this.flowPaused) {
try {
this.mainSyncObj.wait(CHECK_WAIT_MS);
} catch (final InterruptedException e) {
}
continue;
} else {
if (this.retryFailedJobs) {
retryAllFailures();
} else if (!progressGraph()) {
try {
this.mainSyncObj.wait(CHECK_WAIT_MS);
} catch (final InterruptedException e) {
}
}
}
}
}
//停止Flow,更新状态
this.logger.info("Finishing up flow. Awaiting Termination");
this.executorService.shutdown();
updateFlow();
this.logger.info("Finished Flow");
}
以上方法中最主要的就是runReadyJob(this.flow)
方法,该方法为Flow执行主体方法:
private boolean runReadyJob(final ExecutableNode node) throws IOException {
//任务结束 或者 正在运行中就返回False
if (Status.isStatusFinished(node.getStatus())
|| Status.isStatusRunning(node.getStatus())) {
return false;
}
/**
* Determines what the state of the next node should be. Returns null if the node should not be
* run.
*/
//nextNodeStatus 只要不是null就代表可以执行。
final Status nextNodeStatus = getImpliedStatus(node);
if (nextNodeStatus == null) {
return false;
}
if (nextNodeStatus == Status.CANCELLED) {
this.logger.info("Cancelling '" + node.getNestedId()
+ "' due to prior errors.");
node.cancelNode(System.currentTimeMillis());
finishExecutableNode(node);
} else if (nextNodeStatus == Status.SKIPPED) {
this.logger.info("Skipping disabled job '" + node.getId() + "'.");
node.skipNode(System.currentTimeMillis());
finishExecutableNode(node);
} else if (nextNodeStatus == Status.READY) {
//具备执行条件,准备执行
//递归执行该Flow下的所有可执行节点
if (node instanceof ExecutableFlowBase) {
final ExecutableFlowBase flow = ((ExecutableFlowBase) node);
this.logger.info("Running flow '" + flow.getNestedId() + "'.");
//修改flow运行状态以及初始时间属性
flow.setStatus(Status.RUNNING);
flow.setStartTime(System.currentTimeMillis());
prepareJobProperties(flow);
for (final String startNodeId : ((ExecutableFlowBase) node).getStartNodes()) {
final ExecutableNode startNode = flow.getExecutableNode(startNodeId);
runReadyJob(startNode);
}
} else {
runExecutableNode(node);
}
}
return true;
}
以上代码中,可执行节点执行主要方法在于runExecutableNode()
方法:
@SuppressWarnings("FutureReturnValueIgnored")
private void runExecutableNode(final ExecutableNode node) throws IOException {
// Collect output props from the job's dependencies.
prepareJobProperties(node);
//先将该节点的状态设置为队列等待状态
node.setStatus(Status.QUEUED);
//获取JobRunner
final JobRunner runner = createJobRunner(node);
this.logger.info("Submitting job '" + node.getNestedId() + "' to run.");
try {
//将JobRunner添加到线程池中,异步运行
this.executorService.submit(runner);
this.activeJobRunners.add(runner);
} catch (final RejectedExecutionException e) {
this.logger.error(e);
}
}
这里有一个executorService
,可以看到它就是一个线程池:
上边代码中我们可以看到,azkaban将我们的任务封装成了JobRunner对象,并异步提交线程,所以接下来我们要从azkaban.execapp.JobRunner#run
着手:
/**
* The main run thread.
*/
@Override
public void run() {
try {
doRun();
} catch (final Exception e) {
serverLogger.error("Unexpected exception", e);
throw e;
}
}
可以看到这里执行了一个doRun()
方法,我们继续追踪:
private void doRun() {
//设置线程名称
Thread.currentThread().setName(
"JobRunner-" + this.jobId + "-" + this.executionId);
// If the job is cancelled, disabled, killed. No log is created in this case
//再次判断任务状态,如果任务被取消,任务处于无效状态或者被强制杀掉,就直接结束
if (handleNonReadyStatus()) {
return;
}
//创建Attachment文件以及相关日志服务
createAttachmentFile();
createLogger();
boolean errorFound = false;
// Delay execution if necessary. Will return a true if something went wrong.
errorFound |= delayExecution();
// For pipelining of jobs. Will watch other jobs. Will return true if
// something went wrong.
errorFound |= blockOnPipeLine();
// Start the node.
//开始执行节点任务
this.node.setStartTime(System.currentTimeMillis());
Status finalStatus = this.node.getStatus();
//往MySQL中插入一条数据,该方法主体后面会讲到
uploadExecutableNode();
if (!errorFound && !isKilled()) {
//启动一个监听器
fireEvent(Event.create(this, EventType.JOB_STARTED, new EventData(this.node)));
//做一些任务执行前的准备工作,构建Job对象,这个prepareJob后面也会重点分析一下
final Status prepareStatus = prepareJob();
// 任务正常的情况下,这里返回的是RUNNING。
// 如果任务中断 或者 取消等其他情况,prepareStatus 返回null,具体可看下边对于prepareJob()方法的解析
if (prepareStatus != null) {
// Writes status to the db
//将状态持久化到数据库中
writeStatus();
fireEvent(Event.create(this, EventType.JOB_STATUS_CHANGED,
new EventData(prepareStatus, this.node.getNestedId())));
// 终于到最后的任务实际执行了,具体方法分析在后边
finalStatus = runJob();
} else {
finalStatus = changeStatus(Status.FAILED);
logError("Job run failed preparing the job.");
}
}
this.node.setEndTime(System.currentTimeMillis());
if (isKilled()) {
// even if it's killed, there is a chance that the job failed is marked as
// failure,
// So we set it to KILLED to make sure we know that we forced kill it
// rather than
// it being a legitimate failure.
finalStatus = changeStatus(Status.KILLED);
}
logInfo(
"Finishing job " + this.jobId + getNodeRetryLog() + " at " + this.node.getEndTime()
+ " with status " + this.node.getStatus());
try {
finalizeLogFile(this.node.getAttempt());
finalizeAttachmentFile();
writeStatus();
} finally {
// note that FlowRunner thread does node.attempt++ when it receives the JOB_FINISHED event
fireEvent(Event.create(this, EventType.JOB_FINISHED,
new EventData(finalStatus, this.node.getNestedId())), false);
}
}
如上 doRun() 方法里边有3个比较重要的方法:uploadExecutableNode(),prepareJob(), runJob() ,下面分别介绍一下这三个方法:
uploadExecutableNode()
:
可以追溯到以下方法:
可以看出azkaban在任务执行前会先在mysql中插入一条任务的相关元数据信息:
public void uploadExecutableNode(final ExecutableNode node, final Props inputProps)
throws ExecutorManagerException {
final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs "
+ "(exec_id, project_id, version, flow_id, job_id, start_time, "
+ "end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";
byte[] inputParam = null;
if (inputProps != null) {
try {
final String jsonString =
JSONUtils.toJSON(PropsUtils.toHierarchicalMap(inputProps));
inputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
} catch (final IOException e) {
throw new ExecutorManagerException("Error encoding input params");
}
}
final ExecutableFlow flow = node.getExecutableFlow();
final String flowId = node.getParentFlow().getFlowPath();
logger.info("Uploading flowId " + flowId);
try {
this.dbOperator.update(INSERT_EXECUTION_NODE, flow.getExecutionId(),
flow.getProjectId(), flow.getVersion(), flowId, node.getId(),
node.getStartTime(), node.getEndTime(), node.getStatus().getNumVal(),
inputParam, node.getAttempt());
} catch (final SQLException e) {
throw new ExecutorManagerException("Error writing job " + node.getId(), e);
}
}
azkaban.execapp.JobRunner#prepareJob
:
private Status prepareJob() throws RuntimeException {
// Check pre conditions
//再次判断条件
if (this.props == null || this.isKilled()) {
logError("Failing job. The job properties don't exist");
return null;
}
//判断条件
final Status finalStatus;
synchronized (this.syncObject) {
if (this.node.getStatus() == Status.FAILED || this.isKilled()) {
return null;
}
//打印开始执行日志
logInfo("Starting job " + this.jobId + getNodeRetryLog() + " at " + this.node.getStartTime());
// If it's an embedded flow, we'll add the nested flow info to the job
// conf 配置信息
if (this.node.getExecutableFlow() != this.node.getParentFlow()) {
final String subFlow = this.node.getPrintableId(":");
this.props.put(CommonJobProperties.NESTED_FLOW_PATH, subFlow);
}
// 以下方法是将任务的元数据参数 以及 JVM 参数 放入JobRunner对象的props中
insertJobMetadata();
insertJVMAargs();
this.props.put(CommonJobProperties.JOB_ID, this.jobId);
this.props.put(CommonJobProperties.JOB_ATTEMPT, this.node.getAttempt());
this.props.put(CommonJobProperties.JOB_METADATA_FILE,
createMetaDataFileName(this.node));
this.props.put(CommonJobProperties.JOB_ATTACHMENT_FILE, this.attachmentFileName);
this.props.put(CommonJobProperties.JOB_LOG_FILE, this.logFile.getAbsolutePath());
//将运行状态改为RUNNING状态
finalStatus = changeStatus(Status.RUNNING);
// Ability to specify working directory 工作目录配置
if (!this.props.containsKey(AbstractProcessJob.WORKING_DIR)) {
this.props.put(AbstractProcessJob.WORKING_DIR, this.workingDir.getAbsolutePath());
}
//提交任务的用户,先判断是否设置有代理用户及代理用户的权限检查,否则就是默认提交用户
if (this.props.containsKey(JobProperties.USER_TO_PROXY)) {
final String jobProxyUser = this.props.getString(JobProperties.USER_TO_PROXY);
if (this.proxyUsers != null && !this.proxyUsers.contains(jobProxyUser)) {
final String permissionsPageURL = getProjectPermissionsURL();
this.logger.error("User " + jobProxyUser
+ " has no permission to execute this job " + this.jobId + "!"
+ " If you want to execute this flow as " + jobProxyUser
+ ", please add it to Proxy Users under project permissions page: " +
permissionsPageURL);
return null;
}
} else {
final String submitUser = this.getNode().getExecutableFlow().getSubmitUser();
this.props.put(JobProperties.USER_TO_PROXY, submitUser);
this.logger.info("user.to.proxy property was not set, defaulting to submit user " +
submitUser);
}
try {
// 构建Job对象,这个job会在azkaban.execapp.JobRunner#runJob方法中执行
this.job = this.jobtypeManager.buildJobExecutor(this.jobId, this.props, this.logger);
} catch (final JobTypeManagerException e) {
this.logger.error("Failed to build job type", e);
return null;
}
}
// 同时 返回任务的状态,这里正常情况下,返回的应该为RUNNING
return finalStatus;
}
azkaban.execapp.JobRunner#runJob
:
JobRunner的runJob()方法:
private Status runJob() {
Status finalStatus;
try {
//前边的prepareJob()方法,构建的job对象
this.job.run();
//获取任务状态
finalStatus = this.node.getStatus();
} catch (final Throwable e) {
synchronized (this.syncObject) {
if (this.props.getBoolean("job.succeed.on.failure", false)) {
finalStatus = changeStatus(Status.FAILED_SUCCEEDED);
logError("Job run failed, but will treat it like success.");
logError(e.getMessage() + " cause: " + e.getCause(), e);
} else {
if (isKilled() || this.node.getStatus() == Status.KILLED) {
finalStatus = Status.KILLED;
logError("Job run killed!", e);
} else {
finalStatus = changeStatus(Status.FAILED);
logError("Job run failed!", e);
}
logError(e.getMessage() + " cause: " + e.getCause());
}
}
}
if (this.job != null) {
this.node.setOutputProps(this.job.getJobGeneratedProperties());
}
synchronized (this.syncObject) {
// If the job is still running (but not killed), set the status to Success.
if (!Status.isStatusFinished(finalStatus) && !isKilled()) {
finalStatus = changeStatus(Status.SUCCEEDED);
}
}
return finalStatus;
}
上边方法中执行到了this.job.run();
,这里就要分情况了,job的类型不同,执行的方法也不一样,我们这里以最简单的command
type为例:
azkaban.jobExecutor.ProcessJob
如下,当type
为command
时,这里的job
就是ProcessJob
的实例:
ProcessJob的run方法如下:
@Override
public void run() throws Exception {
try {
resolveProps();
} catch (final Exception e) {
handleError("Bad property definition! " + e.getMessage(), e);
}
if (this.getSysProps().getBoolean(MEMCHECK_ENABLED, true)
&& this.getJobProps().getBoolean(AZKABAN_MEMORY_CHECK, true)) {
final Pair<Long, Long> memPair = getProcMemoryRequirement();
final long xms = memPair.getFirst();
final long xmx = memPair.getSecond();
// retry backoff in ms
final String oomMsg = String
.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
xms, xmx, getId());
int attempt;
boolean isMemGranted = true;
//todo HappyRay: move to proper Guice after this class is refactored.
final SystemMemoryInfo memInfo = SERVICE_PROVIDER.getInstance(SystemMemoryInfo.class);
for (attempt = 1; attempt <= Constants.MEMORY_CHECK_RETRY_LIMIT; attempt++) {
isMemGranted = memInfo.canSystemGrantMemory(xmx);
if (isMemGranted) {
info(String.format("Memory granted for job %s", getId()));
if (attempt > 1) {
this.commonMetrics.decrementOOMJobWaitCount();
}
break;
}
if (attempt < Constants.MEMORY_CHECK_RETRY_LIMIT) {
info(String.format(oomMsg + ", sleep for %s secs and retry, attempt %s of %s",
TimeUnit.MILLISECONDS.toSeconds(
Constants.MEMORY_CHECK_INTERVAL_MS), attempt,
Constants.MEMORY_CHECK_RETRY_LIMIT));
if (attempt == 1) {
this.commonMetrics.incrementOOMJobWaitCount();
}
synchronized (this) {
try {
this.wait(Constants.MEMORY_CHECK_INTERVAL_MS);
} catch (final InterruptedException e) {
info(String
.format("Job %s interrupted while waiting for memory check retry", getId()));
}
}
if (this.killed) {
this.commonMetrics.decrementOOMJobWaitCount();
info(String.format("Job %s was killed while waiting for memory check retry", getId()));
return;
}
}
}
if (!isMemGranted) {
this.commonMetrics.decrementOOMJobWaitCount();
handleError(oomMsg, null);
}
}
List<String> commands = null;
try {
commands = getCommandList();
} catch (final Exception e) {
handleError("Job set up failed: " + e.getMessage(), e);
}
final long startMs = System.currentTimeMillis();
if (commands == null) {
handleError("There are no commands to execute", null);
}
info(commands.size() + " commands to execute.");
final File[] propFiles = initPropsFiles();
// change krb5ccname env var so that each job execution gets its own cache
final Map<String, String> envVars = getEnvironmentVariables();
envVars.put(KRB5CCNAME, getKrb5ccname(this.getJobProps()));
// determine whether to run as Azkaban or run as effectiveUser,
// by default, run as effectiveUser
String executeAsUserBinaryPath = null;
String effectiveUser = null;
final boolean isExecuteAsUser = this.getSysProps().getBoolean(EXECUTE_AS_USER, true);
//Get list of users we never execute flows as. (ie: root, azkaban)
final Set<String> blackListedUsers = new HashSet<>(
Arrays.asList(
this.getSysProps()
.getString(Constants.ConfigurationKeys.BLACK_LISTED_USERS, "root,azkaban")
.split(",")
)
);
// nativeLibFolder specifies the path for execute-as-user file,
// which will change user from Azkaban to effectiveUser
if (isExecuteAsUser) {
final String nativeLibFolder = this.getSysProps().getString(AZKABAN_SERVER_NATIVE_LIB_FOLDER);
executeAsUserBinaryPath = String.format("%s/%s", nativeLibFolder, "execute-as-user");
effectiveUser = getEffectiveUser(this.getJobProps());
// Throw exception if Azkaban tries to run flow as a prohibited user
if (blackListedUsers.contains(effectiveUser)) {
throw new RuntimeException(
String.format("Not permitted to proxy as '%s' through Azkaban", effectiveUser)
);
}
// Set parent directory permissions to <uid>:azkaban so user can write in their execution directory
// if the directory is not permissioned correctly already (should happen once per execution)
if (!canWriteInCurrentWorkingDirectory(effectiveUser)) {
info("Changing current working directory ownership");
assignUserFileOwnership(effectiveUser, getWorkingDirectory());
}
// Set property file permissions to <uid>:azkaban so user can write to their prop files
// in order to pass properties from one job to another
for (final File propFile : propFiles) {
info("Changing properties files ownership");
assignUserFileOwnership(effectiveUser, propFile.getAbsolutePath());
}
}
for (String command : commands) {
AzkabanProcessBuilder builder = null;
if (isExecuteAsUser) {
command =
String.format("%s %s %s", executeAsUserBinaryPath, effectiveUser,
command);
info("Command: " + command);
builder =
new AzkabanProcessBuilder(partitionCommandLine(command))
.setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog())
.enableExecuteAsUser().setExecuteAsUserBinaryPath(executeAsUserBinaryPath)
.setEffectiveUser(effectiveUser);
} else {
info("Command: " + command);
builder =
new AzkabanProcessBuilder(partitionCommandLine(command))
.setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog());
}
if (builder.getEnv().size() > 0) {
info("Environment variables: " + builder.getEnv());
}
info("Working directory: " + builder.getWorkingDir());
// print out the Job properties to the job log.
this.logJobProperties();
synchronized (this) {
// Make sure that checking if the process job is killed and creating an AzkabanProcess
// object are atomic. The cancel method relies on this to make sure that if this.process is
// not null, this block of code which includes checking if the job is killed has not been
// executed yet.
if (this.killed) {
info("The job is killed. Abort. No job process created.");
return;
}
this.process = builder.build();
}
try {
this.process.run();
this.success = true;
} catch (final Throwable e) {
for (final File file : propFiles) {
if (file != null && file.exists()) {
file.delete();
}
}
throw new RuntimeException(e);
} finally {
info("Process completed "
+ (this.success ? "successfully" : "unsuccessfully") + " in "
+ ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
}
}
// Get the output properties from this job.
generateProperties(propFiles[1]);
}
当然后面还有更细一层的分析:this.process = builder.build();
以及this.process.run();
,基本上看了源码就能看懂了,这里也就不再提了。