Netty生产级的心跳和重连机制

今天研究的是,心跳和重连,虽然这次是大神写的代码,但是万变不离其宗,我们先回顾一下Netty应用心跳和重连的整个过程:

1)客户端连接服务端

2)在客户端的的ChannelPipeline中加入一个比较特殊的IdleStateHandler,设置一下客户端的写空闲时间,例如5s

3)当客户端的所有ChannelHandler中4s内没有write事件,则会触发userEventTriggered方法(上文介绍过)

4)我们在客户端的userEventTriggered中对应的触发事件下发送一个心跳包给服务端,检测服务端是否还存活,防止服务端已经宕机,客户端还不知道

5)同样,服务端要对心跳包做出响应,其实给客户端最好的回复就是“不回复”,这样可以服务端的压力,假如有10w个空闲Idle的连接,那么服务端光发送心跳回复,则也是费事的事情,那么怎么才能告诉客户端它还活着呢,其实很简单,因为5s服务端都会收到来自客户端的心跳信息,那么如果10秒内收不到,服务端可以认为客户端挂了,可以close链路

6)加入服务端因为什么因素导致宕机的话,就会关闭所有的链路链接,所以作为客户端要做的事情就是短线重连

以上描述的就是整个心跳和重连的整个过程,虽然很简单,上一篇blog也写了一个Demo,简单地做了一下上述功能

要写工业级的Netty心跳重连的代码,需要解决一下几个问题:

1)ChannelPipeline中的ChannelHandlers的维护,首次连接和重连都需要对ChannelHandlers进行管理

2)重连对象的管理,也就是bootstrap对象的管理

3)重连机制编写

完整的代码:https://github.com/BazingaLyn/netty-study/tree/master/src/main/java/com/lyncc/netty/idle

下面我们就看大神是如何解决这些问题的,首先先定义一个接口ChannelHandlerHolder,用来保管ChannelPipeline中的Handlers的

  1. package com.lyncc.netty.idle;
  2. import io.netty.channel.ChannelHandler;
  3. /**
  4. *
  5. * 客户端的ChannelHandler集合,由子类实现,这样做的好处:
  6. * 继承这个接口的所有子类可以很方便地获取ChannelPipeline中的Handlers
  7. * 获取到handlers之后方便ChannelPipeline中的handler的初始化和在重连的时候也能很方便
  8. * 地获取所有的handlers
  9. */
  10. public interface ChannelHandlerHolder {
  11. ChannelHandler[] handlers();
  12. }

我们再来编写我们熟悉的服务端的ServerBootstrap的编写:

HeartBeatServer.java

  1. package com.lyncc.netty.idle;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelOption;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioServerSocketChannel;
  10. import io.netty.handler.codec.string.StringDecoder;
  11. import io.netty.handler.codec.string.StringEncoder;
  12. import io.netty.handler.logging.LogLevel;
  13. import io.netty.handler.logging.LoggingHandler;
  14. import io.netty.handler.timeout.IdleStateHandler;
  15. import java.net.InetSocketAddress;
  16. import java.util.concurrent.TimeUnit;
  17. public class HeartBeatServer {
  18. private final AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger();
  19. private int port;
  20. public HeartBeatServer(int port) {
  21. this.port = port;
  22. }
  23. public void start() {
  24. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  25. EventLoopGroup workerGroup = new NioEventLoopGroup();
  26. try {
  27. ServerBootstrap sbs = new ServerBootstrap().group(bossGroup, workerGroup)
  28. .channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
  29. .localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>() {
  30. protected void initChannel(SocketChannel ch) throws Exception {
  31. ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
  32. ch.pipeline().addLast(idleStateTrigger);
  33. ch.pipeline().addLast("decoder", new StringDecoder());
  34. ch.pipeline().addLast("encoder", new StringEncoder());
  35. ch.pipeline().addLast(new HeartBeatServerHandler());
  36. };
  37. }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
  38. // 绑定端口,开始接收进来的连接
  39. ChannelFuture future = sbs.bind(port).sync();
  40. System.out.println("Server start listen at " + port);
  41. future.channel().closeFuture().sync();
  42. } catch (Exception e) {
  43. bossGroup.shutdownGracefully();
  44. workerGroup.shutdownGracefully();
  45. }
  46. }
  47. public static void main(String[] args) throws Exception {
  48. int port;
  49. if (args.length > 0) {
  50. port = Integer.parseInt(args[0]);
  51. } else {
  52. port = 8080;
  53. }
  54. new HeartBeatServer(port).start();
  55. }
  56. }

单独写一个AcceptorIdleStateTrigger,其实也是继承ChannelInboundHandlerAdapter,重写userEventTriggered方法,因为客户端是write,那么服务端自然是read,设置的状态就是IdleState.READER_IDLE,源码如下:

  1. package com.lyncc.netty.idle;
  2. import io.netty.channel.ChannelHandler;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import io.netty.handler.timeout.IdleState;
  6. import io.netty.handler.timeout.IdleStateEvent;
  7. @ChannelHandler.Sharable
  8. public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
  9. @Override
  10. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  11. if (evt instanceof IdleStateEvent) {
  12. IdleState state = ((IdleStateEvent) evt).state();
  13. if (state == IdleState.READER_IDLE) {
  14. throw new Exception("idle exception");
  15. }
  16. } else {
  17. super.userEventTriggered(ctx, evt);
  18. }
  19. }
  20. }

HeartBeatServerHandler就是一个很简单的自定义的Handler,不是重点:

  1. package com.lyncc.netty.idle;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.ChannelInboundHandlerAdapter;
  4. public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
  5. @Override
  6. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  7. System.out.println("server channelRead..");
  8. System.out.println(ctx.channel().remoteAddress() + "->Server :" + msg.toString());
  9. }
  10. @Override
  11. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  12. cause.printStackTrace();
  13. ctx.close();
  14. }
  15. }

接下来就是重点,我们需要写一个类,这个类可以去观察链路是否断了,如果断了,进行循环的断线重连操作,ConnectionWatchdog,顾名思义,链路检测狗,我们先看完整代码:

  1. package com.lyncc.netty.idle;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.Channel;
  4. import io.netty.channel.ChannelFuture;
  5. import io.netty.channel.ChannelFutureListener;
  6. import io.netty.channel.ChannelHandler.Sharable;
  7. import io.netty.channel.ChannelHandlerContext;
  8. import io.netty.channel.ChannelInboundHandlerAdapter;
  9. import io.netty.channel.ChannelInitializer;
  10. import io.netty.util.Timeout;
  11. import io.netty.util.Timer;
  12. import io.netty.util.TimerTask;
  13. import java.util.concurrent.TimeUnit;
  14. /**
  15. *
  16. * 重连检测狗,当发现当前的链路不稳定关闭之后,进行12次重连
  17. */
  18. @Sharable
  19. public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask ,ChannelHandlerHolder{
  20. private final Bootstrap bootstrap;
  21. private final Timer timer;
  22. private final int port;
  23. private final String host;
  24. private volatile boolean reconnect = true;
  25. private int attempts;
  26. public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, int port,String host, boolean reconnect) {
  27. this.bootstrap = bootstrap;
  28. this.timer = timer;
  29. this.port = port;
  30. this.host = host;
  31. this.reconnect = reconnect;
  32. }
  33. /**
  34. * channel链路每次active的时候,将其连接的次数重新☞ 0
  35. */
  36. @Override
  37. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  38. System.out.println("当前链路已经激活了,重连尝试次数重新置为0");
  39. attempts = 0;
  40. ctx.fireChannelActive();
  41. }
  42. @Override
  43. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  44. System.out.println("链接关闭");
  45. if(reconnect){
  46. System.out.println("链接关闭,将进行重连");
  47. if (attempts < 12) {
  48. attempts++;
  49. //重连的间隔时间会越来越长
  50. int timeout = 2 << attempts;
  51. timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
  52. }
  53. }
  54. ctx.fireChannelInactive();
  55. }
  56. public void run(Timeout timeout) throws Exception {
  57. ChannelFuture future;
  58. //bootstrap已经初始化好了,只需要将handler填入就可以了
  59. synchronized (bootstrap) {
  60. bootstrap.handler(new ChannelInitializer<Channel>() {
  61. @Override
  62. protected void initChannel(Channel ch) throws Exception {
  63. ch.pipeline().addLast(handlers());
  64. }
  65. });
  66. future = bootstrap.connect(host,port);
  67. }
  68. //future对象
  69. future.addListener(new ChannelFutureListener() {
  70. public void operationComplete(ChannelFuture f) throws Exception {
  71. boolean succeed = f.isSuccess();
  72. //如果重连失败,则调用ChannelInactive方法,再次出发重连事件,一直尝试12次,如果失败则不再重连
  73. if (!succeed) {
  74. System.out.println("重连失败");
  75. f.channel().pipeline().fireChannelInactive();
  76. }else{
  77. System.out.println("重连成功");
  78. }
  79. }
  80. });
  81. }
  82. }

稍微分析一下:

1)继承了ChannelInboundHandlerAdapter,说明它也是Handler,也对,作为一个检测对象,肯定会放在链路中,否则怎么检测

2)实现了2个接口,TimeTask,ChannelHandlerHolder

①TimeTask,我们就要写run方法,这应该是一个定时任务,这个定时任务做的事情应该是重连的工作

②ChannelHandlerHolder的接口,这个接口我们刚才说过是维护的所有的Handlers,因为在重连的时候需要获取Handlers

3)bootstrap对象,重连的时候依旧需要这个对象

4)当链路断开的时候会触发channelInactive这个方法,也就说触发重连的导火索是从这边开始的

好了,我们这边再写次核心的HeartBeatsClient的代码:

  1. package com.lyncc.netty.idle;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.Channel;
  4. import io.netty.channel.ChannelFuture;
  5. import io.netty.channel.ChannelHandler;
  6. import io.netty.channel.ChannelInitializer;
  7. import io.netty.channel.EventLoopGroup;
  8. import io.netty.channel.nio.NioEventLoopGroup;
  9. import io.netty.channel.socket.nio.NioSocketChannel;
  10. import io.netty.handler.codec.string.StringDecoder;
  11. import io.netty.handler.codec.string.StringEncoder;
  12. import io.netty.handler.logging.LogLevel;
  13. import io.netty.handler.logging.LoggingHandler;
  14. import io.netty.handler.timeout.IdleStateHandler;
  15. import io.netty.util.HashedWheelTimer;
  16. import java.util.concurrent.TimeUnit;
  17. public class HeartBeatsClient {
  18. protected final HashedWheelTimer timer = new HashedWheelTimer();
  19. private Bootstrap boot;
  20. private final ConnectorIdleStateTrigger idleStateTrigger = new ConnectorIdleStateTrigger();
  21. public void connect(int port, String host) throws Exception {
  22. EventLoopGroup group = new NioEventLoopGroup();
  23. boot = new Bootstrap();
  24. boot.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO));
  25. final ConnectionWatchdog watchdog = new ConnectionWatchdog(boot, timer, port,host, true) {
  26. public ChannelHandler[] handlers() {
  27. return new ChannelHandler[] {
  28. this,
  29. new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS),
  30. idleStateTrigger,
  31. new StringDecoder(),
  32. new StringEncoder(),
  33. new HeartBeatClientHandler()
  34. };
  35. }
  36. };
  37. ChannelFuture future;
  38. //进行连接
  39. try {
  40. synchronized (boot) {
  41. boot.handler(new ChannelInitializer<Channel>() {
  42. //初始化channel
  43. @Override
  44. protected void initChannel(Channel ch) throws Exception {
  45. ch.pipeline().addLast(watchdog.handlers());
  46. }
  47. });
  48. future = boot.connect(host,port);
  49. }
  50. // 以下代码在synchronized同步块外面是安全的
  51. future.sync();
  52. } catch (Throwable t) {
  53. throw new Exception("connects to  fails", t);
  54. }
  55. }
  56. /**
  57. * @param args
  58. * @throws Exception
  59. */
  60. public static void main(String[] args) throws Exception {
  61. int port = 8080;
  62. if (args != null && args.length > 0) {
  63. try {
  64. port = Integer.valueOf(args[0]);
  65. } catch (NumberFormatException e) {
  66. // 采用默认值
  67. }
  68. }
  69. new HeartBeatsClient().connect(port, "127.0.0.1");
  70. }
  71. }

也稍微说明一下:

1)创建了ConnectionWatchdog对象,自然要实现handlers方法

2)初始化好bootstrap对象

3)4秒内没有写操作,进行心跳触发,也就是IdleStateHandler这个方法

最后ConnectorIdleStateTrigger这个类

  1. package com.lyncc.netty.idle;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandler.Sharable;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.channel.ChannelInboundHandlerAdapter;
  7. import io.netty.handler.timeout.IdleState;
  8. import io.netty.handler.timeout.IdleStateEvent;
  9. import io.netty.util.CharsetUtil;
  10. @Sharable
  11. public class ConnectorIdleStateTrigger extends ChannelInboundHandlerAdapter {
  12. private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
  13. CharsetUtil.UTF_8));
  14. @Override
  15. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  16. if (evt instanceof IdleStateEvent) {
  17. IdleState state = ((IdleStateEvent) evt).state();
  18. if (state == IdleState.WRITER_IDLE) {
  19. // write heartbeat to server
  20. ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
  21. }
  22. } else {
  23. super.userEventTriggered(ctx, evt);
  24. }
  25. }
  26. }

HeartBeatClientHandler.java(不是重点)

  1. package com.lyncc.netty.idle;
  2. import io.netty.channel.ChannelHandler.Sharable;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import io.netty.util.ReferenceCountUtil;
  6. import java.util.Date;
  7. @Sharable
  8. public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
  9. @Override
  10. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  11. System.out.println("激活时间是:"+new Date());
  12. System.out.println("HeartBeatClientHandler channelActive");
  13. ctx.fireChannelActive();
  14. }
  15. @Override
  16. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  17. System.out.println("停止时间是:"+new Date());
  18. System.out.println("HeartBeatClientHandler channelInactive");
  19. }
  20. @Override
  21. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  22. String message = (String) msg;
  23. System.out.println(message);
  24. if (message.equals("Heartbeat")) {
  25. ctx.write("has read message from server");
  26. ctx.flush();
  27. }
  28. ReferenceCountUtil.release(msg);
  29. }
  30. }

好了,到此为止,所有的代码都贴完了,我们做一个简单的测试,按照常理,如果不出任何状况的话,客户端4秒发送心跳,服务端5秒才验证是不会断连的,所以我们在启动之后,关闭服务端,然后再次重启服务端

首先启动服务端,控制台如下:

Netty生产级的心跳和重连机制

启动客户端,控制台如下:

Netty生产级的心跳和重连机制

客户端启动之后,服务端的控制台:

Netty生产级的心跳和重连机制

关闭服务端后,客户端控制台:

Netty生产级的心跳和重连机制

重启启动服务端:

Netty生产级的心跳和重连机制

重连成功~

上一篇:在C#调用C++的DLL方法(二)生成托管的DLL


下一篇:Java 设计模式系列(二)简单工厂模式和工厂方法模式