微服务时代,RPC远程调用成为了一个重要的角色。因此本篇文章将会讨论一下RPC的实现原理以及模拟一个RPC实现。
RPC原理我们还是按常规归纳为三要素:动态代理、网络传输、序列化
下面是RPC原理图:
下面我们模仿下RPC的实现过程,其中动态代理使用JDK自带的动态代理。网络传输使用最原始的Socket的BIO模式。序列化也采用JDK的ObjectInputStream和ObjectOutputStream.。
先上用例再上原理
服务端用例(包括主函数和rpc接口实现):
@Slf4j
public class ServerMain {
public static void main(String[] args) {
/*
* 创建RPC服务端
*/
try {
RpcServer rpcServer = new RpcServer();
rpcServer.start();
} catch (IOException e) {
log.error("Rpc服务端异常, 异常原因: {}", e.getMessage(), e);
}
}
}
服务端接口实现类:
@Slf4j
public class StudentServiceImpl implements StudentService {
@Override
public int save(StudentVo student) {
// 保存学生信息, 本例用打印来代替
log.info("服务端获取学生信息: [姓名: {} 学号: {} 地址: {}] ", student.getName(), student.getCardNo(), student.getAddress());
// 返回成功的code
return 200;
}
}
客户端用例:
@Slf4j
public class ClientMain {
public static void main(String[] args) {
/*
* 1、从Factory创建一个代理对象, 如果是Spring框架中可以处理为单例, 然后注入即可使用
*/
StudentService studentService = RpcFactory.newInstance(StudentService.class);
// 2、构造请求参数
StudentVo studentVo = new StudentVo();
studentVo.setId(1L);
studentVo.setAddress("广东省深圳市");
studentVo.setCardNo("5201314");
studentVo.setName("Martin");
studentVo.setSex(1);
// 3、远程调用
int result = studentService.save(studentVo);
// 4、根据RPC返回结果进行处理
log.info("服务端响应Code: {}", result);
}
}
运行结果:
客户端:
[INFO ] 2021-07-06 16:55:31.573 method:cn.gaaidou.rpc.client.ClientMain.main(ClientMain.java:32): 服务端响应Code: 200
服务端:
[INFO ] 2021-07-06 16:55:31.558 method:cn.gaaidou.rpc.server.service.impl.StudentServiceImpl.save(StudentServiceImpl.java:17): 服务端获取学生信息: [姓名: Martin 学号: 5201314 地址: 广东省深圳市]
下面我们来看看具体实现过程:
1、动态代理
RPC言必称动态代理,实际上:
如果一个interface一直都看不到实现,那么大概率就是动态代理模式
实际上我们常用的Mybatis框架,底层也是使用了动态代理模式,所以我们使用Mybatis框架时,往往只需要定义Mapper接口,但从来没看到Mapper接口的实现。
public class RpcFactory {
/**
* 创建代理对象
*
* @param clazz
* @param <T>
* @return
*/
@SuppressWarnings("unchecked")
public static <T> T newInstance(Class<T> clazz) {
return (T) Proxy.newProxyInstance(RpcFactory.class.getClassLoader(), new Class<?>[]{clazz}, new RpcClientProxy());
}
}
改写InvocationHandler实现动态代理,在本类中同时也是客户端Socket的试下过程。
public class RpcClientProxy implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
/*
* 1、Socket客户端连接服务端
* 如果是市面RPC框架, 可以通过注册中心和客户端负载均衡来获取服务提供者的IP + Port
*/
Socket socket = new Socket("127.0.0.1", 8080);
// 2、创建请求参数, 请求服务端
// 方法参数类型和值的映射关系, 主要用于服务端寻找具体的实现方法
List<RpcClassMapperVo> argsClazzMapper = Arrays
.stream(args)
.map(v -> new RpcClassMapperVo(v.getClass(), JdkSerializationUtil.serialize(v)))
.collect(Collectors.toList());
// 请求参数封装类
RpcReqVo rpcReqVo = new RpcReqVo(method.getDeclaringClass().getName(), method.getName(), argsClazzMapper);
socket.getOutputStream().write(JdkSerializationUtil.serialize(rpcReqVo));
// 3、反序列化服务端返回的结果
return JdkSerializationUtil.deserialize(IOUtil.readSocket(socket.getInputStream()));
}
}
2、网络传输
不同的RPC框架可能会采用不同的网络传输协议,比如SpringCloud默认使用Http协议、Dubbo框架可以使用Dubbo协议、Hessian协议等。为了简单,本文直接采取最底层的TCP协议,使用Socket编程实现整个交互过程(BIO模式)。
由于全面动态代理部分已经讲了客户端部分的Socket编程,因此本段只写服务端Socket编程:
@Slf4j
public class RpcServer {
private volatile boolean stop = false;
private ServerSocket serverSocket;
/**
* 创建一个ServerSocket, 默认端口为8080
*
* @param port
* @throws IOException
*/
public RpcServer(int port) throws IOException {
serverSocket = new ServerSocket(port);
}
public RpcServer() throws IOException {
this(8080);
}
public void start() {
// 1、注册关闭回勾
Runtime.getRuntime().addShutdownHook(new Thread(() -> stop = true));
while (!stop) {
try {
// 2、监听客户端Socket
Socket socket = serverSocket.accept();
// 3、读取请求 -> 执行请求 -> 响应结果
socket.getOutputStream().write(RpcInvoker.invoke(IOUtil.readSocket(socket.getInputStream())));
// 4、关闭Socket
socket.close();
} catch (Exception e) {
log.error("Socket监听出现异常, 异常原因: {}", e.getMessage(), e);
}
}
}
}
3、序列化
RPC框架中使用不同的协议,会有不同的序列化和反序列化方式。比如有Json、Hessian、Dubbo、Protobuf、Jdk自带的序列化模式等。具体选择哪种序列化方式根据实际的项目需求。本文为了简单,使用Jdk自带的的序列化模式:
@Slf4j
public class JdkSerializationUtil {
/**
* JDK序列化对象
*
* @param o
* @return
*/
public static byte[] serialize(Object o) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(o);
return baos.toByteArray();
} catch (IOException e) {
log.error("JDK序列化对象出现异常, 异常原因: {}", e.getMessage(), e);
throw new RuntimeException("JDK序列化对象出现异常");
}
}
/**
* JDK反序列化对象
*
* @param data
* @return
*/
public static Object deserialize(byte[] data) {
ByteArrayInputStream bis = new ByteArrayInputStream(data);
try (ObjectInputStream ois = new ObjectInputStream(bis)) {
return ois.readObject();
} catch (IOException | ClassNotFoundException e) {
log.error("JDK反序列化对象出现异常, 异常原因: {}", e.getMessage(), e);
throw new RuntimeException("JDK反序列化对象出现异常");
}
}
}
完整代码在Gitee仓库上 维护(见Module: gaaidou-rpc)。