008-核心技术-netty-服务端心跳机制以及客户端心跳重连方案

一、概述

使用与客户端与服务端

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) 

// long readerIdleTime 表示多长时间没有读,就会发送一个心跳检测包检测是否连接
// long writerIdleTim 表示多长时间没有写,就会发送一个心跳检测包检测是否连接
// long allIdleTime 表示多长时间没有读写,就会发送一个心跳检测包检测是否连接
// 触发一个 IdleStateEvent事件
// 当IdleStateEvent事件触发后,就会传递给管道下一个handler去处理。
// 通过调用(触发)下一handler的userEventTiggered。在该方法中去处理(读、写等)

客户端主要是发送偏多,注重的是写,以及心跳配置是写,客户端需要自启动,以及断线重连

服务端主要是接收偏多,注重的是读,以及心跳配置是读,定期收到不,要剔除客户端

 1.1、服务端

主方法

package com.github.bjlhx15.netty.demo.netty.heartbeat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class ServerHeartBeat {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))//在bossGroup增加一个日志处理器
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //加入一个netty提供的IdleStateHandler,空闲状态处理器
                            // long readerIdleTime 表示多长时间没有读,就会发送一个心跳检测包检测是否连接
                            // long writerIdleTim 表示多长时间没有写,就会发送一个心跳检测包检测是否连接
                            // long allIdleTime 表示多长时间没有读写,就会发送一个心跳检测包检测是否连接
                            // 触发一个 IdleStateEvent事件
//                            当IdleStateEvent事件触发后,就会传递给管道下一个handler去处理。
//                            通过调用(触发)下一handler的userEventTiggered。在该方法中去处理(读、写等)
                            pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
//                            加入对IdleStateEvent检测进一步处理的handler
                            pipeline.addLast(new ServerHeartBeatIdleStateTriggerChannelHandler());
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            pipeline.addLast(new ServerMsgChannelHandler());//业务处理的Handler
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();

            System.out.println("Server start listen at " + 7000);
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服务端心跳检测ChannelHandler

package com.github.bjlhx15.netty.demo.netty.heartbeat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

public class ServerHeartBeatIdleStateTriggerChannelHandler extends ChannelInboundHandlerAdapter {
    /**
     * @param ctx 上下文
     * @param evt 事件
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if(event.state()== IdleState.READER_IDLE){
                System.out.println(ctx.channel().remoteAddress() + "--读超时时间--" + event.state());
                System.out.println("服务器做对应处理即可");
            }
        }
    }
}

业务逻辑处理类

package com.github.bjlhx15.netty.demo.netty.heartbeat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;

public class ServerMsgChannelHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server channelRead……");
        System.out.println(ctx.channel().remoteAddress() + "-msg=" + msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

1.2、客户端

1、构建一个ChannelHandler集合

package com.github.bjlhx15.netty.demo.netty.heartbeat;

import io.netty.channel.ChannelHandler;

/**
 *
 * 客户端的ChannelHandler集合,由子类实现,这样做的好处:
 * 继承这个接口的所有子类可以很方便地获取ChannelPipeline中的Handlers
 * 获取到handlers之后方便ChannelPipeline中的handler的初始化和在重连的时候也能很方便
 * 地获取所有的handlers
 */
public interface ChannelHandlerHolder {
    ChannelHandler[] handlers();
} 

2、客户端心跳检测Handler

package com.github.bjlhx15.netty.demo.netty.heartbeat;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;

@ChannelHandler.Sharable
public class ClientConnectorIdleStateTriggerChannelHandler extends ChannelInboundHandlerAdapter {

    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
            CharsetUtil.UTF_8));

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                // write heartbeat to server
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

3、客户端增加服务检测ChannelHandler类

package com.github.bjlhx15.netty.demo.netty.heartbeat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;

import java.util.concurrent.TimeUnit;

@ChannelHandler.Sharable
public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask, ChannelHandlerHolder {
    private final Bootstrap bootstrap;
    private final Timer timer;
    private final int port;
    private final String host;
    private volatile boolean reconnect = true;
    private int attempts;

    public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, int port, String host, boolean reconnect) {
        this.bootstrap = bootstrap;
        this.timer = timer;
        this.port = port;
        this.host = host;
        this.reconnect = reconnect;
    }

    /**
     * channel链路每次active的时候,将其连接的次数重新? 0
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("当前链路已经激活了,重连尝试次数重新置为0");
        attempts = 0;
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("链接关闭");
        if (reconnect) {
            System.out.println("链接关闭,将进行重连");
            if (attempts < 12) {
                attempts++;
            }           //重连的间隔时间会越来越长
            int timeout = 2 << attempts;
            timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
        }
        ctx.fireChannelInactive();
    }

    @Override
    public void run(Timeout timeout) throws Exception {
        ChannelFuture future;
        //bootstrap已经初始化好了,只需要将handler填入就可以了
        synchronized (bootstrap) {
            bootstrap.handler(new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(handlers());
                }
            });
            future = bootstrap.connect(host, port);
        }
        //future对象
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                boolean succeed = f.isSuccess();
                //如果重连失败,则调用ChannelInactive方法,再次出发重连事件,一直尝试12次,如果失败则不再重连
                if (!succeed) {
                    System.out.println("重连失败");
                    f.channel().pipeline().fireChannelInactive();
                } else {
                    System.out.println("重连成功");
                }
            }
        });
    }
}

4、客户端主类

package com.github.bjlhx15.netty.demo.netty.heartbeat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.HashedWheelTimer;

import java.util.concurrent.TimeUnit;

public class ClientHeartBeat {
    protected final HashedWheelTimer timer = new HashedWheelTimer();
    private Bootstrap boot;
    private final ClientConnectorIdleStateTriggerChannelHandler idleStateTrigger = new ClientConnectorIdleStateTriggerChannelHandler();

    public void connect(int port, String host) {
        EventLoopGroup group = new NioEventLoopGroup();
        boot = new Bootstrap();
        boot.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO));

        final ConnectionWatchdog watchdog = new ConnectionWatchdog(boot, timer, port, host, true) {
            @Override
            public ChannelHandler[] handlers() {
                return new ChannelHandler[]{
                        this,
                        new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS),
                        idleStateTrigger,
                        new StringDecoder(),
                        new StringEncoder(),
                        new ClientMsgChannelHandler()
                };
            }
        };

        ChannelFuture future = null;
        //进行连接
        try {
            synchronized (boot) {
                boot.handler(new ChannelInitializer<Channel>() {
                    //初始化channel
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(watchdog.handlers());
                    }
                });

                future = boot.connect(host, port);

            }// 以下代码在synchronized同步块外面是安全的
            future.sync();
            if (!future.isSuccess()) {
                System.out.println("---- 连接服务器失败,2秒后重试 ---------port=" + port);
                this.scheduleStart(future.channel(), port, host);
            }

        } catch (Throwable t) {
            System.out.println("connects to  fails." + t.getMessage());
            System.out.println("---- 连接服务器失败,2秒后重试 ---------port=" + port);
            Channel channel = future != null && future.channel() != null ? future.channel() : new LocalChannel();
            this.scheduleStart(channel, port, host);
        }
    }

    public void scheduleStart(Channel channel, int port, String host) {
        channel.eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                connect(port, host);
            }
        }, 2L, TimeUnit.SECONDS);
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        new ClientHeartBeat().connect(7000, "127.0.0.1");
    }
}

5、客户端消息处理类

package com.github.bjlhx15.netty.demo.netty.heartbeat;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

import java.util.Date;

@ChannelHandler.Sharable
public class ClientMsgChannelHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("激活时间是:" + new Date());
        System.out.println("HeartBeatClientHandler channelActive");
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("停止时间是:" + new Date());
        System.out.println("HeartBeatClientHandler channelInactive");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = (String) msg;
        System.out.println(message);
        if (message.equals("Heartbeat")) {
            ctx.write("has read message from server");
            ctx.flush();
        }
        ReferenceCountUtil.release(msg);
    }
}

1.3、测试

1、启动服务端,在启动客户端,正常

服务端会定时收到心跳

server channelRead……
/127.0.0.1:52276-msg=Heartbeat

2、停止服务端

客户端如下

重连失败
链接关闭
链接关闭,将进行重连
停止时间是:Sat Aug 07 22:53:20 CST 2021
HeartBeatClientHandler channelInactive

启动后恢复正常

重连成功
当前链路已经激活了,重连尝试次数重新置为0
激活时间是:Sat Aug 07 22:53:57 CST 2021
HeartBeatClientHandler channelActive

3、如果先启动客户端

客户端如下

connects to  fails.Connection refused: /127.0.0.1:7000
---- 连接服务器失败,2秒后重试 ---------port=7000

然后启动服务端

客户端会

connects to  fails.Connection refused: /127.0.0.1:7000
---- 连接服务器失败,2秒后重试 ---------port=7000
当前链路已经激活了,重连尝试次数重新置为0
激活时间是:Sat Aug 07 22:55:25 CST 2021
HeartBeatClientHandler channelActive

  

 

 

 

 

 

 

 

电风扇


008-核心技术-netty-服务端心跳机制以及客户端心跳重连方案

上一篇:js的数据类型


下一篇:C语言程序设计100例之(45):最大乘积