Netty由浅入深的学习指南(入门)

本章节会介绍EventLoop、Channel、Future、Pipeline、Handdler、ByteBuf等重要的组件。

2.1 netty的优势

  • Netty vs NIO
    • 需要自己构建协议
    • 解决TCP传输问题,如粘包、半包
    • epoll空轮巡导致CPU 100%
    • 对API进行增强,使之更易用
  • Netty vs 其他网络应用框架
    • 迭代快
    • API更加的简洁
    • 文档优秀全面

2.2 第一个NeetyDemo “Hello World”

  • 目标

    开发一个简单的服务器端和客户端

    • 客户端向服务器端发送“hello world”
    • 服务器仅接收,不返回
  • 引入依赖

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.39.Final</version>
    </dependency>
    
  • 编码实现

    • 服务器端

      //声明一个服务器的启动器,组装Netty组件
      new ServerBootstrap()
          //工作组可以添加组线程组及工作线程组
          .group(new NioEventLoopGroup(1),new NioEventLoopGroup(10))
          //实现的事件,比如BIO、NIO
          .channel(NioServerSocketChannel.class)
          //boss负责处理连接,worker(child)负责业务处理
          .childHandler(
          //Channel客户端读写通道,Initializer初始化Handler
          new ChannelInitializer<NioSocketChannel>() {
              @Override
              protected void initChannel(NioSocketChannel sc) throws Exception {
                  sc.pipeline().addLast(new StringDecoder()); //解码处理器
                  sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){ //自定义的Handler
                      //处理读事件
                      @Override
                      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                          log.info("服务器读取的内容:{}",msg);
                      }
                  });
              }
          })
          .bind("127.0.0.1",8080); //绑定服务器及端口
      
    • 客户端

      Channel channel = new Bootstrap()
          .group(new NioEventLoopGroup(1))
          .channel(NioSocketChannel.class)
          .handler(new ChannelInitializer<NioSocketChannel>() {
              @Override
              protected void initChannel(NioSocketChannel sc) throws Exception {
                  sc.pipeline().addLast(new StringEncoder());
              }
          }).connect(new InetSocketAddress("127.0.0.1", 8080))
          .sync() //阻塞方法,直到连接建立放开
          .channel();
      while (true){
          channel.writeAndFlush("hello world");
          Thread.sleep(1000);
      }
      

2.3 组件

2.3.1 EventLoop

事件循环对象(EventLoop)

其本质是一个单线程执行器(同时维护一个Selector),里面有run方法处理Channel中源源不断的IO事件。其集成关系也比较复杂:

  • 一条线是继承自ScheduledExecutorService,因此包含了线程池中所有的方法
  • 另一条线是继承netty自己的OrderedEventExecutor
    • 提供了boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此EventLoop
    • 提供了parent方法来看看自己属于那个EventLoopGroup

事件循环组(EventLoopGroup)

EventLoopGroup是一组EventLoop,Chanel一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个Channel上的IO事件都由此EventLoop来处理,继承自netty自己的EventExecutorGroup:

  • 实现了Iterable接口提供遍历EventLoop的能力
  • 另有next方法获取集合中下一个EventLoop

创建一个EventLoop

//创建一个事件循环组,NioEventLoopGroup比较常见,支持IO事件、普通事件、定时任务
EventLoopGroup group = new NioEventLoopGroup(2); //指定线程数
//获取下一个事件,默认轮巡
group.next();

执行普通任务

//执行普通任务submit|execute
group.next().execute(() -> {
    log.info("我是普通任务!");
});

执行定时任务

//执行定时任务,0初始即执行,1延迟时间,TimeUnit.SECONDS时间单位
group.next().scheduleAtFixedRate(() -> {
    log.info("我是普通任务!");
},0,1, TimeUnit.SECONDS);

执行IO事件

new ServerBootstrap()
    .group(new NioEventLoopGroup(2))
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel sc) throws Exception {
            sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    log.info(buf.toString(Charset.forName("UTF-8")));
                }
            });
        }
    })
    .bind(8080);

分功细化

//主服务,只处理ServerSocketChannel的连接事件
EventLoopGroup boss = new NioEventLoopGroup(1);
//工作服务,只处理SocketChannel的读、写事件
EventLoopGroup worker = new NioEventLoopGroup(10);
//自定义处理组
EventLoopGroup group = new DefaultEventLoop();
new ServerBootstrap()
    .group(boss,worker)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel sc) throws Exception {
            sc.pipeline().addLast("worker1",new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    log.info(buf.toString(Charset.forName("UTF-8")));
                }
            }).addLast(group,"worker2",new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    log.info(buf.toString(Charset.forName("UTF-8")));
                }
            });
        }
    })
    .bind(8080);

2.3.2 Channel

主要作用

  • close() :可以用来关闭Channel
  • closeFuture() :处理channel的关闭
    • sync方法作用是同步等待channel关闭
    • addListener异步等待channel
  • pipeline() :添加处理器
  • weite() :将数据写入
  • writeAndFlush() :将数据写入并刷出

sync & addListener 使用二选一

ChannelFuture future = new Bootstrap()
    .group(new NioEventLoopGroup(1))
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel sc) throws Exception {
            sc.pipeline().addLast(new StringEncoder());
        }
    }).connect(new InetSocketAddress("127.0.0.1", 8080));

//使用sync同步阻塞
future.sync(); //阻塞直到连接建立
Channel channel = future.channel();
channel.writeAndFlush("hello world");

//使用addListener异步处理
future.addListener(new ChannelFutureListener() {
    //连接建立执行operationComplete方法
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        Channel channel = future.channel();
        channel.writeAndFlush("hello world");
    }
});

优雅关闭问题

ChannelFuture future = new Bootstrap()
    .group(new NioEventLoopGroup(1))
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel sc) throws Exception {
            sc.pipeline().addLast(new StringEncoder());
        }
    })
    .connect(new InetSocketAddress("127.0.0.1", 8080));
Channel channel = future.sync().channel();

new Thread(()->{
    Scanner scanner = new Scanner(System.in);
    while (true){
        String s = scanner.nextLine();
        if(s.equals("q")){
            channel.close();
            break;
        }
        channel.writeAndFlush(s);
    }
},"ceShi").start();

//优雅的关闭
ChannelFuture closeFuture = channel.closeFuture();

//同步方式sync
closeFuture.sync();
log.info("我已经关闭了");
group.shutdownGracefully(); //优雅的关闭EvenntLoop

//异步
closeFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        log.info("我已经关闭了");
        group.shutdownGracefully(); //优雅的关闭EvenntLoop
    }
});

2.3.3 Future & Promis

在异步处理时经常使用这两个接口,首先说明的时netty中的Future和JDK中的Future同名,但是是两个接口,netty的Future继承自JDK的Future,而Promise又对netty的Future进行了扩展

  • jdk Future 只能同步等待任务结束(成功|失败)才能得到结果
  • netty Future可以同步等待任务结束得到结果,也可以异步得到结果,但都需要等任务结束
  • netty Promise不仅有netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能 JDK Future Netty Future Promise
cancel 取消任务
isCancel 任务是否取消
isDone 任务是否完成,不能区分成功失败
get 获取任务结果,阻塞等待
getNow 获取任务结果,非阻塞,未产生结果时返回null
await 等待任务结束,如果任务失败,不抛异常,需要使用isSuccess判断
sync 等待任务结束,如果任务失败,抛异常
isSuccess 判断任务是否成功
cause 获取失败信息,非阻塞,如果没有失败,返回null
addLinstener 添加回调,异步接收结果
setSuccess 设置成功
setFailure 设置失败

JDK Future

//创建线程池
ExecutorService service = Executors.newFixedThreadPool(2);
//提交任务
Future<Integer> future = service.submit(() -> {
    log.info("等待计算结果... ...");
    Thread.sleep(1000);
    return 50;
});

//通过future获取数据,get会阻塞
log.info("获取计算结果:{}",future.get());

Netty Future

NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();

Future<Integer> future = eventLoop.submit(() -> {
    log.info("等待计算结果... ...");
    Thread.sleep(1000);
    return 50;
});

//同步获取
log.info("获取计算结果:{}",future.get()); //get会阻塞同sync
log.info("获取计算结果:{}",future.sync().getNow());
//异步获取
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
    @Override
    public void operationComplete(Future<? super Integer> future) throws Exception {
        log.info("获取计算结果:{}",future.getNow());
    }
});

Netty Promise

EventLoop eventLoop = new NioEventLoopGroup().next();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

new Thread(()-> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    promise.setSuccess(50);
}).start();

//同步获取
log.info("获取计算结果:{}",promise.get());
//异步获取
promise.addListener(new GenericFutureListener<Future<? super Integer>>() {
    @Override
    public void operationComplete(Future<? super Integer> future) throws Exception {
        log.info("获取计算结果:{}",future.getNow());
    }
});

2.3.4 Handler & Pipeline

ChannelHandller用来处理Channel上的各种事件,分为入站、出战两种,所有ChannelHandler被连接成一串,就是pipeline

  • 入站处理器通常是ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果
  • 出战处理器通常是ChannelOutBoundHandlerAdapter的子类,主要对写回结果进行加工

代码示例:

new ServerBootstrap()
    .group(new NioEventLoopGroup(2))
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel sc) throws Exception {
            ChannelPipeline pipeline = sc.pipeline();
            /*pipeline会默认添加一个头(head)和尾(tail)的处理器
                          整体格式如:head <-> h1 <-> h2 <-> h3 <-> h4 <-> tail
                          他是一个双向的链表,入站处理器从前向后执行
                          出站处理器从后向前执行*/
            //添加入站处理
            pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    log.info("1");
                    super.channelRead(ctx,msg);
                }
            });
            pipeline.addLast("h2",new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    log.info("2");
                    //必须写入数据才能触发出站处理器
                    super.channelRead(ctx,msg);
                    sc.writeAndFlush(ctx.alloc().buffer().writeBytes("server".getBytes()));
                }
            });
            pipeline.addLast("h3",new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                    log.info("3");
                    super.write(ctx, msg, promise);
                }
            });
            pipeline.addLast("h4",new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                    log.info("4");
                    super.write(ctx, msg, promise);
                }
            });
        }
    })
    .bind(8080);
}

日志输出:

##入站正序执行输出
15:48:46.456 [nioEventLoopGroup-2-2] INFO com.fyy.netty.chapter02.c2.PipelineTest - 1
15:48:46.456 [nioEventLoopGroup-2-2] INFO com.fyy.netty.chapter02.c2.PipelineTest - 2
##出站倒序执行输出
15:48:46.457 [nioEventLoopGroup-2-2] INFO com.fyy.netty.chapter02.c2.PipelineTest - 4
15:48:46.457 [nioEventLoopGroup-2-2] INFO com.fyy.netty.chapter02.c2.PipelineTest - 3

使用EmbeddedChannel模拟服务器处理,避免创建服务器及客户端

ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.debug("1");
        super.channelRead(ctx, msg);
    }
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter(){
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.debug("2");
        super.channelRead(ctx, msg);
    }
};

ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter(){
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        log.debug("3");
        super.write(ctx, msg, promise);
    }
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter(){
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        log.debug("4");
        super.write(ctx, msg, promise);
    }
};

//使用EmbeddedChannel模拟服务器处理
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
//实现入站测试
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
//实现出站测试
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));

2.3.5 ByteBuf

  • 创建及自动扩容

    • 测试代码
    //创建ByteBuf
    ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); //默认大小为256
    //自动扩容测试
    System.out.println(buf); //打印初始对象
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < 300; i++) {
        sb.append("a");
    }
    buf.writeBytes(sb.toString().getBytes());
    System.out.println(buf); //打印填入数据后对象
    
    • 日志
    PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)
    PooledUnsafeDirectByteBuf(ridx: 0, widx: 300, cap: 512)
    
  • 内存模式(直接内存&堆内存)

    • 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起使用
    • 直接内存对GC压力小,因为这部分内存不受JVM垃圾回收的管理,但也要注意及时主动释放
    //创建池化基于堆的ByteBuf
    ByteBuf buf1 = ByteBufAllocator.DEFAULT.heapBuffer(10);
    //创建池化基于直接内存的ByteBuf
    ByteBuf buf2 = ByteBufAllocator.DEFAULT.directBuffer(10);
    
  • 池化 vs 非池化

    池化的最大意义在于可以重用ByteBuf,其优点:

    • 没有池化,则每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算使用堆内存,也会增加GC的压力
    • 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率
    • 高并发时,池化功能更节约内存,减少内存溢出的问题

    设置池化功能开启,使用如下环境变量

    -Dio.netty.allocator.type=[unpooled|pooled]
    
  • 组成

    ByteBuf由四部分组成(容量、最大容量、读指针、写指针)

Netty由浅入深的学习指南(入门)

最开始读写指针都在0的位置

  • 写入

    方法名 含义 备注
    writeBoolearn() 写入boolearn值 用一字节01|00代表true|false
    writeByte() 写入byte值
    writeShort() 写入short值
    writeInt() 写入int值 Big Endian,即0x250,写入后00 00 02 50
    writeLong() 写入long值 Little Endian,即0x250,写入后50 02 00 00
    writeChar() 写入char值
    writeFloat() 写入float值
    writeDouble() 写入double值
    writeBytes(ByteBuf src) 写入netty的ByteBuf值
    writeBytes(byte[] src) 写入byte[]
  • 读取

    读取也存在类似于写入的一系列方法,不在一一展示,主要说明如何读取及重复读取

    • 读取

      byte b = buf.readByte(); //每次读取一个字节
      boolean b1 = buf.readBoolean(); //读取Boolean值 
      ...
      
    • 重复读取

      buf.markReaderIndex(); //添加标记
      buf.readByte();
      buf.resetReaderIndex(); //重置到标记点
      
    • 使用get()方法读取,不会修改读指针的位置

      buf.getByte(1); //获取下标为1的字节
      buf.getBoolean(1); //获取下标为1的字节并封装成Boolean
      ...
      
  • 内存释放(retain & release)

    由于Netty中有堆外内存(直接内存)的ByteBuf实现,堆外内存最好是手动释放,而不是等GC垃圾回收。

    • UnpooledHeapByteBuf使用的是JVM内存,只需要等GC回收即可

    • UnpooledDirectByteBuf使用的就是直接内存,需要特殊方法来回收内存

    • PooledByteBuf和他的子类使用了池化机制,需要更为复杂的规则来回收内存

      //回收内存的源码实现,关注下面方法的不同实现
      protected abstract void deallocate()
      

    Netty这里采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口

    • 每个ByteBuf对象的初始计数为1

    • 调用release方法计数减1,如果计数为0,ByteBuf内存被回收

    • 调用retain方法计数加1,表示调用者没有用完之前,其他handler即使调用了release也不会造成回收

    • 当计数为0,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用

      buf.retain(); //计数加一
      buf.release(); //计数减一
      
  • 零拷贝

    • slice(切片)

      ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
      buf.writeBytes(new byte[]{'a','b','c','d','e','f','j','h','i'});
      ByteBufferUtil.log(buf);
      
      //使用slice实现零拷贝切片,且切片后容量不可变
      ByteBuf f1 = buf.slice(0, 5);
      ByteBuf f2 = buf.slice(5, 5);
      ByteBufferUtil.log(f1);
      ByteBufferUtil.log(f2);
      
    • CompositeByteBuf(组合)

      ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(10);
      buf.writeBytes(new byte[]{'1','2','3','4','5'});
      ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(10);
      buf.writeBytes(new byte[]{'6','7','8','9','0'});
      
      //使用CompositeByteBuf组合多个小的ByteBuf,同样基于零拷贝
      CompositeByteBuf bytebuf = ByteBufAllocator.DEFAULT.compositeBuffer();
      bytebuf.addComponent(true,buf1);
      bytebuf.addComponent(true,buf2);
      //或者使用addComponents
      bytebuf.addComponents(true,buf1,buf2);
      

2.3.6 ByteBuf的优势

  • 池化:可以重用池中ByteBuf实例,节省内存,减少内存溢出的可能
  • 读写指针分离,不需要切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如slice\duplicate\CompositeByteBuf
上一篇:Netty编解码器源码分析(上)(详细分析在注释中说明)


下一篇:Netty ByteBuf