RPC的概念
背景知识
- RPC的基本概念,核心功能
常见的RPC框架
Duboo基本功能
- 远程通讯
- 基于接口方法的透明远程过程调用
- 负载均衡
- 服务注册中心
RPC过程
client 调用远程方法-> request序列化 -> 协议编码 -> 网络传输-> 服务端 -> 反序列化request -> 调用本地方法得到response -> 序列化 ->编码->……
版本迭代过程
目录
从0开始的RPC的迭代过程:
- version0版本:以不到百行的代码完成一个RPC例子
- version1版本:完善通用消息格式(request,response),客户端的动态代理完成对request消息格式的封装
- version2版本:支持服务端暴露多个服务接口, 服务端程序抽象化,规范化
- version3版本:使用高性能网络框架netty的实现网络通信,以及客户端代码的重构
- version4版本:自定义消息格式,支持多种序列化方式(java原生, json…)
- version5版本: 服务器注册与发现的实现,zookeeper作为注册中心
- version6版本: 负载均衡的策略的实现
3.MyRPC版本3
背景知识
- netty高性能网络框架的使用
本节问题
如何提升这个rpc的性能? 可以从两个方面入手,网络传输从BIO到NIO,序列化要减少字节流长度,提高序列化反序列化效率
知名的rpc框架:dubbo, grpc都是使用netty底层进行通信的
升级过程:
前提: maven pom.xml文件引入netty
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.51.Final</version>
</dependency>
升级1: 重构客户端代码
客户端的代码太乱了, 我们先进行代码重构,才有利于后面使用netty的方式实现客户端,使之不同方式网络连接的客户端有着同样的结构,同样的api
假如我们现在已经有了两个客户端:SimpleRPCClient(使用java BIO的方式), NettyRPCClient(使用netty进行网络传输),那么它们两者的共性是啥?发送请求与得到response是共性, 而建立连接与发送请求的方式是不同点。
// 共性抽取出来
public interface RPCClient {
RPCResponse sendRequest(RPCRequest response);
}
// SimpleRPCClient实现这个接口,不同的网络方式有着不同的实现
@AllArgsConstructor
public class SimpleRPCClient implements RPCClient{
private String host;
private int port;
// 客户端发起一次请求调用,Socket建立连接,发起请求Request,得到响应Response
// 这里的request是封装好的,不同的service需要进行不同的封装, 客户端只知道Service接口,需要一层动态代理根据反射封装不同的Service
public RPCResponse sendRequest(RPCRequest request) {
try {
// 发起一次Socket连接请求
Socket socket = new Socket(host, port);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
System.out.println(request);
objectOutputStream.writeObject(request);
objectOutputStream.flush();
RPCResponse response = (RPCResponse) objectInputStream.readObject();
//System.out.println(response.getData());
return response;
} catch (IOException | ClassNotFoundException e) {
System.out.println();
return null;
}
}
}
而RPCClientProxy类中需要加入一个RPCClient类变量即可, 传入不同的client(simple,netty), 即可调用公共的接口sendRequest发送请求,所以客户端代码结构很清晰了:
-
RPCClient: 不同的网络连接,网络传输方式的客户端分别实现这个接口
-
XXXRPCClient: 具体实现类
-
RPCClientProxy: 动态代理Service类,封装不同的Service请求为Request对象,并且持有一个RPCClient对象,负责与服务端的通信,
由此,客户端代码重构完毕,结构更为清晰,一个使用用例为:
// 构建一个使用java Socket传输的客户端
SimpleRPCClient simpleRPCClient = new SimpleRPCClient("127.0.0.1", 8899);
// 把这个客户端传入代理客户端
RPCClientProxy rpcClientProxy = new RPCClientProxy(simpleRPCClient);
// 代理客户端根据不同的服务,获得一个代理类, 并且这个代理类的方法以或者增强(封装数据,发送请求)
UserService userService = rpcClientProxy.getProxy(UserService.class);
// 调用方法
User userByUserId = userService.getUserByUserId(10);
升级2: 使用netty方式传输数据:实现NettyRPCServer, NettyRPCCLient,这里建议先学习下netty的启动代码
netty 服务端的实现
/**
* 实现RPCServer接口,负责监听与发送数据
*/
@AllArgsConstructor
public class NettyRPCServer implements RPCServer {
private ServiceProvider serviceProvider;
@Override
public void start(int port) {
// netty 服务线程组boss负责建立连接, work负责具体的请求
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
System.out.printf("Netty服务端启动了...");
try {
// 启动netty服务器
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 初始化
serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
.childHandler(new NettyServerInitializer(serviceProvider));
// 同步阻塞
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
// 死循环监听
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
@Override
public void stop() {
}
}
netty server初始化类
/**
* 初始化,主要负责序列化的编码解码, 需要解决netty的粘包问题
*/
@AllArgsConstructor
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
private ServiceProvider serviceProvider;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 消息格式 [长度][消息体], 解决粘包问题
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
// 计算当前待发送消息的长度,写入到前4个字节中
pipeline.addLast(new LengthFieldPrepender(4));
// 这里使用的还是java 序列化方式, netty的自带的解码编码支持传输这种结构
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(new ClassResolver() {
@Override
public Class<?> resolve(String className) throws ClassNotFoundException {
return Class.forName(className);
}
}));
pipeline.addLast(new NettyRPCServerHandler(serviceProvider));
}
}
netty server具体的handler
/**
* 因为是服务器端,我们知道接受到请求格式是RPCRequest
* Object类型也行,强制转型就行
*/
@AllArgsConstructor
public class NettyRPCServerHandler extends SimpleChannelInboundHandler<RPCRequest> {
private ServiceProvider serviceProvider;
@Override
protected void channelRead0(ChannelHandlerContext ctx, RPCRequest msg) throws Exception {
//System.out.println(msg);
RPCResponse response = getResponse(msg);
ctx.writeAndFlush(response);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
RPCResponse getResponse(RPCRequest request) {
// 得到服务名
String interfaceName = request.getInterfaceName();
// 得到服务端相应服务实现类
Object service = serviceProvider.getService(interfaceName);
// 反射调用方法
Method method = null;
try {
method = service.getClass().getMethod(request.getMethodName(), request.getParamsTypes());
Object invoke = method.invoke(service, request.getParams());
return RPCResponse.success(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
System.out.println("方法执行错误");
return RPCResponse.fail();
}
}
}
客户端netty的实现
/**
* 实现RPCClient接口
*/
public class NettyRPCClient implements RPCClient {
private static final Bootstrap bootstrap;
private static final EventLoopGroup eventLoopGroup;
private String host;
private int port;
public NettyRPCClient(String host, int port) {
this.host = host;
this.port = port;
}
// netty客户端初始化,重复使用
static {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new NettyClientInitializer());
}
/**
* 这里需要操作一下,因为netty的传输都是异步的,你发送request,会立刻返回, 而不是想要的相应的response
*/
@Override
public RPCResponse sendRequest(RPCRequest request) {
try {
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
// 发送数据
channel.writeAndFlush(request);
channel.closeFuture().sync();
// 阻塞的获得结果,通过给channel设计别名,获取特定名字下的channel中的内容(这个在hanlder中设置)
// AttributeKey是,线程隔离的,不会由线程安全问题。
// 实际上不应通过阻塞,可通过回调函数
AttributeKey<RPCResponse> key = AttributeKey.valueOf("RPCResponse");
RPCResponse response = channel.attr(key).get();
System.out.println(response);
return response;
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
初始化: 与服务端一致,就不贴代码了
ClientHandler设计:
public class NettyClientHandler extends SimpleChannelInboundHandler<RPCResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RPCResponse msg) throws Exception {
// 接收到response, 给channel设计别名,让sendRequest里读取response
AttributeKey<RPCResponse> key = AttributeKey.valueOf("RPCResponse");
ctx.channel().attr(key).set(msg);
ctx.channel().close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
结果:
总结
此版本我们完成了客户端的重构,使之能够支持多种版本客户端的扩展(实现RPCClient接口)
并且使用netty实现了客户端与服务端的通信
此RPC最大痛点
java自带序列化方式(Java序列化写入不仅是完整的类名,也包含整个类的定义,包含所有被引用的类),不够通用,不够高效