案例介绍
结合上面两章节,本章将实现rpc的基础功能;提供一个rpc中间件jar给生产端和服务端。
技术点;
1、注册中心,生产者在启动的时候需要将本地接口发布到注册中心,我们这里采用redis作为注册中心,随机取数模拟权重。
2、客户端在启动的时候,连接到注册中心,也就是我们的redis。连接成功后将配置的生产者方法发布到注册中心{接口+别名}。
3、服务端配置生产者的信息后,在加载xml时候由中间件生成动态代理类,当发生发放调用时实际则调用了我们代理类的方法,代理里会通过netty的futuer通信方式进行数据交互。
环境准备
1、jdk 1.8.0
2、IntelliJ IDEA Community Edition 2018.3.1 x64
3、windows redis
代码示例
| 源码获取,关注公众号:bugstack虫洞栈 | 回复RPC案例
1itstack-demo-rpc-03 2└── src 3 └── main 4 │ ├── java 5 │ │ └── org.itstack.demo.rpc 6 │ │ ├── config 7 │ │ ├── domain 8 │ │ ├── network 9 │ │ │ ├── client 10 │ │ │ │ ├── ClientSocket.java 11 │ │ │ │ └── MyClientHandler.java 12 │ │ │ ├── codec 13 │ │ │ │ ├── RpcDecoder.java 14 │ │ │ │ └── RpcEncoder.java 15 │ │ │ ├── future 16 │ │ │ │ ├── SyncWrite.java 17 │ │ │ │ ├── SyncWriteFuture.java 18 │ │ │ │ ├── SyncWriteMap.java 19 │ │ │ │ └── WriteFuture.java 20 │ │ │ ├── msg 21 │ │ │ │ ├── Request.java 22 │ │ │ │ └── Response.java 23 │ │ │ ├── server 24 │ │ │ │ ├── MyServerHandler.java 25 │ │ │ │ └── ServerSocket.java 26 │ │ │ └── util 27 │ │ │ └── SerializationUtil.java 28 │ │ ├── reflect 29 │ │ │ ├── JDKInvocationHandler.java 30 │ │ │ └── JDKProxy.java 31 │ │ ├── registry 32 │ │ │ └── RedisRegistryCenter.java 33 │ │ └── util 34 │ └── resource 35 │ └── META-INF 36 │ ├── rpc.xsd 37 │ ├── spring.handlers 38 │ └── spring.schemas 39 40 └── test 41 ├── java 42 │ └── org.itstack.demo.test 43 │ ├── service 44 │ │ ├── impl 45 │ │ │ └── HelloServiceImpl.java 46 │ │ └── HelloService.java 47 │ └── ApiTest.java 48 └── resource 49 ├── itstack-rpc-center.xml 50 ├── itstack-rpc-consumer.xml 51 ├── itstack-rpc-provider.xml 52 └── log4j.xml
ConsumerBean.java
1package org.itstack.demo.rpc.config.spring.bean; 2 3import com.alibaba.fastjson.JSON; 4import io.netty.channel.ChannelFuture; 5import org.itstack.demo.rpc.config.ConsumerConfig; 6import org.itstack.demo.rpc.domain.RpcProviderConfig; 7import org.itstack.demo.rpc.network.client.ClientSocket; 8import org.itstack.demo.rpc.network.msg.Request; 9import org.itstack.demo.rpc.reflect.JDKProxy; 10import org.itstack.demo.rpc.registry.RedisRegistryCenter; 11import org.itstack.demo.rpc.util.ClassLoaderUtils; 12import org.springframework.beans.factory.FactoryBean; 13import org.springframework.util.Assert; 14 15/** 16 * http://www.itstack.org 17 * create by fuzhengwei on 2019/5/6 18 */ 19public class ConsumerBean<T> extends ConsumerConfig<T> implements FactoryBean { 20 21 private ChannelFuture channelFuture; 22 23 private RpcProviderConfig rpcProviderConfig; 24 25 @Override 26 public Object getObject() throws Exception { 27 28 //从redis获取链接 29 if (null == rpcProviderConfig) { 30 String infoStr = RedisRegistryCenter.obtainProvider(nozzle, alias); 31 rpcProviderConfig = JSON.parseObject(infoStr, RpcProviderConfig.class); 32 } 33 Assert.isTrue(null != rpcProviderConfig); 34 35 //获取通信channel 36 if (null == channelFuture) { 37 ClientSocket clientSocket = new ClientSocket(rpcProviderConfig.getHost(), rpcProviderConfig.getPort()); 38 new Thread(clientSocket).start(); 39 for (int i = 0; i < 100; i++) { 40 if (null != channelFuture) break; 41 Thread.sleep(500); 42 channelFuture = clientSocket.getFuture(); 43 } 44 } 45 Assert.isTrue(null != channelFuture); 46 47 Request request = new Request(); 48 request.setChannel(channelFuture.channel()); 49 request.setNozzle(nozzle); 50 request.setRef(rpcProviderConfig.getRef()); 51 request.setAlias(alias); 52 return (T) JDKProxy.getProxy(ClassLoaderUtils.forName(nozzle), request); 53 } 54 55 @Override 56 public Class<?> getObjectType() { 57 try { 58 return ClassLoaderUtils.forName(nozzle); 59 } catch (ClassNotFoundException e) { 60 return null; 61 } 62 } 63 64 @Override 65 public boolean isSingleton() { 66 return true; 67 } 68 69 70}
ProviderBean.java
1package org.itstack.demo.rpc.config.spring.bean; 2 3import com.alibaba.fastjson.JSON; 4import org.itstack.demo.rpc.config.ProviderConfig; 5import org.itstack.demo.rpc.domain.LocalServerInfo; 6import org.itstack.demo.rpc.domain.RpcProviderConfig; 7import org.itstack.demo.rpc.registry.RedisRegistryCenter; 8import org.slf4j.Logger; 9import org.slf4j.LoggerFactory; 10import org.springframework.beans.BeansException; 11import org.springframework.context.ApplicationContext; 12import org.springframework.context.ApplicationContextAware; 13 14/** 15 * http://www.itstack.org 16 * create by fuzhengwei on 2019/5/6 17 */ 18public class ProviderBean extends ProviderConfig implements ApplicationContextAware { 19 20 private Logger logger = LoggerFactory.getLogger(ProviderBean.class); 21 22 @Override 23 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { 24 25 RpcProviderConfig rpcProviderConfig = new RpcProviderConfig(); 26 rpcProviderConfig.setNozzle(nozzle); 27 rpcProviderConfig.setRef(ref); 28 rpcProviderConfig.setAlias(alias); 29 rpcProviderConfig.setHost(LocalServerInfo.LOCAL_HOST); 30 rpcProviderConfig.setPort(LocalServerInfo.LOCAL_PORT); 31 32 //注册生产者 33 long count = RedisRegistryCenter.registryProvider(nozzle, alias, JSON.toJSONString(rpcProviderConfig)); 34 35 logger.info("注册生产者:{} {} {}", nozzle, alias, count); 36 } 37 38}
ServerBean.java
1package org.itstack.demo.rpc.config.spring.bean; 2 3import org.itstack.demo.rpc.config.ServerConfig; 4import org.itstack.demo.rpc.domain.LocalServerInfo; 5import org.itstack.demo.rpc.network.server.ServerSocket; 6import org.itstack.demo.rpc.registry.RedisRegistryCenter; 7import org.slf4j.Logger; 8import org.slf4j.LoggerFactory; 9import org.springframework.beans.BeansException; 10import org.springframework.beans.factory.InitializingBean; 11import org.springframework.context.ApplicationContext; 12import org.springframework.context.ApplicationContextAware; 13 14/** 15 * http://www.itstack.org 16 * create by fuzhengwei on 2019/5/6 17 */ 18public class ServerBean extends ServerConfig implements ApplicationContextAware { 19 20 private Logger logger = LoggerFactory.getLogger(ServerBean.class); 21 22 @Override 23 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { 24 //启动注册中心 25 logger.info("启动注册中心 ..."); 26 RedisRegistryCenter.init(host, port); 27 logger.info("启动注册中心完成 {} {}", host, port); 28 29 //初始化服务端 30 logger.info("初始化生产端服务 ..."); 31 ServerSocket serverSocket = new ServerSocket(applicationContext); 32 Thread thread = new Thread(serverSocket); 33 thread.start(); 34 while (!serverSocket.isActiveSocketServer()) { 35 try { 36 Thread.sleep(500); 37 } catch (InterruptedException ignore) { 38 } 39 } 40 41 logger.info("初始化生产端服务完成 {} {}", LocalServerInfo.LOCAL_HOST, LocalServerInfo.LOCAL_PORT); 42 } 43 44 45}
MyClientHandler.java
1package org.itstack.demo.rpc.network.client; 2 3import io.netty.channel.ChannelHandlerContext; 4import io.netty.channel.ChannelInboundHandlerAdapter; 5import org.itstack.demo.rpc.network.future.SyncWriteFuture; 6import org.itstack.demo.rpc.network.future.SyncWriteMap; 7import org.itstack.demo.rpc.network.msg.Response; 8 9/** 10 * http://www.itstack.org 11 * create by fuzhengwei on 2019/5/6 12 */ 13public class MyClientHandler extends ChannelInboundHandlerAdapter { 14 15 @Override 16 public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception { 17 Response msg = (Response) obj; 18 String requestId = msg.getRequestId(); 19 SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId); 20 if (future != null) { 21 future.setResponse(msg); 22 } 23 } 24 25 @Override 26 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 27 cause.printStackTrace(); 28 ctx.close(); 29 } 30 31}
MyServerHandler.java
1package org.itstack.demo.rpc.network.server; 2 3import io.netty.channel.ChannelHandlerContext; 4import io.netty.channel.ChannelInboundHandlerAdapter; 5import io.netty.util.ReferenceCountUtil; 6import org.itstack.demo.rpc.network.msg.Request; 7import org.itstack.demo.rpc.network.msg.Response; 8import org.itstack.demo.rpc.util.ClassLoaderUtils; 9import org.springframework.context.ApplicationContext; 10 11import java.lang.reflect.Method; 12 13/** 14 * http://www.itstack.org 15 * create by fuzhengwei on 2019/5/6 16 */ 17public class MyServerHandler extends ChannelInboundHandlerAdapter { 18 19 private ApplicationContext applicationContext; 20 21 MyServerHandler(ApplicationContext applicationContext) { 22 this.applicationContext = applicationContext; 23 } 24 25 @Override 26 public void channelRead(ChannelHandlerContext ctx, Object obj) { 27 try { 28 Request msg = (Request) obj; 29 //调用 30 Class<?> classType = ClassLoaderUtils.forName(msg.getNozzle()); 31 Method addMethod = classType.getMethod(msg.getMethodName(), msg.getParamTypes()); 32 Object objectBean = applicationContext.getBean(msg.getRef()); 33 Object result = addMethod.invoke(objectBean, msg.getArgs()); 34 //反馈 35 Response request = new Response(); 36 request.setRequestId(msg.getRequestId()); 37 request.setResult(result); 38 ctx.writeAndFlush(request); 39 //释放 40 ReferenceCountUtil.release(msg); 41 } catch (Exception e) { 42 e.printStackTrace(); 43 } 44 } 45 46 @Override 47 public void channelReadComplete(ChannelHandlerContext ctx) { 48 ctx.flush(); 49 } 50 51}
JDKInvocationHandler.java
1package org.itstack.demo.rpc.reflect; 2 3 4import org.itstack.demo.rpc.network.future.SyncWrite; 5import org.itstack.demo.rpc.network.msg.Request; 6import org.itstack.demo.rpc.network.msg.Response; 7 8import java.lang.reflect.InvocationHandler; 9import java.lang.reflect.Method; 10 11public class JDKInvocationHandler implements InvocationHandler { 12 13 private Request request; 14 15 public JDKInvocationHandler(Request request) { 16 this.request = request; 17 } 18 19 @Override 20 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 21 String methodName = method.getName(); 22 Class[] paramTypes = method.getParameterTypes(); 23 if ("toString".equals(methodName) && paramTypes.length == 0) { 24 return request.toString(); 25 } else if ("hashCode".equals(methodName) && paramTypes.length == 0) { 26 return request.hashCode(); 27 } else if ("equals".equals(methodName) && paramTypes.length == 1) { 28 return request.equals(args[0]); 29 } 30 //设置参数 31 request.setMethodName(methodName); 32 request.setParamTypes(paramTypes); 33 request.setArgs(args); 34 request.setRef(request.getRef()); 35 Response response = new SyncWrite().writeAndSync(request.getChannel(), request, 5000); 36 //异步调用 37 return response.getResult(); 38 39 } 40 41}
JDKProxy.java
1package org.itstack.demo.rpc.reflect; 2 3 4import org.itstack.demo.rpc.network.msg.Request; 5import org.itstack.demo.rpc.util.ClassLoaderUtils; 6 7import java.lang.reflect.InvocationHandler; 8import java.lang.reflect.Proxy; 9 10public class JDKProxy { 11 12 public static <T> T getProxy(Class<T> interfaceClass, Request request) throws Exception { 13 InvocationHandler handler = new JDKInvocationHandler(request); 14 ClassLoader classLoader = ClassLoaderUtils.getCurrentClassLoader(); 15 T result = (T) Proxy.newProxyInstance(classLoader, new Class[]{interfaceClass}, handler); 16 return result; 17 } 18 19}
RedisRegistryCenter.java
1package org.itstack.demo.rpc.registry; 2 3import redis.clients.jedis.Jedis; 4import redis.clients.jedis.JedisPool; 5import redis.clients.jedis.JedisPoolConfig; 6 7/** 8 * http://www.itstack.org 9 * create by fuzhengwei on 2019/5/7 10 * redis 模拟RPC注册中心 11 */ 12public class RedisRegistryCenter { 13 14 private static Jedis jedis; //非切片额客户端连接 15 16 //初始化redis 17 public static void init(String host, int port) { 18 // 池基本配置 19 JedisPoolConfig config = new JedisPoolConfig(); 20 config.setMaxIdle(5); 21 config.setTestOnBorrow(false); 22 JedisPool jedisPool = new JedisPool(config, host, port); 23 jedis = jedisPool.getResource(); 24 } 25 26 /** 27 * 注册生产者 28 * 29 * @param nozzle 接口 30 * @param alias 别名 31 * @param info 信息 32 * @return 注册结果 33 */ 34 public static Long registryProvider(String nozzle, String alias, String info) { 35 return jedis.sadd(nozzle + "_" + alias, info); 36 } 37 38 /** 39 * 获取生产者 40 * 模拟权重,随机获取 41 * @param nozzle 接口名称 42 */ 43 public static String obtainProvider(String nozzle, String alias) { 44 return jedis.srandmember(nozzle + "_" + alias); 45 } 46 47 public static Jedis jedis() { 48 return jedis; 49 } 50 51}
ApiTest.java
1public class ApiTest { 2 3 public static void main(String[] args) { 4 String[] configs = {"itstack-rpc-center.xml", "itstack-rpc-provider.xml", "itstack-rpc-consumer.xml"}; 5 new ClassPathXmlApplicationContext(configs); 6 } 7 8}
框架,测试结果
12019-....ClassPathXmlApplicationContext:prepareRefresh:510] - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@299a06ac: startup date [Tue May 07 20:19:47 CST 2019]; root of context hierarchy 22019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-center.xml] 32019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-provider.xml] 42019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-consumer.xml] 52019-...upport.DefaultListableBeanFactory:preInstantiateSingletons:577] - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@7e0b0338: defining beans [consumer_itstack,provider_helloService,consumer_helloService]; root of factory hierarchy 62019-...bean.ServerBean:setApplicationContext:25] - 启动注册中心 ... 72019-...bean.ServerBean:setApplicationContext:27] - 启动注册中心完成 127.0.0.1 6379 82019-...bean.ServerBean:setApplicationContext:30] - 初始化生产端服务 ... 92019-...bean.ServerBean:setApplicationContext:41] - 初始化生产端服务完成 10.13.81.104 22201 102019-...bean.ProviderBean:setApplicationContext:35] - 注册生产者:org.itstack.demo.test.service.HelloService itStackRpc 0
框架应用
为了测试我们写两个测试工程;itstack-demo-rpc-provider、itstack-demo-rpc-consumer
itstack-demo-rpc-provider 提供生产者接口
1itstack-demo-rpc-provider 2├── itstack-demo-rpc-provider-export 3│ └── src 4│ └── main 5│ └── java 6│ └── org.itstack.demo.rpc.provider.export 7│ ├── domain 8│ │ └── Hi.java 9│ └── HelloService.java 10│ 11└── itstack-demo-rpc-provider-web 12 └── src 13 └── main 14 ├── java 15 │ └── org.itstack.demo.rpc.provider.web 16 │ └── HelloServiceImpl.java 17 └── resources 18 └── spring 19 └── spring-itstack-rpc-provider.xml
HelloService.java
1public interface HelloService { 2 3 String hi(); 4 5 String say(String str); 6 7 String sayHi(Hi hi); 8 9}
HelloServiceImpl.java
1@Controller("helloService") 2public class HelloServiceImpl implements HelloService { 3 4 @Override 5 public String hi() { 6 return "hi itstack rpc"; 7 } 8 9 @Override 10 public String say(String str) { 11 return str; 12 } 13 14 @Override 15 public String sayHi(Hi hi) { 16 return hi.getUserName() + " say:" + hi.getSayMsg(); 17 } 18 19}
spring-itstack-rpc-provider.xml
1<?xml version="1.0" encoding="UTF-8"?> 2<beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rpc="http://rpc.itstack.org/schema/rpc" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 5 http://rpc.itstack.org/schema/rpc http://rpc.itstack.org/schema/rpc/rpc.xsd"> 6 7 <!-- 注册中心 --> 8 <rpc:server id="rpcServer" host="127.0.0.1" port="6379"/> 9 10 <rpc:provider id="helloServiceRpc" nozzle="org.itstack.demo.rpc.provider.export.HelloService" 11 ref="helloService" alias="itstackRpc"/> 12 13</beans>
itstack-demo-rpc-consumer 提供消费者调用
1itstack-demo-rpc-consumer 2└── src 3 ├── main 4 │ ├── java 5 │ └── resources 6 │ └── spring 7 │ └── spring-itstack-rpc-consumer.xml 8 │ 9 └── test 10 └── java 11 └── org.itstack.demo.test 12 └── ConsumerTest.java
spring-itstack-rpc-consumer.xml
1<?xml version="1.0" encoding="UTF-8"?> 2<beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rpc="http://rpc.itstack.org/schema/rpc" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 5 http://rpc.itstack.org/schema/rpc http://rpc.itstack.org/schema/rpc/rpc.xsd"> 6 7 <!-- 注册中心 --> 8 <rpc:server id="consumer_itstack" host="127.0.0.1" port="6379"/> 9 10 <rpc:consumer id="helloService" nozzle="org.itstack.demo.rpc.provider.export.HelloService" alias="itstackRpc"/> 11 12</beans>
ConsumerTest.java
1@RunWith(SpringJUnit4ClassRunner.class) 2@ContextConfiguration("/spring-config.xml") 3public class ConsumerTest { 4 5 @Resource(name = "helloService") 6 private HelloService helloService; 7 8 @Test 9 public void test() { 10 11 String hi = helloService.hi(); 12 System.out.println("测试结果:" + hi); 13 14 String say = helloService.say("hello world"); 15 System.out.println("测试结果:" + say); 16 17 Hi hiReq = new Hi(); 18 hiReq.setUserName("付栈"); 19 hiReq.setSayMsg("付可敌国,栈无不胜"); 20 String hiRes = helloService.sayHi(hiReq); 21 22 System.out.println("测试结果:" + hiRes); 23 } 24 25}
应用,测试结果
测试时启动redis
启动ProviderTest Redis中的注册数据
1redis 127.0.0.1:6379> srandmember org.itstack.demo.rpc.provider.export.HelloService_itstackRpc 2"{\"alias\":\"itstackRpc\",\"host\":\"10.13.81.104\",\"nozzle\":\"org.itstack.demo.rpc.provider.export.HelloService\",\"port\":22201,\"ref\":\"helloService\"}" 3redis 127.0.0.1:6379>
执行ConsumerTest中的单元测试方法
1log4j:WARN No appenders could be found for logger (org.springframework.test.context.junit4.SpringJUnit4ClassRunner). 2log4j:WARN Please initialize the log4j system properly. 3log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 4测试结果:hi itstack rpc 5测试结果:hello world 6测试结果:付栈 say:付可敌国,栈无不胜 7 8Process finished with exit code 0 9