EventLoop
事件循环对象&事件循环组
EventLoop 本质上是一个单线程执行器(同时维护了一个selector),里面的run方法处理channel 上源源不断的IO事件,也就是集成NIO中负责分配任务和处理accept请求的boss线程
它的继承关系比较复杂:
- 一条线是继承自j.u.c.ScheduledExecutorService 可以用于执行定时任务,同时也包含了线程池中的所有方法
- 另一条线是继承自netty自己的OrderedEventExecutor,有序的执行任务
- 提供了boolean isEventLoop(Thread thread)判断一个线程是否属于此EventLoop
- 提供了parent 方法看看自己属于哪个EventLoopGroup
事件循环组:
EventLoopGroup 就是一组EventLoop,channel 一般会调用EventLoopGroup 的register 方法来绑定其中的一个EventLoop,后续这个Channel 上的IO事件都由此EventLoop 来处理(保证了IO事件处理时的线程安全)
处理普通事件
执行普通任务的意义在于:
- 异步处理
- 事件分发的时候可以通过这种方式,将接下来一段代码的执行从一个线程传递给另一个线程
@Slf4j
public class TestEventLoop {
public static void main(String[] args) {
/**
* 1、创建事件循环组
* NioEventLoopGroup 可以用于处理IO事件,定时事件,普通任务
* DefaultEventLoopGroup 只能处理定时事件和普通任务,并不能处理IO事件
*
* 一个事件循环是一个线程来维护的
* 这里传入的参数是创建线程的个数,如果没有传入参数则线程默认数为1
*/
EventLoopGroup group = new NioEventLoopGroup(2);
/*2、group.next
* 循环获取下一个事件循环对象,这样就保证负载均衡
* 依次将不同的channel 注册到不同eventLoop 上
* 就类似于NIO对于两个worker 的处理
* */
group.next().submit(() -> {
/**
* 将事件提交给某个事件循环对象去处理(交给一个线程去处理)
* 实现一个异步处理
*/
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("ok");
});
log.debug("main");
}
}
- group.next
- 获取事件循环对象
- submit 向事件循环对象提交一个事件,一个事件循环对象通过一个线程来专门进行维护
- 也可以使用execute
处理定时任务
使用意义:
- 实现keep-live 的时候可以作为连接的保护而使用
- 通过另一条线程规律执行某个线程
/**
* 启动一个定时任务,并以一定的频率来执行
* 1、参数一是一个Runnable 接口类型的任务对象
* 2、参数二是初始延时事件
* 3、参数三是间隔时间
* 4、执行单位
*/
group.next().scheduleAtFixedRate(()->{
log.info("ok");
},0,1, TimeUnit.SECONDS);
处理IO事件
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
/*初始化器*/
.childHandler(new ChannelInitializer<NioSocketChannel>() {
/**
* 这个方法会在connection 建立后调用
* @param nioSocketChannel 泛型对象
* @throws Exception
*/
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
/*向流水线中添加处理,这里添加一个自定义的处理器*/
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
/*这里表示关注的是读操作
没有StringDecoder的时候
Object msg 是ByteBuf类型的数据*/
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(StandardCharsets.UTF_8));
}
});
}
})
.bind(8080);
}
}
当服务器和客户端两个channel 建立连接之后,那么服务器会用同一个EventLoop 或者说同一个线程来处理这个客户端,建立一个绑定关系
但是并不是说有多少个channel 就会建立多少了eventLoop,eventLoop的数量是既定的,所以会将channel 循环的分给eventLoop,以达到负载均衡的效果,其次就是上面说的每一个eventLoop 对一个channel 都会负责到底
分工细化一
-
创建两个eventLoopGroup 分别管理Boss 和Worker
- BossEventLoop 中只会有一个线程因为它唯一去处理ServerSocketChannel,而只会有一个ServerSocketChannel
将eventLoop 的职责划分可以划的更细一些,划分为boss 和worker
.channel 会将ServerSocketChannel 绑定到第一个EventLoop上
.childHandler 会将SocketChannel 绑定到参数二的EventLoopGroup 中的EventLoop 上,初始化器针对的是已经绑定后的SocketChannel
new ServerBootstrap()
/**
* 划分为boss 和 worker
* 第一个boss 只负责ServerSocketChannel accept 事件
* 第二个worker 只负责socketChannel 上的读写事件 线程数默认是CPU核心数*2
* 这里并不需要将第一个EventLoopGroup 的线程数设为一
* 因为这里的serverSocketChannel默认只会和第一个EventLoopGroup 中的一个进行绑定
* 而并没有多个ServerSocketChannel
* 也只会占用一个线程
*/
.group(new NioEventLoopGroup(),new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
分工细化二
- 再创建一个EventLoopGroup 去处理耗时较长的操作,而不是用NioEventLoop (worker Thread) 去处理耗时较长的操作
- 避免繁杂逻辑操作影响io线程
- 当需要将两个自定义的handler 建立连接时需要在第一个handler 中调用context.fireChannelRead(msg) 将数据传递给下一个handler,这样才能将两个handler 串起来
- 这里在第一个handler 可以实现条件判断看看是不是要走第二个handler,调用了那句代码才会走到对应的handler 中
- 用指定的group 处理特殊情况的操作,两个用的线程是不同的,因为eventLoop 中线程的数量是有限的,而worker 线程要服务很多channel ,所以如果一个线程做比较消耗资源的操作就会降低其他channel 对资源的利用率,所以这里才会创建一个新的group 其实就是新的线程去处理对应的特殊操作
- eventLoop 就是一个线程且带着一个selector
public static void main(String[] args) {
/*创建处理特殊事件的Group*/
EventLoopGroup group = new DefaultEventLoop();
new ServerBootstrap()
.group(new NioEventLoopGroup(),new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(StandardCharsets.UTF_8));
/*将消息传递给下一个handler*/
ctx.fireChannelRead(msg);
}
}).addLast(group,"handler2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
}
});
}
})
.bind(8080);
}
切换线程
在流水线执行过程中,handler 执行是如何切换EventLoop的,例如:如何从NioEventLoop 切换到DefaultEventLoop 上执行的(换人)
如果两个handler 绑定的是同一个线程(EventLoop)就可以直接调用,如果绑定的是不同线程,而是会将要调用的代码封装为一个Runnable 对象中然后传递给下一个handler的线程去处理