流量整形(Traffic Shaping)是一种主动调整流量输出速率的措施。一个典型应用是基于下游网络结点的TP指标来控制本地流量的输出。流量整形与流量监管的主要区别在于,流量整形对流量监管中需要丢弃的报文进行缓存——通常是将它们放入缓冲区或队列内,也称流量整形(Traffic Shaping,简称TS)。当令牌桶有足够的令牌时,再均匀的向外发送这些被缓存的报文。流量整形与流量监管的另一区别是,整形可能会增加延迟,而监管几乎不引入额外的延迟。
AbstractTrafficShapingHandler.java | 功能介绍
/** * <p>AbstractTrafficShapingHandler allows to limit the global bandwidth * (see {@link GlobalTrafficShapingHandler}) or per session * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping. * It allows you to implement an almost real time monitoring of the bandwidth using * the monitors from {@link TrafficCounter} that will call back every checkInterval * the method doAccounting of this handler.</p> * * <p>If you want for any particular reasons to stop the monitoring (accounting) or to change * the read/write limit or the check interval, several methods allow that for you:</p> * <ul> * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li> * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li> * </ul> */ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler{...}
GlobalTrafficShapingHandler.java | 全局限制
/** * <p>This implementation of the {@link AbstractTrafficShapingHandler} is for global * traffic shaping, that is to say a global limitation of the bandwidth, whatever * the number of opened channels.</p> * <p>Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>2</b>.</p> * * <p>The general use should be as follow:</p> * <ul> * <li><p>Create your unique GlobalTrafficShapingHandler like:</p> * <p><tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);</tt></p> * <p>The executor could be the underlying IO worker pool</p> * <p><tt>pipeline.addLast(myHandler);</tt></p> * * <p><b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created * and shared among all channels as the counter must be shared among all channels.</b></p> * * <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation) * or the check interval (in millisecond) that represents the delay between two computations of the * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p> * * <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting, * it is recommended to set a positive value, even if it is high since the precision of the * Traffic Shaping depends on the period where the traffic is computed. The highest the interval, * the less precise the traffic shaping will be. It is suggested as higher value something close * to 5 or 10 minutes.</p> * * <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p> * </li> * <li>In your handler, you should consider to use the {@code channel.isWritable()} and * {@code channelWritabilityChanged(ctx)} to handle writability, or through * {@code future.addListener(new GenericFutureListener())} on the future returned by * {@code ctx.write()}.</li> * <li><p>You shall also consider to have object size in read or write operations relatively adapted to * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect, * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li> * <li><p>Some configuration methods will be taken as best effort, meaning * that all already scheduled traffics will not be * changed, but only applied to new traffics.</p> * So the expected usage of those methods are to be used not too often, * accordingly to the traffic shaping configuration.</li> * </ul> * * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources. * This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own. */ @Sharable public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {}
注意『 OutboundBuffer.setUserDefinedWritability(index, boolean)』中索引使用’2’。
GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor); pipeline.addLast(myHandler);
private final ConcurrentMap<Integer, PerChannel> channelQueues = PlatformDependent.newConcurrentHashMap();
PerChannel对象中维护有该Channel的待发送数据的消息队列(ArrayDeque messagesQueue)。
ChannelTrafficShapingHandler.java | 功能介绍
/** * <p>This implementation of the {@link AbstractTrafficShapingHandler} is for channel * traffic shaping, that is to say a per channel limitation of the bandwidth.</p> * <p>Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>1</b>.</p> * * <p>The general use should be as follow:</p> * <ul> * <li><p>Add in your pipeline a new ChannelTrafficShapingHandler.</p> * <p><tt>ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler();</tt></p> * <p><tt>pipeline.addLast(myHandler);</tt></p> * * <p><b>Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created * for each new channel as the counter cannot be shared among all channels.</b>.</p> * * <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation) * or the check interval (in millisecond) that represents the delay between two computations of the * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p> * * <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting, * it is recommended to set a positive value, even if it is high since the precision of the * Traffic Shaping depends on the period where the traffic is computed. The highest the interval, * the less precise the traffic shaping will be. It is suggested as higher value something close * to 5 or 10 minutes.</p> * * <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p> * </li> * <li>In your handler, you should consider to use the {@code channel.isWritable()} and * {@code channelWritabilityChanged(ctx)} to handle writability, or through * {@code future.addListener(new GenericFutureListener())} on the future returned by * {@code ctx.write()}.</li> * <li><p>You shall also consider to have object size in read or write operations relatively adapted to * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect, * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li> * <li><p>Some configuration methods will be taken as best effort, meaning * that all already scheduled traffics will not be * changed, but only applied to new traffics.</p> * <p>So the expected usage of those methods are to be used not too often, * accordingly to the traffic shaping configuration.</p></li> * </ul> */ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {...}
GlobalChannelTrafficShapingHandler.java | 功能介绍
/** * This implementation of the {@link AbstractTrafficShapingHandler} is for global * and per channel traffic shaping, that is to say a global limitation of the bandwidth, whatever * the number of opened channels and a per channel limitation of the bandwidth.<br><br> * This version shall not be in the same pipeline than other TrafficShapingHandler.<br><br> * * The general use should be as follow:<br> * <ul> * <li>Create your unique GlobalChannelTrafficShapingHandler like:<br><br> * <tt>GlobalChannelTrafficShapingHandler myHandler = new GlobalChannelTrafficShapingHandler(executor);</tt><br><br> * The executor could be the underlying IO worker pool<br> * <tt>pipeline.addLast(myHandler);</tt><br><br> * * <b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created * and shared among all channels as the counter must be shared among all channels.</b><br><br> * * Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation) * or the check interval (in millisecond) that represents the delay between two computations of the * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br> * Note that as this is a fusion of both Global and Channel Traffic Shaping, limits are in 2 sets, * respectively Global and Channel.<br><br> * * A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting, * it is recommended to set a positive value, even if it is high since the precision of the * Traffic Shaping depends on the period where the traffic is computed. The highest the interval, * the less precise the traffic shaping will be. It is suggested as higher value something close * to 5 or 10 minutes.<br><br> * * maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br><br> * </li> * <li>In your handler, you should consider to use the {@code channel.isWritable()} and * {@code channelWritabilityChanged(ctx)} to handle writability, or through * {@code future.addListener(new GenericFutureListener())} on the future returned by * {@code ctx.write()}.</li> * <li>You shall also consider to have object size in read or write operations relatively adapted to * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect, * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.<br><br></li> * <li>Some configuration methods will be taken as best effort, meaning * that all already scheduled traffics will not be * changed, but only applied to new traffics.<br> * So the expected usage of those methods are to be used not too often, * accordingly to the traffic shaping configuration.</li> * </ul><br> * * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources. * This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own. */ @Sharable public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {...}
2、Netty4.1.36.Final【netty3.x 4.x 5每次的变化较大,接口类名也随着变化】
itstack-demo-netty-2-12 └── src ├── main │ └── java │ └── org.itstack.demo.netty │ ├── client │ │ ├── MyChannelInitializer.java │ │ ├── MyClientHandler.java │ │ └── NettyClient.java │ └── server │ ├── common │ │ └── MyServerCommonHandler.java │ ├── MyChannelInitializer.java │ ├── MyServerHandler.java │ └── NettyServer.java │ └── test └── java └── org.itstack.demo.test └── ApiTest.java
部分重点代码块讲解,获取全部代码,关注公众号:bugstack虫洞栈 | 回复netty源码
client/MyChannelInitializer.java | 增加Channel流量整形配置,速率设置为10bytes/s
/** * 虫洞栈:https://bugstack.cn * 公众号:bugstack虫洞栈 {获取学习源码} * Create by fuzhengwei on 2019 */ public class MyChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel channel) throws Exception { //流量整形 channel.pipeline().addLast(new ChannelTrafficShapingHandler(10, 10)); // 基于换行符号 channel.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 解码转String,注意调整自己的编码格式GBK、UTF-8 channel.pipeline().addLast(new StringDecoder(Charset.forName("GBK"))); // 解码转String,注意调整自己的编码格式GBK、UTF-8 channel.pipeline().addLast(new StringEncoder(Charset.forName("GBK"))); // 在管道中添加我们自己的接收数据实现方法 channel.pipeline().addLast(new MyClientHandler()); } }
server/common/MyServerCommonHandler.java | 提供抽象类,监控发送速率以及获取发送状态
/** * 虫洞栈:https://bugstack.cn * 公众号:bugstack虫洞栈 | 欢迎关注并获取专题&源码 * Create by fuzhengwei on 2019 */ public abstract class MyServerCommonHandler extends SimpleChannelInboundHandler<String> { protected boolean sentFlag; private Runnable counterTask; private AtomicLong consumeMsgLength = new AtomicLong(); private long priorProgress; @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { counterTask = () -> { while (true) { try { Thread.sleep(500); long length = consumeMsgLength.getAndSet(0); if (0 == length) continue; System.out.println("数据发送速率(KB/S):" + length); } catch (InterruptedException ignored) { } } }; super.handlerAdded(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { sendData(ctx); //启动监控线程 new Thread(counterTask).start(); } protected abstract void sendData(ChannelHandlerContext ctx); protected ChannelProgressivePromise getChannelProgressivePromise(ChannelHandlerContext ctx, Consumer<ChannelProgressiveFuture> completedAction) { ChannelProgressivePromise channelProgressivePromise = ctx.newProgressivePromise(); channelProgressivePromise.addListener(new ChannelProgressiveFutureListener() { @Override public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception { consumeMsgLength.addAndGet(progress - priorProgress); priorProgress = progress; } @Override public void operationComplete(ChannelProgressiveFuture future) throws Exception { sentFlag = false; if (future.isSuccess()) { System.out.println("微信公众号:bugstack虫洞栈 | 提醒,消息发送成功!"); priorProgress -= 10; Optional.ofNullable(completedAction).ifPresent(action -> action.accept(future)); } else { System.out.println("微信公众号:bugstack虫洞栈 | 提醒,消息发送失败!"); future.cause().printStackTrace(); } } }); return channelProgressivePromise; } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("微信公众号:bugstack虫洞栈 | NettyServer接收到消息:" + msg); } }
server/MyChannelInitializer.java | 增加全局流量整形配置,速率设置为10bytes/s
/** * 虫洞栈:https://bugstack.cn * 公众号:bugstack虫洞栈 | 欢迎关注并获取专题&源码 * Create by fuzhengwei on 2019 */ public class MyChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel channel) { // 基于换行符号 channel.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 流量整形;writeLimit/readLimit{0 or a limit in bytes/s} channel.pipeline().addLast(new GlobalTrafficShapingHandler(channel.eventLoop().parent(), 10, 10)); // 解码转String,注意调整自己的编码格式GBK、UTF-8 channel.pipeline().addLast(new StringDecoder(Charset.forName("GBK"))); // 解码转String,注意调整自己的编码格式GBK、UTF-8 channel.pipeline().addLast(new StringEncoder(Charset.forName("GBK"))); // 在管道中添加我们自己的接收数据实现方法 channel.pipeline().addLast(new MyServerHandler()); } }
server/MyServerHandler.java | 处理消息验证是否可以发送ctx.channel().isWritable()
/** * 虫洞栈:https://bugstack.cn * 公众号:bugstack虫洞栈 | 欢迎关注并获取专题&源码 * Create by fuzhengwei on 2019 */ public class MyServerHandler extends MyServerCommonHandler { @Override protected void sendData(ChannelHandlerContext ctx) { sentFlag = true; ctx.writeAndFlush( "111111111122222222223333333333\r\n", getChannelProgressivePromise(ctx, new Consumer<ChannelProgressiveFuture>() { @Override public void accept(ChannelProgressiveFuture channelProgressiveFuture) { if (ctx.channel().isWritable() && !sentFlag) { sendData(ctx); } } })); } }
启动服务端NettyServer | 可以看到速率已经被限制
itstack-demo-netty server start done. {关注公众号:bugstack虫洞栈 | 获取专题案例源码} 微信公众号:bugstack虫洞栈 | 提醒,消息发送成功! 数据发送速率(KB/S):32 微信公众号:bugstack虫洞栈 | 提醒,消息发送成功! 数据发送速率(KB/S):10 微信公众号:bugstack虫洞栈 | NettyServer接收到消息:876d251b-aba8-481a-81d0-e123a4c42214 微信公众号:bugstack虫洞栈 | 提醒,消息发送成功! 数据发送速率(KB/S):10 微信公众号:bugstack虫洞栈 | NettyServer接收到消息:250d53fb-acc3-4390-b5c5-a660577fff6f 微信公众号:bugstack虫洞栈 | 提醒,消息发送成功! 数据发送速率(KB/S):10 微信公众号:bugstack虫洞栈 | NettyServer接收到消息:89cad8a0-8e5b-44ef-812b-39c4b2d2e0fb 微信公众号:bugstack虫洞栈 | 提醒,消息发送成功! 数据发送速率(KB/S):10 微信公众号:bugstack虫洞栈 | NettyServer接收到消息:e951ca01-a583-4c20-b884-5c272b1cc7a4 微信公众号:bugstack虫洞栈 | 提醒,消息发送成功! 数据发送速率(KB/S):10 微信公众号:bugstack虫洞栈 | NettyServer接收到消息:4b13d77c-188f-4613-9cd9-94a2a7751932 微信公众号:bugstack虫洞栈 | 提醒,消息发送成功! 数据发送速率(KB/S):10 微信公众号:bugstack虫洞栈 | 提醒,消息发送成功! 微信公众号:bugstack虫洞栈 | NettyServer接收到消息:fdc5378c-a594-4be8-885d-4caa7ecccd82 数据发送速率(KB/S):10 Process finished with exit code -1
启动客户端NettyClient | 可以看到速率已经被限制
itstack-demo-netty client start done. {关注公众号:bugstack虫洞栈 | 获取专题案例源码} 微信公众号:bugstack虫洞栈 | NettyClient接收到消息:111111111122222222223333333333 length:30 微信公众号:bugstack虫洞栈 | NettyClient接收到消息:111111111122222222223333333333 length:30 微信公众号:bugstack虫洞栈 | NettyClient接收到消息:111111111122222222223333333333 length:30 微信公众号:bugstack虫洞栈 | NettyClient接收到消息:111111111122222222223333333333 length:30 微信公众号:bugstack虫洞栈 | NettyClient接收到消息:111111111122222222223333333333 length:30 微信公众号:bugstack虫洞栈 | NettyClient接收到消息:111111111122222222223333333333 length:30 微信公众号:bugstack虫洞栈 | NettyClient接收到消息:111111111122222222223333333333 length:30 微信公众号:bugstack虫洞栈 | NettyClient接收到消息:111111111122222222223333333333 length:30 Process finished with exit code -1