案例介绍
结合上面两章节,本章将实现rpc的基础功能;提供一给rpc中间件jar给生产端和服务端。
技术点;
1、注册中心,生产者在启动的时候需要将本地接口发布到注册中心,我们这里采用redis作为注册中心,随机取数模拟权重。
2、客户端在启动的时候,连接到注册中心,也就是我们的redis。连接成功后将配置的生产者方法发布到注册中心{接口+别名}。
3、服务端配置生产者的信息后,在加载xml时候由中间件生成动态代理类,当发生发放调用时实际则调用了我们代理类的方法,代理里会通过netty的futuer通信方式进行数据交互。
环境准备
1、jdk 1.8.0
2、IntelliJ IDEA Community Edition 2018.3.1 x64
3、windows redis
代码示例
ConsumerBean.java
package org.itstack.demo.rpc.config.spring.bean; import com.alibaba.fastjson.JSON; import io.netty.channel.ChannelFuture; import org.itstack.demo.rpc.config.ConsumerConfig; import org.itstack.demo.rpc.domain.RpcProviderConfig; import org.itstack.demo.rpc.network.client.ClientSocket; import org.itstack.demo.rpc.network.msg.Request; import org.itstack.demo.rpc.reflect.JDKProxy; import org.itstack.demo.rpc.registry.RedisRegistryCenter; import org.itstack.demo.rpc.util.ClassLoaderUtils; import org.springframework.beans.factory.FactoryBean; import org.springframework.util.Assert; /** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */ public class ConsumerBean<T> extends ConsumerConfig<T> implements FactoryBean { private ChannelFuture channelFuture; private RpcProviderConfig rpcProviderConfig; @Override public Object getObject() throws Exception { //从redis获取链接 if (null == rpcProviderConfig) { String infoStr = RedisRegistryCenter.obtainProvider(nozzle, alias); rpcProviderConfig = JSON.parseObject(infoStr, RpcProviderConfig.class); } Assert.isTrue(null != rpcProviderConfig); //获取通信channel if (null == channelFuture) { ClientSocket clientSocket = new ClientSocket(rpcProviderConfig.getHost(), rpcProviderConfig.getPort()); new Thread(clientSocket).start(); for (int i = 0; i < 100; i++) { if (null != channelFuture) break; Thread.sleep(500); channelFuture = clientSocket.getFuture(); } } Assert.isTrue(null != channelFuture); Request request = new Request(); request.setChannel(channelFuture.channel()); request.setNozzle(nozzle); request.setRef(rpcProviderConfig.getRef()); request.setAlias(alias); return (T) JDKProxy.getProxy(ClassLoaderUtils.forName(nozzle), request); } @Override public Class<?> getObjectType() { try { return ClassLoaderUtils.forName(nozzle); } catch (ClassNotFoundException e) { return null; } } @Override public boolean isSingleton() { return true; } }
ProviderBean.java
package org.itstack.demo.rpc.config.spring.bean; import com.alibaba.fastjson.JSON; import org.itstack.demo.rpc.config.ProviderConfig; import org.itstack.demo.rpc.domain.LocalServerInfo; import org.itstack.demo.rpc.domain.RpcProviderConfig; import org.itstack.demo.rpc.registry.RedisRegistryCenter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; /** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */ public class ProviderBean extends ProviderConfig implements ApplicationContextAware { private Logger logger = LoggerFactory.getLogger(ProviderBean.class); @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RpcProviderConfig rpcProviderConfig = new RpcProviderConfig(); rpcProviderConfig.setNozzle(nozzle); rpcProviderConfig.setRef(ref); rpcProviderConfig.setAlias(alias); rpcProviderConfig.setHost(LocalServerInfo.LOCAL_HOST); rpcProviderConfig.setPort(LocalServerInfo.LOCAL_PORT); //注册生产者 long count = RedisRegistryCenter.registryProvider(nozzle, alias, JSON.toJSONString(rpcProviderConfig)); logger.info("注册生产者:{} {} {}", nozzle, alias, count); } }
ServerBean.java
package org.itstack.demo.rpc.config.spring.bean; import org.itstack.demo.rpc.config.ServerConfig; import org.itstack.demo.rpc.domain.LocalServerInfo; import org.itstack.demo.rpc.network.server.ServerSocket; import org.itstack.demo.rpc.registry.RedisRegistryCenter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; /** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */ public class ServerBean extends ServerConfig implements ApplicationContextAware { private Logger logger = LoggerFactory.getLogger(ServerBean.class); @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { //启动注册中心 logger.info("启动注册中心 ..."); RedisRegistryCenter.init(host, port); logger.info("启动注册中心完成 {} {}", host, port); //初始化服务端 logger.info("初始化生产端服务 ..."); ServerSocket serverSocket = new ServerSocket(applicationContext); Thread thread = new Thread(serverSocket); thread.start(); while (!serverSocket.isActiveSocketServer()) { try { Thread.sleep(500); } catch (InterruptedException ignore) { } } logger.info("初始化生产端服务完成 {} {}", LocalServerInfo.LOCAL_HOST, LocalServerInfo.LOCAL_PORT); } }
MyClientHandler.java
package org.itstack.demo.rpc.network.client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.itstack.demo.rpc.network.future.SyncWriteFuture; import org.itstack.demo.rpc.network.future.SyncWriteMap; import org.itstack.demo.rpc.network.msg.Response; /** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */ public class MyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception { Response msg = (Response) obj; String requestId = msg.getRequestId(); SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId); if (future != null) { future.setResponse(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
MyServerHandler.java
package org.itstack.demo.rpc.network.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; import org.itstack.demo.rpc.network.msg.Request; import org.itstack.demo.rpc.network.msg.Response; import org.itstack.demo.rpc.util.ClassLoaderUtils; import org.springframework.context.ApplicationContext; import java.lang.reflect.Method; /** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */ public class MyServerHandler extends ChannelInboundHandlerAdapter { private ApplicationContext applicationContext; MyServerHandler(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } @Override public void channelRead(ChannelHandlerContext ctx, Object obj) { try { Request msg = (Request) obj; //调用 Class<?> classType = ClassLoaderUtils.forName(msg.getNozzle()); Method addMethod = classType.getMethod(msg.getMethodName(), msg.getParamTypes()); Object objectBean = applicationContext.getBean(msg.getRef()); Object result = addMethod.invoke(objectBean, msg.getArgs()); //反馈 Response request = new Response(); request.setRequestId(msg.getRequestId()); request.setResult(result); ctx.writeAndFlush(request); //释放 ReferenceCountUtil.release(msg); } catch (Exception e) { e.printStackTrace(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } }
JDKInvocationHandler.java
package org.itstack.demo.rpc.reflect; import org.itstack.demo.rpc.network.future.SyncWrite; import org.itstack.demo.rpc.network.msg.Request; import org.itstack.demo.rpc.network.msg.Response; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; public class JDKInvocationHandler implements InvocationHandler { private Request request; public JDKInvocationHandler(Request request) { this.request = request; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class[] paramTypes = method.getParameterTypes(); if ("toString".equals(methodName) && paramTypes.length == 0) { return request.toString(); } else if ("hashCode".equals(methodName) && paramTypes.length == 0) { return request.hashCode(); } else if ("equals".equals(methodName) && paramTypes.length == 1) { return request.equals(args[0]); } //设置参数 request.setMethodName(methodName); request.setParamTypes(paramTypes); request.setArgs(args); request.setRef(request.getRef()); Response response = new SyncWrite().writeAndSync(request.getChannel(), request, 5000); //异步调用 return response.getResult(); } }
JDKProxy.java
package org.itstack.demo.rpc.reflect; import org.itstack.demo.rpc.network.msg.Request; import org.itstack.demo.rpc.util.ClassLoaderUtils; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; public class JDKProxy { public static <T> T getProxy(Class<T> interfaceClass, Request request) throws Exception { InvocationHandler handler = new JDKInvocationHandler(request); ClassLoader classLoader = ClassLoaderUtils.getCurrentClassLoader(); T result = (T) Proxy.newProxyInstance(classLoader, new Class[]{interfaceClass}, handler); return result; } }
RedisRegistryCenter.java
package org.itstack.demo.rpc.registry; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; /** * http://www.itstack.org * create by fuzhengwei on 2019/5/7 * redis 模拟RPC注册中心 */ public class RedisRegistryCenter { private static Jedis jedis; //非切片额客户端连接 //初始化redis public static void init(String host, int port) { // 池基本配置 JedisPoolConfig config = new JedisPoolConfig(); config.setMaxIdle(5); config.setTestOnBorrow(false); JedisPool jedisPool = new JedisPool(config, host, port); jedis = jedisPool.getResource(); } /** * 注册生产者 * * @param nozzle 接口 * @param alias 别名 * @param info 信息 * @return 注册结果 */ public static Long registryProvider(String nozzle, String alias, String info) { return jedis.sadd(nozzle + "_" + alias, info); } /** * 获取生产者 * 模拟权重,随机获取 * @param nozzle 接口名称 */ public static String obtainProvider(String nozzle, String alias) { return jedis.srandmember(nozzle + "_" + alias); } public static Jedis jedis() { return jedis; } }
ApiTest.java
public class ApiTest { public static void main(String[] args) { String[] configs = {"itstack-rpc-center.xml", "itstack-rpc-provider.xml", "itstack-rpc-consumer.xml"}; new ClassPathXmlApplicationContext(configs); } }
框架,测试结果
2019-....ClassPathXmlApplicationContext:prepareRefresh:510] - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@299a06ac: startup date [Tue May 07 20:19:47 CST 2019]; root of context hierarchy 2019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-center.xml] 2019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-provider.xml] 2019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-consumer.xml] 2019-...upport.DefaultListableBeanFactory:preInstantiateSingletons:577] - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@7e0b0338: defining beans [consumer_itstack,provider_helloService,consumer_helloService]; root of factory hierarchy 2019-...bean.ServerBean:setApplicationContext:25] - 启动注册中心 ... 2019-...bean.ServerBean:setApplicationContext:27] - 启动注册中心完成 127.0.0.1 6379 2019-...bean.ServerBean:setApplicationContext:30] - 初始化生产端服务 ... 2019-...bean.ServerBean:setApplicationContext:41] - 初始化生产端服务完成 10.13.81.104 22201 2019-...bean.ProviderBean:setApplicationContext:35] - 注册生产者:org.itstack.demo.test.service.HelloService itStackRpc 0
框架应用
为了测试我们写两个测试工程;itstack-demo-rpc-provider、itstack-demo-rpc-consumer
itstack-demo-rpc-provider 提供生产者接口
itstack-demo-rpc-provider ├── itstack-demo-rpc-provider-export │ └── src │ └── main │ └── java │ └── org.itstack.demo.rpc.provider.export │ ├── domain │ │ └── Hi.java │ └── HelloService.java │ └── itstack-demo-rpc-provider-web └── src └── main ├── java │ └── org.itstack.demo.rpc.provider.web │ └── HelloServiceImpl.java └── resources └── spring └── spring-itstack-rpc-provider.xml
HelloService.java
public interface HelloService { String hi(); String say(String str); String sayHi(Hi hi); }
HelloServiceImpl.java
@Controller("helloService") public class HelloServiceImpl implements HelloService { @Override public String hi() { return "hi itstack rpc"; } @Override public String say(String str) { return str; } @Override public String sayHi(Hi hi) { return hi.getUserName() + " say:" + hi.getSayMsg(); } }
spring-itstack-rpc-provider.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rpc="http://rpc.itstack.org/schema/rpc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://rpc.itstack.org/schema/rpc http://rpc.itstack.org/schema/rpc/rpc.xsd"> <!-- 注册中心 --> <rpc:server id="rpcServer" host="127.0.0.1" port="6379"/> <rpc:provider id="helloServiceRpc" nozzle="org.itstack.demo.rpc.provider.export.HelloService" ref="helloService" alias="itstackRpc"/> </beans>
itstack-demo-rpc-consumer 提供消费者调用
itstack-demo-rpc-consumer └── src ├── main │ ├── java │ └── resources │ └── spring │ └── spring-itstack-rpc-consumer.xml │ └── test └── java └── org.itstack.demo.test └── ConsumerTest.java
spring-itstack-rpc-consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rpc="http://rpc.itstack.org/schema/rpc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://rpc.itstack.org/schema/rpc http://rpc.itstack.org/schema/rpc/rpc.xsd"> <!-- 注册中心 --> <rpc:server id="consumer_itstack" host="127.0.0.1" port="6379"/> <rpc:consumer id="helloService" nozzle="org.itstack.demo.rpc.provider.export.HelloService" alias="itstackRpc"/> </beans>
ConsumerTest.java
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("/spring-config.xml") public class ConsumerTest { @Resource(name = "helloService") private HelloService helloService; @Test public void test() { String hi = helloService.hi(); System.out.println("测试结果:" + hi); String say = helloService.say("hello world"); System.out.println("测试结果:" + say); Hi hiReq = new Hi(); hiReq.setUserName("付栈"); hiReq.setSayMsg("付可敌国,栈无不胜"); String hiRes = helloService.sayHi(hiReq); System.out.println("测试结果:" + hiRes); } }
应用,测试结果 测试时启动redis
启动ProviderTest Redis中的注册数据
redis 127.0.0.1:6379> srandmember org.itstack.demo.rpc.provider.export.HelloService_itstackRpc "{\"alias\":\"itstackRpc\",\"host\":\"10.13.81.104\",\"nozzle\":\"org.itstack.demo.rpc.provider.export.HelloService\",\"port\":22201,\"ref\":\"helloService\"}" redis 127.0.0.1:6379>
执行ConsumerTest中的单元测试方法
log4j:WARN No appenders could be found for logger (org.springframework.test.context.junit4.SpringJUnit4ClassRunner). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 测试结果:hi itstack rpc 测试结果:hello world 测试结果:付栈 say:付可敌国,栈无不胜 Process finished with exit code 0