手写RPC框架第三章《RPC中间件》

案例介绍

结合上面两章节,本章将实现rpc的基础功能;提供一个rpc中间件jar给生产端和服务端。

技术点;

1、注册中心,生产者在启动的时候需要将本地接口发布到注册中心,我们这里采用redis作为注册中心,随机取数模拟权重。

2、客户端在启动的时候,连接到注册中心,也就是我们的redis。连接成功后将配置的生产者方法发布到注册中心{接口+别名}。

3、服务端配置生产者的信息后,在加载xml时候由中间件生成动态代理类,当发生发放调用时实际则调用了我们代理类的方法,代理里会通过netty的futuer通信方式进行数据交互。

环境准备

1、jdk 1.8.0

2、IntelliJ IDEA Community Edition 2018.3.1 x64

3、windows redis

代码示例

| 源码获取,关注公众号:bugstack虫洞栈 | 回复RPC案例

1itstack-demo-rpc-03
 2└── src
 3    └── main
 4    │    ├── java
 5    │    │    └── org.itstack.demo.rpc
 6    │    │        ├── config
 7    │    │        ├── domain
 8    │    │        ├── network    
 9    │    │        │   ├── client
10    │    │        │   │   ├── ClientSocket.java
11    │    │        │   │   └── MyClientHandler.java  
12    │    │        │   ├── codec
13    │    │        │   │   ├── RpcDecoder.java
14    │    │        │   │   └── RpcEncoder.java  
15    │    │        │   ├── future
16    │    │        │   │   ├── SyncWrite.java     
17    │    │        │   │   ├── SyncWriteFuture.java    
18    │    │        │   │   ├── SyncWriteMap.java    
19    │    │        │   │   └── WriteFuture.java    
20    │    │        │   ├── msg
21    │    │        │   │   ├── Request.java
22    │    │        │   │   └── Response.java 
23    │    │        │   ├── server
24    │    │        │   │   ├── MyServerHandler.java
25    │    │        │   │   └── ServerSocket.java     
26    │    │        │   └── util
27    │    │        │       └── SerializationUtil.java    
28    │    │        ├── reflect
29    │    │        │   ├── JDKInvocationHandler.java    
30    │    │        │   └── JDKProxy.java
31    │    │        ├── registry
32    │    │        │   └── RedisRegistryCenter.java    
33    │    │        └── util    
34    │      └── resource
35    │        └── META-INF
36    │            ├── rpc.xsd
37    │            ├── spring.handlers
38    │            └── spring.schemas    
39
40    └── test
41         ├── java
42         │   └── org.itstack.demo.test
43         │       ├── service
44         │       │   ├── impl
45         │        │   │   └── HelloServiceImpl.java  
46         │        │   └── HelloService.java
47         │       └── ApiTest.java                 
48         └── resource  
49             ├── itstack-rpc-center.xml
50             ├── itstack-rpc-consumer.xml         
51             ├── itstack-rpc-provider.xml
52             └── log4j.xml

ConsumerBean.java

1package org.itstack.demo.rpc.config.spring.bean;
 2
 3import com.alibaba.fastjson.JSON;
 4import io.netty.channel.ChannelFuture;
 5import org.itstack.demo.rpc.config.ConsumerConfig;
 6import org.itstack.demo.rpc.domain.RpcProviderConfig;
 7import org.itstack.demo.rpc.network.client.ClientSocket;
 8import org.itstack.demo.rpc.network.msg.Request;
 9import org.itstack.demo.rpc.reflect.JDKProxy;
10import org.itstack.demo.rpc.registry.RedisRegistryCenter;
11import org.itstack.demo.rpc.util.ClassLoaderUtils;
12import org.springframework.beans.factory.FactoryBean;
13import org.springframework.util.Assert;
14
15/**
16 * http://www.itstack.org
17 * create by fuzhengwei on 2019/5/6
18 */
19public class ConsumerBean<T> extends ConsumerConfig<T> implements FactoryBean {
20
21    private ChannelFuture channelFuture;
22
23    private RpcProviderConfig rpcProviderConfig;
24
25    @Override
26    public Object getObject() throws Exception {
27
28        //从redis获取链接
29        if (null == rpcProviderConfig) {
30            String infoStr = RedisRegistryCenter.obtainProvider(nozzle, alias);
31            rpcProviderConfig = JSON.parseObject(infoStr, RpcProviderConfig.class);
32        }
33        Assert.isTrue(null != rpcProviderConfig);
34
35        //获取通信channel
36        if (null == channelFuture) {
37            ClientSocket clientSocket = new ClientSocket(rpcProviderConfig.getHost(), rpcProviderConfig.getPort());
38            new Thread(clientSocket).start();
39            for (int i = 0; i < 100; i++) {
40                if (null != channelFuture) break;
41                Thread.sleep(500);
42                channelFuture = clientSocket.getFuture();
43            }
44        }
45        Assert.isTrue(null != channelFuture);
46
47        Request request = new Request();
48        request.setChannel(channelFuture.channel());
49        request.setNozzle(nozzle);
50        request.setRef(rpcProviderConfig.getRef());
51        request.setAlias(alias);
52        return (T) JDKProxy.getProxy(ClassLoaderUtils.forName(nozzle), request);
53    }
54
55    @Override
56    public Class<?> getObjectType() {
57        try {
58            return ClassLoaderUtils.forName(nozzle);
59        } catch (ClassNotFoundException e) {
60            return null;
61        }
62    }
63
64    @Override
65    public boolean isSingleton() {
66        return true;
67    }
68
69
70}

ProviderBean.java

1package org.itstack.demo.rpc.config.spring.bean;
 2
 3import com.alibaba.fastjson.JSON;
 4import org.itstack.demo.rpc.config.ProviderConfig;
 5import org.itstack.demo.rpc.domain.LocalServerInfo;
 6import org.itstack.demo.rpc.domain.RpcProviderConfig;
 7import org.itstack.demo.rpc.registry.RedisRegistryCenter;
 8import org.slf4j.Logger;
 9import org.slf4j.LoggerFactory;
10import org.springframework.beans.BeansException;
11import org.springframework.context.ApplicationContext;
12import org.springframework.context.ApplicationContextAware;
13
14/**
15 * http://www.itstack.org
16 * create by fuzhengwei on 2019/5/6
17 */
18public class ProviderBean extends ProviderConfig implements ApplicationContextAware {
19
20    private Logger logger = LoggerFactory.getLogger(ProviderBean.class);
21
22    @Override
23    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
24
25        RpcProviderConfig rpcProviderConfig = new RpcProviderConfig();
26        rpcProviderConfig.setNozzle(nozzle);
27        rpcProviderConfig.setRef(ref);
28        rpcProviderConfig.setAlias(alias);
29        rpcProviderConfig.setHost(LocalServerInfo.LOCAL_HOST);
30        rpcProviderConfig.setPort(LocalServerInfo.LOCAL_PORT);
31
32        //注册生产者
33        long count = RedisRegistryCenter.registryProvider(nozzle, alias, JSON.toJSONString(rpcProviderConfig));
34
35        logger.info("注册生产者:{} {} {}", nozzle, alias, count);
36    }
37
38}

ServerBean.java

1package org.itstack.demo.rpc.config.spring.bean;
 2
 3import org.itstack.demo.rpc.config.ServerConfig;
 4import org.itstack.demo.rpc.domain.LocalServerInfo;
 5import org.itstack.demo.rpc.network.server.ServerSocket;
 6import org.itstack.demo.rpc.registry.RedisRegistryCenter;
 7import org.slf4j.Logger;
 8import org.slf4j.LoggerFactory;
 9import org.springframework.beans.BeansException;
10import org.springframework.beans.factory.InitializingBean;
11import org.springframework.context.ApplicationContext;
12import org.springframework.context.ApplicationContextAware;
13
14/**
15 * http://www.itstack.org
16 * create by fuzhengwei on 2019/5/6
17 */
18public class ServerBean extends ServerConfig implements ApplicationContextAware {
19
20    private Logger logger = LoggerFactory.getLogger(ServerBean.class);
21
22    @Override
23    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
24        //启动注册中心
25        logger.info("启动注册中心 ...");
26        RedisRegistryCenter.init(host, port);
27        logger.info("启动注册中心完成 {} {}", host, port);
28
29        //初始化服务端
30        logger.info("初始化生产端服务 ...");
31        ServerSocket serverSocket = new ServerSocket(applicationContext);
32        Thread thread = new Thread(serverSocket);
33        thread.start();
34        while (!serverSocket.isActiveSocketServer()) {
35            try {
36                Thread.sleep(500);
37            } catch (InterruptedException ignore) {
38            }
39        }
40
41        logger.info("初始化生产端服务完成 {} {}", LocalServerInfo.LOCAL_HOST, LocalServerInfo.LOCAL_PORT);
42    }
43
44
45}

MyClientHandler.java

1package org.itstack.demo.rpc.network.client;
 2
 3import io.netty.channel.ChannelHandlerContext;
 4import io.netty.channel.ChannelInboundHandlerAdapter;
 5import org.itstack.demo.rpc.network.future.SyncWriteFuture;
 6import org.itstack.demo.rpc.network.future.SyncWriteMap;
 7import org.itstack.demo.rpc.network.msg.Response;
 8
 9/**
10 * http://www.itstack.org
11 * create by fuzhengwei on 2019/5/6
12 */
13public class MyClientHandler extends ChannelInboundHandlerAdapter {
14
15    @Override
16    public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
17        Response msg = (Response) obj;
18        String requestId = msg.getRequestId();
19        SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId);
20        if (future != null) {
21            future.setResponse(msg);
22        }
23    }
24
25    @Override
26    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
27        cause.printStackTrace();
28        ctx.close();
29    }
30
31}

MyServerHandler.java

1package org.itstack.demo.rpc.network.server;
 2
 3import io.netty.channel.ChannelHandlerContext;
 4import io.netty.channel.ChannelInboundHandlerAdapter;
 5import io.netty.util.ReferenceCountUtil;
 6import org.itstack.demo.rpc.network.msg.Request;
 7import org.itstack.demo.rpc.network.msg.Response;
 8import org.itstack.demo.rpc.util.ClassLoaderUtils;
 9import org.springframework.context.ApplicationContext;
10
11import java.lang.reflect.Method;
12
13/**
14 * http://www.itstack.org
15 * create by fuzhengwei on 2019/5/6
16 */
17public class MyServerHandler extends ChannelInboundHandlerAdapter {
18
19    private ApplicationContext applicationContext;
20
21    MyServerHandler(ApplicationContext applicationContext) {
22        this.applicationContext = applicationContext;
23    }
24
25    @Override
26    public void channelRead(ChannelHandlerContext ctx, Object obj) {
27        try {
28            Request msg = (Request) obj;
29            //调用
30            Class<?> classType = ClassLoaderUtils.forName(msg.getNozzle());
31            Method addMethod = classType.getMethod(msg.getMethodName(), msg.getParamTypes());
32            Object objectBean = applicationContext.getBean(msg.getRef());
33            Object result = addMethod.invoke(objectBean, msg.getArgs());
34            //反馈
35            Response request = new Response();
36            request.setRequestId(msg.getRequestId());
37            request.setResult(result);
38            ctx.writeAndFlush(request);
39            //释放
40            ReferenceCountUtil.release(msg);
41        } catch (Exception e) {
42            e.printStackTrace();
43        }
44    }
45
46    @Override
47    public void channelReadComplete(ChannelHandlerContext ctx) {
48        ctx.flush();
49    }
50
51}

JDKInvocationHandler.java

1package org.itstack.demo.rpc.reflect;
 2
 3
 4import org.itstack.demo.rpc.network.future.SyncWrite;
 5import org.itstack.demo.rpc.network.msg.Request;
 6import org.itstack.demo.rpc.network.msg.Response;
 7
 8import java.lang.reflect.InvocationHandler;
 9import java.lang.reflect.Method;
10
11public class JDKInvocationHandler implements InvocationHandler {
12
13    private Request request;
14
15    public JDKInvocationHandler(Request request) {
16        this.request = request;
17    }
18
19    @Override
20    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
21        String methodName = method.getName();
22        Class[] paramTypes = method.getParameterTypes();
23        if ("toString".equals(methodName) && paramTypes.length == 0) {
24            return request.toString();
25        } else if ("hashCode".equals(methodName) && paramTypes.length == 0) {
26            return request.hashCode();
27        } else if ("equals".equals(methodName) && paramTypes.length == 1) {
28            return request.equals(args[0]);
29        }
30        //设置参数
31        request.setMethodName(methodName);
32        request.setParamTypes(paramTypes);
33        request.setArgs(args);
34        request.setRef(request.getRef());
35        Response response = new SyncWrite().writeAndSync(request.getChannel(), request, 5000);
36        //异步调用
37        return response.getResult();
38
39    }
40
41}

JDKProxy.java

1package org.itstack.demo.rpc.reflect;
 2
 3
 4import org.itstack.demo.rpc.network.msg.Request;
 5import org.itstack.demo.rpc.util.ClassLoaderUtils;
 6
 7import java.lang.reflect.InvocationHandler;
 8import java.lang.reflect.Proxy;
 9
10public class JDKProxy {
11
12    public static <T> T getProxy(Class<T> interfaceClass, Request request) throws Exception {
13        InvocationHandler handler = new JDKInvocationHandler(request);
14        ClassLoader classLoader = ClassLoaderUtils.getCurrentClassLoader();
15        T result = (T) Proxy.newProxyInstance(classLoader, new Class[]{interfaceClass}, handler);
16        return result;
17    }
18
19}

RedisRegistryCenter.java

1package org.itstack.demo.rpc.registry;
 2
 3import redis.clients.jedis.Jedis;
 4import redis.clients.jedis.JedisPool;
 5import redis.clients.jedis.JedisPoolConfig;
 6
 7/**
 8 * http://www.itstack.org
 9 * create by fuzhengwei on 2019/5/7
10 * redis 模拟RPC注册中心
11 */
12public class RedisRegistryCenter {
13
14    private static Jedis jedis;   //非切片额客户端连接
15
16    //初始化redis
17    public static void init(String host, int port) {
18        // 池基本配置
19        JedisPoolConfig config = new JedisPoolConfig();
20        config.setMaxIdle(5);
21        config.setTestOnBorrow(false);
22        JedisPool jedisPool = new JedisPool(config, host, port);
23        jedis = jedisPool.getResource();
24    }
25
26    /**
27     * 注册生产者
28     *
29     * @param nozzle 接口
30     * @param alias  别名
31     * @param info   信息
32     * @return 注册结果
33     */
34    public static Long registryProvider(String nozzle, String alias, String info) {
35        return jedis.sadd(nozzle + "_" + alias, info);
36    }
37
38    /**
39     * 获取生产者
40     * 模拟权重,随机获取
41     * @param nozzle 接口名称
42     */
43    public static String obtainProvider(String nozzle, String alias) {
44        return jedis.srandmember(nozzle + "_" + alias);
45    }
46
47    public static Jedis jedis() {
48        return jedis;
49    }
50
51}

ApiTest.java

1public class ApiTest {
2
3    public static void main(String[] args) {
4        String[] configs = {"itstack-rpc-center.xml", "itstack-rpc-provider.xml", "itstack-rpc-consumer.xml"};
5        new ClassPathXmlApplicationContext(configs);
6    }
7
8}

框架,测试结果

12019-....ClassPathXmlApplicationContext:prepareRefresh:510] - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@299a06ac: startup date [Tue May 07 20:19:47 CST 2019]; root of context hierarchy
 22019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-center.xml]
 32019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-provider.xml]
 42019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-consumer.xml]
 52019-...upport.DefaultListableBeanFactory:preInstantiateSingletons:577] - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@7e0b0338: defining beans [consumer_itstack,provider_helloService,consumer_helloService]; root of factory hierarchy
 62019-...bean.ServerBean:setApplicationContext:25] - 启动注册中心 ...
 72019-...bean.ServerBean:setApplicationContext:27] - 启动注册中心完成 127.0.0.1 6379
 82019-...bean.ServerBean:setApplicationContext:30] - 初始化生产端服务 ...
 92019-...bean.ServerBean:setApplicationContext:41] - 初始化生产端服务完成 10.13.81.104 22201
102019-...bean.ProviderBean:setApplicationContext:35] - 注册生产者:org.itstack.demo.test.service.HelloService itStackRpc 0

框架应用

为了测试我们写两个测试工程;itstack-demo-rpc-provider、itstack-demo-rpc-consumer

itstack-demo-rpc-provider 提供生产者接口

1itstack-demo-rpc-provider
 2├── itstack-demo-rpc-provider-export
 3│   └── src
 4│        └── main
 5│            └── java
 6│                 └── org.itstack.demo.rpc.provider.export
 7│                     ├── domain 
 8│                     │   └── Hi.java
 9│                     └── HelloService.java
10│   
11└── itstack-demo-rpc-provider-web
12    └── src
13         └── main
14             ├── java
15             │    └── org.itstack.demo.rpc.provider.web
16             │        └── HelloServiceImpl.java
17             └── resources
18                  └── spring
19                      └── spring-itstack-rpc-provider.xml

HelloService.java

1public interface HelloService {
2
3    String hi();
4
5    String say(String str);
6
7    String sayHi(Hi hi);
8
9}

HelloServiceImpl.java

1@Controller("helloService")
 2public class HelloServiceImpl implements HelloService {
 3
 4    @Override
 5    public String hi() {
 6        return "hi itstack rpc";
 7    }
 8
 9    @Override
10    public String say(String str) {
11        return str;
12    }
13
14    @Override
15    public String sayHi(Hi hi) {
16        return hi.getUserName() + " say:" + hi.getSayMsg();
17    }
18
19}

spring-itstack-rpc-provider.xml

1<?xml version="1.0" encoding="UTF-8"?>
 2<beans xmlns="http://www.springframework.org/schema/beans"
 3       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rpc="http://rpc.itstack.org/schema/rpc"
 4       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
 5      http://rpc.itstack.org/schema/rpc http://rpc.itstack.org/schema/rpc/rpc.xsd">
 6
 7    <!-- 注册中心 -->
 8    <rpc:server id="rpcServer" host="127.0.0.1" port="6379"/>
 9
10    <rpc:provider id="helloServiceRpc" nozzle="org.itstack.demo.rpc.provider.export.HelloService"
11                  ref="helloService" alias="itstackRpc"/>
12
13</beans>

itstack-demo-rpc-consumer 提供消费者调用

1itstack-demo-rpc-consumer
 2└── src
 3     ├── main
 4     │   ├── java    
 5     │   └── resources
 6     │       └── spring
 7     │           └── spring-itstack-rpc-consumer.xml
 8     │   
 9     └── test
10         └── java
11             └── org.itstack.demo.test
12                 └── ConsumerTest.java

spring-itstack-rpc-consumer.xml

1<?xml version="1.0" encoding="UTF-8"?>
 2<beans xmlns="http://www.springframework.org/schema/beans"
 3       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rpc="http://rpc.itstack.org/schema/rpc"
 4       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
 5      http://rpc.itstack.org/schema/rpc http://rpc.itstack.org/schema/rpc/rpc.xsd">
 6
 7    <!-- 注册中心 -->
 8    <rpc:server id="consumer_itstack" host="127.0.0.1" port="6379"/>
 9
10    <rpc:consumer id="helloService" nozzle="org.itstack.demo.rpc.provider.export.HelloService" alias="itstackRpc"/>
11
12</beans>

ConsumerTest.java

1@RunWith(SpringJUnit4ClassRunner.class)
 2@ContextConfiguration("/spring-config.xml")
 3public class ConsumerTest {
 4
 5    @Resource(name = "helloService")
 6    private HelloService helloService;
 7
 8    @Test
 9    public void test() {
10
11        String hi = helloService.hi();
12        System.out.println("测试结果:" + hi);
13
14        String say = helloService.say("hello world");
15        System.out.println("测试结果:" + say);
16
17        Hi hiReq = new Hi();
18        hiReq.setUserName("付栈");
19        hiReq.setSayMsg("付可敌国,栈无不胜");
20        String hiRes = helloService.sayHi(hiReq);
21
22        System.out.println("测试结果:" + hiRes);
23    }
24
25}

应用,测试结果

测试时启动redis

启动ProviderTest Redis中的注册数据

1redis 127.0.0.1:6379> srandmember org.itstack.demo.rpc.provider.export.HelloService_itstackRpc
2"{\"alias\":\"itstackRpc\",\"host\":\"10.13.81.104\",\"nozzle\":\"org.itstack.demo.rpc.provider.export.HelloService\",\"port\":22201,\"ref\":\"helloService\"}"
3redis 127.0.0.1:6379>

执行ConsumerTest中的单元测试方法

1log4j:WARN No appenders could be found for logger (org.springframework.test.context.junit4.SpringJUnit4ClassRunner).
2log4j:WARN Please initialize the log4j system properly.
3log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
4测试结果:hi itstack rpc
5测试结果:hello world
6测试结果:付栈 say:付可敌国,栈无不胜
7
8Process finished with exit code 0
9


上一篇:netty案例,netty4.1基础入门篇一《嗨!NettyServer》


下一篇:netty案例,netty4.1基础入门篇四《NettyServer收发数据》