1.springboot项目,首先引入Netty的pom:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency>
2.服务端代码:
package com.netty.heartbeat; import com.netty.basic.NettyServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.TimeUnit; /** * @author: jingzhe @createTime: 2022-02-25 11:13 * Description: 服务端 */ @Slf4j public class NettyServer { public static void main(String[] args) { // 1.创建两个线程组 // 负责处理客户端的连接请求 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 负责处理客户端的读写请求 EventLoopGroup workerGroup = new NioEventLoopGroup(); // 2.创建一个服务端的启动对象 ServerBootstrap bootstrap = new ServerBootstrap(); // 3.为启动对象设置相关参数 // 设置线程池,采用主从线程的Reactor模式 bootstrap.group(bossGroup, workerGroup) // 设置通信类型为NIO类型 .channel(NioServerSocketChannel.class) // 设置从线程的处理逻辑 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 处理粘包/拆包 pipeline.addLast(new LineBasedFrameDecoder(18)); pipeline.addLast(new StringEncoder()); pipeline.addLast(new StringDecoder()); // 添加一个监测心跳机制的Handler,当3秒未接收到客户端的心跳包时,则出现超时 pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS)); pipeline.addLast(new HeartBeatServerHandler()); } }); // 4.绑定监听端口 try { ChannelFuture future = bootstrap.bind(8888); log.info("服务器已经启动,在8888端口进行监听..."); // 在channel关闭的时候,才去关闭future future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 优雅关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
3:。服务端心跳机制处理器:
package com.netty.heartbeat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import java.util.Date; /** * @author: jingzhe @createTime: 2022-03-02 16:36 * Description: 服务端处理器 */ @Slf4j public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> { /** 记录超时的次数 */ private int timeCount; /** 记录上次超时时间 */ private long lastIdleTime = 0; @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { if ("heartbeat".equals(msg)) { log.info("收到{}的心跳包:{}", channelHandlerContext.channel(), new Date()); /** 超过3秒后,客户端的心跳包是正常的,超时累加的次数清零 */ if (System.currentTimeMillis() - lastIdleTime >= 3000) { log.info("{}已经有一段时间表现非常稳定,超时次数清零", channelHandlerContext.channel()); timeCount = 0; } } else { log.info("非心跳信息,不做处理..."); } } /** * 心跳出现超时的时候,会触发该方法的执行 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; Channel channel = ctx.channel(); Date now = new Date(); switch (idleStateEvent.state()) { case READER_IDLE: log.info("{}:出现了一次心跳超时{}", channel, now); // 记录超时时间 lastIdleTime = now.getTime(); timeCount++; break; default: break; } if (timeCount >= 3) { log.info("{}:心跳超时已经达到3次,将关闭连接", channel); channel.close(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("{}:上线了", ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("{}:下线了", ctx.channel()); } }
4.客户端代码:
package com.netty.heartbeat; import com.netty.basic.NettyClientHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import lombok.extern.slf4j.Slf4j; import java.util.Random; /** * @author: jingzhe @createTime: 2022-02-25 11:13 * Description: 客户端 */ @Slf4j public class NettyClient { public static void main(String[] args) { // 1.创建一个线程池,用于读写交互 EventLoopGroup group = new NioEventLoopGroup(); // 2.创建一个客户端启动对象 Bootstrap bootstrap = new Bootstrap(); // 3.设置相关参数 bootstrap.group(group) // 设置通道为NIO通道 .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加处理逻辑 ChannelPipeline pipeline = socketChannel.pipeline(); // 字符串信息的编码与解码 pipeline.addLast(new StringEncoder()); pipeline.addLast(new StringDecoder()); } }); // 4.连接服务端 try { ChannelFuture future = bootstrap.connect("localhost", 8888).sync(); log.info("连接服务端成功"); Channel channel = future.channel(); String msg = "heartbeat\n"; Random random = new Random(); while (channel.isActive()) { int num = random.nextInt(6); Thread.sleep(num * 1000); channel.writeAndFlush(msg); } // 对通道关闭进行监听 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
---
引入pom后,即可运行,代码下载地址:
https://url83.ctfile.com/f/18819283-551085507-0364b5
(访问密码:9595)