封装的消息体
package com.example.netty.im.common.message;
import lombok.Data;
/**
* @Class MsgBody
* @Description 消息体
* @Author
* @Date 2021/7/14
**/
@Data
public class MsgBody {
//发送人名称
private String sendUserName;
private String msg;
}
NettyServerHandler
package com.example.netty.im.common.handler;
import com.alibaba.fastjson.JSONObject;
import com.example.netty.im.common.message.MsgBody;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.SimpleChannelInboundHandler;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;
/**
* @Class NettyServerHandler
* 问题1:服务端为何没有转发到目的客户端,而是只返回给发送消息的客户端,
* 原因是服务端回复消息时,将消息转发给了自己,没有转发给其他客户端
* @Author
* @Date 2021/7/14
**/
public class NettyServerHandler extends SimpleChannelInboundHandler {
// 保存id和容器的关系
private static Map<String, ChannelHandlerContext> map = new HashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
Channel channel = channelHandlerContext.channel();
ChannelId channelId = channel.id();
map.put(channelId.toString(), channelHandlerContext);
ByteBuf byteBuf = (ByteBuf) o;
String rev = getMessage(byteBuf);
MsgBody msgBody = JSONObject.parseObject(rev, MsgBody.class);
String format = String.format("服务器接收到客户端消息,发送人:%s, 发送消息:%s .", msgBody.getSendUserName(), msgBody.getMsg());
System.out.println(format);
map.forEach((k, v) -> {
try {
// 出现问题1的原因:遇到不是自身的客户端过滤掉了,但实际上返回消息是需要自身的
if (channelId.toString().equals(k)) {
return;
}
MsgBody sendMsgBody = new MsgBody();
sendMsgBody.setSendUserName(msgBody.getSendUserName());
sendMsgBody.setMsg(msgBody.getMsg());
v.writeAndFlush(getSendByteBuf(JSONObject.toJSONString(sendMsgBody)));
System.out.println("服务端回复消息: " + JSONObject.toJSONString(sendMsgBody));
} catch (Exception e) {
e.printStackTrace();
}
});
}
/**
* 从ByteBuf 中获取信息,使用UTF-8编码返回
*
* @param buf
* @return
*/
private String getMessage(ByteBuf buf) {
byte[] con = new byte[buf.readableBytes()];
buf.readBytes(con);
try {
return new String(con, "UTF8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return null;
}
}
private ByteBuf getSendByteBuf(String message) {
byte[] req = message.getBytes();
ByteBuf pingMessage = Unpooled.buffer();
pingMessage.writeBytes(req);
return pingMessage;
}
}
NettyClientHandler
package com.example.netty.im.common.handler;
//import cn.hutool.json.JSONObject;
import com.alibaba.fastjson.JSONObject;
import com.example.netty.im.common.message.MsgBody;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.io.UnsupportedEncodingException;
/**
* @Class NettyClientHandler
* @Description TODO
* @Author
* @Date 2021/7/16
**/
public class NettyClientHandler extends SimpleChannelInboundHandler {
private ByteBuf firstMessage;
private ChannelHandlerContext ctx;
private String userName;
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public void sendMsg(String str) {
byte[] data = str.getBytes();
firstMessage = Unpooled.buffer();
firstMessage.writeBytes(data);
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
MsgBody msgBody = new MsgBody();
msgBody.setSendUserName(userName);
msgBody.setMsg("进入聊天室");
byte[] data = JSONObject.toJSONString(msgBody).getBytes();
firstMessage = Unpooled.buffer();
firstMessage.writeBytes(data);
ctx.writeAndFlush(firstMessage);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
ByteBuf buf = (ByteBuf) o;
String rev = getMessage(buf);
MsgBody msgBody = JSONObject.parseObject(rev, MsgBody.class);
String format = String.format("客户端收到服务端的消息,发送人:%s , 发送消息:%s .", msgBody.getSendUserName(), msgBody.getMsg());
System.out.println(format);
}
private String getMessage(ByteBuf buf) {
byte[] conn = new byte[buf.readableBytes()];
buf.readBytes(conn);
try {
return new String(conn, "UTF8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return null;
}
}
}
NettyServer
package com.example.netty.im.server;
import com.example.netty.im.common.handler.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @Class NettyServer
* @Description netty 的服务端
* @Author
* @Date 2021/7/14
**/
public class NettyServer {
private int port;
public NettyServer(int port) {
this.port = port;
bind();
}
private void bind() {
EventLoopGroup parentEventLoopGroup = new NioEventLoopGroup();
EventLoopGroup childEventLoopGroup = new NioEventLoopGroup();
try {
// 服务端引导类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(parentEventLoopGroup, childEventLoopGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024) // 连接数
.option(ChannelOption.TCP_NODELAY, true) // 不延迟,消息立即发送
.childOption(ChannelOption.SO_KEEPALIVE, true) // 长连接
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
NettyServerHandler serverHandler = new NettyServerHandler();
// 添加NettyServerHandler,用来处理
channelPipeline.addLast(serverHandler);
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
if (channelFuture.isSuccess()) {
System.out.println(" 启动Netty 服务成功,端口号:" + this.port);
}
// 关闭连接
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
System.out.println("启动Netty服务异常,异常信息: " + e.getMessage());
e.printStackTrace();
} finally {
if (parentEventLoopGroup != null) {
parentEventLoopGroup.shutdownGracefully();
}
if (childEventLoopGroup != null) {
childEventLoopGroup.shutdownGracefully();
}
}
}
public static void main(String[] args) {
final int port = 10086;
new NettyServer(port);
}
}
NettyClient
package com.example.netty.im.client;
import com.alibaba.fastjson.JSONObject;
import com.example.netty.im.common.handler.NettyClientHandler;
import com.example.netty.im.common.message.MsgBody;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Scanner;
/**
* @Class NettyClient
* @Description TODO
* @Author
* @Date 2021/7/16
**/
public class NettyClient {
private NettyClientHandler nettyClientHandler;
private int port;
private String host;
private String sendUserName;
public NettyClient(int port, String host, String sendUserName) throws InterruptedException {
this.port = port;
this.host = host;
this.sendUserName = sendUserName;
start();
}
private void start() throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.group(eventLoopGroup)
.remoteAddress(host, port)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel)
throws Exception {
nettyClientHandler = new NettyClientHandler();
nettyClientHandler.setUserName(sendUserName);
socketChannel.pipeline().addLast(nettyClientHandler);
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
if (channelFuture.isSuccess()) {
new Thread(new Runnable() {
@Override
public void run() {
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
MsgBody msgBody = new MsgBody();
msgBody.setSendUserName(sendUserName);
msgBody.setMsg(sc.next());
nettyClientHandler.sendMsg(JSONObject.toJSONString(msgBody));
}
}
}).start();
System.err.println(sendUserName + "连接服务器成功");
}
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
三个模拟客户端
package com.example.netty.im.client;
/**
* @Class MockClient01
* @Description TODO
* @Author
* @Date 2021/7/16
**/
public class MockClient01 {
public static void main(String[] args) {
try {
new NettyClient(10086, "localhost", "Tom");
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
package com.example.netty.im.client;
/**
* @Class MockClient02
* @Description TODO
* @Author
* @Date 2021/7/16
**/
public class MockClient02 {
public static void main(String[] args) {
try {
new NettyClient(10086, "localhost", "Jerry");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.example.netty.im.client;
/**
* @Class MockClient03
* @Description TODO
* @Author
* @Date 2021/7/16
**/
public class MockClient03 {
public static void main(String[] args) {
try {
new NettyClient(10086, "localhost", "Jack");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
启动服务端和3个模拟客户端
参考链接:https://www.cnblogs.com/yeyongjian/p/12824955.html