我们通过FutureTask.get()方法取得计算结果:如果调用get()的时候,计算还没结束,则当前线程阻塞;如果计算已经完成,则get()立即返回结果,如下图所示;如果计算过程出错、抛出异常,则get()不阻塞并且直接抛出异常(异常信息封装在这个异常的clause中);如果调用了cancel(),又调用get(),则抛出CancellationException。调用get()的时候,记得要捕捉异常。FutureTask在计算完成之后不能再次计算(复用),因此要想重新计算,需要调用runAndReset() 或者重新创建一个FutureTask。
![FutureTask的使用方法及实现原理](https://www.icode9.com/i/ll/?i=img_convert/8a002ed
【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 开源分享
4707e1678fcab9d8dbc9b5519.png)
FutureTask的实现
FutureTask的老版本实现是基于
AbstractQueuedSynchronizer的(见参考文章),这样会有一个缺点,当多个线程对同一个FutureTask执行cancel的时候,FutureTask会把中断状态保留起来,让调用方感到奇怪。所以,FutureTask的新版本实现修改为用state字段+CAS操作进行同步控制,再用简化版的Treiber栈保存等待线程。虽然FutureTask没有使用AQS,但是其实现原理非常类似于AQS。
1. 用一个volatile int state变量作为同步状态
AQS中也有state,但AQS不会使用在state中保存的内容,只是对它执行CAS操作,看是否成功而已。与AQS不同,FutureTask需要用到state变量中的内容。state所有可能的取值有7个,分别如下:
-
NEW = 0; 初始状态,FutureTask刚被创建,正在计算中都是该状态。
-
COMPLETING = 1; 中间状态,表示计算已完成正在对结果进行赋值,或正在处理异常
-
NORMAL = 2; 终止状态,表示计算已完成,结果已经被赋值。
-
EXCEPTIONAL = 3; 终止状态,表示计算过程已经被异常打断。
-
CANCELLED = 4; 终止状态,表示计算过程已经被cancel操作终止。
-
INTERRUPTING = 5; 中间状态,表示计算过程已开始并且被中断,正在修改状态。
-
INTERRUPTED = 6; 终止状态,表示计算过程已开始并且被中断,目前已完全停止。
在FutureTask中,state只会被set(), setException(),cancel()修改为终止状态(NORMAL, EXCEPTIONAL, CANCELED, INTERRUPTED)。要注意是,**这个state变量被声明为volatile,不仅保证了它自己的可见性,还保证了FutureTask类其他成员属性的可见性。**在其他变量的赋值操作应该在state变量的赋值操作之前,对其他变量的访问操作应该在对state变量的操作之后。例如,
public FutureTask(Callable callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
在FutureTask的构造函数中,对声明为volatile的state变量进行赋值操作后,callable变量也会变得对于其他线程可见。
2. 用一个Treiber栈保存等待线程
Treiber栈是一个无锁数据结构,FutureTask中的waiters变量指向这个栈的栈顶。名字有点吓人,其实很简单,就是一个无锁的线程安全的栈。入栈操作只通过一步CAS操作实现,即修改栈顶指针waiters;出栈和在栈的中间执行删除操作通过特定的循环操作实现。
3. 取消操作cancel()的实现
cancel操作首先判断state是否等于NEW,即判断计算线程(runner)是否已经开始。如果没有开始执行,则立即用CAS操作将状态改为CANCELED或INTERRUPTING,防止runner开始执行计算工作。判断了state之后,紧接着用compareAndSwap对state进行修改而不是覆盖式的赋值,这是考虑到在判断state和修改state之间,state可能被其他线程修改。顺便讲一下,state的判断后面紧跟CAS操作,可以让两个原子操作合并成一个原子操作,
state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)
如果状态已经不是NEW,说明runner已经开始,此时根据入参mayInterruptIfRunning来决定是否给runner发中断。接下来,把Treiber栈中的等待线程全部唤醒并且移出。这里的唤醒动作考虑到可能有多个线程并发调用cancel(),所以设计了一个循环CAS操作,对Treiber栈操作竞争,保证只有一个线程能够进入内层循环,代码如下:
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;