Netty实现RPC

Netty实现RPC

1.服务接口定义

public interface CityService
{
    String getCityName();
}

2.服务提供者

2.1服务接口实现

public class CityServiceImpl implements CityService
{
    @Override
    public String getCityName()
    {
        return "厦门";
    }
}

2.2服务netty server实现

public class NettyServer
{
    public static void main(String[] args) throws Exception
    {
        new NettyServer().start();
    }

    private static Map<String, CityService> serviceMap = new HashMap<>();

    public void start() throws Exception
    {
        initService();
        initNettyServer();
    }

    private void initService()
    {
        serviceMap.put("com.yalong.NIO.rpc.CityService", new CityServiceImpl());
    }

    private void initNettyServer() throws InterruptedException
    {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try
        {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.
                    group(bossGroup, workGroup).
                    channel(NioServerSocketChannel.class).
                    option(ChannelOption.SO_BACKLOG, 1024).
                    childOption(ChannelOption.SO_KEEPALIVE, true).
                    childHandler(new ChannelInitializer<SocketChannel>()
                    {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception
                        {
                            ChannelPipeline pipeline = channel.pipeline();
                            pipeline.addLast(new ObjectEncoder());
                            pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
                                    ClassResolvers.cacheDisabled(null)));
                            pipeline.addLast(new NettyServerHandler(serviceMap));
                        }
                    });
            ChannelFuture channelFuture = bootstrap.bind(8888).sync();
            channelFuture.channel().closeFuture().sync();

        } finally
        {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

2.3接口对应的处理器

public class NettyServerHandler extends ChannelInboundHandlerAdapter
{
    private Map<String, CityService> serviceMap;

    public NettyServerHandler(Map<String, CityService> serviceMap)
    {
        this.serviceMap = serviceMap;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
    {
        if (msg instanceof Msg)
        {
            String className = ((Msg) msg).getClassName();
            CityService cityService = serviceMap.get(className);
            if (cityService != null)
            {
                String cityName = cityService.getCityName();
                ctx.writeAndFlush(cityName);
            }
        }else
        {
            System.out.println("++++================++++");
        }
        ctx.close();

    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

3.服务消费者

3.1服务Client端实现

public class NettyClient
{
    public static void main(String[] args) throws Exception
    {
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();

        NettyClientHandler nettyClientHandler = new NettyClientHandler();
        //启动netty client
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>()
                {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception
                    {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new ObjectEncoder());
                        pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
                                ClassResolvers.cacheDisabled(null)));
                        pipeline.addLast(nettyClientHandler);
                    }
                });
        ChannelFuture future = bootstrap.connect("localhost", 8888).sync();

        //通过代理模式发送消息
        CityService cityService = (CityService) Proxy.newProxyInstance(CityService.class.getClassLoader(),
                new Class[]{CityService.class}, new InvocationHandler()
                {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
                    {
                        //发送远程调用
                        System.out.println("=========开始发送远程调用===========");
                        String clazzName = CityService.class.getName();
                        Msg msg = new Msg();
                        msg.setClassName(clazzName);

                        try
                        {
                            future.channel().writeAndFlush(msg);
                            future.channel().closeFuture().sync();
                        } finally
                        {
                            group.shutdownGracefully();
                        }
                        return nettyClientHandler.getResult();
                    }
                });
        String cityName = cityService.getCityName();
        System.out.println("=====远程调用结果====="+cityName);
    }

}

3.2服务Client端处理器

public class NettyClientHandler extends ChannelInboundHandlerAdapter{

    public Object getResult()
    {
        return result;
    }

    private Object result;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
    {
        this.result = msg;
        super.channelRead(ctx, msg);
    }
}
上一篇:字符流介绍


下一篇:APM - Javassist 入门 生成一个简单类