RM是个综合服务类,内部包含了多个服务,所有的服务被放在列表中,通过循环逐个启动,其他服务的列表如下:
每个服务的启动都遵循一定的流程,服务的启动流程如下:
1、ResourceManager.java中的serviceStart调用父类的serviceStart
//ResourceManager.java protected void serviceStart() throws Exception { ...... super.serviceStart(); }2、父类CompositeService.serviceStart
protected void serviceStart() throws Exception { //获得服务列表 List<Service> services = getServices(); if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": starting services, size=" + services.size()); } //循环启动服务,每一次start调用最终都会进入服务本身的serviceStart函数 for (Service service : services) { // start the service. If this fails that service // will be stopped and an exception raised service.start(); } super.serviceStart(); }3、进入父类的父类AbstractService
public void start() { //服务是否已经启动? if (isInState(STATE.STARTED)) { return; } //enter the started state synchronized (stateChangeLock) { if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) { try { //记录服务启动时间 startTime = System.currentTimeMillis(); //开始启动服务,此处会进入子类的函数+++ serviceStart(); //检测服务是否启动成功 if (isInState(STATE.STARTED)) { //if the service started (and isn‘t now in a later state), notify if (LOG.isDebugEnabled()) { LOG.debug("Service " + getName() + " is started"); } notifyListeners(); } } catch (Exception e) { noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e); } } } }4、最终进入子类serviceStart函数中启动服务
由此可以看出的由于服务的抽象对服务的统一管理带来了便利,如果后续再增加服务,只要按这个继承关系就可以将服务纳入统一管理了。
Token管理器服务线程启动
@Override public void serviceStart() throws Exception { //Token管理器启动,具体作用以后分析,每个管理器由Timer驱动 amRmTokenSecretManager.start(); containerTokenSecretManager.start(); nmTokenSecretManager.start(); try { //过期Token移除线程 rmDTSecretManager.startThreads(); } catch(IOException ie) { throw new YarnRuntimeException("Failed to start secret manager threads", ie); } super.serviceStart(); }Ping Checker服务:AbstractLivelinessMonitor的内部类,循环遍历已记录的NodeManager列表,当发现某个节点超过一段时间未汇报,则认为他已经挂掉,在列表中删除。
private class PingChecker implements Runnable { @Override public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { synchronized (AbstractLivelinessMonitor.this) { //获得活动NM列表的迭代器 Iterator<Map.Entry<O, Long>> iterator = running.entrySet().iterator(); //avoid calculating current time everytime in loop long currentTime = clock.getTime(); //迭代每个节点,若发现节点超过expireInterval(yarn.nm.liveness-monitor.expiry-interval-ms控制,默认10分钟) //则认为他已经挂掉,删除该节点 while (iterator.hasNext()) { Map.Entry<O, Long> entry = iterator.next(); if (currentTime > entry.getValue() + expireInterval) { iterator.remove(); expire(entry.getKey()); LOG.info("Expired:" + entry.getKey().toString() + " Timed out after " + expireInterval/1000 + " secs"); } } } try { //线程暂停monitorInterval( expireInterval/3) Thread.sleep(monitorInterval); } catch (InterruptedException e) { LOG.info(getName() + " thread interrupted"); break; } } } }ResourceManager Event Processor服务:
private final class EventProcessor implements Runnable { @Override public void run() { SchedulerEvent event; while (!stopped && !Thread.currentThread().isInterrupted()) { try { //取出事件 event = eventQueue.take(); } catch (InterruptedException e) { LOG.error("Returning, interrupted : " + e); return; // TODO: Kill RM. } try { //处理事件 scheduler.handle(event); } catch (Throwable t) { ..... } } } }ResourceTrackerService服务:RPC服务器,实现了ResourceTracker接口,提供NM的注册和心跳服务
//ResourceTrackerService.java @Override protected void serviceStart() throws Exception { super.serviceStart(); // ResourceTrackerServer authenticates NodeManager via Kerberos if // security is enabled, so no secretManager. //创建RPC服务器,该服务器实现ResourceTracker接口,handler数量由yarn.resourcemanager. resource-tracker.client.thread-count控制 Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); this.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress, conf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT)); // Enable service authorization? if (conf.getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { refreshServiceAcls(conf, new RMPolicyProvider()); } //服务启动 this.server.start(); conf.updateConnectAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, server.getListenerAddress()); }RPC服务器组件启动,主要包括responder、listener、handler
public synchronized void start() { responder.start(); listener.start(); handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); } }服务于客户端的RPC server:ClientRMService,类似ResourceTrackerService,该服务器实现了ApplicationClientProtocol接口,RPC server的启动都一样,只是实现的协议不同
@Override protected void serviceStart() throws Exception { Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); //handler数量由yarn.resourcemanager.client.thread-count控制 this.server = rpc.getServer(ApplicationClientProtocol.class, this, clientBindAddress, conf, this.rmDTSecretManager, conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT)); // Enable service authorization? if (conf.getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { refreshServiceAcls(conf, new RMPolicyProvider()); } this.server.start(); clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS, server.getListenerAddress()); super.serviceStart(); }服务于管理员的RPC server:AdminService ,handler数量由yarn.resourcemanager.admin.client.thread-count控制,该服务实现ResourceManagerAdministrationProtocol接口
protected void startServer() throws Exception { Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); this.server = (Server) rpc.getServer( ResourceManagerAdministrationProtocol.class, this, masterServiceAddress, conf, null, conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT)); ....... this.server.start(); conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS, server.getListenerAddress()); }AsyncDispatcher event handler服务的启动:
调用层次比较深,只关注关键部分,调用栈的顶层:
AsyncDispatcher类直接继承自AbstractService,服务启动时会先调用父类的同名函数
@Override protected void serviceStart() throws Exception { //调用父类同名函数,实际啥都木有做,以后全局初始化之类的操作可能会放进去 super.serviceStart(); //创建一个新的线程,并启动,主要的业务关系包含在createThread函数中 eventHandlingThread = new Thread(createThread()); eventHandlingThread.setName("AsyncDispatcher event handler"); eventHandlingThread.start(); }下面看AsyncDispatcher的线程执行体,由上面的createThread创建,该线程会进入主循环,并一直等待事件队列,一旦有新的事件到达,便执行dispatch(event),将事件分发出去
Runnable createThread() { return new Runnable() { @Override public void run() { //查看服务标识和线程状态 while (!stopped && !Thread.currentThread().isInterrupted()) { drained = eventQueue.isEmpty(); // blockNewEvents is only set when dispatcher is draining to stop, // adding this check is to avoid the overhead of acquiring the lock // and calling notify every time in the normal run of the loop. //加入该检测是防止事件过多导致该线程压力过大 if (blockNewEvents) { synchronized (waitForDrained) { if (drained) { waitForDrained.notify(); } } } Event event; try { //在队列中取出事件 event = eventQueue.take(); } catch(InterruptedException ie) { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted", ie); } return; } if (event != null) { //分发事件 dispatch(event); } } } }; }RM的服务类型还是比较多的,而且好多服务都是多线程的,比如RPCserver,默认的handler就有50个,而且有多个RPC server,RM中整体的服务列表服下:
Service org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService Service org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer Service AMLivelinessMonitor in state AMLivelinessMonitor: INITED Service AMLivelinessMonitor in state AMLivelinessMonitor: INITED Service org.apache.hadoop.yarn.server.resourcemanager.NodesListManager Service org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$SchedulerEventDispatcher Service NMLivelinessMonitor in state NMLivelinessMonitor: INITED Service org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService Service org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService Service org.apache.hadoop.yarn.server.resourcemanager.ClientRMService Service org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher Service Dispatcher in state Dispatcher: INITED Service org.apache.hadoop.yarn.server.resourcemanager.AdminService