Netty的Unsafe接口
这个Unsafe
不是JDK
原生的Unsafe
,主要就是一些直接跟IO
底层直接相关的通用操作:
interface Unsafe { // 接受数据的时候用于分配字节缓冲区的处理器 RecvByteBufAllocator.Handle recvBufAllocHandle(); // 本地地址 SocketAddress localAddress(); // 远程地址 SocketAddress remoteAddress(); //向事件循环注册通道,完成后回调 void register(EventLoop eventLoop, ChannelPromise promise); // 绑定本地地址,完成后回调 void bind(SocketAddress localAddress, ChannelPromise promise); // 连接 void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); // 断开连接,完成回调 void disconnect(ChannelPromise promise); // 关闭连接,完成回调 void close(ChannelPromise promise); // 立即关闭,不触发任何事件 void closeForcibly(); // 注销,完成回调 void deregister(ChannelPromise promise); // 开始读操作 void beginRead(); // 写操作 void write(Object msg, ChannelPromise promise); // 冲刷所有的出站数据 void flush(); // 特殊的占位符,不接受通知 ChannelPromise voidPromise(); //写操作的出站缓冲区 ChannelOutboundBuffer outboundBuffer(); }
AbstractUnsafe基本抽象实现
要进行数据的读写,需要有接收缓冲区,所以有了recvHandle
处理器,写出去的时候需要有写缓冲区ChannelOutboundBuffer
,注意ChannelOutboundBuffer
是初始化的时候就会创建,就创建一次。
//出站字节缓冲区 private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); private RecvByteBufAllocator.Handle recvHandle;//接受数据缓冲分配器的处理器 private boolean inFlush0;//是否正在缓冲 /** true if the channel has never been registered, false otherwise */ private boolean neverRegistered = true;//通道没注册过 private void assertEventLoop() {//断言还没注册,或者当前线程是IO线程 assert !registered || eventLoop.inEventLoop(); }
recvBufAllocHandle接受缓冲区处理器
@Override public RecvByteBufAllocator.Handle recvBufAllocHandle() { if (recvHandle == null) { recvHandle = config().getRecvByteBufAllocator().newHandle(); } return recvHandle; }
register注册到事件循环
注册方法其实就是判断是否当前线程就是IO
线程,是的话就直接执行,不是就包装成一个任务提交给IO
线程,这样就避免多线程的问题,始终是单线程操作。
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ObjectUtil.checkNotNull(eventLoop, "eventLoop"); if (isRegistered()) {//是否已经注册人到一个eventLoop promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) {//是否是NioEventLoop类型 promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; //只能当前线程是eventLoop的线程才可以注册,防止多线程并发问题,所以即使多线程来操作,也是安全的,会按照一定顺序提交到任务队列里 if (eventLoop.inEventLoop()) { register0(promise); } else {//否则就当做任务提交给eventLoop的任务队列 try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
register0执行注册逻辑
这里是注册过程要做的事,进行真正的注册逻辑doRegister,其实就是将NIO通道注册到Selector上,然后进行处理器的待添加事件的处理,注册回调成功,管道传递注册事件,如果是第一次注册,管道传递通道激活事件,否则是设置自动读的话就注册读监听。
private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) {//确保是不可取消和通道打开着,否则就返回 return; } boolean firstRegistration = neverRegistered;//设置注册标记 doRegister();//进行注册逻辑 neverRegistered = false;//AbstractUnsafe的已注册标记 registered = true;//channel的已注册标记 pipeline.invokeHandlerAddedIfNeeded();//如果在注册前有处理器添加,还没进行HandlerAdded回调,注册成功后要回调 safeSetSuccess(promise);//回调注册成功 pipeline.fireChannelRegistered();//通道注册事件传递 if (isActive()) {//通道激活的话 if (firstRegistration) {//第一次注册要进行激活事件传递 pipeline.fireChannelActive(); } else if (config().isAutoRead()) {//否则如果设置了自动读,就进行读监听 beginRead(); } } } catch (Throwable t) { closeForcibly();//强制关闭 closeFuture.setClosed();//关闭回调 safeSetFailure(promise, t);//设置失败 } }
bind绑定地址
如果通道开始没激活,绑定后激活的话,就开一个延时的任务,进行激活事件传递,最后回调绑定成功。
@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { ... boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) {//绑定前没激活,绑定后激活了 invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
disconnect断开连接
调用doDisconnect
,断开连接,如果开始激活的,断开后失效了,就传递失效事件。如果通道关闭了,还要处理关闭事件closeIfClosed
。
@Override public final void disconnect(final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable()) { return; } boolean wasActive = isActive(); try { doDisconnect(); // Reset remoteAddress and localAddress remoteAddress = null; localAddress = null; } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (wasActive && !isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelInactive(); } }); } safeSetSuccess(promise); closeIfClosed(); // doDisconnect() might have closed the channel }
close关闭通道和出站缓冲区
进行通道的关闭,主要还是出站缓冲区的处理和传递通道失效和注销事件。
@Override public final void close(final ChannelPromise promise) { assertEventLoop(); ClosedChannelException closedChannelException = new ClosedChannelException(); close(promise, closedChannelException, closedChannelException, false); } private void close(final ChannelPromise promise, final Throwable cause, final ClosedChannelException closeCause, final boolean notify) { if (!promise.setUncancellable()) { return; } if (closeInitiated) {//如果已经发起关闭了 if (closeFuture.isDone()) {//判断是否关闭完成 // Closed already. safeSetSuccess(promise);//回调 } else if (!(promise instanceof VoidChannelPromise)) { closeFuture.addListener(new ChannelFutureListener() {//如果不是VoidChannelPromise,添加关闭监听 @Override public void operationComplete(ChannelFuture future) throws Exception { promise.setSuccess(); } }); } return; } closeInitiated = true;//已经开始关闭了 //处理出站缓冲区关闭 final boolean wasActive = isActive(); final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. Executor closeExecutor = prepareToClose(); if (closeExecutor != null) { closeExecutor.execute(new Runnable() { @Override public void run() { try { // Execute the close. doClose0(promise); } finally { invokeLater(new Runnable() { @Override public void run() { if (outboundBuffer != null) { // Fail all the queued messages outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause); } fireChannelInactiveAndDeregister(wasActive); } }); } } }); } else { try { doClose0(promise); } finally { if (outboundBuffer != null) { // Fail all the queued messages. outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause); } } if (inFlush0) { invokeLater(new Runnable() { @Override public void run() { fireChannelInactiveAndDeregister(wasActive); } }); } else { fireChannelInactiveAndDeregister(wasActive); } } }
doClose0关闭通道
具体的关闭逻辑和回调,具体逻辑是在通道中实现的。
private void doClose0(ChannelPromise promise) { try { doClose(); closeFuture.setClosed(); safeSetSuccess(promise); } catch (Throwable t) { closeFuture.setClosed(); safeSetFailure(promise, t); } }
fireChannelInactiveAndDeregister传递通道失效和注销事件
传递通道失效和注销事件。
private void fireChannelInactiveAndDeregister(final boolean wasActive) { deregister(voidPromise(), wasActive && !isActive()); }
doDeregister注销事件
提交一个任务,进行注销doDeregister
,然后根据情况传递通道失效和注销事件。
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) { if (!promise.setUncancellable()) { return; } if (!registered) { safeSetSuccess(promise); return; } invokeLater(new Runnable() { @Override public void run() { try { doDeregister(); } catch (Throwable t) { logger.warn("Unexpected exception occurred while deregistering a channel.", t); } finally { if (fireChannelInactive) { pipeline.fireChannelInactive(); } if (registered) { registered = false; pipeline.fireChannelUnregistered(); } safeSetSuccess(promise); } } }); }
shutdownOutput出站缓冲区关闭处理
清理出站缓冲区ChannelOutboundBuffer
,并传递fireUserEventTriggered
事件。
@UnstableApi public final void shutdownOutput(final ChannelPromise promise) { assertEventLoop(); shutdownOutput(promise, null); } private void shutdownOutput(final ChannelPromise promise, Throwable cause) { if (!promise.setUncancellable()) { return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) {//如果出站缓冲区为null的话,就回调失败 promise.setFailure(new ClosedChannelException()); return; } this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.禁止添加数据到出站缓冲区了 final Throwable shutdownCause = cause == null ?//根据异常创建ChannelOutputShutdownException new ChannelOutputShutdownException("Channel output shutdown") : new ChannelOutputShutdownException("Channel output shutdown", cause); Executor closeExecutor = prepareToClose();//有关闭执行器 if (closeExecutor != null) {//提交一个任务 closeExecutor.execute(new Runnable() { @Override public void run() { try { // Execute the shutdown. doShutdownOutput(); promise.setSuccess(); } catch (Throwable err) { promise.setFailure(err); } finally {//出站缓冲区事件任务 // Dispatch to the EventLoop eventLoop().execute(new Runnable() { @Override public void run() {//出站缓冲区事件处理 closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause); } }); } } }); } else { try {//直接处理关闭 // Execute the shutdown. doShutdownOutput(); promise.setSuccess(); } catch (Throwable err) { promise.setFailure(err); } finally { closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause); } } } private void closeOutboundBufferForShutdown( ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) { buffer.failFlushed(cause, false);//不能冲刷 buffer.close(cause, true);//关闭出站缓冲区 pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);//传递事件 }
beginRead开始读
判断下条件,准备开始读,真正读的是通道的doBeginRead
方法。
@Override public final void beginRead() { assertEventLoop(); if (!isActive()) { return; } try { doBeginRead(); } catch (final Exception e) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } }
write写数据到出站缓冲区
write
操作最后就是写入出站缓冲区。如果出站缓冲区关闭了,那就无用写了,释放消息即可,否则就封装后放入出站缓冲区里,里面是个单链表。
@Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg);//封装成直接缓冲区 size = pipeline.estimatorHandle().size(msg);//获取缓冲区大小 if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise);//往出站缓冲区添加消息 }
flush准备将出站缓冲区数据发出去
给出站缓冲区数据打好冲刷标记,然后准备冲刷flush0
。
@Override public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); }
flush0将出站缓冲区数据发出去
具体的发送方法,主要是调用通道的doWrite
方法,里面才是将数据从通道中发出去。
protected void flush0() { if (inFlush0) {//避免重入 return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; if (!isActive()) {//通道失效的话 try { if (isOpen()) {//报错并通知 outboundBuffer.failFlushed(new NotYetConnectedException(), true); } else { //报错不通知 outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { initialCloseCause = t; close(voidPromise(), t, newClosedChannelException(t), false); } else { try { shutdownOutput(voidPromise(), t); } catch (Throwable t2) { initialCloseCause = t; close(voidPromise(), t2, newClosedChannelException(t), false); } } } finally { inFlush0 = false; } }
newClosedChannelException创建一个通道关闭异常
将异常信息封装成ClosedChannelException
。
private ClosedChannelException newClosedChannelException(Throwable cause) { ClosedChannelException exception = new ClosedChannelException(); if (cause != null) { exception.initCause(cause); } return exception; }
NioUnsafe
这个是针对NIO
的子接口:
public interface NioUnsafe extends Unsafe { /** * Return underlying {@link SelectableChannel} */ SelectableChannel ch(); /** * Finish connect */ void finishConnect(); /** * Read from underlying {@link SelectableChannel} */ void read(); void forceFlush(); }
AbstractNioUnsafe
继承了抽象AbstractUnsafe
,实现了NioUnsafe
的接口,一些基本的NIO
实现都有。
removeReadOp清除读监听
如果发现有读监听就删除了,用的是位操作。什么时候会进行读的清除呢,一般是设置自动读的,所以不会清除读监听,而且默认NioSocketChannel
是监听读的。
protected final void removeReadOp() { SelectionKey key = selectionKey(); if (!key.isValid()) { return; } int interestOps = key.interestOps(); if ((interestOps & readInterestOp) != 0) { key.interestOps(interestOps & ~readInterestOp); } }
connect建立连接
如果doConnect
能连接上,就处理回调fulfillConnectPromise
,否则如果有设置超时的话就提交超时调度任务,如果连接上了,就把超时任务取消。
@Override public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } try { if (connectPromise != null) { // Already a connect in process. throw new ConnectionPendingException(); } boolean wasActive = isActive(); if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } else { connectPromise = promise; requestedRemoteAddress = remoteAddress; // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; close(voidPromise()); } } }); } } catch (Throwable t) { promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed(); } }
fulfillConnectPromise实现连接回调
连接成功,如果前面通道没激活,现在激活了,就传递激活事件。
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { if (promise == null) { return; } boolean active = isActive(); //尝试成功回调 boolean promiseSet = promise.trySuccess(); //前面没激活,现在激活了 if (!wasActive && active) { pipeline().fireChannelActive(); } //失败就关闭 if (!promiseSet) { close(voidPromise()); } }
finishConnect完成连接
连接完成了,不管是否成功都要回调,要处理。
@Override public final void finishConnect() { assert eventLoop().inEventLoop(); try { boolean wasActive = isActive(); doFinishConnect(); fulfillConnectPromise(connectPromise, wasActive); } catch (Throwable t) { fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; } }
flush0发送数据
如果没有待发的就发出去,否则要留给待发的。
@Override protected final void flush0() { if (!isFlushPending()) {//没有待刷的操作 super.flush0(); } }
isFlushPending是否有待发送的数据
也就是注册了OP_WRITE
写事件,可能是因为前面数据没发送完,所以注册了写数据,要继续发。
private boolean isFlushPending() { SelectionKey selectionKey = selectionKey(); return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0; }
forceFlush强制发送数据
@Override public final void forceFlush() { super.flush0(); }
如果有写事件,会强制刷出去,也就是把上次没发完的数据发出去。
NioMessageUnsafe
专门来处理客户端连接的unsafe
。
read读客户端连接
上文提到的可调节的接收缓冲区,这篇就来看看他是怎么用的,所以就要看NioMessageUnsafe
和NioByteUnsafe
的read
,我们先介绍NioMessageUnsafe
的read
。这个是用来接收客户端连接的,读取的是客户端通道。这里是把所有的客户端连接接受完了,然后传递每一个连接的读事件。
@Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config();//获取通道的配置 final ChannelPipeline pipeline = pipeline();//获取管道 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();//获取接受缓冲区处理器 allocHandle.reset(config);//重置配置参数 boolean closed = false; Throwable exception = null; try { try { do { int localRead = doReadMessages(readBuf);//读取消息到readBuf中 if (localRead == 0) {//没有消息 break; } if (localRead < 0) {//关闭是-1 closed = true; break; } allocHandle.incMessagesRead(localRead);//读取消息数+1 } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i));//传播读事件,传递的是客户端通道 } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete();//传播读完成事件 if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) {//不自动读了,就删除读事件监听,一般读完成后,都设置成自动读的 removeReadOp(); } } }
NioServerSocketChannel的doReadMessages
可以看到这里准备去接受一个客户端。
protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
内部其实就是用ServerSocketChannel
去接受一个SocketChannel
啦。
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException { try { return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() { @Override public SocketChannel run() throws IOException { return serverSocketChannel.accept(); } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } }
然后放进一个list
中, 返回1
。如果没接受到,那就返回0
。后面就开始把这个通道传递下去了。
NioByteUnsafe
这个是专门来处理客户端读数据的。
read读客户端数据
其实逻辑和读客户端连接差不多,只不过这次是需要接收缓冲区分配器分配缓冲区来接受读的信息。然后接受一次传递一次读事件。这里为什么要读一次传一次,而不是全部读完再传呢,我想可能是因为这样读一部分处理一分部分,处理完了就释放内存,可以提高吞吐量,而且也不至于数据有积压,可能造成内存溢出。
@Override public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator);//分配缓冲区 allocHandle.lastBytesRead(doReadBytes(byteBuf));//读取通道内数据到缓冲区,如果是客户端断开,读取数据个数是-1 if (allocHandle.lastBytesRead() <= 0) {//如果没读到数据0 或者客户端关闭-1 // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0;//-1 if (close) {//把通道关闭,也就是没发数据来 // There is nothing left to read as we received an EOF. readPending = false; } break; } allocHandle.incMessagesRead(1);//增加读的消息数 readPending = false; pipeline.fireChannelRead(byteBuf);//管道里传递消息 byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete();//读取完成 if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) {//如果还要继续读的话,就不会删除读监听 removeReadOp(); } } }
handleReadException处理读异常
如果出现了读的异常,因为有缓冲区申请在那边,如果缓冲区有数据,那就先把数据传递下去,否则就直接释放,然后还是按普通逻辑去统计,传递读完成事件,最后传递异常事件。
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close, RecvByteBufAllocator.Handle allocHandle) { if (byteBuf != null) { if (byteBuf.isReadable()) {//如果有可读的话,把传递消息 readPending = false; pipeline.fireChannelRead(byteBuf); } else { byteBuf.release(); } } allocHandle.readComplete(); pipeline.fireChannelReadComplete();//读完成 pipeline.fireExceptionCaught(cause);//传递异常 if (close || cause instanceof IOException) {//如果是IO异常 closeOnRead(pipeline);//传递自定义事件 } }
closeOnRead读关闭(半关闭)
这里可能要涉及一个半关闭的概念,简单点就是说我们客户端和服务端的socket其实都有两个缓冲区,即读缓冲区和写缓冲区,如果TCP要分手了,客户端就会先发送关闭请求,然后把写缓冲区关闭了,就不写了,但是这个时候他是可以读客户端的消息的,即读缓冲区还没关,所以叫做半关闭。所以这里就是判断是否是把读缓冲区关了,如果关了,就直接传递自定义消息,否则就判断是否配置了半关闭,是的话就进行读关闭,传递自定义消息,否则就关闭通道了。
private void closeOnRead(ChannelPipeline pipeline) { if (!isInputShutdown0()) {//单方关闭输出流,但是可以接受输入,即半关闭 if (isAllowHalfClosure(config())) {//是否配置了半关闭 shutdownInput();//关闭输入流 pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); } else { close(voidPromise()); } } else { inputClosedSeenErrorOnRead = true; pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);//传递自定义事件 } }
NioSocketChannelUnsafe
这个是NioSocketChannel
的unsafe
。主要是prepareToClose方法的覆盖,取消注册事件,返回全局唯一的GlobalEventExecutor
。
private final class NioSocketChannelUnsafe extends NioByteUnsafe { @Override protected Executor prepareToClose() { try { if (javaChannel().isOpen() && config().getSoLinger() > 0) { doDeregister(); return GlobalEventExecutor.INSTANCE; } } catch (Throwable ignore) { } return null; } }
RecvByteBufAllocator
RecvByteBufAllocator
分配接口用于存放从通道里读取的数据。即接受缓冲区。
public interface RecvByteBufAllocator { //处理器,也就是做一些统计的 Handle newHandle(); @Deprecated interface Handle { //分配缓冲区,实际是交给ByteBufAllocator的 ByteBuf allocate(ByteBufAllocator alloc); //猜下次该用多大的接收缓冲区 int guess(); //重置一些统计参数 void reset(ChannelConfig config); //统计读取的消息数 void incMessagesRead(int numMessages); //设置上一次读取的字节数 void lastBytesRead(int bytes); //获取上一次读取的字节数 int lastBytesRead(); //设置尝试读取的字节数 void attemptedBytesRead(int bytes); //获取尝试读取字节数 int attemptedBytesRead(); //是否还能继续读 boolean continueReading(); //读取完成 void readComplete(); } @SuppressWarnings("deprecation") @UnstableApi interface ExtendedHandle extends Handle { boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier); }
定义了一些统计的参数,是为了猜下一次可能读取的数据大小,这样就可以对接受缓冲区的大小进行调节。
MaxMessagesRecvByteBufAllocator
这个又加了两个方法,其实为了限制读的次数,默认配置读到16
个消息就不读了,不然可能事件长了就阻塞IO
线程去做别的事了。
public interface MaxMessagesRecvByteBufAllocator extends RecvByteBufAllocator { // 每次读循环,读多少个消息 int maxMessagesPerRead(); //设置最大消息数 MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead); }
DefaultMaxMessagesRecvByteBufAllocator
这个是上面接口的抽象实现,还加了一个是否停止读的标记respectMaybeMoreData
,默认就是认为没有更多数据,不读了。
public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator { private volatile int maxMessagesPerRead;//最多读多少个消息 private volatile boolean respectMaybeMoreData = true;//是否没有更多的数据,停止读了 public DefaultMaxMessagesRecvByteBufAllocator() { this(1);//这里设置是只读1个消息 } public DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead) { maxMessagesPerRead(maxMessagesPerRead); } @Override public int maxMessagesPerRead() { return maxMessagesPerRead; } @Override public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead) { checkPositive(maxMessagesPerRead, "maxMessagesPerRead"); this.maxMessagesPerRead = maxMessagesPerRead; return this; } public DefaultMaxMessagesRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) { this.respectMaybeMoreData = respectMaybeMoreData; return this; } public final boolean respectMaybeMoreData() { return respectMaybeMoreData; } }
MaxMessageHandle处理器
这个就是前面RecvByteBufAllocator
里处理器的抽象实现。里面有个一个布尔值判别器,主要是说如果把申请的接收缓冲区填满了,那就说明可能还要读,否则就是不读了,因为数据都填不满缓冲区。
public abstract class MaxMessageHandle implements ExtendedHandle { private ChannelConfig config; private int maxMessagePerRead;//每次读的最大消息数 private int totalMessages;//总共读了多少次消息 private int totalBytesRead;//总共读的字节数 private int attemptedBytesRead;//尝试读的字节数 private int lastBytesRead;//上一次读的字节数 private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData; //一个布尔值判别器 private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() { @Override public boolean get() { return attemptedBytesRead == lastBytesRead;//是否把缓冲区内可写的空间全填满 } }; /** 重置属性 * Only {@link ChannelConfig#getMaxMessagesPerRead()} is used. */ @Override public void reset(ChannelConfig config) { this.config = config; maxMessagePerRead = maxMessagesPerRead(); totalMessages = totalBytesRead = 0; } //分配缓冲区 @Override public ByteBuf allocate(ByteBufAllocator alloc) { return alloc.ioBuffer(guess()); } //增加接受读消息的数量 @Override public final void incMessagesRead(int amt) { totalMessages += amt; } //保存上一次读取的字节数 @Override public void lastBytesRead(int bytes) { lastBytesRead = bytes;//记录上次读取的字节数 if (bytes > 0) {//先判断后加,0就不加了,将性能提高到极致啊 totalBytesRead += bytes;//统计总的字节数 } } //获取上一次读取的字节数 @Override public final int lastBytesRead() { return lastBytesRead; } //是否继续读 @Override public boolean continueReading() { return continueReading(defaultMaybeMoreSupplier); } @Override public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { return config.isAutoRead() &&//配置了自动读 (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&//如果还有可读的,或者把缓冲区可写的全填满了 totalMessages < maxMessagePerRead &&//没超过最大读取消息数 totalBytesRead > 0;//已经有数据读取 } @Override public void readComplete() { } //尝试读取的尺寸,默认是缓冲区可写的尺寸 @Override public int attemptedBytesRead() { return attemptedBytesRead; } @Override public void attemptedBytesRead(int bytes) { attemptedBytesRead = bytes; } //总读的大小 protected final int totalBytesRead() { return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead; } }
AdaptiveRecvByteBufAllocator
可调节的缓冲区分配器,这个是有限制的,默认最小尺寸64,最大是65536,初始是1024,初始化时候会创建一个数组,里面有所有可分配的尺寸,从16到2^30。16-496是间隔16分的,512以上是2倍分的。然后会将最小最大初始尺寸都转化为索引,方便操作。
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator { static final int DEFAULT_MINIMUM = 64;//最小 static final int DEFAULT_INITIAL = 1024;//初始 static final int DEFAULT_MAXIMUM = 65536;//最大 //增加索引+4 减少索引-1 private static final int INDEX_INCREMENT = 4; private static final int INDEX_DECREMENT = 1; private static final int[] SIZE_TABLE;//尺寸数组 //16-496间隔16 512到2的31次-1 间隔2倍 static { List<Integer> sizeTable = new ArrayList<Integer>(); for (int i = 16; i < 512; i += 16) { sizeTable.add(i); } for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); } SIZE_TABLE = new int[sizeTable.size()]; for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); } } @Deprecated public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator(); //二分查找 private static int getSizeTableIndex(final int size) { for (int low = 0, high = SIZE_TABLE.length - 1;;) { if (high < low) { return low; } if (high == low) { return high; } int mid = low + high >>> 1; int a = SIZE_TABLE[mid]; int b = SIZE_TABLE[mid + 1]; if (size > b) { low = mid + 1; } else if (size < a) { high = mid - 1; } else if (size == a) { return mid; } else { return mid + 1;//返回最近大于size的尺寸 } } } private final int minIndex;//记录最小尺寸索引 private final int maxIndex;//记录最大尺寸索引 private final int initial;//记录初始尺寸索引 public AdaptiveRecvByteBufAllocator() { this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM); } public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) { checkPositive(minimum, "minimum"); if (initial < minimum) { throw new IllegalArgumentException("initial: " + initial); } if (maximum < initial) { throw new IllegalArgumentException("maximum: " + maximum); } //设置最小索引 int minIndex = getSizeTableIndex(minimum); if (SIZE_TABLE[minIndex] < minimum) { this.minIndex = minIndex + 1; } else { this.minIndex = minIndex; } //设置最大索引 int maxIndex = getSizeTableIndex(maximum); if (SIZE_TABLE[maxIndex] > maximum) { this.maxIndex = maxIndex - 1; } else { this.maxIndex = maxIndex; } this.initial = initial; } @SuppressWarnings("deprecation") @Override public Handle newHandle() { return new HandleImpl(minIndex, maxIndex, initial); } @Override public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) { super.respectMaybeMoreData(respectMaybeMoreData); return this; } }
HandleImpl
这个是真正进行尺寸伸缩的处理器。每次设置上一次读取的字节数时,会判断真实读取的字节数是不是把分配的缓冲区给填满了,如果满了就要进行缓冲区尺寸的伸缩。伸缩算法就是如果真实读取的字节数小于等于当前尺寸的前一个尺寸大小,且要连续两次,那就会把空间缩小成前一个尺寸大小。如果真实读取字节数大于等于预测的接收缓冲区大小,那就扩容,每次扩容是当前尺寸的后4个尺寸大小,但是不超过最大尺寸。
举个例子16,32,48,64,80,96.6个尺寸,开始是在32,如果两次发现真实的读取数据都小于等于16,那就设置成16,如果发现数据大于等于32,就跳4个位置,就是96。为什么要扩容的时候跳那么多呢,我想可能是因为扩容太小的话会可能会有多次扩容,多次申请直接缓冲区,直接缓冲区的创建和释放是有性能消耗的。
private final class HandleImpl extends MaxMessageHandle { private final int minIndex; private final int maxIndex; private int index;//当前在尺寸表的索引 private int nextReceiveBufferSize;//下一次接受缓冲区大小 private boolean decreaseNow; HandleImpl(int minIndex, int maxIndex, int initial) { this.minIndex = minIndex; this.maxIndex = maxIndex; index = getSizeTableIndex(initial); nextReceiveBufferSize = SIZE_TABLE[index]; } //设置上次读取的字节数 @Override public void lastBytesRead(int bytes) { if (bytes == attemptedBytesRead()) {//如果真实读取的字节数等于读取尝试的字节数,也就是将接受缓冲区的可写位置全部填满了 record(bytes);//要进行尺寸伸缩了 } super.lastBytesRead(bytes); } //猜测下一个接受缓冲区的大小 @Override public int guess() { return nextReceiveBufferSize; } //记录,根据这次真实接受的尺寸,进行下一次接受缓冲区的大小伸缩 private void record(int actualReadBytes) { if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {//连续两次小,才会进行缩减 if (decreaseNow) {//减少,索引-1,不小于最小索引 index = max(index - INDEX_DECREMENT, minIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } else { decreaseNow = true; } } else if (actualReadBytes >= nextReceiveBufferSize) {//扩大,索引+4,不超过最大索引 index = min(index + INDEX_INCREMENT, maxIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } } //读取完成 @Override public void readComplete() { record(totalBytesRead()); } }