场景
Netty的Socket编程详解-搭建服务端与客户端并进行数据传输:
https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/108615023
在此基础上要实现多个客户端之间通信,实现类似群聊或者聊天室的功能。
注:
博客:
https://blog.csdn.net/badao_liumang_qizhi
关注公众号
霸道的程序猿
获取编程相关电子书、教程推送与免费下载。
实现
在上面实现的服务端与客户端通信的基础上,在src下新建com.badao.Char包,包下新建ChatServer类作为聊天室的服务端。
package com.badao.Chat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class ChatServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChatServerInitializer()); //绑定端口 ChannelFuture channelFuture = serverBootstrap.bind(70).sync(); channelFuture.channel().closeFuture().sync(); }finally { //关闭事件组 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
在上面中绑定70端口并添加了一个服务端的初始化器ChatServerInitializer
所以新建类ChatServerInitializer
package com.badao.Chat; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; public class ChatServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter())); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new ChatServerHandler()); } }
使其继承ChannelInitializer,并重写InitChannel方法,在方法中使用Netty自带的处理器进行编码的处理并最后添加一个自定义的处理器ChatServerHandler
新建处理器类ChatServerHandler
package com.badao.Chat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; public class ChatServerHandler extends SimpleChannelInboundHandler<String> { private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { Channel channel = ctx.channel(); channelGroup.forEach(ch->{ if(channel!=ch) { ch.writeAndFlush(channel.remoteAddress()+"发送的消息:"+msg+"\n"); } else { ch.writeAndFlush("[自己]:"+msg+"\n"); } }); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[服务器]:"+channel.remoteAddress()+"加入\n"); channelGroup.add(channel); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[服务器]:"+channel.remoteAddress()+"离开\n"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress()+"上线了\n"); System.out.println("当前在线人数:"+channelGroup.size()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress()+"下线了\n"); System.out.println("当前在线人数:"+channelGroup.size()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
使处理器继承SimpleChannelinboundHandler并重写channelRead0方法。
在最上面声明了一个通道组的通过 DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
获取其单例,只要是建立连接的客户端都会自动添加进此通道组中。
然后只要是客户端与服务端发送消息后就会执行该方法。
在此方法中直接遍历通道组,判断通道组里面的每一个客户端是不是当前发消息的客户端。
如果是就显示自己发送消息,如果不是则获取远程地址并显示发送消息。
然后就是实现客户端的上线功能以及在线人数统计的功能。
在上面的处理器中重写channelActive方法,此方法会在通道激活即建立连接后调用
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress()+"上线了\n"); System.out.println("当前在线人数:"+channelGroup.size()); }
同理重写channelInactive方法,此方法会在断掉连接后调用
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress()+"下线了\n"); System.out.println("当前在线人数:"+channelGroup.size()); }
然后就是实现向所有的客户端广播新建客户端加入聊天室的功能
重写handlerAdded方法,此方法会在将通道添加到通道组中调用,所以在此方法中获取加入到通道组的远程地址
并使用channelGroup的writeAndFlush方法就能实现向所有建立连接的客户端发送消息,新的客户端刚上线时不用向自己
发送上线消息,所以在广播完上线消息后再讲此channel添加到channelGroup中。
@Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[服务器]:"+channel.remoteAddress()+"加入\n"); channelGroup.add(channel); }
同理实现下线提醒需要重写handlerRemoved方法
@Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[服务器]:"+channel.remoteAddress()+"离开\n"); }
但是此方法中不用手动从channelGroup中手动去掉channel,因为Netty会自动将其移除掉。
服务端搭建完成之后再搭建客户端,新建ChatClient类并编写main方法,在main方法中
package com.badao.Chat; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import java.io.BufferedReader; import java.io.InputStreamReader; public class ChatClient { public static void main(String[] args) throws Exception { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .handler(new ChatClientInitializer()); //绑定端口 Channel channel = bootstrap.connect("localhost", 70).channel(); BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); for(;;) { channel.writeAndFlush(br.readLine()+"\r\n"); } } finally { //关闭事件组 eventLoopGroup.shutdownGracefully(); } } }
在客户端中读取输入的内容并在一个无限循环中将输入的内容发送至服务端。
在Client中建立对服务端的连接同理也要设置一个初始化器ChatClientInitializer
新建初始化器的类ChatClientInitializer
package com.badao.Chat; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; public class ChatClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter())); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new ChatClientHandler()); } }
使用Netty自带的处理器对编码进行处理并添加一个自定义的处理器ChatClientHandler
新建类ChatClientHandler
package com.badao.Chat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class ChatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); } }
在重写的channelRead0方法中只需要将收到的消息进行输出即可。
现在运行服务端的main方法
为了能运行多个客户端在IDEA中客户端编辑
然后将下面的勾选上
然后首先运行一个客户端
那么在服务端中就会输出上线的客户端以及在线人数
再次运行客户端的main方法,此时服务端会输出两个客户端上线
同时在第二个客户端上线时第一个客户端会收到加入的提示
此时停掉第二个客户端即将第二个客户端下线
服务端会提示下线并更新在线人数
同时在第一个客户端会收到服务端的推送
再运行第二个客户端,并在控制台输入消息,回车发送
此时第一个客户端就会收到第二个客户端发送的消息。
然后第一个客户端再输入一个消息并回车
那么第二个客户端也能收到消息
示例代码下载:
https://download.csdn.net/download/BADAO_LIUMANG_QIZHI/12850228