JAVA RPC (十) nio服务端解析

源码地址:https://gitee.com/a1234567891/koalas-rpc

 企业生产级百亿日PV高可用可拓展的RPC框架。理论上并发数量接近服务器带宽,客户端采用thrift协议,服务端支持netty和thrift的TThreadedSelectorServer半同步半异步线程模型,支持动态扩容,服务上下线,权重动态,可用性配置,页面流量统计,支持trace跟踪等,天然接入cat支持数据大盘展示等,持续为个人以及中小型公司提供可靠的RPC框架技术方案

ServerSocketChannel简单介绍:

上一篇文章我们讲了netty server服务端的使用方式,对于netty来说对nio层进行了全方位的封装,我们使用netty的使用可以当内部nio是黑盒处理即可,只需要处理netty的hander处理即可,但是koalas-rpc同时也实现了高性能的nio服务框架,给大家另外一种原生的选择,下面我们来简单看一下NIO相关的入门知识。

            this.serverSocketChannel = ServerSocketChannel.open();
            this.serverSocketChannel.configureBlocking(false);
            this.serverSocket_ = this.serverSocketChannel.socket();
            this.serverSocket_.setReuseAddress(true);
            this.serverSocket_.bind(bindAddr);

Java NIO中的ServerSocketChannel是一个可以监听新进来的TCP连接的通道,就像标准的IIO中的ServerSocket一样。ServerSocketChannel类在java.nio.channels包中。

Selector acceptSelector = SelectorProvider.provider().openSelector();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
try {
acceptSelector.select();

Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();

if (!key.isValid()) {
continue;
}

if (key.isAcceptable()) {
handleAccept();
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
 

acceptSelector为服务端选择,是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。

为什么使用Selector?

Selector允许单线程处理多个 Channel。如果你的应用打开了多个连接(通道),但每个连接的流量都很低,使用Selector就会很方便。例如,在一个聊天服务器中。

这是在一个单线程中使用一个Selector处理3个Channel的图示:

JAVA RPC (十) nio服务端解析

 

 

仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。因此,使用的线程越少越好。要使用Selector,得向Selector注册Channel,然后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新连接进来,数据接收等。但是,需要记住,现代的操作系统和CPU在多任务方面表现的越来越好,所以多线程的开销随着时间的推移,变得越来越小了。实际上,如果一个CPU有多个内核,不使用多任务可能是在浪费CPU能力。不管怎么说,关于那种设计的讨论应该放在另一篇不同的文章中。在这里,只要知道使用Selector能够处理多个通道就足够了。

注意register()方法的第二个参数。这是一个“interest集合”,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件:Connect,Accept,Read,Write

这四种事件用SelectionKey的四个常量来表示:SelectionKey.OP_CONNECT,SelectionKey.OP_ACCEPT,SelectionKey.OP_READ,SelectionKey.OP_WRITE,多个事件的监听int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

回到我们的代码中就会发现当服务端接收到client端的连接请求时 acceptSelector.select()阻塞可以获取到执行权限。

if (key.isAcceptable()) {
      handleAccept();
    }

这段代码的意思是可被连接,获取到连接事件,用户业务逻辑就可以在handleAccept中执行了。

 

SocketChannel简单介绍

Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道(这句话是翻译过来java api的英文注释,比较鸡肋,大家明白意思即可)。可以通过以下2种方式创建SocketChannel:

对于服务端来说:当客户端连接服务端之后并且服务端获取到了accept事件,这样就可以获取到SocketChannel对象。 例如

 SocketChannel socketChannel = serverSocketChannel.accept();

对于客户端来说: 客户端可以手动声明一个SocketChannel对象,例如

1 SocketChannel socketChannel = SocketChannel.open();
2 
3 socketChannel.connect(new InetSocketAddress("192.168.3.1", 8080));

我们这次只讨论nio的server端实现,先不考虑client端的nio实现,今后有时间也会为大家专门写一篇关于client端关于nio的实现

serverSocketChannel对象就是我们上一小节中的ServerSocketChannel。同样的socketChannel可以支持读和写的监听

        clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);

或者

        clientKey = accepted.registerSelector(selector, SelectionKey.OP_WRITE);

这样当服务端的接收到写事件或者读事件后就会非常快速的响应数据流信息了,这里也是NIO速度比BIO速度快的关键,BIO通过用户线程不断的去轮训内核中滑动接收窗口中的数据,效率较慢,而NIO是通过内核依赖IO多路复用的方式主动通知JVM,这样吞吐速度会快很多,所以NIO是靠内核支持的。现在win,mac和linux都支持IO多路复用。介绍完了NIO的简单知识,我们来看看KOALAS-RPC是怎么通过NIO来实现服务端的,由于NIO的细节知识过于繁杂,作者没有办法通过一篇文章来详细说明,感兴趣的小伙伴可以加群联系作者沟通。

 

KOALAS-RPC的NIO SERVER实现:

koalas-rpc的nio server实现主要是在KoalasThreadedSelectorServer类中,我们先看一下连接线程和读写线程

private AcceptThread acceptThread;

  private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();

元素声明

@Override
  protected boolean startThreads() {
    try {
      for (int i = 0; i < args.selectorThreads; ++i) {
        selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
      }
      acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
        createSelectorThreadLoadBalancer(selectorThreads));
      stopped_ = false;
      for (SelectorThread thread : selectorThreads) {
        thread.start();
      }
      acceptThread.start();
      return true;
    } catch (IOException e) {
      LOGGER.error("Failed to start threads!", e);
      return false;
    }
  }

 

这里可以看到声明了一个AcceptThread对象和多个selectorThreads对象,AcceptThread对象负责获取client的连接事件,selectorThreads负责读和写事件,这里由于client端连接事件非常非常少,所以只需要单个线程就可以满足需求了,但是读和写事件是非常频繁的,所以这里用了多个线程去读写。我们看一下连接事件中干了些什么事情

 private void select() {
      try {
        acceptSelector.select();

        Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();

          if (!key.isValid()) {
            continue;
          }

          if (key.isAcceptable()) {
            handleAccept();
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }
private void handleAccept() {
      final TNonblockingTransport client = doAccept();
      if (client != null) {
        final SelectorThread targetThread = threadChooser.nextThread();

        if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
          doAddAccept(targetThread, client);
        } else {
          try {
            invoker.submit(new Runnable() {
              public void run() {
                doAddAccept(targetThread, client);
              }
            });
          } catch (RejectedExecutionException rx) {
            LOGGER.warn("ExecutorService rejected accept registration!", rx);
            client.close();
          }
        }
      }
    }
 private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
      if (!thread.addAcceptedConnection(client)) {
        client.close();
      }
    }

 

读者结合源码可以非常清晰的看到当AcceptThread获取到连接事件时,获取到读写的通道SocketChannel,并且将SocketChannel通道addAcceptedConnection方法传给读写线程SelectorThread,接着往下看

public boolean addAcceptedConnection(TNonblockingTransport accepted) {
      try {
        acceptedQueue.put(accepted);
      } catch (InterruptedException e) {
        LOGGER.warn("Interrupted while adding accepted connection!", e);
        return false;
      }
      selector.wakeup();
      return true;
    }

把读写通道对象交给SelectorThread中的队列,供读写线程去获取。我们在看看读写线程中做了些什么事情:

public void run() {
      try {
        while (!stopped_) {
          select();
          processAcceptedConnections();
          processInterestChanges();
        }
        for (SelectionKey selectionKey : selector.keys()) {
          cleanupSelectionKey(selectionKey);
        }
      } catch (Throwable t) {
        LOGGER.error("run() exiting due to uncaught error", t);
      } finally {
        // This will wake up the accept thread and the other selector threads
        KoalasThreadedSelectorServer.this.stop();
      }
    }
private void select() {
      try {
        selector.select();

        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();

          if (!key.isValid()) {
            cleanupSelectionKey(key);
            continue;
          }

          if (key.isReadable()) {
            handleRead(key);
          } else if (key.isWritable()) {
            handleWrite(key);
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }

 

private void processAcceptedConnections() {
      // Register accepted connections
      while (!stopped_) {
        TNonblockingTransport accepted = acceptedQueue.poll();
        if (accepted == null) {
          break;
        }
        registerAccepted(accepted);
      }
    }

 

private void registerAccepted(TNonblockingTransport accepted) {
      SelectionKey clientKey = null;
      try {
        clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);

        FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this,privateKey,publicKey,serviceName,tGenericProcessor,cat);
        clientKey.attach(frameBuffer);
      } catch (IOException e) {
        LOGGER.warn("Failed to register accepted connection to selector!", e);
        if (clientKey != null) {
          cleanupSelectionKey(clientKey);
        }
        accepted.close();
      }
    }

 

核心代码说明,当连接线程将通道对象传给读写线程时,读写线程获取到了执行代码的权限,然后从队列中获取到了连接通道对象,之后注册读的事件

clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);

FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this,privateKey,publicKey,serviceName,tGenericProcessor,cat);
clientKey.attach(frameBuffer);

并且声明了一个FrameBuffer对象,之后的读写操作都包装到FrameBuffer对象中,读写线程的核心读写代码如下:

        if (key.isReadable()) {
            handleRead(key);
          } else if (key.isWritable()) {
            handleWrite(key);

当有可读对象和可返回数据的时间通知后进行不同的业务逻辑处理,假设有client端连接之后发送了几个字节的数据,那么key.isReadable()就会被触发,会执行读取字节流,拆包处理,和调用业业务方法等操作,调用用户方法之后会返回结果,序列化之后写入,这样就会调用key.isWritable()中的方法去等待下次读取连接,循环往复此操作。handleRead比较复杂我们简单看一下实现

 public boolean read() {
            if (state_ == FrameBufferState.READING_FRAME_SIZE) {
                if (!internalRead ()) {
                    return false;
                }

                if (buffer_.remaining () == 0) {
                    int frameSize = buffer_.getInt ( 0 );
                    if (frameSize <= 0) {
                        LOGGER.error ( "Read an invalid frame size of " + frameSize
                                + ". Are you using TFramedTransport on the client side?" );
                        return false;
                    }

                    if (frameSize > MAX_READ_BUFFER_BYTES) {
                        LOGGER.error ( "Read a frame size of " + frameSize
                                + ", which is bigger than the maximum allowable buffer size for ALL connections." );
                        return false;
                    }

                    if (readBufferBytesAllocated.get () + frameSize > MAX_READ_BUFFER_BYTES) {
                        return true;
                    }

                    readBufferBytesAllocated.addAndGet ( frameSize );

                    buffer_ = ByteBuffer.allocate ( frameSize );

                    state_ = FrameBufferState.READING_FRAME;
                } else {

                    return true;
                }
            }

            if (state_ == FrameBufferState.READING_FRAME) {
                if (!internalRead ()) {
                    return false;
                }

                if (buffer_.remaining () == 0) {
                    selectionKey_.interestOps ( 0 );
                    state_ = FrameBufferState.READ_FRAME_COMPLETE;
                }

                return true;
            }

            LOGGER.error ( "Read was called but state is invalid (" + state_ + ")" );
            return false;
        }

 

首先读取字节长度,然后在读消息体,并且将数据保存在ByteBuffer对象中备用。然后通过buffer.isFrameFullyRead ()方法来判断本次请求的字节流是否都读完了,requestInvoke方法来调用用户实现,通过handleWrite方法来将结果返回给client端对象。

 

结论:

 由于koalas-rpc是nio server主题设计比较复杂,一篇文章无法完全说清细节实现,但是大概的核心内容就是上面这些了,读者对NIO比较感兴趣的话可以通过读源码的方式来更深入的了解。

更多学习内容请加高级java QQ群:825199617,spring 源码,spring mvc源码,dubbo源码,jdk源码,ioc aop源码分享等你来。

 

上一篇:Accepted Necklace//HDU - 2660//dfs


下一篇:机器学习入门方法和资料合集