一、RPC是什么
remote procedure call:远程过程调用
过程就是程序,像调用本地方法一样调用远程的过程
RPC采用Client-Server结构,通过request-response消息模式实现
RMI(remote method invocation)远程方法调用时oop领域中RPC的一种具体实现
webservice、restfull接口调用都是RPC,仅消息组织方式及消息协议不同
与本地调用相比,速度相对较慢、可靠性减弱
为什么用RPC
- 服务化
- 可重用
- 系统间交互调用
术语
二、RPC的流程环节
1.客户端处理过程中调用Client stub,传递参数
2.Client stub将参数编组为消息,然后通过系统调用向服务端发送消息
3.客户端本地操作系统将消息从客户端机器发送到服务端机器
4.服务端操作系统接收到数据包传递给Server stub
5.Server stub解组消息为参数
6.Server stub再调用服务端的过程,过程执行结果以反方向的相同的步骤响应给客户端
需要处理的问题
1.Client stub、Server stub的开发
2.参数如何编组为消息,以及解组消息
3.消息如何发送
4.过程结果如何表示、异常情况如何处理
5.如何实现安全的访问控制
三、RPC协议
RPC调用过程中需要将参数编组为消息进行发送,接受方需要解组消息为参数,过程处理结果同样需要编组、解组。消息由哪些部分构成及消息的表示形式就构成了消息协议。
RPC调用过程中采用协议成为RPC协议。
常见RPC协议
四、手写RPC框架
封装好参数编组、消息解码、底层网络通信的RPC程序开发框架,带来的便捷是可以直接在其基础上只需专注于过程代码的编写
从使用者角度开始
2.1 客户端
2.1.1 客户端设计
客户端生成过程接口的代理对象
设计客户端代理工厂,用JDK动态代理即可生成接口的代理对象
ServiceInfoDiscoverer接口得到服务信息,返回服务信息的列表,大并发的支持,某个服务提供者可能有多个提供者,并发量很大需要用到集群
ServiceInfo,服务的名称,服务协议
根据需要提供服务信息发现者,动态可以使用zookeeper
消息协议独立为一层,客户端、服务端均需要
客户端完整类图
不同颜色代表不同层,入口是ClientStubProxyFactory
2.1.2 实现客户端
package com.study.mike.rpc.client; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import com.study.mike.rpc.client.net.NetClient; import com.study.mike.rpc.common.protocol.MessageProtocol; import com.study.mike.rpc.common.protocol.Request; import com.study.mike.rpc.common.protocol.Response; import com.study.mike.rpc.discovery.ServiceInfo; import com.study.mike.rpc.discovery.ServiceInfoDiscoverer; public class ClientStubProxyFactory { private ServiceInfoDiscoverer sid; private Map<String, MessageProtocol> supportMessageProtocols; private NetClient netClient; private Map<Class<?>, Object> objectCache = new HashMap<>(); public <T> T getProxy(Class<T> interf) { T obj = (T) this.objectCache.get(interf); if (obj == null) { obj = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class<?>[] { interf }, new ClientStubInvocationHandler(interf)); this.objectCache.put(interf, obj); } return obj; } public ServiceInfoDiscoverer getSid() { return sid; } public void setSid(ServiceInfoDiscoverer sid) { this.sid = sid; } public Map<String, MessageProtocol> getSupportMessageProtocols() { return supportMessageProtocols; } public void setSupportMessageProtocols(Map<String, MessageProtocol> supportMessageProtocols) { this.supportMessageProtocols = supportMessageProtocols; } public NetClient getNetClient() { return netClient; } public void setNetClient(NetClient netClient) { this.netClient = netClient; } private class ClientStubInvocationHandler implements InvocationHandler { private Class<?> interf; private Random random = new Random(); public ClientStubInvocationHandler(Class<?> interf) { super(); this.interf = interf; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getName().equals("toString")) { return proxy.getClass().toString(); } if (method.getName().equals("hashCode")) { return 0; } // 1、获得服务信息 String serviceName = this.interf.getName(); List<ServiceInfo> sinfos = sid.getServiceInfo(serviceName); if (sinfos == null || sinfos.size() == 0) { throw new Exception("远程服务不存在!"); } // 随机选择一个服务提供者(软负载均衡) ServiceInfo sinfo = sinfos.get(random.nextInt(sinfos.size())); // 2、构造request对象 Request req = new Request(); req.setServiceName(sinfo.getName()); req.setMethod(method.getName()); req.setPrameterTypes(method.getParameterTypes()); req.setParameters(args); // 3、协议层编组 // 获得该方法对应的协议 MessageProtocol protocol = supportMessageProtocols.get(sinfo.getProtocol()); // 编组请求 byte[] data = protocol.marshallingRequest(req); // 4、调用网络层发送请求 byte[] repData = netClient.sendRequest(data, sinfo); // 5解组响应消息 Response rsp = protocol.unmarshallingResponse(repData); // 6、结果处理 if (rsp.getException() != null) { throw rsp.getException(); } return rsp.getReturnValue(); } } }
package com.study.mike.rpc.client.net; import java.util.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.study.mike.rpc.discovery.ServiceInfo; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyNetClient implements NetClient { private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class); @Override public byte[] sendRequest(byte[] data, ServiceInfo sinfo) throws Throwable { String[] addInfoArray = sinfo.getAddress().split(":"); SendHandler sendHandler = new SendHandler(data); byte[] respData = null; // 配置客户端 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(sendHandler); } }); // 启动客户端连接 b.connect(addInfoArray[0], Integer.valueOf(addInfoArray[1])).sync(); respData = (byte[]) sendHandler.rspData(); logger.info("sendRequest get reply: " + respData); } finally { // 释放线程组资源 group.shutdownGracefully(); } return respData; } private class SendHandler extends ChannelInboundHandlerAdapter { private CountDownLatch cdl = null; private Object readMsg = null; private byte[] data; public SendHandler(byte[] data) { cdl = new CountDownLatch(1); this.data = data; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("连接服务端成功:" + ctx); ByteBuf reqBuf = Unpooled.buffer(data.length); reqBuf.writeBytes(data); logger.info("客户端发送消息:" + reqBuf); ctx.writeAndFlush(reqBuf); } public Object rspData() { try { cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } return readMsg; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("client read msg: " + msg); ByteBuf msgBuf = (ByteBuf) msg; byte[] resp = new byte[msgBuf.readableBytes()]; msgBuf.readBytes(resp); readMsg = resp; cdl.countDown(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); logger.error("发生异常:" + cause.getMessage()); ctx.close(); } } }
package com.study.mike.rpc.client.net; import com.study.mike.rpc.discovery.ServiceInfo; public interface NetClient { byte[] sendRequest(byte[] data, ServiceInfo sinfo) throws Throwable; }
package com.study.mike.rpc.discovery; public class ServiceInfo { private String name; private String protocol; private String address; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getProtocol() { return protocol; } public void setProtocol(String protocol) { this.protocol = protocol; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } }
package com.study.mike.rpc.discovery; import java.util.List; public interface ServiceInfoDiscoverer { List<ServiceInfo> getServiceInfo(String name); }
package com.study.mike.rpc.discovery; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.util.ArrayList; import java.util.List; import org.I0Itec.zkclient.ZkClient; import com.alibaba.fastjson.JSON; import com.study.mike.rpc.server.register.MyZkSerializer; import com.study.mike.rpc.util.PropertiesUtils; public class ZookeeperServiceInfoDiscoverer implements ServiceInfoDiscoverer { ZkClient client; private String centerRootPath = "/Rpc-framework"; public ZookeeperServiceInfoDiscoverer() { String addr = PropertiesUtils.getProperties("zk.address"); client = new ZkClient(addr); client.setZkSerializer(new MyZkSerializer()); } @Override public List<ServiceInfo> getServiceInfo(String name) { String servicePath = centerRootPath + "/" + name + "/service"; List<String> children = client.getChildren(servicePath); List<ServiceInfo> resources = new ArrayList<ServiceInfo>(); for (String ch : children) { try { String deCh = URLDecoder.decode(ch, "UTF-8"); ServiceInfo r = JSON.parseObject(deCh, ServiceInfo.class); resources.add(r); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return resources; } }
package com.study.mike.rpc.common.protocol; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; public class JavaSerializeMessageProtocol implements MessageProtocol { private byte[] serialize(Object obj) throws Exception { ByteArrayOutputStream bout = new ByteArrayOutputStream(); ObjectOutputStream out = new ObjectOutputStream(bout); out.writeObject(obj); return bout.toByteArray(); } @Override public byte[] marshallingRequest(Request req) throws Exception { return this.serialize(req); } @Override public Request unmarshallingRequest(byte[] data) throws Exception { ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data)); return (Request) in.readObject(); } @Override public byte[] marshallingResponse(Response rsp) throws Exception { return this.serialize(rsp); } @Override public Response unmarshallingResponse(byte[] data) throws Exception { ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data)); return (Response) in.readObject(); } }
package com.study.mike.rpc.demo.consumer; import java.awt.Point; import java.util.HashMap; import java.util.Map; import com.study.mike.rpc.client.ClientStubProxyFactory; import com.study.mike.rpc.client.net.NettyNetClient; import com.study.mike.rpc.common.protocol.JavaSerializeMessageProtocol; import com.study.mike.rpc.common.protocol.MessageProtocol; import com.study.mike.rpc.demo.DemoService; import com.study.mike.rpc.discovery.ZookeeperServiceInfoDiscoverer; public class Consumer { public static void main(String[] args) throws Exception { ClientStubProxyFactory cspf = new ClientStubProxyFactory(); // 设置服务发现者 cspf.setSid(new ZookeeperServiceInfoDiscoverer()); // 设置支持的协议 Map<String, MessageProtocol> supportMessageProtocols = new HashMap<>(); supportMessageProtocols.put("javas", new JavaSerializeMessageProtocol()); cspf.setSupportMessageProtocols(supportMessageProtocols); // 设置网络层实现 cspf.setNetClient(new NettyNetClient()); DemoService demoService = cspf.getProxy(DemoService.class); // 获取远程服务代理 String hello = demoService.sayHello("world"); // 执行远程方法 System.out.println(hello); // 显示调用结果 System.out.println(demoService.multiPoint(new Point(5, 10), 2)); } }
2.2 服务端
2.2.1 设计服务端
2.2.2 实现服务端
package com.study.mike.rpc.demo.provider; import com.study.mike.rpc.common.protocol.JavaSerializeMessageProtocol; import com.study.mike.rpc.demo.DemoService; import com.study.mike.rpc.server.NettyRpcServer; import com.study.mike.rpc.server.RequestHandler; import com.study.mike.rpc.server.RpcServer; import com.study.mike.rpc.server.register.ServiceObject; import com.study.mike.rpc.server.register.ServiceRegister; import com.study.mike.rpc.server.register.ZookeeperExportServiceRegister; import com.study.mike.rpc.util.PropertiesUtils; public class Provider { public static void main(String[] args) throws Exception { int port = Integer.parseInt(PropertiesUtils.getProperties("rpc.port")); String protocol = PropertiesUtils.getProperties("rpc.protocol"); // 服务注册 ServiceRegister sr = new ZookeeperExportServiceRegister(); DemoService ds = new DemoServiceImpl(); ServiceObject so = new ServiceObject(DemoService.class.getName(), DemoService.class, ds); sr.register(so, protocol, port); RequestHandler reqHandler = new RequestHandler(new JavaSerializeMessageProtocol(), sr); RpcServer server = new NettyRpcServer(port, protocol, reqHandler); server.start(); System.in.read(); // 按任意键退出 server.stop(); } }
配置端口
app.properties
zk.address=127.0.0.1:2181 rpc.port=19000 rpc.protocol=javas
package com.study.mike.rpc.server.register; import java.util.HashMap; import java.util.Map; public class DefaultServiceRegister implements ServiceRegister { private Map<String, ServiceObject> serviceMap = new HashMap<>(); @Override public void register(ServiceObject so, String protocolName, int port) throws Exception { if (so == null) { throw new IllegalArgumentException("参数不能为空"); } this.serviceMap.put(so.getName(), so); } @Override public ServiceObject getServiceObject(String name) { return this.serviceMap.get(name); } }
package com.study.mike.rpc.server.register; import java.io.UnsupportedEncodingException; import java.net.InetAddress; import java.net.URLEncoder; import org.I0Itec.zkclient.ZkClient; import com.alibaba.fastjson.JSON; import com.study.mike.rpc.discovery.ServiceInfo; import com.study.mike.rpc.util.PropertiesUtils; /** * Zookeeper方式获取远程服务信息类。 * * ZookeeperServiceInfoDiscoverer */ public class ZookeeperExportServiceRegister extends DefaultServiceRegister implements ServiceRegister { private ZkClient client; private String centerRootPath = "/Rpc-framework"; public ZookeeperExportServiceRegister() { String addr = PropertiesUtils.getProperties("zk.address"); client = new ZkClient(addr); client.setZkSerializer(new MyZkSerializer()); } @Override public void register(ServiceObject so, String protocolName, int port) throws Exception { super.register(so, protocolName, port); ServiceInfo soInf = new ServiceInfo(); String host = InetAddress.getLocalHost().getHostAddress(); String address = host + ":" + port; soInf.setAddress(address); soInf.setName(so.getInterf().getName()); soInf.setProtocol(protocolName); this.exportService(soInf); } private void exportService(ServiceInfo serviceResource) { String serviceName = serviceResource.getName(); String uri = JSON.toJSONString(serviceResource); try { uri = URLEncoder.encode(uri, "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } String servicePath = centerRootPath + "/" + serviceName + "/service"; if (!client.exists(servicePath)) { client.createPersistent(servicePath, true); } String uriPath = servicePath + "/" + uri; if (client.exists(uriPath)) { client.delete(uriPath); } client.createEphemeral(uriPath); } }
package com.study.mike.rpc.server; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class NettyRpcServer extends RpcServer { private static Logger logger = LoggerFactory.getLogger(NettyRpcServer.class); private Channel channel; public NettyRpcServer(int port, String protocol, RequestHandler handler) { super(port, protocol, handler); } @Override public void start() { // 配置服务器 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new ChannelRequestHandler()); } }); // 启动服务 ChannelFuture f = b.bind(port).sync(); logger.info("完成服务端端口绑定与启动"); channel = f.channel(); // 等待服务通道关闭 f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 释放线程组资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } @Override public void stop() { this.channel.close(); } private class ChannelRequestHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("激活"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("服务端收到消息:" + msg); ByteBuf msgBuf = (ByteBuf) msg; byte[] req = new byte[msgBuf.readableBytes()]; msgBuf.readBytes(req); byte[] res = handler.handleRequest(req); logger.info("发送响应:" + msg); ByteBuf respBuf = Unpooled.buffer(res.length); respBuf.writeBytes(res); ctx.write(respBuf); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); logger.error("发生异常:" + cause.getMessage()); ctx.close(); } } }
package com.study.mike.rpc.server; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import com.study.mike.rpc.common.protocol.MessageProtocol; import com.study.mike.rpc.common.protocol.Request; import com.study.mike.rpc.common.protocol.Response; import com.study.mike.rpc.common.protocol.Status; import com.study.mike.rpc.server.register.ServiceObject; import com.study.mike.rpc.server.register.ServiceRegister; public class RequestHandler { private MessageProtocol protocol; private ServiceRegister serviceRegister; public RequestHandler(MessageProtocol protocol, ServiceRegister serviceRegister) { super(); this.protocol = protocol; this.serviceRegister = serviceRegister; } public byte[] handleRequest(byte[] data) throws Exception { // 1、解组消息 Request req = this.protocol.unmarshallingRequest(data); // 2、查找服务对象 ServiceObject so = this.serviceRegister.getServiceObject(req.getServiceName()); Response rsp = null; if (so == null) { rsp = new Response(Status.NOT_FOUND); } else { // 3、反射调用对应的过程方法 try { Method m = so.getInterf().getMethod(req.getMethod(), req.getPrameterTypes()); Object returnValue = m.invoke(so.getObj(), req.getParameters()); rsp = new Response(Status.SUCCESS); rsp.setReturnValue(returnValue); } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { rsp = new Response(Status.ERROR); rsp.setException(e); } } // 4、编组响应消息 return this.protocol.marshallingResponse(rsp); } public MessageProtocol getProtocol() { return protocol; } public void setProtocol(MessageProtocol protocol) { this.protocol = protocol; } public ServiceRegister getServiceRegister() { return serviceRegister; } public void setServiceRegister(ServiceRegister serviceRegister) { this.serviceRegister = serviceRegister; } }