本章节会介绍EventLoop、Channel、Future、Pipeline、Handdler、ByteBuf等重要的组件。
2.1 netty的优势
- Netty vs NIO
- 需要自己构建协议
- 解决TCP传输问题,如粘包、半包
- epoll空轮巡导致CPU 100%
- 对API进行增强,使之更易用
- Netty vs 其他网络应用框架
- 迭代快
- API更加的简洁
- 文档优秀全面
2.2 第一个NeetyDemo “Hello World”
-
目标
开发一个简单的服务器端和客户端
- 客户端向服务器端发送“hello world”
- 服务器仅接收,不返回
-
引入依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.39.Final</version> </dependency>
-
编码实现
-
服务器端
//声明一个服务器的启动器,组装Netty组件 new ServerBootstrap() //工作组可以添加组线程组及工作线程组 .group(new NioEventLoopGroup(1),new NioEventLoopGroup(10)) //实现的事件,比如BIO、NIO .channel(NioServerSocketChannel.class) //boss负责处理连接,worker(child)负责业务处理 .childHandler( //Channel客户端读写通道,Initializer初始化Handler new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel sc) throws Exception { sc.pipeline().addLast(new StringDecoder()); //解码处理器 sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){ //自定义的Handler //处理读事件 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("服务器读取的内容:{}",msg); } }); } }) .bind("127.0.0.1",8080); //绑定服务器及端口
-
客户端
Channel channel = new Bootstrap() .group(new NioEventLoopGroup(1)) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel sc) throws Exception { sc.pipeline().addLast(new StringEncoder()); } }).connect(new InetSocketAddress("127.0.0.1", 8080)) .sync() //阻塞方法,直到连接建立放开 .channel(); while (true){ channel.writeAndFlush("hello world"); Thread.sleep(1000); }
-
2.3 组件
2.3.1 EventLoop
事件循环对象(EventLoop)
其本质是一个单线程执行器(同时维护一个Selector),里面有run方法处理Channel中源源不断的IO事件。其集成关系也比较复杂:
- 一条线是继承自ScheduledExecutorService,因此包含了线程池中所有的方法
- 另一条线是继承netty自己的OrderedEventExecutor
- 提供了boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此EventLoop
- 提供了parent方法来看看自己属于那个EventLoopGroup
事件循环组(EventLoopGroup)
EventLoopGroup是一组EventLoop,Chanel一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个Channel上的IO事件都由此EventLoop来处理,继承自netty自己的EventExecutorGroup:
- 实现了Iterable接口提供遍历EventLoop的能力
- 另有next方法获取集合中下一个EventLoop
创建一个EventLoop
//创建一个事件循环组,NioEventLoopGroup比较常见,支持IO事件、普通事件、定时任务
EventLoopGroup group = new NioEventLoopGroup(2); //指定线程数
//获取下一个事件,默认轮巡
group.next();
执行普通任务
//执行普通任务submit|execute
group.next().execute(() -> {
log.info("我是普通任务!");
});
执行定时任务
//执行定时任务,0初始即执行,1延迟时间,TimeUnit.SECONDS时间单位
group.next().scheduleAtFixedRate(() -> {
log.info("我是普通任务!");
},0,1, TimeUnit.SECONDS);
执行IO事件
new ServerBootstrap()
.group(new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel sc) throws Exception {
sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.info(buf.toString(Charset.forName("UTF-8")));
}
});
}
})
.bind(8080);
分功细化
//主服务,只处理ServerSocketChannel的连接事件
EventLoopGroup boss = new NioEventLoopGroup(1);
//工作服务,只处理SocketChannel的读、写事件
EventLoopGroup worker = new NioEventLoopGroup(10);
//自定义处理组
EventLoopGroup group = new DefaultEventLoop();
new ServerBootstrap()
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel sc) throws Exception {
sc.pipeline().addLast("worker1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.info(buf.toString(Charset.forName("UTF-8")));
}
}).addLast(group,"worker2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.info(buf.toString(Charset.forName("UTF-8")));
}
});
}
})
.bind(8080);
2.3.2 Channel
主要作用
- close() :可以用来关闭Channel
- closeFuture() :处理channel的关闭
- sync方法作用是同步等待channel关闭
- addListener异步等待channel
- pipeline() :添加处理器
- weite() :将数据写入
- writeAndFlush() :将数据写入并刷出
sync & addListener 使用二选一
ChannelFuture future = new Bootstrap()
.group(new NioEventLoopGroup(1))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel sc) throws Exception {
sc.pipeline().addLast(new StringEncoder());
}
}).connect(new InetSocketAddress("127.0.0.1", 8080));
//使用sync同步阻塞
future.sync(); //阻塞直到连接建立
Channel channel = future.channel();
channel.writeAndFlush("hello world");
//使用addListener异步处理
future.addListener(new ChannelFutureListener() {
//连接建立执行operationComplete方法
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
channel.writeAndFlush("hello world");
}
});
优雅关闭问题
ChannelFuture future = new Bootstrap()
.group(new NioEventLoopGroup(1))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel sc) throws Exception {
sc.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("127.0.0.1", 8080));
Channel channel = future.sync().channel();
new Thread(()->{
Scanner scanner = new Scanner(System.in);
while (true){
String s = scanner.nextLine();
if(s.equals("q")){
channel.close();
break;
}
channel.writeAndFlush(s);
}
},"ceShi").start();
//优雅的关闭
ChannelFuture closeFuture = channel.closeFuture();
//同步方式sync
closeFuture.sync();
log.info("我已经关闭了");
group.shutdownGracefully(); //优雅的关闭EvenntLoop
//异步
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
log.info("我已经关闭了");
group.shutdownGracefully(); //优雅的关闭EvenntLoop
}
});
2.3.3 Future & Promis
在异步处理时经常使用这两个接口,首先说明的时netty中的Future和JDK中的Future同名,但是是两个接口,netty的Future继承自JDK的Future,而Promise又对netty的Future进行了扩展
- jdk Future 只能同步等待任务结束(成功|失败)才能得到结果
- netty Future可以同步等待任务结束得到结果,也可以异步得到结果,但都需要等任务结束
- netty Promise不仅有netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能 | JDK Future | Netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | ||
isCancel | 任务是否取消 | ||
isDone | 任务是否完成,不能区分成功失败 | ||
get | 获取任务结果,阻塞等待 | ||
getNow | 获取任务结果,非阻塞,未产生结果时返回null | ||
await | 等待任务结束,如果任务失败,不抛异常,需要使用isSuccess判断 | ||
sync | 等待任务结束,如果任务失败,抛异常 | ||
isSuccess | 判断任务是否成功 | ||
cause | 获取失败信息,非阻塞,如果没有失败,返回null | ||
addLinstener | 添加回调,异步接收结果 | ||
setSuccess | 设置成功 | ||
setFailure | 设置失败 |
JDK Future
//创建线程池
ExecutorService service = Executors.newFixedThreadPool(2);
//提交任务
Future<Integer> future = service.submit(() -> {
log.info("等待计算结果... ...");
Thread.sleep(1000);
return 50;
});
//通过future获取数据,get会阻塞
log.info("获取计算结果:{}",future.get());
Netty Future
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(() -> {
log.info("等待计算结果... ...");
Thread.sleep(1000);
return 50;
});
//同步获取
log.info("获取计算结果:{}",future.get()); //get会阻塞同sync
log.info("获取计算结果:{}",future.sync().getNow());
//异步获取
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
log.info("获取计算结果:{}",future.getNow());
}
});
Netty Promise
EventLoop eventLoop = new NioEventLoopGroup().next();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(()-> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
promise.setSuccess(50);
}).start();
//同步获取
log.info("获取计算结果:{}",promise.get());
//异步获取
promise.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
log.info("获取计算结果:{}",future.getNow());
}
});
2.3.4 Handler & Pipeline
ChannelHandller用来处理Channel上的各种事件,分为入站、出战两种,所有ChannelHandler被连接成一串,就是pipeline
- 入站处理器通常是ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果
- 出战处理器通常是ChannelOutBoundHandlerAdapter的子类,主要对写回结果进行加工
代码示例:
new ServerBootstrap()
.group(new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
/*pipeline会默认添加一个头(head)和尾(tail)的处理器
整体格式如:head <-> h1 <-> h2 <-> h3 <-> h4 <-> tail
他是一个双向的链表,入站处理器从前向后执行
出站处理器从后向前执行*/
//添加入站处理
pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("1");
super.channelRead(ctx,msg);
}
});
pipeline.addLast("h2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("2");
//必须写入数据才能触发出站处理器
super.channelRead(ctx,msg);
sc.writeAndFlush(ctx.alloc().buffer().writeBytes("server".getBytes()));
}
});
pipeline.addLast("h3",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("3");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h4",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("4");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
日志输出:
##入站正序执行输出
15:48:46.456 [nioEventLoopGroup-2-2] INFO com.fyy.netty.chapter02.c2.PipelineTest - 1
15:48:46.456 [nioEventLoopGroup-2-2] INFO com.fyy.netty.chapter02.c2.PipelineTest - 2
##出站倒序执行输出
15:48:46.457 [nioEventLoopGroup-2-2] INFO com.fyy.netty.chapter02.c2.PipelineTest - 4
15:48:46.457 [nioEventLoopGroup-2-2] INFO com.fyy.netty.chapter02.c2.PipelineTest - 3
使用EmbeddedChannel模拟服务器处理,避免创建服务器及客户端
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
};
//使用EmbeddedChannel模拟服务器处理
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
//实现入站测试
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
//实现出站测试
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
2.3.5 ByteBuf
-
创建及自动扩容
- 测试代码
//创建ByteBuf ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); //默认大小为256 //自动扩容测试 System.out.println(buf); //打印初始对象 StringBuilder sb = new StringBuilder(); for (int i = 0; i < 300; i++) { sb.append("a"); } buf.writeBytes(sb.toString().getBytes()); System.out.println(buf); //打印填入数据后对象
- 日志
PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256) PooledUnsafeDirectByteBuf(ridx: 0, widx: 300, cap: 512)
-
内存模式(直接内存&堆内存)
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起使用
- 直接内存对GC压力小,因为这部分内存不受JVM垃圾回收的管理,但也要注意及时主动释放
//创建池化基于堆的ByteBuf ByteBuf buf1 = ByteBufAllocator.DEFAULT.heapBuffer(10); //创建池化基于直接内存的ByteBuf ByteBuf buf2 = ByteBufAllocator.DEFAULT.directBuffer(10);
-
池化 vs 非池化
池化的最大意义在于可以重用ByteBuf,其优点:
- 没有池化,则每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算使用堆内存,也会增加GC的压力
- 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率
- 高并发时,池化功能更节约内存,减少内存溢出的问题
设置池化功能开启,使用如下环境变量
-Dio.netty.allocator.type=[unpooled|pooled]
-
组成
ByteBuf由四部分组成(容量、最大容量、读指针、写指针)
最开始读写指针都在0的位置
-
写入
方法名 含义 备注 writeBoolearn() 写入boolearn值 用一字节01|00代表true|false writeByte() 写入byte值 writeShort() 写入short值 writeInt() 写入int值 Big Endian,即0x250,写入后00 00 02 50 writeLong() 写入long值 Little Endian,即0x250,写入后50 02 00 00 writeChar() 写入char值 writeFloat() 写入float值 writeDouble() 写入double值 writeBytes(ByteBuf src) 写入netty的ByteBuf值 writeBytes(byte[] src) 写入byte[] -
读取
读取也存在类似于写入的一系列方法,不在一一展示,主要说明如何读取及重复读取
-
读取
byte b = buf.readByte(); //每次读取一个字节 boolean b1 = buf.readBoolean(); //读取Boolean值 ...
-
重复读取
buf.markReaderIndex(); //添加标记 buf.readByte(); buf.resetReaderIndex(); //重置到标记点
-
使用get()方法读取,不会修改读指针的位置
buf.getByte(1); //获取下标为1的字节 buf.getBoolean(1); //获取下标为1的字节并封装成Boolean ...
-
-
内存释放(retain & release)
由于Netty中有堆外内存(直接内存)的ByteBuf实现,堆外内存最好是手动释放,而不是等GC垃圾回收。
-
UnpooledHeapByteBuf使用的是JVM内存,只需要等GC回收即可
-
UnpooledDirectByteBuf使用的就是直接内存,需要特殊方法来回收内存
-
PooledByteBuf和他的子类使用了池化机制,需要更为复杂的规则来回收内存
//回收内存的源码实现,关注下面方法的不同实现 protected abstract void deallocate()
Netty这里采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口
-
每个ByteBuf对象的初始计数为1
-
调用release方法计数减1,如果计数为0,ByteBuf内存被回收
-
调用retain方法计数加1,表示调用者没有用完之前,其他handler即使调用了release也不会造成回收
-
当计数为0,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用
buf.retain(); //计数加一 buf.release(); //计数减一
-
-
零拷贝
-
slice(切片)
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10); buf.writeBytes(new byte[]{'a','b','c','d','e','f','j','h','i'}); ByteBufferUtil.log(buf); //使用slice实现零拷贝切片,且切片后容量不可变 ByteBuf f1 = buf.slice(0, 5); ByteBuf f2 = buf.slice(5, 5); ByteBufferUtil.log(f1); ByteBufferUtil.log(f2);
-
CompositeByteBuf(组合)
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(10); buf.writeBytes(new byte[]{'1','2','3','4','5'}); ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(10); buf.writeBytes(new byte[]{'6','7','8','9','0'}); //使用CompositeByteBuf组合多个小的ByteBuf,同样基于零拷贝 CompositeByteBuf bytebuf = ByteBufAllocator.DEFAULT.compositeBuffer(); bytebuf.addComponent(true,buf1); bytebuf.addComponent(true,buf2); //或者使用addComponents bytebuf.addComponents(true,buf1,buf2);
-
2.3.6 ByteBuf的优势
- 池化:可以重用池中ByteBuf实例,节省内存,减少内存溢出的可能
- 读写指针分离,不需要切换读写模式
- 可以自动扩容
- 支持链式调用,使用更流畅
- 很多地方体现零拷贝,例如slice\duplicate\CompositeByteBuf