Dubbo用管程实现异步转同步
Lock可以响应中断,支持超时,以及非阻塞获取锁
Condition实现了管程里面的条件变量
synchronized里的条件变量只有1个,而Lock和Condition的组合可以有多个条件变量
1.利用两个条件变量实现阻塞队列
阻塞队列,2个条件变量,1个是队列不空(空队列不允许出队),1个是队列不满(队列满不允许入队)
public class BlockedQueue<T>{
final Lock lock =
new ReentrantLock();
// 条件变量:队列不满
final Condition notFull =
lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty =
lock.newCondition();
// 入队
void enq(T x) {
lock.lock();
try {
while (队列已满){
// 等待队列不满
notFull.await();
}
// 省略入队操作...
//入队后,通知可出队
notEmpty.signal();
}finally {
lock.unlock();
}
}
// 出队
void deq(){
lock.lock();
try {
while (队列已空){
// 等待队列不空
notEmpty.await();
}
// 省略出队操作...
//出队后,通知可入队
notFull.signal();
}finally {
lock.unlock();
}
}
}
线程等待和通知是要调用await(),singal(),singAll()
语意与wait(),notify(),notifyAll()相同,但是wait(),notify(),notifyAll()只有在synchronized里才能使用。lock和condition里不能用。
2.同步与异步
调用方是否需要等待结果,需要等待就是同步
如果调用非不需要等待结果,就是异步
// 计算圆周率小说点后100万位
String pai1M() {
//省略代码无数
}
pai1M()
printf("hello world")
如果pai1M方法要执行2个礼拜,线程一直等着计算结果,2个结果后返回,就可以执行hello world了。这就是同步
如果执行pai1M之后,线程不用等待结果,就立刻执行hello world了,就属于异步了
同步是java默认的处理方式
如果要异步,怎么做?
- 调用方创建一个子线程,在子线程中执行方法调用,异步调用
- 方法实现的时候,创建一个新的线程执行主要逻辑,主线程直接return,这种方法我们成为异步方法。
3.Dubbo源码分析
TCP协议本身就是异步的,RPC调用时,TCP层面,发送完RPC请求后,线程不会等待RPC的响应结果的
可是平时工作中RPC调用大多数是同步的啊?怎么回事
很简单,就是做了异步转同步。RPC框架Dubbo就是做了异步转同步的事情。
sayHello方法就是同步方法,执行sayHello时候,线程会停下来等结果
DemoService service = 初始化部分省略
String message =
service.sayHello("dubbo");
System.out.println(message);
此时打印出线程栈信息,会看到线程的状态为TIMED_WAITING
本来发送请求是异步的,但是调用线程却阻塞了,dubbo做了异步转同步的事情
DefaultFuturn.get(),推断Dubbo异步转同步的功能通过DefaultFuture这个类实现的
在DubblInvoker的108行调用的DefaultFuture.get(),但是也是先调用了request(inv,timeout)方法,这个方法其实就是发送rpc请求,然后之后调用get方法返回rpc结果
public class DubboInvoker{
Result doInvoke(Invocation inv){
// 下面这行就是源码中108行
// 为了便于展示,做了修改
return currentClient
.request(inv, timeout)
.get();
}
}
当RPC返回结果前,阻塞调用线程,让调用线程等待,当rpc返回结果后,唤醒调用线程,让调用线程重新执行
这也就是等待-通知机制啊。
接下来看看dubbo如何用等待通知机制,实现异步转同步操作
// 创建锁与条件变量
private final Lock lock
= new ReentrantLock();
private final Condition done
= lock.newCondition();
// 调用方通过该方法等待结果
Object get(int timeout){
long start = System.nanoTime();
lock.lock();
try {
while (!isDone()) {
done.await(timeout);
long cur=System.nanoTime();
if (isDone() ||
cur-start > timeout){
break;
}
}
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException();
}
return returnFromResponse();
}
// RPC结果是否已经返回
boolean isDone() {
return response != null;
}
// RPC结果返回时调用该方法
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
}
调用get()方法等待rpc结果时,看到的都是熟悉面孔,lock获取锁,finally释放锁,获取锁后通过循环调用await来实现等待
rpc结果返回时,调用doReceived方法,这个方法lock获取锁,finally释放锁,通过singal通知调用线程,然后await那收到,就不会继续等待了
dubbo的异步转同步就分析完了,
最近异步编程大火,有好多api都是异步的,例如公有云api本身是异步的,创建云主机的异步api,调用成功了,需要掉另一个api轮训主机状态。
项目内部封装创建云主机的api接口,面临异步转同步的问题。同步API更易用。