suging闲谈-netty 的异步非阻塞IO线程与业务线程分离

前言

surging 对外沉寂了一段时间了,但是作者并没有闲着,而是针对于客户的需要添加了不少功能,也给我带来了不少外快收益, 就比如协议转化,consul 的watcher 机制,JAVA版本,skywalking 升级支持8.0,.升级NET 6.0 ,而客户自己扩展支持服务编排流程引擎,后期客户还需要扩展定制coap ,XMPP等协议。而今天写这篇文章的目的针对于修改基于netty 的异步非阻塞业务逻辑操作

问题描述

年前客户把JAVA版本进行了测试,产生了不少问题,客户也比较茫然,因为有内存泄漏,通过jmeter压测,并发始终上不来,通过半个月的努力,终于把问题解决了,预估JAVA版本并发能达到2万左右,以下是客户通过设置jmeter压测实例

suging闲谈-netty 的异步非阻塞IO线程与业务线程分离

 

 

 suging闲谈-netty 的异步非阻塞IO线程与业务线程分离

 

 

 suging闲谈-netty 的异步非阻塞IO线程与业务线程分离

 

 

 

 

解决方案

当客户把问题抛给我后,我第一反应是IO线程被阻塞造成的,而这样就可以把问题定位在netty 的处理上,而处理server 端代码是NettyServerMessageListener,而其中ServerHandler的channelRead是处理业务逻辑的,在这当中我是通过ThreadPoolExecutor执行异步处理,可以看看NettyServerMessageListener代码:

public class NettyServerMessageListener implements IMessageListener {
    private Thread thread;
    private static final Logger logger = LoggerFactory.getLogger(NettyServerMessageListener.class);
    private ChannelFuture channel;
    private final ITransportMessageDecoder transportMessageDecoder;
    private final ITransportMessageEncoder transportMessageEncoder;
    ReceivedDelegate Received = new ReceivedDelegate();
    @Inject
    public NettyServerMessageListener(  ITransportMessageCodecFactory codecFactory)
    {
        this.transportMessageEncoder = codecFactory.GetEncoder();
        this.transportMessageDecoder = codecFactory.GetDecoder();
    }

    public void StartAsync(final String serverAddress) {
        thread = new Thread(new Runnable() {
            int parallel = Runtime.getRuntime().availableProcessors();
            final DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(parallel);
            ThreadFactory threadFactory = new DefaultThreadFactory("rpc-netty", true);
            public void run() {
                String[] array = serverAddress.split(":");
                logger.debug("准备启动服务主机,监听地址:" + array[0] + "" + array[1] + "。");
                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workerGroup = new NioEventLoopGroup(parallel,threadFactory);
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup).option(ChannelOption.SO_BACKLOG,128)
                        .childOption(ChannelOption.SO_KEEPALIVE,true).childOption(ChannelOption.TCP_NODELAY, true).channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline()
                                        .addLast(new LengthFieldPrepender(4))
                                        .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
                                        .addLast(new ServerHandler(eventExecutors,new ReadAction<ChannelHandlerContext, TransportMessage>() {
                                                    @Override
                                                    public void run() {
                                                        IMessageSender sender = new NettyServerMessageSender(transportMessageEncoder, this.parameter);
                                                        onReceived(sender, this.parameter1);
                                                    }
                                                },transportMessageDecoder)
                                        );
                            }
                        })
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
                try {

                    String host = array[0];
                    int port = Integer.parseInt(array[1]);
                    channel = bootstrap.bind(host, port).sync();
                    logger.debug("服务主机启动成功,监听地址:" + serverAddress + "。");
                } catch (Exception e) {
                    if (e instanceof InterruptedException) {
                        logger.info("Rpc server remoting server stop");
                    } else {
                        logger.error("Rpc server remoting server error", e);
                    }

                }
            }
        });
        thread.start();
    }


    @Override
    public ReceivedDelegate getReceived() {
        return Received;
    }

    public void onReceived(IMessageSender sender, TransportMessage message) {
        if (Received == null)
            return;
        Received.notifyX(sender,message);
    }

    private class ReadAction<T,T1> implements  Runnable
    {
        public  T parameter;
        public T1 parameter1;
        public void setParameter( T tParameter,T1 tParameter1) {
            parameter = tParameter;
            parameter1 = tParameter1;
        }

        @Override
        public void run() {

        }
    }

    private class ServerHandler extends ChannelInboundHandlerAdapter {
        private final DefaultEventLoopGroup serverHandlerPool;
        private final ReadAction<ChannelHandlerContext, TransportMessage> serverRunnable;
        private final ITransportMessageDecoder transportMessageDecoder;

        public ServerHandler(final DefaultEventLoopGroup threadPoolExecutor, ReadAction<ChannelHandlerContext, TransportMessage> runnable,
                             ITransportMessageDecoder transportMessageDecoder) {
            this.serverHandlerPool = threadPoolExecutor;
            this.serverRunnable = runnable;
            this.transportMessageDecoder = transportMessageDecoder;
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            logger.warn("与服务器:" + ctx.channel().remoteAddress() + "通信时发送了错误。");
            ctx.close();
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext context) {
            context.flush();
        }

        @Override
        public void channelRead(ChannelHandlerContext channelHandlerContext, Object message) throws Exception {
           ByteBuf buffer = (ByteBuf) message;
            try {
                byte[] data = new byte[buffer.readableBytes()];
                buffer.readBytes(data);
                serverHandlerPool.execute(() -> {
                    TransportMessage transportMessage = null;
                    try {
                        transportMessage = transportMessageDecoder.Decode(data);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    serverRunnable.setParameter(channelHandlerContext, transportMessage);
                    serverRunnable.run();
                });
            }
            finally {
                ReferenceCountUtil.release(message);
            }
        }
    }
}

ThreadPoolExecutor代码:

    public static ThreadPoolExecutor makeServerThreadPool(final String serviceName, int corePoolSize, int maxPoolSize) {
        ThreadPoolExecutor serverHandlerPool = new ThreadPoolExecutor(
                corePoolSize,
                maxPoolSize,
                60L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(  10000));
/*
        new LinkedBlockingQueue<Runnable>(10000),
                r -> new Thread(r, "netty-rpc-" + serviceName + "-" + r.hashCode()),
                new ThreadPoolExecutor.AbortPolicy());*/

        return serverHandlerPool;
    }

后面通过查找官方的文档发现以下addLast是IO线程阻塞调用

 .addLast(new ServerHandler(eventExecutors,new ReadAction<ChannelHandlerContext, TransportMessage>() {
                                                    @Override
                                                    public void run() {
                                                        IMessageSender sender = new NettyServerMessageSender(transportMessageEncoder, this.parameter);
                                                        onReceived(sender, this.parameter1);
                                                    }
                                                },transportMessageDecoder)

 后面通过使用EventExecutorGroup把IO线程与业务线程进行分离,把耗时业务处理添加到EventExecutorGroup进行处理,首先EventExecutorGroup代码如下

    public static final EventExecutorGroup execThreadPool = new DefaultEventExecutorGroup( Runtime.getRuntime().availableProcessors()*2,
            (ThreadFactory) r -> {
                Thread thread = new Thread(r);
                thread.setName("custom-tcp-exec-"+r.hashCode());
                return thread;
            },
            100000,
            RejectedExecutionHandlers.reject()
    );

而addLast的ServerHandler添加了EventExecutorGroup, 最新的NettyServerMessageListener代码如下:

public class NettyServerMessageListener implements IMessageListener {
    private Thread thread;
    private static final Logger logger = LoggerFactory.getLogger(NettyServerMessageListener.class);
    private ChannelFuture channel;
    private final ITransportMessageDecoder transportMessageDecoder;
    private final ITransportMessageEncoder transportMessageEncoder;
    ReceivedDelegate Received = new ReceivedDelegate();
    @Inject
    public NettyServerMessageListener(  ITransportMessageCodecFactory codecFactory)
    {
        this.transportMessageEncoder = codecFactory.GetEncoder();
        this.transportMessageDecoder = codecFactory.GetDecoder();
    }

    public void StartAsync(final String serverAddress) {
        thread = new Thread(new Runnable() {
            public void run() {
                String[] array = serverAddress.split(":");
              logger.debug("准备启动服务主机,监听地址:" + array[0] + "" + array[1] + "。");
                EventLoopGroup bossGroup = new NioEventLoopGroup(1);
                EventLoopGroup workerGroup = new NioEventLoopGroup();
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline()
                                        .addLast(new LengthFieldPrepender(4))
                                        .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
                                        .addLast(ThreadPoolUtil.execThreadPool, "handler",new ServerHandler(new ReadAction<ChannelHandlerContext, TransportMessage>() {
                                            @Override
                                            public void run() {
                                                IMessageSender sender = new NettyServerMessageSender(transportMessageEncoder, this.parameter);
                                                 onReceived(sender, this.parameter1);
                                            }
                                        },transportMessageDecoder)
                                        );
                            }
                        })
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
                try {

                    String host = array[0];
                    int port = Integer.parseInt(array[1]);
                    channel = bootstrap.bind(host, port).sync();
                    logger.debug("服务主机启动成功,监听地址:" + serverAddress + "。");
                } catch (Exception e) {
                    if (e instanceof InterruptedException) {
                        logger.info("Rpc server remoting server stop");
                    } else {
                        logger.error("Rpc server remoting server error", e);
                    }

                }
            }
        });
        thread.start();
    }


    @Override
    public ReceivedDelegate getReceived() {
        return Received;
    }

    public void onReceived(IMessageSender sender, TransportMessage message) {
        if (Received == null)
            return;
        Received.notifyX(sender,message);
    }

    private class ReadAction<T,T1> implements  Runnable
    {
        public  T parameter;
        public T1 parameter1;
        public void setParameter( T tParameter,T1 tParameter1) {
            parameter = tParameter;
            parameter1 = tParameter1;
        }

        @Override
        public void run() {

        }
    }

    private class ServerHandler extends ChannelInboundHandlerAdapter {

        private final ReadAction<ChannelHandlerContext, TransportMessage> serverRunnable;
        private final ITransportMessageDecoder transportMessageDecoder;

        public ServerHandler(ReadAction<ChannelHandlerContext, TransportMessage> runnable,
                             ITransportMessageDecoder transportMessageDecoder) {

            this.serverRunnable = runnable;
            this.transportMessageDecoder = transportMessageDecoder;
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            logger.warn("与服务器:" + ctx.channel().remoteAddress() + "通信时发送了错误。");
            ctx.close();
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext context) {
            context.flush();
        }

        @Override
        public void channelRead(ChannelHandlerContext channelHandlerContext, Object message) throws Exception {
            ByteBuf buffer = (ByteBuf) message;
            try {
                byte[] data = new byte[buffer.readableBytes()];
                buffer.readBytes(data);
                TransportMessage  transportMessage = transportMessageDecoder.Decode(data);
                serverRunnable.setParameter(channelHandlerContext, transportMessage);
                serverRunnable.run();
            }
            finally {
                ReferenceCountUtil.release(message);
            }
        }
    }
}

通过以上修改,再通过jmeter压测已经不会出现timeout 问题,就连stage 网关-》.NET微服务-》JAVA微服务都没有Time out问题产生,jmeter的user thread拉长到2000也没有出现问题。

通过以上思路把.NET版本的surging 社区版本也进行了修改,已经提交到github,首先把ServiceHost中的serverMessageListener.Received 中的Task.Run移除,ServerHandler中ChannelRead进行移除,然后addLast的ServerHandler添加了EventExecutorGroup.通过以上修改再通过压测发现可以支持20万+ ,也未发现内存泄漏问题,执行client 1万次 ,服务端cpu 在6%左右,响应速度在1.1秒左右,可以开启多个surging 的client 进行压测,cpu 会叠加上升,响应速度没有影响,以下是执行1万次压测

suging闲谈-netty 的异步非阻塞IO线程与业务线程分离

 

 

 surging 社区版本开源地址

 

 

总结

通过5年研发,surging 从原来的最初的基于netty 的RPC发展到现在可以支持多协议,多语言的异构微服务引擎,不仅是技术的提高,也带来名利的收益,只要不断坚持,终究能看到成果,我也会一直更新,为企业和社区用户带来自己的绵薄之力,让企业能更好的掌握微服务解决方案,已解决现在行业各种不同的业务需求。

上一篇:让自己的npm包支持npm WARN deprecated


下一篇:Routerboard/DR8072V01-wifi6-Qualcomm-IPQ8072A-4T4R-support-OpenWRT-802.11AX-supporting-10GE-port-10G