spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81624875

 

提示:阅读本文前最好先阅读:

  1. 《Spark2.1.0之内置RPC框架》
  2. 《spark2.1.0之源码分析——RPC配置TransportConf》
  3. 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》
  4. spark2.1.0之源码分析——RPC服务器TransportServer》
  5. 《spark2.1.0之源码分析——RPC管道初始化》
  6. spark2.1.0之源码分析——RPC传输管道处理器详解

在《spark2.1.0之源码分析——RPC传输管道处理器详解》一文中详细介绍了TransportRequestHandler。

由于TransportRequestHandler实际是把请求消息交给RpcHandler进一步处理的,所以这里对RpcHandler首先做个介绍。RpcHandler是一个抽象类,定义了一些RPC处理器的规范,其主要实现见代码清单1。

代码清单1         RpcHandler的实现

public abstract class RpcHandler {

  private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback();

  public abstract void receive(
      TransportClient client,
      ByteBuffer message,
      RpcResponseCallback callback);

  public abstract StreamManager getStreamManager();

  public void receive(TransportClient client, ByteBuffer message) {
    receive(client, message, ONE_WAY_CALLBACK);
  }

  public void channelActive(TransportClient client) { }

  public void channelInactive(TransportClient client) { }

  public void exceptionCaught(Throwable cause, TransportClient client) { }

  private static class OneWayRpcCallback implements RpcResponseCallback {

    private static final Logger logger = LoggerFactory.getLogger(OneWayRpcCallback.class);

    @Override
    public void onSuccess(ByteBuffer response) {
      logger.warn("Response provided for one-way RPC.");
    }

    @Override
    public void onFailure(Throwable e) {
      logger.error("Error response provided for one-way RPC.", e);
    }

  }

}

代码清单1中RpcHandler的各个方法的作用如下:

  • receive:这是一个抽象方法,用来接收单一的RPC消息,具体处理逻辑需要子类去实现。receive接收三个参数,分别是TransportClientByteBufferRpcResponseCallbackRpcResponseCallback用于对请求处理结束后进行回调,无论处理结果是成功还是失败,RpcResponseCallback都会被调用一次。RpcResponseCallback的接口定义如下:
public interface RpcResponseCallback {
  void onSuccess(ByteBuffer response);
  void onFailure(Throwable e);
}
  • 重载的receive:只接收TransportClientByteBuffer两个参数,RpcResponseCallback为默认的ONE_WAY_CALLBACK,其类型为OneWayRpcCallback,从代码清单3-27中OneWayRpcCallback的实现可以看出其onSuccessonFailure只是打印日志,并没有针对客户端做回复处理。
  • channelActive:当与给定客户端相关联的channel处于活动状态时调用。
  • channelInactive:当与给定客户端相关联的channel处于非活动状态时调用。
  • exceptionCaught:当channel产生异常时调用。
  • getStreamManager:获取StreamManager,StreamManager可以从流中获取单个的块,因此它也包含着当前正在被TransportClient获取的流的状态。、

介绍完RpcHandler,现在回到TransportRequestHandler的处理过程。TransportRequestHandler处理以上四种RequestMessage的实现见代码清单2。

代码清单2         TransportRequestHandler的handle方法

  @Override
  public void handle(RequestMessage request) {
    if (request instanceof ChunkFetchRequest) {
      processFetchRequest((ChunkFetchRequest) request);
    } else if (request instanceof RpcRequest) {
      processRpcRequest((RpcRequest) request);
    } else if (request instanceof OneWayMessage) {
      processOneWayMessage((OneWayMessage) request);
    } else if (request instanceof StreamRequest) {
      processStreamRequest((StreamRequest) request);
    } else {
      throw new IllegalArgumentException("Unknown request type: " + request);
    }
  }

结合代码清单2,下面逐一详细分析这四种类型请求的处理过程。

处理块获取请求

         processFetchRequest方法用于处理ChunkFetchRequest类型的消息,其实现见代码清单3。

代码清单3         processFetchRequest的实现

  private void processFetchRequest(final ChunkFetchRequest req) {
    if (logger.isTraceEnabled()) {
      logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
        req.streamChunkId);
    }

    ManagedBuffer buf;
    try {
      streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId);
      streamManager.registerChannel(channel, req.streamChunkId.streamId);
      buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
    } catch (Exception e) {
      logger.error(String.format("Error opening block %s for request from %s",
        req.streamChunkId, getRemoteAddress(channel)), e);
      respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
      return;
    }

    respond(new ChunkFetchSuccess(req.streamChunkId, buf));
  }

代码清单3中的streamManager是通过调用RpcHandler的getStreamManager方法获取的StreamManager。processFetchRequest的处理都依托于RpcHandler的StreamManager,其处理步骤如下:

  1. 调用StreamManager的checkAuthorization方法,校验客户端是否有权限从给定的流中读取;
  2. 调用StreamManager的registerChannel方法,将一个流和一条(只能是一条)客户端的TCP连接关联起来,这可以保证对于单个的流只会有一个客户端读取。流关闭之后就永远不能够重用了;
  3. 调用StreamManager的getChunk方法,获取单个的块(块被封装为ManagedBuffer)。由于单个的流只能与单个的TCP连接相关联,因此getChunk方法不能为了某个特殊的流而并行调用;
  4. 将ManagedBuffer和流的块Id封装为ChunkFetchSuccess后,调用respond方法返回给客户端。

有关StreamManager的具体实现,读者可以参考《Spark内核设计的艺术 架构设计与实现》一书5.3.5节介绍的NettyStreamManager和6.9.2节介绍的NettyBlockRpcServer中的OneForOneStreamManager。

处理RPC请求

         processRpcRequest方法用于处理RpcRequest类型的消息,其实现见代码清单4。

代码清单4         processRpcRequest的实现

  private void processRpcRequest(final RpcRequest req) {
    try {
      rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
        @Override
        public void onSuccess(ByteBuffer response) {
          respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
        }

        @Override
        public void onFailure(Throwable e) {
          respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
        }
      });
    } catch (Exception e) {
      logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
      respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
    } finally {
      req.body().release();
    }
  }

代码清单4中将RpcRequest消息的内容体、发送消息的客户端以及一个RpcResponseCallback类型的匿名内部类作为参数传递给了RpcHandler的receive方法。这就是说真正用于处理RpcRequest消息的是RpcHandler,而非TransportRequestHandler。由于RpcHandler是抽象类(见代码清单1),其receive方法也是抽象方法,所以具体的操作将由RpcHandler的实现了receive方法的子类来完成。所有继承RpcHandler的子类都需要在其receive方法的具体实现中回调RpcResponseCallback的onSuccess(处理成功时)或者onFailure(处理失败时)方法。从RpcResponseCallback的实现来看,无论处理结果成功还是失败,都将调用respond方法对客户端进行响应。

处理流请求

         processStreamRequest方法用于处理StreamRequest类型的消息,其实现见代码清单5。

代码清单5         processStreamRequest的实现

  private void processStreamRequest(final StreamRequest req) {
    ManagedBuffer buf;
    try {
      buf = streamManager.openStream(req.streamId);// 将获取到的流数据封装为ManagedBuffer
    } catch (Exception e) {
      logger.error(String.format(
        "Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e);
      respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
      return;
    }

    if (buf != null) {
      respond(new StreamResponse(req.streamId, buf.size(), buf));
    } else {
      respond(new StreamFailure(req.streamId, String.format(
        "Stream '%s' was not found.", req.streamId)));
    }
  }

代码清单5中也使用了RpcHandler的StreamManager,其处理步骤如下:

  1. 调用StreamManager的openStream方法将获取到的流数据封装为ManagedBuffer;
  2. 当成功或失败时调用respond方法向客户端响应。

处理无需回复的RPC请求

         processOneWayMessage方法用于处理StreamRequest类型的消息,其实现见代码清单6。

代码清单6         processOneWayMessage的实现

  private void processOneWayMessage(OneWayMessage req) {
    try {
      rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
    } catch (Exception e) {
      logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
    } finally {
      req.body().release();
    }
  }

processOneWayMessage方法的实现processRpcRequest非常相似,区别在于processOneWayMessage调用了代码清单1中ONE_WAY_CALLBACK的receive方法,因而processOneWayMessage在处理完RPC请求后不会对客户端作出响应。

         从以上四种处理的分析可以看出最终的处理都由RpcHandler及其内部组件完成。除了OneWayMessage的消息外,其余三种消息都是最终调用respond方法响应客户端,其实现见代码清单7。

代码清单7         respond的实现

  private void respond(final Encodable result) {
    final SocketAddress remoteAddress = channel.remoteAddress();
    channel.writeAndFlush(result).addListener(
      new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          if (future.isSuccess()) {
            logger.trace("Sent result {} to client {}", result, remoteAddress);
          } else {
            logger.error(String.format("Error sending result %s to %s; closing connection",
              result, remoteAddress), future.cause());
            channel.close();
          }
        }
      }
    );
  }

可以看到respond方法中实际调用了Channel的writeAndFlush方法[1]来响应客户端。

 


[1] Channel的writeAndFlush方法涉及Netty的实现细节及原理,这并不是本书所要阐述的内容,有兴趣的读者可以访问Netty官网:http://netty.io获取更多信息。

 

关于《Spark内核设计的艺术 架构设计与实现》

经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:

spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解

 

纸质版售卖链接如下:

京东:https://item.jd.com/12302500.html

上一篇:spark2.1.0之源码分析——RPC传输管道处理器详解


下一篇:Java泛型的逆变