提示:阅读本文前最好先阅读:
- 《Spark2.1.0之内置RPC框架》
- 《spark2.1.0之源码分析——RPC配置TransportConf》
- 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》
- 《spark2.1.0之源码分析——RPC服务器TransportServer》
- 《spark2.1.0之源码分析——RPC管道初始化》
- 《spark2.1.0之源码分析——RPC传输管道处理器详解》
- 《spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解》
通过《spark2.1.0之源码分析——RPC服务器TransportServer》一文的介绍,我们知道TransportServer的构造器中的bootstraps是TransportServerBootstrap的列表。接口TransportServerBootstrap定义了服务端引导程序的规范,服务端引导程序旨在当客户端与服务端建立连接之后,在服务端持有的客户端管道上执行的引导程序。TransportServerBootstrap的定义见代码清单1。
代码清单1 TransportServerBootstrap的定义
public interface TransportServerBootstrap {
RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}
TransportServerBootstrap的doBootstrap方法将对服务端的RpcHandler进行代理,接收客户端的请求。TransportServerBootstrap有SaslServerBootstrap和EncryptionCheckerBootstrap两个实现类。为了更清楚的说明TransportServerBootstrap的意义,我们以SaslServerBootstrap为例,来讲解其实现(见代码清单2)。
代码清单2 SaslServerBootstrap的doBootstrap实现
public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
return new SaslRpcHandler(conf, channel, rpcHandler, secretKeyHolder);
}
根据代码清单2,我们知道SaslServerBootstrap的doBootstrap方法实际创建了SaslRpcHandler,SaslRpcHandler负责对管道进行SASL(Simple Authentication and Security Layer)加密。SaslRpcHandler本身也继承了RpcHandler,所以我们重点来看其receive方法的实现,见代码清单3。
代码清单3 SaslRpcHandler的receive方法
@Override
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
if (isComplete) {
// 将消息传递给SaslRpcHandler所代理的下游RpcHandler并返回
delegate.receive(client, message, callback);
return;
}
ByteBuf nettyBuf = Unpooled.wrappedBuffer(message);
SaslMessage saslMessage;
try {
saslMessage = SaslMessage.decode(nettyBuf);// 对客户端发送的消息进行SASL解密
} finally {
nettyBuf.release();
}
if (saslServer == null) {
// 如果saslServer还未创建,则需要创建SparkSaslServer
client.setClientId(saslMessage.appId);
saslServer = new SparkSaslServer(saslMessage.appId, secretKeyHolder,
conf.saslServerAlwaysEncrypt());
}
byte[] response;
try {
response = saslServer.response(JavaUtils.bufferToArray(// 使用saslServer处理已解密的消息
saslMessage.body().nioByteBuffer()));
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
callback.onSuccess(ByteBuffer.wrap(response));
if (saslServer.isComplete()) {
logger.debug("SASL authentication successful for channel {}", client);
isComplete = true;// SASL认证交换已经完成
if (SparkSaslServer.QOP_AUTH_CONF.equals(saslServer.getNegotiatedProperty(Sasl.QOP))) {
logger.debug("Enabling encryption for channel {}", client);
// 对管道进行SASL加密
SaslEncryption.addToChannel(channel, saslServer, conf.maxSaslEncryptedBlockSize());
saslServer = null;
} else {
saslServer.dispose();
saslServer = null;
}
}
}
根据代码清单3,SaslRpcHandler处理客户端消息的步骤如下:
- 如果SASL认证交换已经完成(isComplete等于true),则将消息传递给SaslRpcHandler所代理的下游RpcHandler并返回。
- 如果SASL认证交换未完成(isComplete等于false),则对客户端发送的消息进行SASL解密。
- 如果saslServer还未创建,则需要创建SparkSaslServer。当SaslRpcHandler接收到客户端的第一条消息时会做此操作。
- 使用saslServer处理已解密的消息,并将处理结果通过RpcResponseCallback的回调方法返回给客户端。
- 如果SASL认证交换已经完成,则将isComplete置为true。
- 对管道进行SASL加密。
SaslServerBootstrap是通过SaslRpcHandler对下游RpcHandler进行代理的一种TransportServerBootstrap。EncryptionCheckerBootstrap是另一种TransportServerBootstrap的实现,它通过将自身加入Netty的管道中实现引导,EncryptionCheckerBootstrap的doBootstrap方法的实现见代码清单4。
代码清单4 EncryptionCheckerBootstrap的doBootstrap实现
@Override
public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
channel.pipeline().addFirst("encryptionChecker", this);
return rpcHandler;
}
在详细介绍了TransportChannelHandler之后我们就可以对《spark2.1.0之源码分析——RPC管道初始化》文中的图1进行扩展,把TransportRequestHandler、TransportServerBootstrap及RpcHandler的处理流程增加进来,如下图所示。
RPC框架服务端处理请求、响应流程图
有读者可能会问,上图中并未见TransportServerBootstrap的身影。根据对TransportServerBootstrap的两种实现的举例,我们知道TransportServerBootstrap将可能存在于图中任何两个组件的箭头连线中间,起到引导、包装、代理的作用。
关于《Spark内核设计的艺术 架构设计与实现》
经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:
纸质版售卖链接如下: