提示:阅读本文前最好先阅读:
- 《Spark2.1.0之内置RPC框架》
- 《spark2.1.0之源码分析——RPC配置TransportConf》
- 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》
- 《spark2.1.0之源码分析——RPC服务器TransportServer》
- 《spark2.1.0之源码分析——RPC管道初始化》
- 《spark2.1.0之源码分析——RPC传输管道处理器详解》
在《spark2.1.0之源码分析——RPC传输管道处理器详解》一文中详细介绍了TransportRequestHandler。
由于TransportRequestHandler实际是把请求消息交给RpcHandler进一步处理的,所以这里对RpcHandler首先做个介绍。RpcHandler是一个抽象类,定义了一些RPC处理器的规范,其主要实现见代码清单1。
代码清单1 RpcHandler的实现
public abstract class RpcHandler {
private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback();
public abstract void receive(
TransportClient client,
ByteBuffer message,
RpcResponseCallback callback);
public abstract StreamManager getStreamManager();
public void receive(TransportClient client, ByteBuffer message) {
receive(client, message, ONE_WAY_CALLBACK);
}
public void channelActive(TransportClient client) { }
public void channelInactive(TransportClient client) { }
public void exceptionCaught(Throwable cause, TransportClient client) { }
private static class OneWayRpcCallback implements RpcResponseCallback {
private static final Logger logger = LoggerFactory.getLogger(OneWayRpcCallback.class);
@Override
public void onSuccess(ByteBuffer response) {
logger.warn("Response provided for one-way RPC.");
}
@Override
public void onFailure(Throwable e) {
logger.error("Error response provided for one-way RPC.", e);
}
}
}
代码清单1中RpcHandler的各个方法的作用如下:
- receive:这是一个抽象方法,用来接收单一的RPC消息,具体处理逻辑需要子类去实现。receive接收三个参数,分别是TransportClient、ByteBuffer和RpcResponseCallback。RpcResponseCallback用于对请求处理结束后进行回调,无论处理结果是成功还是失败,RpcResponseCallback都会被调用一次。RpcResponseCallback的接口定义如下:
public interface RpcResponseCallback {
void onSuccess(ByteBuffer response);
void onFailure(Throwable e);
}
- 重载的receive:只接收TransportClient和ByteBuffer两个参数,RpcResponseCallback为默认的ONE_WAY_CALLBACK,其类型为OneWayRpcCallback,从代码清单3-27中OneWayRpcCallback的实现可以看出其onSuccess和onFailure只是打印日志,并没有针对客户端做回复处理。
- channelActive:当与给定客户端相关联的channel处于活动状态时调用。
- channelInactive:当与给定客户端相关联的channel处于非活动状态时调用。
- exceptionCaught:当channel产生异常时调用。
- getStreamManager:获取StreamManager,StreamManager可以从流中获取单个的块,因此它也包含着当前正在被TransportClient获取的流的状态。、
介绍完RpcHandler,现在回到TransportRequestHandler的处理过程。TransportRequestHandler处理以上四种RequestMessage的实现见代码清单2。
代码清单2 TransportRequestHandler的handle方法
@Override
public void handle(RequestMessage request) {
if (request instanceof ChunkFetchRequest) {
processFetchRequest((ChunkFetchRequest) request);
} else if (request instanceof RpcRequest) {
processRpcRequest((RpcRequest) request);
} else if (request instanceof OneWayMessage) {
processOneWayMessage((OneWayMessage) request);
} else if (request instanceof StreamRequest) {
processStreamRequest((StreamRequest) request);
} else {
throw new IllegalArgumentException("Unknown request type: " + request);
}
}
结合代码清单2,下面逐一详细分析这四种类型请求的处理过程。
处理块获取请求
processFetchRequest方法用于处理ChunkFetchRequest类型的消息,其实现见代码清单3。
代码清单3 processFetchRequest的实现
private void processFetchRequest(final ChunkFetchRequest req) {
if (logger.isTraceEnabled()) {
logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
req.streamChunkId);
}
ManagedBuffer buf;
try {
streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId);
streamManager.registerChannel(channel, req.streamChunkId.streamId);
buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
} catch (Exception e) {
logger.error(String.format("Error opening block %s for request from %s",
req.streamChunkId, getRemoteAddress(channel)), e);
respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
return;
}
respond(new ChunkFetchSuccess(req.streamChunkId, buf));
}
代码清单3中的streamManager是通过调用RpcHandler的getStreamManager方法获取的StreamManager。processFetchRequest的处理都依托于RpcHandler的StreamManager,其处理步骤如下:
- 调用StreamManager的checkAuthorization方法,校验客户端是否有权限从给定的流中读取;
- 调用StreamManager的registerChannel方法,将一个流和一条(只能是一条)客户端的TCP连接关联起来,这可以保证对于单个的流只会有一个客户端读取。流关闭之后就永远不能够重用了;
- 调用StreamManager的getChunk方法,获取单个的块(块被封装为ManagedBuffer)。由于单个的流只能与单个的TCP连接相关联,因此getChunk方法不能为了某个特殊的流而并行调用;
- 将ManagedBuffer和流的块Id封装为ChunkFetchSuccess后,调用respond方法返回给客户端。
有关StreamManager的具体实现,读者可以参考《Spark内核设计的艺术 架构设计与实现》一书5.3.5节介绍的NettyStreamManager和6.9.2节介绍的NettyBlockRpcServer中的OneForOneStreamManager。
处理RPC请求
processRpcRequest方法用于处理RpcRequest类型的消息,其实现见代码清单4。
代码清单4 processRpcRequest的实现
private void processRpcRequest(final RpcRequest req) {
try {
rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
}
@Override
public void onFailure(Throwable e) {
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
}
});
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
} finally {
req.body().release();
}
}
代码清单4中将RpcRequest消息的内容体、发送消息的客户端以及一个RpcResponseCallback类型的匿名内部类作为参数传递给了RpcHandler的receive方法。这就是说真正用于处理RpcRequest消息的是RpcHandler,而非TransportRequestHandler。由于RpcHandler是抽象类(见代码清单1),其receive方法也是抽象方法,所以具体的操作将由RpcHandler的实现了receive方法的子类来完成。所有继承RpcHandler的子类都需要在其receive方法的具体实现中回调RpcResponseCallback的onSuccess(处理成功时)或者onFailure(处理失败时)方法。从RpcResponseCallback的实现来看,无论处理结果成功还是失败,都将调用respond方法对客户端进行响应。
处理流请求
processStreamRequest方法用于处理StreamRequest类型的消息,其实现见代码清单5。
代码清单5 processStreamRequest的实现
private void processStreamRequest(final StreamRequest req) {
ManagedBuffer buf;
try {
buf = streamManager.openStream(req.streamId);// 将获取到的流数据封装为ManagedBuffer
} catch (Exception e) {
logger.error(String.format(
"Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e);
respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
return;
}
if (buf != null) {
respond(new StreamResponse(req.streamId, buf.size(), buf));
} else {
respond(new StreamFailure(req.streamId, String.format(
"Stream '%s' was not found.", req.streamId)));
}
}
代码清单5中也使用了RpcHandler的StreamManager,其处理步骤如下:
- 调用StreamManager的openStream方法将获取到的流数据封装为ManagedBuffer;
- 当成功或失败时调用respond方法向客户端响应。
处理无需回复的RPC请求
processOneWayMessage方法用于处理StreamRequest类型的消息,其实现见代码清单6。
代码清单6 processOneWayMessage的实现
private void processOneWayMessage(OneWayMessage req) {
try {
rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
} finally {
req.body().release();
}
}
processOneWayMessage方法的实现processRpcRequest非常相似,区别在于processOneWayMessage调用了代码清单1中ONE_WAY_CALLBACK的receive方法,因而processOneWayMessage在处理完RPC请求后不会对客户端作出响应。
从以上四种处理的分析可以看出最终的处理都由RpcHandler及其内部组件完成。除了OneWayMessage的消息外,其余三种消息都是最终调用respond方法响应客户端,其实现见代码清单7。
代码清单7 respond的实现
private void respond(final Encodable result) {
final SocketAddress remoteAddress = channel.remoteAddress();
channel.writeAndFlush(result).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
logger.error(String.format("Error sending result %s to %s; closing connection",
result, remoteAddress), future.cause());
channel.close();
}
}
}
);
}
可以看到respond方法中实际调用了Channel的writeAndFlush方法[1]来响应客户端。
[1] Channel的writeAndFlush方法涉及Netty的实现细节及原理,这并不是本书所要阐述的内容,有兴趣的读者可以访问Netty官网:http://netty.io获取更多信息。
关于《Spark内核设计的艺术 架构设计与实现》
经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:
纸质版售卖链接如下: