HADOOP源码分析之RPC(1)

源码位于Hadoop-common ipc包下

abstract class Server

构造Server

protected Server(String bindAddress, int port,
Class<? extends Writable> rpcRequestClass, int handlerCount,
int numReaders, int queueSizePerHandler, Configuration conf,
String serverName, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException { //监听地址
this.bindAddress = bindAddress;
this.conf = conf;
this.portRangeConfig = portRangeConfig;
//监听端口
this.port = port;
this.rpcRequestClass = rpcRequestClass;
//处理器个数
this.handlerCount = handlerCount;
this.socketSendBufferSize = 0;
this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
if (queueSizePerHandler != -1) {
this.maxQueueSize = handlerCount * queueSizePerHandler;
} else {
this.maxQueueSize = handlerCount * conf.getInt(
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
}
this.maxRespSize = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
if (numReaders != -1) {
this.readThreads = numReaders;
} else {
this.readThreads = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
}
this.readerPendingConnectionQueue = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT); // Setup appropriate callqueue
final String prefix = getQueueClassPrefix();
this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
getSchedulerClass(prefix, conf),
getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf); this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
this.authorize =
conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
false); // configure supported authentications
this.enabledAuthMethods = getAuthMethods(secretManager, conf);
this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods); // Start the listener here and let it bind to the port
//监听器
listener = new Listener();
this.port = listener.getAddress().getPort();
connectionManager = new ConnectionManager();
this.rpcMetrics = RpcMetrics.create(this, conf);
this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
this.tcpNoDelay = conf.getBoolean(
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT); this.setLogSlowRPC(conf.getBoolean(
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC,
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT)); // Create the responder here
responder = new Responder(); if (secretManager != null || UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
saslPropsResolver = SaslPropertiesResolver.getInstance(conf);
} this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
}

Server的主要组成即Listener、

均是单独的线程,底层利用Java NIO实现(Reactor设计模式)  参考NIO系列文章:http://ifeve.com/overview/

如下是创建Listener的源码:

    //创建一个ServerSocketChannel
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
for (int i = 0; i < readThreads; i++) {
Reader reader = new Reader(
"Socket Reader #" + (i + 1) + " for port " + port);
readers[i] = reader;
reader.start();
} // Register accepts on the server socket with the selector.
//注册channel到selector
    acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);

Listener线程通过Selector不断监听请求建立连接的Socket

public void run() {
LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
connectionManager.startIdleScan();
while (running) {
SelectionKey key = null;
try {
getSelector().select();
Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key);
}
} catch (IOException e) {
}
key = null;
}
} catch (OutOfMemoryError e) {
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
LOG.warn("Out of Memory in server select", e);
closeCurrentConnection(key, e);
connectionManager.closeIdle(true);
try { Thread.sleep(60000); } catch (Exception ie) {}
} catch (Exception e) {
closeCurrentConnection(key, e);
}
}
LOG.info("Stopping " + Thread.currentThread().getName()); synchronized (this) {
try {
acceptChannel.close();
selector.close();
} catch (IOException e) { } selector= null;
acceptChannel= null; // close all connections
connectionManager.stopIdleScan();
connectionManager.closeAll();
}
}
上一篇:AHB/APB简介


下一篇:Elasticsearch 默认配置 IK 及 Java AnalyzeRequestBuilder 使用