手把手教你用Netty实现一个RPC框架

写在开头

本文使用Netty简单实现一个RPC框架,包括服务端,客户端,注册中心等,暂时不考虑监控,并且因为使用的Netty,所以使用到了Netty的封装API,所以不熟悉NettyAPI的小伙伴可以先熟悉一下API,我在代码中也注释了相关步骤和逻辑,因为Neety其实就是对网络通信的封装框架,所以底层还是IO那一套。建议大家熟悉一下NIO的三件套,buffer,selector,channel。

项目结构

手把手教你用Netty实现一个RPC框架

  • api  服务接口
  • consumer 消费者
  • protocol 策略对象
  • provider 生产者-api接口服务实现
  • registry 注册中心 

api

/**
 * @author : Ls
 * @ClassName : IRpcService
 * @date : 2021-01-04 15:46
 * @Version : 1.0
 * @Description : RPC基础服务接口
 **/
public interface IRpcDoSomethingService {

    int add(int a,int b);
    int sub(int a,int b);
    int mul(int a,int b);
    int div(int a,int b);

}
/**
 * @author : Ls
 * @ClassName : IRpcHelloService
 * @date : 2021-01-04 15:45
 * @Version : 1.0
 * @Description : Rpc基础服务接口
 **/
public interface IRpcSayHelloService {

    String sayHello(String name);

}

provider

/**
 * @author : Ls
 * @ClassName : IRpcDoSomethingServiceImpl
 * @date : 2021-01-04 15:50
 * @Version : 1.0
 * @Description :
 **/
public class IRpcDoSomethingServiceImpl implements IRpcDoSomethingService {
    public int add(int a, int b) {
        return a + b;
    }

    public int sub(int a, int b) {
        return a - b;
    }

    public int mul(int a, int b) {
        return a * b;
    }

    public int div(int a, int b) {
        return a / b;
    }
}
/**
 * @author : Ls
 * @ClassName : IRpcHelloServiceImpl
 * @date : 2021-01-04 15:47
 * @Version : 1.0
 * @Description :
 **/
public class IRpcSayHelloServiceImpl implements IRpcSayHelloService {

    public String sayHello(String name) {
        return  "Hello "+name+" !!" ;
    }

}

procotol

/**
 * 自定义传输协议
 */
@Data
public class MyInvokerProtocol implements Serializable {

    private String className;//类名
    private String methodName;//函数名称 
    private Class<?>[] parames;//形参列表
    private Object[] values;//实参列表

}

registry

/**
 * @author : Ls
 * @ClassName : IRpcRegistry
 * @date : 2021-01-04 15:52
 * @Version : 1.0
 * @Description : 注册中心
 **/
public class IRpcRegistry {

    // 暴露端口
    private  int port;

    public IRpcRegistry(int port){
        this.port = port;
    }

    /**
     * 开启服务接口监听
     * 调用 netty 相关API实现接口监听
     */
    public void listen(){
        // Boss线程 (Selector核心)
        NioEventLoopGroup boss = new NioEventLoopGroup();
        // Work线程 (工作线程)
        NioEventLoopGroup work = new NioEventLoopGroup();

        // 1. 建立服务
        ServerBootstrap server = new ServerBootstrap();
        // 2. 注入 Boos/Worker
        server.group(boss,work)
                .channel(NioServerSocketChannel.class) // 3. 管道执行 keys 轮询的核心
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel channel) throws Exception {
                        // 5. 对流数据进行解析
                        ChannelPipeline pipeline = channel.pipeline();
                        // 6. 自定义协议解码器 (取决于自己定义的规则对象)
                        /** 入参有5个,分别解释如下
                         *  maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
                         *  lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
                         *  lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8)
                         *  lengthAdjustment:要添加到长度字段值的补偿值
                         *  initialBytesToStrip:从解码帧中去除的第一个字节数
                         */
                        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                        //自定义协议编码器
                        pipeline.addLast(new LengthFieldPrepender(4));
                        // 7. 参数解析
                        //对象参数类型编码器
                        pipeline.addLast("encoder",new ObjectEncoder());
                        //对象参数类型解码器
                        pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                        // 8. 执行业务逻辑
                        pipeline.addLast(new IRegistryHandler());
                    }
                }) // 4. 子线程 执行对应的业务逻辑
                .option(ChannelOption.SO_BACKLOG,128) // 主线程最大连接数
                .childOption(ChannelOption.SO_KEEPALIVE,true); // 子线程持续

        try {
            // 服务绑定端口
            ChannelFuture future = server.bind(port).sync();
            System.out.println("RPC start success, listen port is :" +  port + " !!");
            future.channel().closeFuture().sync(); // 回调
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }

    
    public static void main(String[] args) {
        // 监听启动
        new IRpcRegistry(8080).listen();
    }


}
/**
 * @author : Ls
 * @ClassName : IRegistryHandler
 * @date : 2021-01-04 16:19
 * @Version : 1.0
 * @Description :  业务执行
 **/
public class IRegistryHandler extends ChannelInboundHandlerAdapter {

    // 注册中心 (容器)
    public static ConcurrentHashMap<String,Object> context = new ConcurrentHashMap<String,Object>();
    // 类信息集合
    private List<String> classNames = new ArrayList<String>();

    // 构造器初始化
    public IRegistryHandler(){
        // 包扫描信息
        scannerClass("com.lishuo.netty.rpc.provider");
        // 注册容器
        doRegistry();
    }


    // 递归包扫描
    private void scannerClass(String packageName) {
        // 获取类加载器
        URL url = this.getClass().getClassLoader().getResource(packageName.replaceAll("\\.", "/"));
        File dir = new File(url.getFile());
        for (File file : dir.listFiles()) {
            //如果是一个文件夹,继续递归
            if(file.isDirectory()){
                scannerClass(packageName + "." + file.getName());
            }else{
                classNames.add(packageName + "." + file.getName().replace(".class", "").trim());
            }
        }
    }

    // 注册
    private void doRegistry() {
        if(classNames.isEmpty()){return;}
        for (String className : classNames) {
            try {
                Class<?> clazz = Class.forName(className);
                Class<?> anInterface = clazz.getInterfaces()[0]; // 因为此demo默认实现一个接口  所以此处写死获取当前第一个接口信息
                context.put(anInterface.getName(),clazz.newInstance());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    // 请求到达  执行执行的业务逻辑
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Object result = new Object();
        MyInvokerProtocol request = (MyInvokerProtocol) msg; // netty 会按照我们自定义的策略进行转换
        // 判断当前调用服务在容器中是否真正存在
        if(context.containsKey(request.getClassName())){
            // 确实存在执行对应的业务逻辑
            Object clazz = context.get(request.getClassName());
            // 获取真正执行的
            Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParames());
            result = method.invoke(clazz, request.getValues());
        }
        if(result != null){
            ctx.write(result);
        }
        ctx.flush();
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }


}

consumer

/**
 * @author : Ls
 * @ClassName : IRpcConsumer
 * @date : 2021-01-04 16:58
 * @Version : 1.0
 * @Description :
 **/
public class IRpcConsumer {

    public static void main(String[] args) {
        IRpcSayHelloService invoke = IRpcProxy.invoke(IRpcSayHelloService.class);
        System.out.println(invoke.sayHello("netty"));

        IRpcDoSomethingService invoke1 = IRpcProxy.invoke(IRpcDoSomethingService.class);
        System.out.println(invoke1.add(2,4));
        System.out.println(invoke1.mul(2,4));
        System.out.println(invoke1.sub(2,4));
        System.out.println(invoke1.div(2,4));

    }

}
/**
 * @author : Ls
 * @ClassName : IRpcProxy
 * @date : 2021-01-04 16:59
 * @Version : 1.0
 * @Description :
 **/
public class IRpcProxy {

    public static <T> T invoke(Class<T> clazz){
        Class<?> [] interfaces = clazz.isInterface() ?
                new Class[]{clazz} :
                clazz.getInterfaces();
       T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,new ConsumerProxyHandler(clazz));
       return result;
    }

}
/**
 * @author : Ls
 * @ClassName : ConsumerProxyHandler
 * @date : 2021-01-04 17:09
 * @Version : 1.0
 * @Description :
 **/
public class ConsumerProxyHandler implements InvocationHandler {

    private Class<?> clazz;
    public ConsumerProxyHandler(Class<?> clazz){
        this.clazz = clazz;
    }

    // 动态代理执行相应业务逻辑
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if(Object.class.equals(method.getDeclaringClass())){
            // 如果当前就是一个实现类
            return method.invoke(proxy,args);
        }else{
            return rpcInvoke(method,args);
        }
    }

    private Object rpcInvoke(Method method, Object[] args) {
        MyInvokerProtocol request = new MyInvokerProtocol();
        request.setClassName(this.clazz.getName()); // 类名称
        request.setMethodName(method.getName()); // 方法名称
        request.setParames(method.getParameterTypes()); // 入参列表
        request.setValues(args); // 实参列表

        // TCP 远程调用
        final IRpcProxyHandler consumerHandler = new IRpcProxyHandler();

        NioEventLoopGroup work = new NioEventLoopGroup();

        Bootstrap server = new Bootstrap();
        server.group(work)
                .channel(NioSocketChannel.class)// 客户端管道
                .option(ChannelOption.TCP_NODELAY, true) // 开启
                .handler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                        //自定义协议编码器
                        pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                        //对象参数类型编码器
                        pipeline.addLast("encoder", new ObjectEncoder());
                        //对象参数类型解码器
                        pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                        pipeline.addLast("handler",consumerHandler);
                    }
                });
        ChannelFuture future = null;
        try {
            future = server.connect("localhost", 8080).sync();
            future.channel().writeAndFlush(request).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            work.shutdownGracefully();
        }
        return consumerHandler.getResponse();
    }

}
/**
 * @author : Ls
 * @ClassName : IRpcProxyHandler
 * @date : 2021-01-04 17:18
 * @Version : 1.0
 * @Description :
 **/
public class IRpcProxyHandler extends ChannelInboundHandlerAdapter {

    private Object response;

    public Object getResponse() {
        return response;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        response = msg;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

 

上一篇:Java开发之路—Java反射机制


下一篇:Spring工具类