前言
surging 对外沉寂了一段时间了,但是作者并没有闲着,而是针对于客户的需要添加了不少功能,也给我带来了不少外快收益, 就比如协议转化,consul 的watcher 机制,JAVA版本,skywalking 升级支持8.0,.升级NET 6.0 ,而客户自己扩展支持服务编排流程引擎,后期客户还需要扩展定制coap ,XMPP等协议。而今天写这篇文章的目的针对于修改基于netty 的异步非阻塞业务逻辑操作
问题描述
年前客户把JAVA版本进行了测试,产生了不少问题,客户也比较茫然,因为有内存泄漏,通过jmeter压测,并发始终上不来,通过半个月的努力,终于把问题解决了,预估JAVA版本并发能达到2万左右,以下是客户通过设置jmeter压测实例
解决方案
当客户把问题抛给我后,我第一反应是IO线程被阻塞造成的,而这样就可以把问题定位在netty 的处理上,而处理server 端代码是NettyServerMessageListener,而其中ServerHandler的channelRead是处理业务逻辑的,在这当中我是通过ThreadPoolExecutor执行异步处理,可以看看NettyServerMessageListener代码:
public class NettyServerMessageListener implements IMessageListener { private Thread thread; private static final Logger logger = LoggerFactory.getLogger(NettyServerMessageListener.class); private ChannelFuture channel; private final ITransportMessageDecoder transportMessageDecoder; private final ITransportMessageEncoder transportMessageEncoder; ReceivedDelegate Received = new ReceivedDelegate(); @Inject public NettyServerMessageListener( ITransportMessageCodecFactory codecFactory) { this.transportMessageEncoder = codecFactory.GetEncoder(); this.transportMessageDecoder = codecFactory.GetDecoder(); } public void StartAsync(final String serverAddress) { thread = new Thread(new Runnable() { int parallel = Runtime.getRuntime().availableProcessors(); final DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(parallel); ThreadFactory threadFactory = new DefaultThreadFactory("rpc-netty", true); public void run() { String[] array = serverAddress.split(":"); logger.debug("准备启动服务主机,监听地址:" + array[0] + "" + array[1] + "。"); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(parallel,threadFactory); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup).option(ChannelOption.SO_BACKLOG,128) .childOption(ChannelOption.SO_KEEPALIVE,true).childOption(ChannelOption.TCP_NODELAY, true).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new LengthFieldPrepender(4)) .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) .addLast(new ServerHandler(eventExecutors,new ReadAction<ChannelHandlerContext, TransportMessage>() { @Override public void run() { IMessageSender sender = new NettyServerMessageSender(transportMessageEncoder, this.parameter); onReceived(sender, this.parameter1); } },transportMessageDecoder) ); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); try { String host = array[0]; int port = Integer.parseInt(array[1]); channel = bootstrap.bind(host, port).sync(); logger.debug("服务主机启动成功,监听地址:" + serverAddress + "。"); } catch (Exception e) { if (e instanceof InterruptedException) { logger.info("Rpc server remoting server stop"); } else { logger.error("Rpc server remoting server error", e); } } } }); thread.start(); } @Override public ReceivedDelegate getReceived() { return Received; } public void onReceived(IMessageSender sender, TransportMessage message) { if (Received == null) return; Received.notifyX(sender,message); } private class ReadAction<T,T1> implements Runnable { public T parameter; public T1 parameter1; public void setParameter( T tParameter,T1 tParameter1) { parameter = tParameter; parameter1 = tParameter1; } @Override public void run() { } } private class ServerHandler extends ChannelInboundHandlerAdapter { private final DefaultEventLoopGroup serverHandlerPool; private final ReadAction<ChannelHandlerContext, TransportMessage> serverRunnable; private final ITransportMessageDecoder transportMessageDecoder; public ServerHandler(final DefaultEventLoopGroup threadPoolExecutor, ReadAction<ChannelHandlerContext, TransportMessage> runnable, ITransportMessageDecoder transportMessageDecoder) { this.serverHandlerPool = threadPoolExecutor; this.serverRunnable = runnable; this.transportMessageDecoder = transportMessageDecoder; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.warn("与服务器:" + ctx.channel().remoteAddress() + "通信时发送了错误。"); ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext context) { context.flush(); } @Override public void channelRead(ChannelHandlerContext channelHandlerContext, Object message) throws Exception { ByteBuf buffer = (ByteBuf) message; try { byte[] data = new byte[buffer.readableBytes()]; buffer.readBytes(data); serverHandlerPool.execute(() -> { TransportMessage transportMessage = null; try { transportMessage = transportMessageDecoder.Decode(data); } catch (IOException e) { e.printStackTrace(); } serverRunnable.setParameter(channelHandlerContext, transportMessage); serverRunnable.run(); }); } finally { ReferenceCountUtil.release(message); } } } }
ThreadPoolExecutor代码:
public static ThreadPoolExecutor makeServerThreadPool(final String serviceName, int corePoolSize, int maxPoolSize) { ThreadPoolExecutor serverHandlerPool = new ThreadPoolExecutor( corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>( 10000)); /* new LinkedBlockingQueue<Runnable>(10000), r -> new Thread(r, "netty-rpc-" + serviceName + "-" + r.hashCode()), new ThreadPoolExecutor.AbortPolicy());*/ return serverHandlerPool; }
后面通过查找官方的文档发现以下addLast是IO线程阻塞调用
.addLast(new ServerHandler(eventExecutors,new ReadAction<ChannelHandlerContext, TransportMessage>() { @Override public void run() { IMessageSender sender = new NettyServerMessageSender(transportMessageEncoder, this.parameter); onReceived(sender, this.parameter1); } },transportMessageDecoder)
后面通过使用EventExecutorGroup把IO线程与业务线程进行分离,把耗时业务处理添加到EventExecutorGroup进行处理,首先EventExecutorGroup代码如下
public static final EventExecutorGroup execThreadPool = new DefaultEventExecutorGroup( Runtime.getRuntime().availableProcessors()*2, (ThreadFactory) r -> { Thread thread = new Thread(r); thread.setName("custom-tcp-exec-"+r.hashCode()); return thread; }, 100000, RejectedExecutionHandlers.reject() );
而addLast的ServerHandler添加了EventExecutorGroup, 最新的NettyServerMessageListener代码如下:
public class NettyServerMessageListener implements IMessageListener { private Thread thread; private static final Logger logger = LoggerFactory.getLogger(NettyServerMessageListener.class); private ChannelFuture channel; private final ITransportMessageDecoder transportMessageDecoder; private final ITransportMessageEncoder transportMessageEncoder; ReceivedDelegate Received = new ReceivedDelegate(); @Inject public NettyServerMessageListener( ITransportMessageCodecFactory codecFactory) { this.transportMessageEncoder = codecFactory.GetEncoder(); this.transportMessageDecoder = codecFactory.GetDecoder(); } public void StartAsync(final String serverAddress) { thread = new Thread(new Runnable() { public void run() { String[] array = serverAddress.split(":"); logger.debug("准备启动服务主机,监听地址:" + array[0] + "" + array[1] + "。"); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new LengthFieldPrepender(4)) .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) .addLast(ThreadPoolUtil.execThreadPool, "handler",new ServerHandler(new ReadAction<ChannelHandlerContext, TransportMessage>() { @Override public void run() { IMessageSender sender = new NettyServerMessageSender(transportMessageEncoder, this.parameter); onReceived(sender, this.parameter1); } },transportMessageDecoder) ); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); try { String host = array[0]; int port = Integer.parseInt(array[1]); channel = bootstrap.bind(host, port).sync(); logger.debug("服务主机启动成功,监听地址:" + serverAddress + "。"); } catch (Exception e) { if (e instanceof InterruptedException) { logger.info("Rpc server remoting server stop"); } else { logger.error("Rpc server remoting server error", e); } } } }); thread.start(); } @Override public ReceivedDelegate getReceived() { return Received; } public void onReceived(IMessageSender sender, TransportMessage message) { if (Received == null) return; Received.notifyX(sender,message); } private class ReadAction<T,T1> implements Runnable { public T parameter; public T1 parameter1; public void setParameter( T tParameter,T1 tParameter1) { parameter = tParameter; parameter1 = tParameter1; } @Override public void run() { } } private class ServerHandler extends ChannelInboundHandlerAdapter { private final ReadAction<ChannelHandlerContext, TransportMessage> serverRunnable; private final ITransportMessageDecoder transportMessageDecoder; public ServerHandler(ReadAction<ChannelHandlerContext, TransportMessage> runnable, ITransportMessageDecoder transportMessageDecoder) { this.serverRunnable = runnable; this.transportMessageDecoder = transportMessageDecoder; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.warn("与服务器:" + ctx.channel().remoteAddress() + "通信时发送了错误。"); ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext context) { context.flush(); } @Override public void channelRead(ChannelHandlerContext channelHandlerContext, Object message) throws Exception { ByteBuf buffer = (ByteBuf) message; try { byte[] data = new byte[buffer.readableBytes()]; buffer.readBytes(data); TransportMessage transportMessage = transportMessageDecoder.Decode(data); serverRunnable.setParameter(channelHandlerContext, transportMessage); serverRunnable.run(); } finally { ReferenceCountUtil.release(message); } } } }
通过以上修改,再通过jmeter压测已经不会出现timeout 问题,就连stage 网关-》.NET微服务-》JAVA微服务都没有Time out问题产生,jmeter的user thread拉长到2000也没有出现问题。
通过以上思路把.NET版本的surging 社区版本也进行了修改,已经提交到github,首先把ServiceHost中的serverMessageListener.Received 中的Task.Run移除,ServerHandler中ChannelRead进行移除,然后addLast的ServerHandler添加了EventExecutorGroup.通过以上修改再通过压测发现可以支持20万+ ,也未发现内存泄漏问题,执行client 1万次 ,服务端cpu 在6%左右,响应速度在1.1秒左右,可以开启多个surging 的client 进行压测,cpu 会叠加上升,响应速度没有影响,以下是执行1万次压测
总结
通过5年研发,surging 从原来的最初的基于netty 的RPC发展到现在可以支持多协议,多语言的异构微服务引擎,不仅是技术的提高,也带来名利的收益,只要不断坚持,终究能看到成果,我也会一直更新,为企业和社区用户带来自己的绵薄之力,让企业能更好的掌握微服务解决方案,已解决现在行业各种不同的业务需求。