一. TCP粘包问题
实际发送的消息, 可能会被TCP拆分成很多数据包发送, 也可能把很多消息组合成一个数据包发送
-
粘包拆包发生的原因
(1) 应用程序一次写的字节大小超过socket发送缓冲区大小
(2) 数据长度超多MSS大小进行分片
> MSS : Maximum Segment Size 最大报文段长度, 是TCP数据包数据段的最大长度
MSS值等于收发双方提供的MSS值的最小值, 等于TCP报文长度-TCP首部长度
(3) 以太网帧的payload大于MTU进行IP分片
MTU : 硬件线路上可以传输的最大字节数
二. Netty的解决方法
LineBasedFrameDecoder:
依次遍历ByteBuf中的可读字节, 发现有"\n"或者"\r\n" , 就把可读位置到该位置的字节看做一条消息.他是用换行符作为分隔符的解码器. 支持配置单行消息最大长度, 若达到最大长度还没出现换行符, 会抛出异常, 并忽略之前的异常数据-
StringDecoder: 把接收到的数据流按照编码格式转换成字符串
public class TimeServer4 {
public static void main(String[] args) {
new TimeServer4().bind(8112);
} public void bind(int port){
// 配置服务端线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 接受线程组
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理请求的线程组
try {
ServerBootstrap sbs = new ServerBootstrap();
sbs.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024) // 链接的缓冲队列
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 若在1024个字节内没有发现\n或\r\n报错,然后忽略之前的数据流
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder(Charset.forName("utf-8")));
ch.pipeline().addLast(new TimeServerHandler());
}
}); //绑定端口,同步等待成功
ChannelFuture f = sbs.bind(port).sync();
f.channel().closeFuture().sync(); } catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
} private class TimeServerHandler extends ChannelHandlerAdapter{
private int counter; @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("链接开启");
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
/* 不加StringDecoder,需要手动转化ByteBuf写到字节数组
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
String body = new String(bytes); */
String body = (String)msg;
System.out.println("recieve order:"+body+"; counter is:"+ ++counter);
if("query time".equalsIgnoreCase(body)){
//客户端也注册了LineNasedFrameDecoder,所以服务端发送的消息也要以"\n"或"\r\n"结尾
String nowTime = new SimpleDateFormat("yyyy-MM-dd").format(new Date()) + System.getProperty("line.separator");
ByteBuf byteBuf = Unpooled.copiedBuffer(nowTime.getBytes());
ctx.writeAndFlush(byteBuf);
}else{
ByteBuf byteBuf = Unpooled.copiedBuffer("bad order".getBytes());
ctx.writeAndFlush(byteBuf);
}
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}public class TimeClient { public static void main(String[] args) {
new TimeClient().connect(8112,"localhost");
} public void connect(int port,String host){
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bs = new Bootstrap();
bs.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeClientHandler());
}
});
//发起异步操作链接
ChannelFuture f = bs.connect(host,port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
} private class TimeClientHandler extends ChannelHandlerAdapter{
private int counter;
// 发出的请求报文必须带有"\n"或"\r\n",否则服务端的LineBasedFrameDecoder无法解析
private byte[] req = ("query time"+ System.getProperty("line.separator")).getBytes(); @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("Now : "+ body + "the counter is "+ ++counter);
} @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf msg = null;
for (int i = 0; i < 100; i++) {
msg = Unpooled.buffer(req.length); // 创建指定长度的buf
msg.writeBytes(req);
ctx.writeAndFlush(msg);
}
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}