目录
3、ServerBootstrapAcceptor注册到worker线程
4、workerGroup 将 socketChannel 注册到选择的NioEventLoop的selector
接收链接
NIO的读事件
while(!stop){//循环遍历selector,休眠时间为1S,当又处于就绪状态的CHannel时,selector将返回该channel的集合。通过对Channel集合的迭代,可进行网络异步读写操作 try { selector.select(1000); Set<SelectionKey> selectKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectKeys.iterator(); SelectionKey key = null; while (it.hasNext()){ key = it.next(); it.remove(); handleInput(key); } } catch (IOException e) { e.printStackTrace(); } }
netty的接收连接
前话
轮询连接和读事件都是在NioEventLoop对象里。这里有一个run方法。
1、bossGroup 轮询链接事件
//NioEventLoop ,不管是Boss 还是 Worker都是在这里监听 protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: //这里是在轮询selector 等待事件 //这里是在轮询selector 等待事件 select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { //处理刚才监听到的事件 processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } } // 处理selectedKeys private void processSelectedKeys() { if (selectedKeys != null) { //这个是优化的方法 , 据说性能提高2% processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } } private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { return; } //channel绑定的线程不是自己,那么不做处理,可以看到,这是线程安全的。 if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } //如果是链接或者读事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //注意 如果是链接的事件,走的是 AbstractNioMessageChannel.read unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
2、bossGroup 创建socketChannel
// 接收一个nio的 socketChannel , 并将其封装成NioSocketChannel , 并设置为OP_READ
//注意 如果是链接的事件,走的是 AbstractNioMessageChannel.read
//AbstractNioMessageChannel.NioMessageUnsafe.read public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //在这里是接收了一个socketChannel 并加入到buf // SocketChannel ch = SocketUtils.accept(javaChannel()); // buf.add(new NioSocketChannel(this, ch)); int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; //在这里触发 注册的 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
3、ServerBootstrapAcceptor注册到worker线程
ServerBootstrapAcceptor 这是一个关键的类 , 它将Boss监听到链接事件 ,注册到worker线程池
//ServerBootstrapAcceptor public void channelRead(ChannelHandlerContext ctx, Object msg) { //这个channel 是 第二步 封装的 NioSocketChannel对象 final Channel child = (Channel) msg; //这是引导程序配置的handlers child.pipeline().addLast(childHandler); //设置参数 setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { //选择一个NioEventLoop 注册NioSocketChannel //next().register(channel) childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
4、workerGroup 将 socketChannel 注册到选择的NioEventLoop的selector
这块和启动流程的注册一样 注册一个 0事件(selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);)
5、workerGroup 注册读事件
DefaultChannelPipeline.HeadContext.read() , 在这里注册时间