consumer的DubboResponseTimeoutScanTimer线程

考虑这样一种情况,由于网络延时,consumer先抛出超时异常,一段时间后又收到了已经超时的响应,dubbo是怎么处理的?

拆分为3步看:

1. consumer的DubboResponseTimeoutScanTimer进行扫描

DubboResponseTimeoutScanTimer负责扫描响应,如果发现超时,自行构造一个超时响应,并处理。

Future,Request,Response共用同一个id

consumer的DubboResponseTimeoutScanTimer线程

//DefaultFuture内部类
private static class RemotingInvocationTimeoutScan implements Runnable { public void run() {
while (true) {
try {
for (DefaultFuture future : FUTURES.values()) {
if (future == null || future.isDone()) {
continue;
}
if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
// consumer创建一个超时响应
// create exception response.
Response timeoutResponse = new Response(future.getId());
// set timeout status.
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
DefaultFuture.received(future.getChannel(), timeoutResponse);
}
}
Thread.sleep(30);
} catch (Throwable e) {
logger.error("Exception when scan the timeout invocation of remoting.", e);
}
}
}
} //DefaultFuture类
public static void received(Channel channel, Response response) {
try {
//首先删除future对象
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
} private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
//唤醒等待的线程(也许有,也许没有)
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}

2. consumer因为超时抛异常

//DefaultFuture
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (! isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (! isDone()) {
   // 被DubboResponseTimeoutScanTimer线程唤醒,但是有个超时的响应,所以isDone返回true      
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (! isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
// isDone返回true,进入returnFromResponse
return returnFromResponse();
} private Object returnFromResponse() throws RemotingException {
Response res = response;
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
return res.getResult();
}
if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
//此处抛异常
throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
}
throw new RemotingException(channel, res.getErrorMessage());
}

3. 迟到的请求到来时

//DefaultFuture
public static void received(Channel channel, Response response) {
try {
//DubboResponseTimeoutScanTimer已经删除了迟到的请求
//所以走else分支
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}

静态分析代码时,以为先是步骤2,然后是步骤1,但实际调试时,结果是步骤1,步骤2.

上一篇:.Net内存泄露原因及解决办法


下一篇:解决Net内存泄露原因