关键词:yarn rm mapreduce 提交
Based on Hadoop 2.7.1
JobSubmitter
- addMRFrameworkToDistributedCache(Configuration conf) : mapreduce.application.framework.path, 用于指定其他framework的hdfs 路径配置,默认yarn的可以不管
- Token相关的方法:读取认证信息(支持二进制、json),并将其添加至相应的fileSystem中,以便以同样权限访问文件系统
- copyAndConfigureFiles(Job job, Path jobSubmitDir): 上传配置、jar、files、libjars、archives等
- submitJobInternal: 真正的提交任务接口
核心代码提交链
- JobSubmitter ->
- ClientProtocol(YARNRunner) ->
- ResourceMgrDelegate ->
- YarnClient(YarnClientImpl).submitApplication( ApplicationSubmissionContext appContext) ->
- 【RM】ApplicationClientProtocol(ClientRMService).submitApplication( SubmitApplicationRequest request) -> // fill ASC with dft values
RMAppManager.submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, String user) ->
-
ApplicationSubmissionContext
提交上下文,包含application各种元信息 -
SubmitApplicationRequest
提交Request对象
// Dispatcher is not yet started at this time, so these START events
// enqueued should be guaranteed to be first processed when dispatcher
// gets started.
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.START));
START -> APP_NEW_SAVED
stateMachineFactory.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
//...
private static final class RMAppNewlySavingTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
// If recovery is enabled then store the application information in a
// non-blocking call so make sure that RM has stored the information
// needed to restart the AM after RM restart without further client
// communication
LOG.info("Storing application with id " + app.applicationId);
app.rmContext.getStateStore().storeNewApplication(app);
}
}
public synchronized void storeNewApplication(RMApp app) {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationStateData appState =
ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(), context, app.getUser());
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
}
private static final class AddApplicationToSchedulerTransition extends
RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
app.submissionContext.getQueue(), app.user,
app.submissionContext.getReservationID()));
}
}
AppAddedSchedulerEvent
会由配置的Scheduler来handle。
P.S. 看 event 部分代码的方法,
- 找出状态,比如 APP_NEW_SAVED,
- 找出handle这个状态的事件类
- 找出处理这个事件的具体逻辑 (这里可能逻辑最复杂)
- 找下一个事件
- 重复。。
ApplicationMaster
START -> APPNEWSAVED -> APP_ACCEPTED ....
后面是一些attempt的启动等各种事件的反复。直接跳到 AM 部分。
ResourceManager内有 createApplicationMasterLauncher() 和 createApplicationMasterService()
private void launch() throws IOException, YarnException {
connect();
ContainerId masterContainerID = masterContainer.getId();
ApplicationSubmissionContext applicationContext =
application.getSubmissionContext();
LOG.info("Setting up container " + masterContainer
+ " for AM " + application.getAppAttemptId());
ContainerLaunchContext launchContext =
createAMContainerLaunchContext(applicationContext, masterContainerID);
StartContainerRequest scRequest =
StartContainerRequest.newInstance(launchContext,
masterContainer.getContainerToken());
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
StartContainersResponse response =
containerMgrProxy.startContainers(allRequests);
if (response.getFailedRequests() != null
&& response.getFailedRequests().containsKey(masterContainerID)) {
Throwable t =
response.getFailedRequests().get(masterContainerID).deSerialize();
parseAndThrowException(t);
} else {
LOG.info("Done launching container " + masterContainer + " for AM "
+ application.getAppAttemptId());
}
}
private ContainerLaunchContext createAMContainerLaunchContext(
ApplicationSubmissionContext applicationMasterContext,
ContainerId containerID) throws IOException {
// Construct the actual Container
ContainerLaunchContext container =
applicationMasterContext.getAMContainerSpec();
LOG.info("Command to launch container "
+ containerID
+ " : "
+ StringUtils.arrayToString(container.getCommands().toArray(
new String[0])));
// Finalize the container
setupTokens(container, containerID);
return container;
}
注意以上其中两行:
- ContainerLaunchContext launchContext = createAMContainerLaunchContext(applicationContext, masterContainerID) 创建 AM 请求
- StartContainersResponse response = containerMgrProxy.startContainers(allRequests); 启动AM的容器并在容器内启动AM。
@Override
public ContainerLaunchContext getAMContainerSpec() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
if (this.amContainer != null) {
return amContainer;
} // Else via proto
if (!p.hasAmContainerSpec()) {
return null;
}
amContainer = convertFromProtoFormat(p.getAmContainerSpec());
return amContainer;
}
public class ApplicationSubmissionContextPBImpl
extends ApplicationSubmissionContext {
ApplicationSubmissionContextProto proto =
ApplicationSubmissionContextProto.getDefaultInstance();
ApplicationSubmissionContextProto.Builder builder = null;
boolean viaProto = false;
private ApplicationId applicationId = null;
private Priority priority = null;
private ContainerLaunchContext amContainer = null;
private Resource resource = null;
private Set<String> applicationTags = null;
private ResourceRequest amResourceRequest = null;
private LogAggregationContext logAggregationContext = null;
private ReservationId reservationId = null;
/// ...
}
接下来便是启动后的AppMaster 创建job,并通过AMRMClient向ResourceManager申请资源等。