1. Netty解决Tcp粘包拆包

一. TCP粘包问题

  1. 实际发送的消息, 可能会被TCP拆分成很多数据包发送, 也可能把很多消息组合成一个数据包发送

  2. 粘包拆包发生的原因

    (1) 应用程序一次写的字节大小超过socket发送缓冲区大小

    (2) 数据长度超多MSS大小进行分片

 > MSS :  Maximum Segment Size 最大报文段长度, 是TCP数据包数据段的最大长度
MSS值等于收发双方提供的MSS值的最小值, 等于TCP报文长度-TCP首部长度 (3) 以太网帧的payload大于MTU进行IP分片
MTU : 硬件线路上可以传输的最大字节数

二. Netty的解决方法

  1. LineBasedFrameDecoder:

    依次遍历ByteBuf中的可读字节, 发现有"\n"或者"\r\n" , 就把可读位置到该位置的字节看做一条消息.他是用换行符作为分隔符的解码器. 支持配置单行消息最大长度, 若达到最大长度还没出现换行符, 会抛出异常, 并忽略之前的异常数据

  2. StringDecoder: 把接收到的数据流按照编码格式转换成字符串

    public class TimeServer4 {
    public static void main(String[] args) {
    new TimeServer4().bind(8112);
    } public void bind(int port){
    // 配置服务端线程组
    EventLoopGroup bossGroup = new NioEventLoopGroup(); // 接受线程组
    EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理请求的线程组
    try {
    ServerBootstrap sbs = new ServerBootstrap();
    sbs.group(bossGroup,workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG,1024) // 链接的缓冲队列
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    // 若在1024个字节内没有发现\n或\r\n报错,然后忽略之前的数据流
    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
    ch.pipeline().addLast(new StringDecoder(Charset.forName("utf-8")));
    ch.pipeline().addLast(new TimeServerHandler());
    }
    }); //绑定端口,同步等待成功
    ChannelFuture f = sbs.bind(port).sync();
    f.channel().closeFuture().sync(); } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }
    } private class TimeServerHandler extends ChannelHandlerAdapter{
    private int counter; @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("链接开启");
    } @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws
    /* 不加StringDecoder,需要手动转化ByteBuf写到字节数组
    ByteBuf buf = (ByteBuf) msg;
    byte[] bytes = new byte[buf.readableBytes()];
    buf.readBytes(bytes);
    String body = new String(bytes); */
    String body = (String)msg;
    System.out.println("recieve order:"+body+"; counter is:"+ ++counter);
    if("query time".equalsIgnoreCase(body)){
    //客户端也注册了LineNasedFrameDecoder,所以服务端发送的消息也要以"\n"或"\r\n"结尾
    String nowTime = new SimpleDateFormat("yyyy-MM-dd").format(new Date()) + System.getProperty("line.separator");
    ByteBuf byteBuf = Unpooled.copiedBuffer(nowTime.getBytes());
    ctx.writeAndFlush(byteBuf);
    }else{
    ByteBuf byteBuf = Unpooled.copiedBuffer("bad order".getBytes());
    ctx.writeAndFlush(byteBuf);
    }
    } @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close();
    }
    }
    }
    public class TimeClient {
    
    	public static void main(String[] args) {
    new TimeClient().connect(8112,"localhost");
    } public void connect(int port,String host){
    EventLoopGroup group = new NioEventLoopGroup();
    try{
    Bootstrap bs = new Bootstrap();
    bs.group(group).channel(NioSocketChannel.class)
    .option(ChannelOption.TCP_NODELAY,true)
    .handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
    ch.pipeline().addLast(new StringDecoder());
    ch.pipeline().addLast(new TimeClientHandler());
    }
    });
    //发起异步操作链接
    ChannelFuture f = bs.connect(host,port).sync();
    f.channel().closeFuture().sync();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    group.shutdownGracefully();
    }
    } private class TimeClientHandler extends ChannelHandlerAdapter{
    private int counter;
    // 发出的请求报文必须带有"\n"或"\r\n",否则服务端的LineBasedFrameDecoder无法解析
    private byte[] req = ("query time"+ System.getProperty("line.separator")).getBytes(); @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    String body = (String) msg;
    System.out.println("Now : "+ body + "the counter is "+ ++counter);
    } @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ByteBuf msg = null;
    for (int i = 0; i < 100; i++) {
    msg = Unpooled.buffer(req.length); // 创建指定长度的buf
    msg.writeBytes(req);
    ctx.writeAndFlush(msg);
    }
    } @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close();
    }
    }
    }
上一篇:luogu1503


下一篇:PyQt写的浏览单web页面的browser - 开源中国社区