Netty实现心跳机制demo

1.springboot项目,首先引入Netty的pom:

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.25.Final</version>
        </dependency>

2.服务端代码:

package com.netty.heartbeat;

import com.netty.basic.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

/**
 * @author: jingzhe  @createTime: 2022-02-25 11:13
 * Description: 服务端
 */
@Slf4j
public class NettyServer {
    public static void main(String[] args) {
        // 1.创建两个线程组
        // 负责处理客户端的连接请求
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 负责处理客户端的读写请求
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        // 2.创建一个服务端的启动对象
        ServerBootstrap bootstrap = new ServerBootstrap();
        // 3.为启动对象设置相关参数
        // 设置线程池,采用主从线程的Reactor模式
        bootstrap.group(bossGroup, workerGroup)
                // 设置通信类型为NIO类型
                .channel(NioServerSocketChannel.class)
                // 设置从线程的处理逻辑
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 处理粘包/拆包
                        pipeline.addLast(new LineBasedFrameDecoder(18));
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        // 添加一个监测心跳机制的Handler,当3秒未接收到客户端的心跳包时,则出现超时
                        pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
                        pipeline.addLast(new HeartBeatServerHandler());
                    }
                });

        // 4.绑定监听端口
        try {
            ChannelFuture future = bootstrap.bind(8888);
            log.info("服务器已经启动,在8888端口进行监听...");
            // 在channel关闭的时候,才去关闭future
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 优雅关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

3:。服务端心跳机制处理器:

package com.netty.heartbeat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;

import java.util.Date;

/**
 * @author: jingzhe  @createTime: 2022-03-02 16:36
 * Description: 服务端处理器
 */
@Slf4j
public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {
    /** 记录超时的次数 */
    private int timeCount;

    /** 记录上次超时时间 */
    private long lastIdleTime = 0;

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        if ("heartbeat".equals(msg)) {
            log.info("收到{}的心跳包:{}", channelHandlerContext.channel(), new Date());
            /** 超过3秒后,客户端的心跳包是正常的,超时累加的次数清零 */
            if (System.currentTimeMillis() - lastIdleTime >= 3000) {
                log.info("{}已经有一段时间表现非常稳定,超时次数清零", channelHandlerContext.channel());
                timeCount = 0;
            }
        } else {
            log.info("非心跳信息,不做处理...");
        }
    }

    /**
     * 心跳出现超时的时候,会触发该方法的执行
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
        Channel channel = ctx.channel();
        Date now = new Date();
        switch (idleStateEvent.state()) {
            case READER_IDLE:
                log.info("{}:出现了一次心跳超时{}", channel, now);
                // 记录超时时间
                lastIdleTime = now.getTime();
                timeCount++;
                break;
            default:
                break;
        }
        if (timeCount >= 3) {
            log.info("{}:心跳超时已经达到3次,将关闭连接", channel);
            channel.close();
        }

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("{}:上线了", ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("{}:下线了", ctx.channel());
    }
}

4.客户端代码:

package com.netty.heartbeat;

import com.netty.basic.NettyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

import java.util.Random;

/**
 * @author: jingzhe  @createTime: 2022-02-25 11:13
 * Description: 客户端
 */
@Slf4j
public class NettyClient {
    public static void main(String[] args) {
        // 1.创建一个线程池,用于读写交互
        EventLoopGroup group = new NioEventLoopGroup();
        // 2.创建一个客户端启动对象
        Bootstrap bootstrap = new Bootstrap();
        // 3.设置相关参数
        bootstrap.group(group)
                // 设置通道为NIO通道
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 添加处理逻辑
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 字符串信息的编码与解码
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                    }
                });

        // 4.连接服务端
        try {
            ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
            log.info("连接服务端成功");
            Channel channel = future.channel();
            String msg = "heartbeat\n";
            Random random = new Random();
            while (channel.isActive()) {
                int num = random.nextInt(6);
                Thread.sleep(num * 1000);
                channel.writeAndFlush(msg);
            }
            // 对通道关闭进行监听
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }

    }
}

---

引入pom后,即可运行,代码下载地址:

https://url83.ctfile.com/f/18819283-551085507-0364b5
(访问密码:9595)

上一篇:彻底解决分布式环境下Redisson消息队列监听重复执行问题


下一篇:Java爬取网页指定内容