Yarn任务提交流程(源码分析)

关键词:yarn rm mapreduce 提交

Based on Hadoop 2.7.1

JobSubmitter

  • addMRFrameworkToDistributedCache(Configuration conf) : mapreduce.application.framework.path, 用于指定其他framework的hdfs 路径配置,默认yarn的可以不管
  • Token相关的方法:读取认证信息(支持二进制、json),并将其添加至相应的fileSystem中,以便以同样权限访问文件系统
  • copyAndConfigureFiles(Job job, Path jobSubmitDir): 上传配置、jar、files、libjars、archives等
  • submitJobInternal: 真正的提交任务接口

核心代码提交链

  1. JobSubmitter ->
  2. ClientProtocol(YARNRunner) ->
  3. ResourceMgrDelegate ->
  4. YarnClient(YarnClientImpl).submitApplication( ApplicationSubmissionContext appContext) ->
  5. 【RM】ApplicationClientProtocol(ClientRMService).submitApplication( SubmitApplicationRequest request) -> // fill ASC with dft values
  6. RMAppManager.submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, String user) ->

  • ApplicationSubmissionContext 提交上下文,包含application各种元信息
  • SubmitApplicationRequest 提交Request对象
// Dispatcher is not yet started at this time, so these START events
// enqueued should be guaranteed to be first processed when dispatcher
// gets started.
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.START));

START -> APP_NEW_SAVED

 stateMachineFactory.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition()) //... private static final class RMAppNewlySavingTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) { // If recovery is enabled then store the application information in a
// non-blocking call so make sure that RM has stored the information
// needed to restart the AM after RM restart without further client
// communication
LOG.info("Storing application with id " + app.applicationId);
app.rmContext.getStateStore().storeNewApplication(app);
}
} public synchronized void storeNewApplication(RMApp app) {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationStateData appState =
ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(), context, app.getUser());
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
} private static final class AddApplicationToSchedulerTransition extends
RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
app.submissionContext.getQueue(), app.user,
app.submissionContext.getReservationID()));
}
}

AppAddedSchedulerEvent 会由配置的Scheduler来handle。

P.S. 看 event 部分代码的方法,

  1. 找出状态,比如 APP_NEW_SAVED,
  2. 找出handle这个状态的事件类
  3. 找出处理这个事件的具体逻辑 (这里可能逻辑最复杂)
  4. 找下一个事件
  5. 重复。。

ApplicationMaster

START -> APPNEWSAVED -> APP_ACCEPTED ....

后面是一些attempt的启动等各种事件的反复。直接跳到 AM 部分。

ResourceManager内有 createApplicationMasterLauncher() 和 createApplicationMasterService()

private void launch() throws IOException, YarnException {
connect();
ContainerId masterContainerID = masterContainer.getId();
ApplicationSubmissionContext applicationContext =
application.getSubmissionContext();
LOG.info("Setting up container " + masterContainer
+ " for AM " + application.getAppAttemptId());
ContainerLaunchContext launchContext =
createAMContainerLaunchContext(applicationContext, masterContainerID); StartContainerRequest scRequest =
StartContainerRequest.newInstance(launchContext,
masterContainer.getContainerToken());
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list); StartContainersResponse response =
containerMgrProxy.startContainers(allRequests);
if (response.getFailedRequests() != null
&& response.getFailedRequests().containsKey(masterContainerID)) {
Throwable t =
response.getFailedRequests().get(masterContainerID).deSerialize();
parseAndThrowException(t);
} else {
LOG.info("Done launching container " + masterContainer + " for AM "
+ application.getAppAttemptId());
}
} private ContainerLaunchContext createAMContainerLaunchContext(
ApplicationSubmissionContext applicationMasterContext,
ContainerId containerID) throws IOException { // Construct the actual Container
ContainerLaunchContext container =
applicationMasterContext.getAMContainerSpec();
LOG.info("Command to launch container "
+ containerID
+ " : "
+ StringUtils.arrayToString(container.getCommands().toArray(
new String[0]))); // Finalize the container
setupTokens(container, containerID); return container;
}

注意以上其中两行:

  • ContainerLaunchContext launchContext = createAMContainerLaunchContext(applicationContext, masterContainerID) 创建 AM 请求
  • StartContainersResponse response = containerMgrProxy.startContainers(allRequests); 启动AM的容器并在容器内启动AM。
  @Override
public ContainerLaunchContext getAMContainerSpec() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
if (this.amContainer != null) {
return amContainer;
} // Else via proto
if (!p.hasAmContainerSpec()) {
return null;
}
amContainer = convertFromProtoFormat(p.getAmContainerSpec());
return amContainer;
} public class ApplicationSubmissionContextPBImpl
extends ApplicationSubmissionContext {
ApplicationSubmissionContextProto proto =
ApplicationSubmissionContextProto.getDefaultInstance();
ApplicationSubmissionContextProto.Builder builder = null;
boolean viaProto = false; private ApplicationId applicationId = null;
private Priority priority = null;
private ContainerLaunchContext amContainer = null;
private Resource resource = null;
private Set<String> applicationTags = null;
private ResourceRequest amResourceRequest = null;
private LogAggregationContext logAggregationContext = null;
private ReservationId reservationId = null; /// ...
}

接下来便是启动后的AppMaster 创建job,并通过AMRMClient向ResourceManager申请资源等。

上一篇:Kinect视频中运用全身运动和人体测量统计学的人物识别技术


下一篇:Linux下新的网络管理工具ip替代ifconfig零压力