百行实现一个简单的RPC调用

调用流程

百行实现一个简单的RPC调用

简单描述下过程,客户端和服务端都可以访问到通用的接口,但是只有服务端有这个接口的实现类,客户端如果想要调用这个接口的话,必须要通过网络传输这种来调,告诉你服务端我要调用这个接口,服务端它收到之后找到这个接口的实现类,并且执行这个实现类,将执行的结果返回给客户端,作为客户端调用接口方法的返回值,就是这个过程。

这篇答应同志们要写起走,所以咱们就用百行小代码做个简单的RPC调用。但是客户端不知道服务端地址?你调个蹬,所以咱们要假设一下,那就是客户端已经知道了服务端的地址,这部分是后面的事情会由后续的服务发现机制完善这段代码。

通用接口

先把通用的接口写好,然后再来看怎么实现客户端和服务端。

就写完了啊


//通用接口
public interface CommonService {
    String common(HelloObject object);//定义了返回值为字符的common方法
}

common方法 需要传一个对象,HelloObject对象,定义如下:


//先安lombook插件才行
@Data//自动添加提供读写功能,你从而不写get、set方法。它还会为类提供 equals()、hashCode()、toString()方法 
@AllArgsConstructor//这个也是lombook插件的,意思添加一个构造函数,它含有所有已声明字段属性参数
public class HelloObject implements Serializable {

      private Integer id;//私有的id属性
      private String message;//私有的message属性
}

注意这个对象需要去实现 Serializable接口,因为它需要在调用过程中从客户端传递给服务端,那必须序列化奥。

接着我们在服务端对这个接口进行实现,实现的方式也很简单,就是写个实现类嘛,然后返回一个字符串就可以了。


public class CommonServiceImpl implements CommonService {

//之前俺一直在用System.out.println()来调试.但是用这种方式开发项目部署到环境中,会因为众多的控制台
//输出降低了应用的性能.这时候Log4J就成为可平衡开发和部署应用的利器了.在项目中使用Log4J并不是一
//件困难的事情,简单粗暴的方式就是在每个类中声明一个Logger私有属性
    private static final Logger logger = LoggerFactory.getLogger(CommonServiceImpl.class);
    
    @Override
    public String common(CommonObject object) {//对common方法进行重写
        logger.info("接收到:{}", object.getMessage());//logger属性调接收方法
        return "这是调的返回值,id=" + object.getId();//用getId()得到String类型的返回值
    }
}

传输协议

严格来说,这不能算是协议……但也大致算一个传输格式而已。

我们来想一下,服务端需要哪些信息,才能唯一确定服务端需要调的接口的方法呢?

首先,是接口的名字,和方法的名字,但是因为方法重载的缘故,所以我们还需要这个方法的所有参数的类型,最后,客户端在调用方法的时侯,还需要传参数的实际值,那么服务端知道上面四个条件,就可以找到这个方法并且调用完成了。我们把这四个条件写到一个对象里,到时候传输时传输这个对象就行了。即 RpcRequest对象。


@Data//上段代码有
@Builder//提供在设计数据实体时,对外保持private setter,而对属性的赋值采用Builder的方式,
//不对外公开属性的写操作
 class RpcRequest implements Serializable {
    /**
     * 这写待调用接口名称
     */
    private String interfaceName;
    
    /**
     * 这写待调用方法名称
     */
    private String methodName;
    
    /**
     * 这写调用方法的参数
     */
    private Object[] parameters;
    
    /**
     * 这写调用方法的参数类型
     */
    private Class<?>[] paramTypes;
}

参数类型我是直接使用 Class对象,其实用字符串也是可以的,可以自己下去整。

好了调用是完了

那服务器调完这个方法后,需要给客户端返回哪些信息呢?成功还是失败,如果调用成功的话,显然需要返回值,如果调用失败了,那得返回失败的信息,这里又封装一个 RpcResponse对象:


@Data//上面代码有
public class RpcResponse<T> implements Serializable {
    /**
     * 响应状态码
     */
    private Integer statusCode;
   
   /**
     * 响应状态补充信息
     */
    private String message;
   
   /**
     * 响应数据
     */
    private T data;
  
    public static <T> RpcResponse<T> success(T data) {
          RpcResponse<T> response = new RpcResponse<>();//用构造器创个对象出来
          response.setStatusCode(ResponseCode.SUCCESS.getCode());//方法的嵌套调用,成功返回并设置状态码
            response.setData(data);//设置饷应数据
        
        return response;//注意返回值类型
    }
    
    //跟上面相反
    public static <T> RpcResponse<T> fail(ResponseCode code) {
          RpcResponse<T> response = new RpcResponse<>();//用构造器创个对象出来
          response.setStatusCode(code.getCode());//失败获取并设置状态码
          response.setMessage(code.getMessage());//获取并设置状态补充信息
      
      return response;//注意返回值类型
    }
}

这里还多写了两个静态方法,用于快速生成成功与失败的响应对象。其中,statusCode属性可以自行定义,客户端服务端一致就可以了。

服务端的实现用动态代理

服务端方面,因为在服务端这一侧我们并没得接口的具体实现类,所以就没有办法直接生成实例对象了。那直接点,我们可以通过动态代理生成实例,并且调用方法时生成所需要的 RpcRequest对象 并且发送给客户端,能够让这代理对象可以拦截对原对象方法的调用并增强就行了。

这里我们暂时采用 JDK 动态代理,代理类是需要实现 InvocationHandler接口 的。


public class RpcServerProxy implements InvocationHandler {
    private String host;//私有的字符属性
    private int port;//私有的int属性

    public RpcServerProxy(String host, int port) {//两个参数的构造方法
        this.host = host;
        this.port = port;
    }

    @SuppressWarnings("unchecked")//该注解意思是给编译器一条指令,告诉它对被批注的代码元素内部的某些警告保持静默。
    public <T> T getProxy(Class<T> clazz) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
    }
}

我们需要传 host 和 port 来指明服务端的位置。并且使用 getProxy()方法 来生成代理对象。

InvocationHandler接口 需去实现 invoke()方法,来指明代理对象的方法被调用时的动作。在这里,我们显然就需要生成一个 RpcRequest对象,发出去,然后返回从服务端接收到的结果即可。


    @Override//重写了
    //三个参数,有异常及时抛出
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    
        RpcRequest rpcRequest = RpcRequest.builder()
                .interfaceName(method.getDeclaringClass().getName())
                .methodName(method.getName())
                .parameters(args)
                .paramTypes(method.getParameterTypes())
                .build();
                
        RpcClient rpcClient = new RpcClient();
        return ((RpcResponse) rpcClient.sendRequest(rpcRequest, host, port)).getData();
    }

生成 RpcRequest 很简单,我使用 Builder模式 来生成这个对象,你可能要问了 Builder模式是个啥,小婊贝你不要忘了lombook的@Builder注解。发送的逻辑俺使用了一个 RpcClient对象 来做,这个对象的作用只有一个,就是往一个对象发,并且接收返回的对象。


public class RpcServer {

//再来一遍,之前俺一直在用System.out.println()来调试.但是用这种方式开发项目部署到环境中,会因为众多的控制台
//输出降低了应用的性能.这时候Log4J就成为可平衡开发和部署应用的利器了.在项目中使用Log4J并不是一
//件困难的事情,简单粗暴的方式就是在每个类中声明一个Logger私有属性
    private static final Logger logger = LoggerFactory.getLogger(RpcClient.class);

    //发送请求方法
    public Object sendRequest(RpcRequest rpcRequest, String host, int port) {
    
        try (Socket socket = new Socket(host, port)) {
            //对象输出流
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            //对象输入流
            ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
            
            objectOutputStream.writeObject(rpcRequest);
            //清空内存的缓冲区
            objectOutputStream.flush();
            
            return objectInputStream.readObject();
        } catch (IOException | ClassNotFoundException e) {
            logger.error("调用时有错误发生:", e);//用log4j的注解进行打印,不浪费控制台,性能也好
            return null;
        }
    }
}

我的实现很简单,直接使用Java的序列化方式,通过Socket传输。创建一个Socket,获取 ObjectOutputStream对象,然后把需要发送的对象传进去即可,接收时获取 ObjectInputStream对象,readObject()方法 就可以获得一个返回的对象。

客户端的实现反射调用

客户端的实现就简单多了,使用一个 ServerSocket 监听某个端口,循环接收连接请求,如果发来了请求就创建一个线程,在新线程序中处理调用。这里创建线程俺采用线程池。


public class RpcClient {

    private final ExecutorService threadPool;
    private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);

      public RpcServer() {
      //定义线程池的属性值
          int corePoolSize = 5;
          int maximumPoolSize = 50;
          long keepAliveTime = 60;
          
          //用阻塞队列创建对象
        BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(100);
          //创建线程工厂对象
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workingQueue, threadFactory);
    }
  
}

这里俺简化了一下,RpcServer 暂时只注册一个接口,即对外提供一个接口的调用服务,添加register方法,在注册完一个服务后立马开始监听。


    //这段很简单,没得注释好写的
    public void register(Object service, int port) {
    
        try (ServerSocket serverSocket = new ServerSocket(port)) {
        
            logger.info("服务器正在启动...");
            Socket socket;
            
              while((socket = serverSocket.accept()) != null) {
                  logger.info("客户端连接!Ip为:" + socket.getInetAddress());
                  //线程池处理工作线程方法的调用
                  threadPool.execute(new WorkerThread(socket, service));
              }
        } catch (IOException e) {
            logger.error("连接时有错误发生:", e);
        }
        
    }

这里向工作线程WorkerThread类 传入了 socket 和 用于服务端实例service。

WorkerThread工作线程类 实现了 Runnable接口,用于接收RpcRequest对象,解析并且调用,生成 RpcResponse对象 并传回去。run方法如下。


    @Override
    public void run() {
        try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
        
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
             
            RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
          
          Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
        
              Object returnObject = method.invoke(service, rpcRequest.getParameters());
              objectOutputStream.writeObject(RpcResponse.success(returnObject));
              
              objectOutputStream.flush();
              
        } catch (IOException | ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
            logger.error("调用或发送时有错误发生:", e);
        }
    }

解释下其中的代码,通过 class.getMethod方法,传入方法名和方法参数类型即可获得Method对象。如果你上面RpcRequest中使用String数组来存储方法参数类型的话,这里你就需要通过反射生成对应的Class数组了。通过method.invoke方法,传入对象实例和参数,即可调用并且获得返回值。

测试下

客户端侧,我们已经在上面实现了一个 HelloService 的 实现类HelloServiceImpl 的实现类了,我们只需要创建一个 RpcServer 并且把这个实现类注册进去就行了。


public class TestClient {
    public static void main(String[] args) {
    
        CommonService commonService = new CommonServiceImpl();
        RpcServer rpcServer = new RpcServer();
        rpcServer.register(CommonService, 9000);//注意是9000端口哈,浏览器不要搞错了
    }
}

服务端方面,我们需要通过动态代理,生成代理对象,并且调用它,动态代理会自动帮我们向服务端发送请求的信息。


public class TestServer {
    public static void main(String[] args) {
        
        //通过动态代理生成代理对象
        RpcClientProxy proxy = new RpcClientProxy("127.0.0.1", 9000);
        CommonService commonService = proxy.getProxy(CommonService.class);
        
        //动态代理帮我们发送请求信息最后打印出来
        HelloObject object = new HelloObject(16, "This is a message");
        String res = CommonService.hello(object);
        System.out.println(res);
    }
}

服务端开放在9000端口。

我们这里生成了一个HelloObject对象作为方法的参数。

首先启动服务端,再启动客户端,服务端输出:

服务器正在启动...
客户端连接!Ip为:127.0.0.1
接收到:This is a message

客户端输出:

这是调用的返回值,id=16

总结

这个例子以不到百行的代码,已经实现了客户端与服务端的一个远程过程调用,非常适合上手,当然它是及其不完善的,甚至连消息格式都没有统一,我们将在接下来的版本更新中逐渐完善它。

搜索公粽号:龙哥手记 「加群进阶」, 进龙歌唯一的读者群。
如果有错误或者不严谨的地方,请务必得出指正,非常感谢。如果喜欢或者 有所启发,欢迎star, 对作者也是一种鼓励。
跟着龙哥一起掉亿点点秀发吧~, `别忘给我点赞???? 哥哥姐姐们再走啊!!!
上一篇:Github actions完成仓库同步与自动部署


下一篇:你的企业是否有自动补丁管理工具的潜在需求?