徒手写一个RPC框架 - 远程调用

前言

微服务已经是每个互联网开发者必须掌握的一项技术。而RPC框架,是构成微服务最重要的组成部分之一。在五一假期,尝试去看了看dubbo的源代码,这里做一个整理。
广义的来讲一个完整的RPC包含了很多组件,包括服务发现,服务治理,远程调用,调用链分析,网关等等。这篇文章主要先讲解的是 RPC 的基石,远程调用 的实现。

RPC调用过程

下图就很直观的表明了一次RPC调用的过程
徒手写一个RPC框架 - 远程调用

  1. client 会调用本地动态代理 proxy
  2. 这个代理会将调用通过协议转序列化字节流
  3. 通过网络框架,将字节流发送到服务端
  4. 服务端在受到这个字节流后,会根据协议,反序列化为原始的调用,利用反射原理调用服务方提供的方法
  5. 如果请求有返回值,又需要把结果根据协议序列化后,再通过网络框架返回给调用方

技术选型:proxy的实现使用cglib,网络框架使用的是广为使用的netty,序列化的方式使用的是json(方便起见)。

具体实现

基础代码

首先我门需要考虑的是如何实现RPC框架的基础,也就是协议以及网络的部分。
针对一次RPC的调用过程肯定会存在请求和相应那么我们需要定义出来这两个实体。
RpcRequest:

@Data
public class RpcRequest implements Serializable {

    private static final long serialVersionUID = -2662364013035730034L;
    
    /**
     * 调用id
     */
    private String requestId;

    /**
     * rpc请求类名
     */
    private String className;

    /**
     * rpc请求方法名
     */
    private String methodName;

    /**
     * rpc请求的参数类型
     */
    private Class<?>[] parameterTypes;

    /**
     * rpc请求的参数
     */
    private Object[] parameters;

    public RpcRequest() {
    }

    public RpcRequest(String requestId, String className, String methodName, Class<?>[] parameterTypes, Object[] parameters) {
        this.requestId = requestId;
        this.className = className;
        this.methodName = methodName;
        this.parameterTypes = parameterTypes;
        this.parameters = parameters;
    }
}

RpcResponse:

@Data
public class RpcResponse implements Serializable {

    private static final long serialVersionUID = -3757187413601154175L;

    /**
     * rpc请求id
     */
    private String requestId;

    /**
     * 出现的异常
     */
    private Throwable throwable;

    /**
     * 返回的结果
     */
    private Object result;

}

定义对象之后,我们需要思索的意见事就是对象在网络中传输那一定是需要进行序列化的,可以选择的序列化协议有很多:

  • jdk 的序列化方法。(不推荐,不利于之后的跨语言调用)
  • json 可读性强,但是序列化速度慢,体积大。
  • protobuf,kyro,Hessian 等都是优秀的序列化框架,也可按需选择。

为了简单和便于调试,我们就选择 json 作为序列化协议,使用fastjson作为 json 解析框架。具体的序列化代码就不展开了。

序列化搞定了之后,就可以考虑一下服务端和客户端进行通信的问题了。

server

server 是负责处理客户端请求的组件。在互联网高并发的环境下,使用nio非阻塞的方式可以相对轻松的应付高并发的场景。netty是一个优秀的 Nio 处理框架。Server就基于netty 进行开发。关键代码如下:

  • netty是基于Reacotr模型的。所以需要初始化两组线程boss和worker。boss负责分发请求,worker负责执行相应的handler:
@Bean
    public ServerBootstrap serverBootstrap() {
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        serverBootstrap.group(bossGroup(), workerGroup())
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.DEBUG))
                .childHandler(serverInitializer);
        Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
        for (ChannelOption option : tcpChannelOptions.keySet()) {
            serverBootstrap.option(option, tcpChannelOptions.get(option));
        }
        return serverBootstrap;
    }
  • netty的操作是基于pipeline的。所以我们需要把在protocol实现的几个coder注册到netty的pipeline中。
@Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 处理tcp请求中粘包的 coder,
        pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4));
        // 实现的序列化和反序列化 coder
        pipeline.addLast(new RpcEncoder(RpcResponse.class, new JSONSerializer()));
        pipeline.addLast(new RpcDecoder(RpcRequest.class, new JSONSerializer()));
        // 具体处理请求的handler
        pipeline.addLast(serverHandler);
    }
  • 实现具体的ServerHandler用于处理真正的调用。

ServerHandler继承SimpleChannelInboundHandler。简单来说这个 InboundHandler会在数据被接受时或者对于的Channel的状态发生变化的时候被调用。当这个 handler读取数据的时候方法channelRead0()会被用,所以我们就重写这个方法就够了。

@Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest request) throws Exception {

        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setRequestId(request.getRequestId());

        try {
            // 收到请求后开始处理请求
            Object res = handler(request);
            rpcResponse.setResult(res);
        } catch (Throwable throwable) {
            rpcResponse.setThrowable(throwable);
            log.error("ServerHandler.handle failed, request : {}", request, throwable);
        }
        // 操作完以后写入netty的上下文中。netty自己处理返回值。
        channelHandlerContext.writeAndFlush(rpcResponse);
    }

handler(msg) 实际上使用的是 cglib 的 Fastclass 实现的,其实根本原理,还是反射。

private Object handler(RpcRequest request) throws Throwable {

        Class<?> clz = Class.forName(request.getClassName());

        Object bean = ctx.getBean(clz);
        Class<?> serviceClass = bean.getClass();
        String methodName = request.getMethodName();
        Class<?>[] parameterType = request.getParameterTypes();
        Object[] parameters = request.getParameters();
        // 根本思路还是获取类名和方法名,利用反射实现调用
        FastClass fastClass = FastClass.create(serviceClass);
        FastMethod method = fastClass.getMethod(methodName, parameterType);

        // 实际调用发生的地方
        return method.invoke(bean, parameters);

    }

总体上来看,server 的实现不是很困难。核心的知识点是 netty 的 channel 的使用和 cglib 的反射机制。

client

其实,对于我来说,client的实现难度要大于server的实现。netty 是一个异步框架,所有的返回都是基于future和callback的机制。
我们可以基于wait和notify实现一个简易的future类

public class DefaultFuture {

    private RpcResponse rpcResponse;

    private volatile boolean isSucc = false;

    private final byte[] lock = new byte[0];

    public RpcResponse getResponse(int timeout) {
        synchronized (lock) {
            while (!isSucc) {
                try {
                    lock.wait(timeout);
                } catch (InterruptedException e) {
                    log.error("getRpcResponse.Interrupted");
                }
            }
            return rpcResponse;
        }
    }

    public void setResponse(RpcResponse response) {
        if (isSucc) {
            return;
        }
        synchronized (lock) {
            this.rpcResponse = response;
            this.isSucc = true;
            lock.notify();
        }
    }

}

为了能够提升 client 的吞吐量,可提供的思路有以下几种:

  • 使用对象池:建立多个client以后保存在对象池中。但是代码的复杂度和维护client的成本会很高。
  • 尽可能的复用netty中的channel。之前你可能注意到,为什么要在RpcRequest和RpcResponse中增加一个id。因为netty中的channel是会被多个线程使用的。当一个结果异步的返回后,你并不知道是哪个线程返回的。这个时候就可以考虑利用一个Map,建立一个id和future 映射。这样请求的线程只要使用对应的id就能获取,相应的返回结果。
public class ClientHandler extends ChannelDuplexHandler {

    private final Map<String, DefaultFuture> futureMap = new ConcurrentHashMap<>();

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if(msg instanceof RpcRequest) {
            RpcRequest request = (RpcRequest) msg;
            futureMap.putIfAbsent(request.getRequestId(), new DefaultFuture());
        }
        super.write(ctx, msg, promise);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof RpcResponse) {
            RpcResponse response = (RpcResponse) msg;
            DefaultFuture future = futureMap.get(response.getRequestId());
            future.setResponse(response);
        }
        super.channelRead(ctx, msg);
    }


    public RpcResponse getRpcResponse(String requestId){
        try{
            DefaultFuture future = futureMap.get(requestId);
            return future.getResponse(10);
        } finally {
            futureMap.remove(requestId);
        }
    }

}

这里没有继承 server 中的 InboundHandler 而使用了 ChannelDuplexHandler。顾名思义就是在写入和读取数据的时候,都会触发相应的方法。写入的时候在Map中保存id和 future。读到数据的时候从Map中取出future并将结果放入Future中。获取结果的时候需要对应的id。
更多详细的代码可以参考底部给出的gitlab链接。

使用

参考工程中带有test的module。

总结

通过以上的代码,我们实现了一个简单的RPC框架,这只是RPC框架的最最基础的一部分,再其上我们还需要做的是服务发现,服务治理,容灾等一系列的事情。优秀的框架总是需要时间去沉淀。

上一篇:ROS问题及解决方案——依赖包安装以及无法修正错误


下一篇:Linux 系统手动部署 MySQL 数据库