一、YarnJobClusterEntrypoint
进入main方法
SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); Map<String, String> env = System.getenv(); final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key()); Preconditions.checkArgument( workingDirectory != null, "Working directory variable (%s) not set", ApplicationConstants.Environment.PWD.key()); try { YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG); } catch (IOException e) { LOG.warn("Could not log YARN environment information.", e); } final Configuration dynamicParameters = ClusterEntrypointUtils.parseParametersOrExit( args, new DynamicParametersConfigurationParserFactory(), YarnJobClusterEntrypoint.class); final Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env); YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(configuration); //执行程序的入口 ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
clusterEntrypoint.startCluster();
securityContext.runSecured((Callable<Void>) () -> {
runCluster(configuration, pluginManager);
return null;
});
synchronized (lock) {
//初始化服务rpc相关
initializeServices(configuration, pluginManager);
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
//创建ResourceManage,创建、启动Dispatcher,启动ResourceManage
clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);
clusterComponent.getShutDownFuture().whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown path. If a separate more specific shutdown was
// already triggered, this will do nothing
shutDownAsync(
applicationStatus,
null,
true);
}
});
}
创建ResourceMange、Dispatcher,并启动
clusterComponent = dispatcherResourceManagerComponentFactory.create( configuration, ioExecutor, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, archivedExecutionGraphStore, new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), this);
具体实现:网页的打开
webMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), fatalErrorHandler); log.debug("Starting Dispatcher REST endpoint."); webMonitorEndpoint.start();
resourceManage的启动
resourceManager = resourceManagerFactory.createResourceManager( configuration, ResourceID.generate(), rpcService, highAvailabilityServices, heartbeatServices, fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), webMonitorEndpoint.getRestBaseUrl(), metricRegistry, hostname, ioExecutor);
创建Dispatcher
dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner( highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, new HaServicesJobGraphStoreFactory(highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices);
选举服务:每个组件都有选举服务,最终要调用这个
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
具体实现:
最终执行这个东西:lamda表达式
previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
start的实现
runIfStateIs( State.CREATED, this::startInternal);
final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create( DispatcherId.fromUuid(getLeaderSessionId()), Collections.singleton(jobGraph), ThrowingJobGraphWriter.INSTANCE);
最终启动dispatcher,并启动
final Dispatcher dispatcher; try { dispatcher = dispatcherFactory.createDispatcher( rpcService, fencingToken, recoveredJobs, (dispatcherGateway, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(), PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter)); } catch (Exception e) { throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e); } dispatcher.start();
最终rpc调用,akka组件通信onStart方法
rpcServer.start();