【resin】 resin3 线程池与IO模型(2)
上篇文章主要介绍了resin3的线程池逻辑,下面具体分析下resin3的网络IO模型
resin3开源版使用的IO模型是BIO模型
相关的核心类是Port和TcpConnection
下面贴出源码:为说明核心逻辑,只展示核心的代码
本文所分析为个人理解,欢迎指出不足之处或分享你的观点
下面看看 Port 类:
/**
* The port thread is responsible for creating new connections.
*
* 负责创建 tcp connection task的主线程
*/
public void run()
{
while (! _lifecycle.isDestroyed())
{
boolean isStart;
try {
// Thread.sleep(10);
synchronized ( this)
{
// need delay to avoid spawning too many threads over a short
time,
// when the load doesn‘t justify it
if (_idleThreadCount >
0) {
wait(1);
}
/**
* port 线程创建 tcp connection task的条件有三个:
* 1.lifecycle.isActive() 为true 这个应该是只要resin服务没有停止就肯定是true了
* 2.isStart = _startThreadCount + _idleThreadCount < _acceptThreadMin 第二个条件是这个 isStart 为true
* _acceptThreadMin 是用户设置的最小创建来accept socket的线程数
* _idleThreadCount 这个可以理解为当前block在 accept的线程数,相当于等待,就是idle了
* _startThreadCount 这个值的作用,我的理解是主要用来控制创建 tcp accept的task数量,
防止无限制的创建
*
* 3._connectionMax <= _connectionCount 这个条件基本上可以忽略了,resin默认的最大链接数是 1024*1024 这个够用了
*
*/
isStart = _startThreadCount + _idleThreadCount < _acceptThreadMin;
if ( _connectionMax <= _connectionCount)
isStart = false;
if (! _lifecycle.isActive())
isStart = false;
/**
* 如果不需要创建新的 connection 则port线程就等待 1 分钟
*/
if (!
isStart) {
Thread. interrupted();
wait(60000);
}
if (isStart)
{
_connectionCount++;
_startThreadCount++;
}
}
if (isStart
&& _lifecycle.isActive()) {
TcpConnection conn = _freeConn.allocate();
if (conn
== null) {
conn = new TcpConnection( this, _serverSocket.createSocket());
conn.setRequest( _protocol.createRequest(conn));
}
conn.start();
/**
* 创建完 tcp connection task 是 Runnable 的
* 丢进resin的线程池里执行
*/
ThreadPool. getThreadPool().schedule (conn);
}
} catch (Throwable
e) {
e.printStackTrace();
}
}
}
// port 线程在resin IO处理逻辑中起着关键作用,而且是单线程执行
// port 线程的逻辑很简单,就是不断轮询,只要满足条件就创建 TcpConnection task 提交线程池处理
// 不满足条件就 wait 60秒
//
下面看看 TcpConnection
tcpconnection task的run方法很长,我直接省略了很多代码,保留核心逻辑
/**
* Runs as a task.
*/
public void run ()
{
Port port = getPort();
boolean isKeepalive
= _isKeepalive;
_isKeepalive = false;
boolean isResume
= _isResume;
_isResume = false;
_isWake = false;
boolean isFirst
= ! isKeepalive;
ServerRequest request = getRequest();
boolean isWaitForRead
= request.isWaitForRead();
if (isKeepalive)
port.keepaliveEnd( this);
/**
* tcp connection task 的主要任务就是 accept 等待新的的链接
*/
while (! _isDead)
{
if (isKeepalive)
{
}
/**
* accept 在等待
*/
else if (port.accept( this,
isFirst))
_connectionStartTime =
Alarm. getCurrentTime();
else
return;
isFirst = false;
ConnectionController controller = null;
try {
do {
controller = null;
isKeepalive = false;
if (!
port.isClosed()
&& (! isWaitForRead || getReadStream().waitForRead())) {
/**
* accept 之后,这里处理 http request了
*/
synchronized ( _requestLock)
{
isKeepalive = request.handleRequest();
}
controller = getController();
if (controller
!= null && controller.isActive()) {
isKeepalive = true;
return;
}
}
} while (isKeepalive
&& waitForKeepalive() && ! port.isClosed());
if (isKeepalive)
{
return;
}
else {
getRequest().protocolCloseEvent();
}
}
catch (ClientDisconnectException
e) {
}
catch (IOException
e) {
}
finally {
if (!
isKeepalive)
closeImpl();
}
}
} catch (Throwable
e) {
} finally {
/**
* 任务执行完了,这个 tcp task就结束了
*/
if (isKeepalive)
{
keepalive();
}
else
free();
}
}
// 可以看到 tcp connection task如果是keepalive的,会一直轮询,等待处理http请求
// 整个task 执行流程是:
// 首先执行Port的accept方法,等待tcp链接,port.accept( this,
isFirst)
// 创建完链接后,就会去处理这个http请求 request.handleRequest();
// 整个处理流程包含了 tcp链接创建和http请求处理
去看下Port的accept方法
/**
* Accepts a new connection.
*/
public boolean accept (TcpConnection
conn, boolean isFirst)
{
try {
synchronized ( this)
{
_idleThreadCount++;
/**
* _startThreadCount 在这里进行了 -- 自减操作
*/
if (isFirst)
{
_startThreadCount--;
}
if (_acceptThreadMax < _idleThreadCount)
{
return false;
}
}
while (_lifecycle.isActive())
{
QSocket socket = conn.startSocket();
/**
* 这里将会调用底层 jdk的accept 方法
*/
if (_serverSocket.accept(socket))
{
conn.initSocket();
if (_throttle.accept(socket))
return true;
else
socket.close();
}
else {
if ( _acceptThreadMax < _idleThreadCount)
{
return false;
}
}
}
} catch (Throwable
e) {
} finally {
synchronized ( this)
{
_idleThreadCount--;
if (_idleThreadCount + _startThreadCount < _acceptThreadMin)
{
notify();
}
}
}
return false;
}
// 可以看到 Port类的 accept方法是同步方法块
// 整个方法的逻辑也简单
//
我们的一个线上服务用的是resin3的开源版,一直都运行良好
但是每当流量暴涨时,resin就是出现假死的情况,进行多番排查,基本定位问题了
下面给出分析:
主要是 Port 线程的逻辑 和 resin参数配置的问题
在 Port 线程逻辑里,创建 tcp connection task有3个条件 :起关键影响的是这个
/**
* 着重讲讲这个条件 isStart = _startThreadCount + _idleThreadCount < _acceptThreadMin;
*
* _acceptThreadMin resin默认值为5
* _acceptThreadMax resin默认值为10
*
* 当resin服务的外部流量稳定时,即平滑增长状态,则 port线程运行的很良好
* 但是当出现流量暴涨时,而且resin的线程池已经耗尽,已经达到 threadMax,此时会出现问题
* _idleThreadCount 一直为0,threadCount 也没有呈现增长的状态,外部链接出现了响应超时情况,感觉是resin服务“挂了”
*
* 推测这个时候的状态是这样的:
* 1.虽然 _idleThreadCount 为0,但是 _startThreadCount 增长到 _acceptThreadMin了
* 即 isStart = _acceptThreadMin + 0 < _acceptThreadMin ; isStart 为 false
* _startThreadCount 要减少的情况就是 新创建的 tcp connection
task开始执行accept了
*
* 2.此时已经有 最多threadMax 的线程在执行,时间片分配得花点时间了
* 考虑极端情况,新创建的 _startThreadCount(值已经等于_acceptThreadMin)个线程都在等待时间片分配
* 另外, tcp connection task要开始执行accept,还得要先获得
port 线程对象的锁
* synchronized (this) {
* 省略代码。。。
* if (isFirst) {
* _startThreadCount --;
* }
* 省略代码。。。
* }
*
* 3.既然 isStart 为 false 则 port 线程就会进行等待 wait(60000) 足足等待一分钟,而且 port线程只有一个
* 如果后续 isStart 还不为 true ,则就还会继续等待 60 秒,这将会是灾难了
*
* 4.结合了上述的情况,外部的链接就没有办法得到响应了,只能等当前的请求都处理完,才能进行处理后续的请求了
*
* PS:我曾经试着去掉 _startThreadCount 这个条件进行了测试,程序没跑多久就出现 OOM等的问题了
* 明显,这个_startThreadCount 是限制创建 tcp task的数量,因为
resin的线程池里的task队列是*的,用的是 ArrayList <Runnable>
*
*
* 要避免这样的情况:就把 _acceptThreadMin _acceptThreadMax 分别设置的大些
* 防止 port线程因为 isStart条件的问题导致无法分配 tcp accept的线程,从而导致无法响应更多的外部请求
*
* 开源版resin是BIO线程池模型,是阻塞的,即创建一定量的 accept thread,等待请求并处理
* resin的并发能力取决于 accept thread的线程数 与 最大 threadMax
*
* 专业版resin貌似是使用 nio,通过select机制,效率吞吐量都会更好
*
*/