【resin】 resin3 线程池与IO模型(2)

【resin】 resin3 线程池与IO模型(2)

上篇文章主要介绍了resin3的线程池逻辑,下面具体分析下resin3的网络IO模型

resin3开源版使用的IO模型是BIO模型
相关的核心类是Port和TcpConnection

下面贴出源码:为说明核心逻辑,只展示核心的代码
本文所分析为个人理解,欢迎指出不足之处或分享你的观点

下面看看 Port 类:

 /**
   * The port thread is responsible for creating new connections.
   *
   * 负责创建 tcp connection task的主线程
   */
  public void run()
  {
    while (! _lifecycle.isDestroyed()) {
      boolean isStart;
      try {      
        // Thread.sleep(10);
        synchronized ( this) {
        // need delay to avoid spawning too many threads over a short time,
        // when the load doesn‘t justify it
        if (_idleThreadCount > 0) {
          wait(1);
        }
       
        /**
         * port 线程创建 tcp connection task的条件有三个:
         * 1.lifecycle.isActive() 为true   这个应该是只要resin服务没有停止就肯定是true了
         * 2.isStart = _startThreadCount + _idleThreadCount < _acceptThreadMin   第二个条件是这个 isStart 为true
         *  _acceptThreadMin 是用户设置的最小创建来accept socket的线程数
         *  _idleThreadCount 这个可以理解为当前block在 accept的线程数,相当于等待,就是idle了
         *  _startThreadCount 这个值的作用,我的理解是主要用来控制创建 tcp accept的task数量, 防止无限制的创建
         *
         * 3._connectionMax <= _connectionCount 这个条件基本上可以忽略了,resin默认的最大链接数是 1024*1024 这个够用了
         *
         */
          isStart = _startThreadCount + _idleThreadCount < _acceptThreadMin;

          if ( _connectionMax <= _connectionCount)
            isStart = false;
        if (! _lifecycle.isActive())
          isStart = false;

        /**
         * 如果不需要创建新的 connection 则port线程就等待 1 分钟
         */
          if (! isStart) {
            Thread. interrupted();
            wait(60000);
          }

          if (isStart) {
            _connectionCount++;
            _startThreadCount++;
          }
        }

        if (isStart && _lifecycle.isActive()) {

          TcpConnection conn = _freeConn.allocate();
          if (conn == null) {
            conn = new TcpConnection( this, _serverSocket.createSocket());
            conn.setRequest( _protocol.createRequest(conn));
          }

        conn.start();

        /**
         * 创建完 tcp connection task 是 Runnable 的
         * 丢进resin的线程池里执行
         */
          ThreadPool. getThreadPool().schedule (conn);
        }
      } catch (Throwable e) {
        e.printStackTrace();
      }
    }
  }

// port 线程在resin IO处理逻辑中起着关键作用,而且是单线程执行
// port 线程的逻辑很简单,就是不断轮询,只要满足条件就创建 TcpConnection task 提交线程池处理
// 不满足条件就 wait 60秒
//

下面看看 TcpConnection 
tcpconnection task的run方法很长,我直接省略了很多代码,保留核心逻辑
  /**
   * Runs as a task.
   */
  public void run ()
  {
    Port port = getPort();

    boolean isKeepalive = _isKeepalive;
    _isKeepalive = false;

    boolean isResume = _isResume;
    _isResume = false;
    _isWake = false;

    boolean isFirst = ! isKeepalive;

    ServerRequest request = getRequest();
    boolean isWaitForRead = request.isWaitForRead();

    if (isKeepalive)
      port.keepaliveEnd( this);

      /**
       * tcp connection task 的主要任务就是 accept 等待新的的链接
       */
      while (! _isDead) {
        if (isKeepalive) {
        }
        /**
         * accept 在等待
         */
        else if (port.accept( this, isFirst))
          _connectionStartTime = Alarm. getCurrentTime();
        else
          return;

        isFirst = false;
        ConnectionController controller = null;

        try {
          do {
            controller = null;

            isKeepalive = false;

            if (! port.isClosed()
                && (! isWaitForRead || getReadStream().waitForRead())) {

                   /**
                   * accept 之后,这里处理 http request了
                   */
              synchronized ( _requestLock) {
                isKeepalive = request.handleRequest();
              }

              controller = getController();
              if (controller != null && controller.isActive()) {
                isKeepalive = true;
                return;
              }
            }
          } while (isKeepalive && waitForKeepalive() && ! port.isClosed());

          if (isKeepalive) {
            return;
          }
          else {
            getRequest().protocolCloseEvent();
          }
        }
        catch (ClientDisconnectException e) {
        }
        catch (IOException e) {
        }
        finally {
          if (! isKeepalive)
            closeImpl();
        }
      }
    } catch (Throwable e) {
    } finally {
      /**
       * 任务执行完了,这个 tcp task就结束了
       */
      if (isKeepalive) {
        keepalive();
      }
      else
        free();
    }
  }
// 可以看到 tcp connection task如果是keepalive的,会一直轮询,等待处理http请求
// 整个task 执行流程是:
// 首先执行Port的accept方法,等待tcp链接,port.accept( this, isFirst)
// 创建完链接后,就会去处理这个http请求 request.handleRequest();
// 整个处理流程包含了 tcp链接创建和http请求处理

去看下Port的accept方法
  /**
   * Accepts a new connection.
   */
  public boolean accept (TcpConnection conn, boolean isFirst)
  {
    try {
      synchronized ( this) {
        _idleThreadCount++;

        /**
         * _startThreadCount 在这里进行了 -- 自减操作
         */
        if (isFirst) {
          _startThreadCount--;
        }

        if (_acceptThreadMax < _idleThreadCount) {
          return false;
        }
      }

      while (_lifecycle.isActive()) {
        QSocket socket = conn.startSocket();
       
        /**
         * 这里将会调用底层 jdk的accept 方法
         */
        if (_serverSocket.accept(socket)) {
          conn.initSocket();

        if (_throttle.accept(socket))
          return true;
        else
          socket.close();
        }
        else {
          if ( _acceptThreadMax < _idleThreadCount) {
            return false;
          }
        }
      }
    } catch (Throwable e) {
    } finally {
      synchronized ( this) {
        _idleThreadCount--;

        if (_idleThreadCount + _startThreadCount < _acceptThreadMin) {
          notify();
        }
      }
    }
    return false;
  }
// 可以看到 Port类的 accept方法是同步方法块
// 整个方法的逻辑也简单
//

我们的一个线上服务用的是resin3的开源版,一直都运行良好
但是每当流量暴涨时,resin就是出现假死的情况,进行多番排查,基本定位问题了
下面给出分析:

主要是 Port 线程的逻辑 和 resin参数配置的问题
在 Port 线程逻辑里,创建 tcp connection task有3个条件 :起关键影响的是这个
       /**
         * 着重讲讲这个条件 isStart = _startThreadCount + _idleThreadCount < _acceptThreadMin;
         *
         *  _acceptThreadMin resin默认值为5
         *  _acceptThreadMax resin默认值为10
         * 
         *  当resin服务的外部流量稳定时,即平滑增长状态,则 port线程运行的很良好
         *  但是当出现流量暴涨时,而且resin的线程池已经耗尽,已经达到 threadMax,此时会出现问题
         *  _idleThreadCount 一直为0,threadCount 也没有呈现增长的状态,外部链接出现了响应超时情况,感觉是resin服务“挂了”
         * 
         *  推测这个时候的状态是这样的:
         *  1.虽然 _idleThreadCount 为0,但是 _startThreadCount 增长到 _acceptThreadMin了
         *        即 isStart = _acceptThreadMin + 0 < _acceptThreadMin ; isStart 为 false
         *        _startThreadCount 要减少的情况就是 新创建的 tcp connection task开始执行accept了
         * 
         *  2.此时已经有 最多threadMax 的线程在执行,时间片分配得花点时间了
         *        考虑极端情况,新创建的 _startThreadCount(值已经等于_acceptThreadMin)个线程都在等待时间片分配
         *        另外, tcp connection task要开始执行accept,还得要先获得 port 线程对象的锁
         *        synchronized (this) {
         *              省略代码。。。
         *               if (isFirst) {
         *                    _startThreadCount --;
         *              }
         *              省略代码。。。
         *        }
         *
         *  3.既然 isStart 为 false 则 port 线程就会进行等待 wait(60000) 足足等待一分钟,而且 port线程只有一个
         *        如果后续 isStart 还不为 true ,则就还会继续等待 60 秒,这将会是灾难了
         * 
         *  4.结合了上述的情况,外部的链接就没有办法得到响应了,只能等当前的请求都处理完,才能进行处理后续的请求了
         * 
         *  PS:我曾经试着去掉 _startThreadCount 这个条件进行了测试,程序没跑多久就出现 OOM等的问题了
         *  明显,这个_startThreadCount 是限制创建 tcp task的数量,因为 resin的线程池里的task队列是*的,用的是 ArrayList <Runnable>
         * 
         * 
         *  要避免这样的情况:就把 _acceptThreadMin _acceptThreadMax 分别设置的大些
         *  防止 port线程因为 isStart条件的问题导致无法分配 tcp accept的线程,从而导致无法响应更多的外部请求
         * 
         *  开源版resin是BIO线程池模型,是阻塞的,即创建一定量的 accept thread,等待请求并处理
         *  resin的并发能力取决于 accept thread的线程数 与 最大 threadMax
         * 
         *  专业版resin貌似是使用 nio,通过select机制,效率吞吐量都会更好
         * 
         */


【resin】 resin3 线程池与IO模型(2),布布扣,bubuko.com

【resin】 resin3 线程池与IO模型(2)

上一篇:线程私有存储空间--pthread_key_t


下一篇:09 spark连接mysql数据库