FutureTask的使用方法及实现原理

FutureTask用法介绍

FutureTask是JDK并发包为Future接口提供的一个实现,代表一个支持取消操作(cancel)的异步计算任务。它实现了Future接口和Runnable接口,所以既是计算任务对象也是结果对象。它可以提交到线程池中去执行,并且结果直接放在自身这个FutureTask中,不是放在另外一个Future中。我们把FutureTask提交到线程池运行的方法一般是先实现一个Callable或Runnable,然后再把它传给FutureTask的对应构造函数从而创建一个FutureTask,然后把这个FutureTask提交给Executor.execution()。代码如下:

FutureTask<String> future = new FutureTask<String>(
 new Callable<String>() {
 public String call() {
 return searcher.search(target);
 }
 });
executor.execute(future);

我们通过FutureTask.get()方法取得计算结果:如果调用get()的时候,计算还没结束,则当前线程阻塞;如果计算已经完成,则get()立即返回结果,如下图所示;如果计算过程出错、抛出异常,则get()不阻塞并且直接抛出异常(异常信息封装在这个异常的clause中);如果调用了cancel(),又调用get(),则抛出CancellationException。调用get()的时候,记得要捕捉异常。FutureTask在计算完成之后不能再次计算(复用),因此要想重新计算,需要调用runAndReset() 或者重新创建一个FutureTask。

FutureTask的使用方法及实现原理

FutureTask的实现

FutureTask的老版本实现是基于
AbstractQueuedSynchronizer的(见参考文章),这样会有一个缺点,当多个线程对同一个FutureTask执行cancel的时候,FutureTask会把中断状态保留起来,让调用方感到奇怪。所以,FutureTask的新版本实现修改为用state字段+CAS操作进行同步控制,再用简化版的Treiber栈保存等待线程。虽然FutureTask没有使用AQS,但是其实现原理非常类似于AQS。

1. 用一个volatile int state变量作为同步状态

AQS中也有state,但AQS不会使用在state中保存的内容,只是对它执行CAS操作,看是否成功而已。与AQS不同,FutureTask需要用到state变量中的内容。state所有可能的取值有7个,分别如下:

  • NEW = 0; 初始状态,FutureTask刚被创建,正在计算中都是该状态。
  • COMPLETING = 1; 中间状态,表示计算已完成正在对结果进行赋值,或正在处理异常
  • NORMAL = 2; 终止状态,表示计算已完成,结果已经被赋值。
  • EXCEPTIONAL = 3; 终止状态,表示计算过程已经被异常打断。
  • CANCELLED = 4; 终止状态,表示计算过程已经被cancel操作终止。
  • INTERRUPTING = 5; 中间状态,表示计算过程已开始并且被中断,正在修改状态。
  • INTERRUPTED = 6; 终止状态,表示计算过程已开始并且被中断,目前已完全停止。

在FutureTask中,state只会被set(), setException(),cancel()修改为终止状态(NORMAL, EXCEPTIONAL, CANCELED, INTERRUPTED)。要注意是,这个state变量被声明为volatile,不仅保证了它自己的可见性,还保证了FutureTask类其他成员属性的可见性。在其他变量的赋值操作应该在state变量的赋值操作之前,对其他变量的访问操作应该在对state变量的操作之后。例如,

public FutureTask(Callable<V> callable) {
 if (callable == null)
 throw new NullPointerException();
 this.callable = callable;
 this.state = NEW; // ensure visibility of callable
 }

在FutureTask的构造函数中,对声明为volatile的state变量进行赋值操作后,callable变量也会变得对于其他线程可见

2. 用一个Treiber栈保存等待线程

Treiber栈是一个无锁数据结构,FutureTask中的waiters变量指向这个栈的栈顶。名字有点吓人,其实很简单,就是一个无锁的线程安全的栈。入栈操作只通过一步CAS操作实现,即修改栈顶指针waiters;出栈和在栈的中间执行删除操作通过特定的循环操作实现。

3. 取消操作cancel()的实现

cancel操作首先判断state是否等于NEW,即判断计算线程(runner)是否已经开始。如果没有开始执行,则立即用CAS操作将状态改为CANCELED或INTERRUPTING,防止runner开始执行计算工作。判断了state之后,紧接着用compareAndSwap对state进行修改而不是覆盖式的赋值,这是考虑到在判断state和修改state之间,state可能被其他线程修改。顺便讲一下,state的判断后面紧跟CAS操作,可以让两个原子操作合并成一个原子操作,

state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)

如果状态已经不是NEW,说明runner已经开始,此时根据入参mayInterruptIfRunning来决定是否给runner发中断。接下来,把Treiber栈中的等待线程全部唤醒并且移出。这里的唤醒动作考虑到可能有多个线程并发调用cancel(),所以设计了一个循环CAS操作,对Treiber栈操作竞争,保证只有一个线程能够进入内层循环,代码如下:

private void finishCompletion() {
 for (WaitNode q; (q = waiters) != null;) {
 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
 for (;;) {
 Thread t = q.thread;
 if (t != null) {
 q.thread = null;
 LockSupport.unpark(t);
 }
 WaitNode next = q.next;
 if (next == null)
 break;
 q.next = null; // unlink to help gc
 q = next;
 }
 break;
 }
 }
 done();
 callable = null;
}

基于循环CAS操作对一个数据结构进行修改,是实现无锁数据结构的常用套路。

4. 等待线程中的等待操作get()的实现

等待线程在启动FutureTask执行,在必须访问其计算结果的时候调用get()。如果此时计算结果已经出来,则get()不会阻塞,直接把outcome变量中保存的记过返回;如果还未计算完成,则会执行一个等待循环。这个等待循环与AQS加锁操作中的循环类似,但有些不同,比如它管理一个等待线程。这个循环不断的检查中断状态和state:如果遇到中断则退出;如果state表示计算结束则退出;如果state=COMPLETING,则自旋;如果已经超时,则退出;其他情况阻塞。FutureTask.get()的阻塞操作就是由这个循环实现的,具体见get()和awaitDone()方法的代码。

5. 计算线程所调用的run()方法的实现

计算线程,例如线程池中的worker,通过调用FutureTask.run()开始执行计算工作。请参照FutureTask.run的代码,如下(建议对照代码阅读)。

public void run() {
 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset,
 null, Thread.currentThread()))
 return;
 try {
 Callable<V> c = callable;
 if (c != null && state == NEW) {
 V result;
 boolean ran;
 try {
 result = c.call();
 ran = true;
 } catch (Throwable ex) {
 result = null;
 ran = false;
 setException(ex);
 }
 if (ran)
 set(result);
 }
 } finally {
 // runner must be non-null until state is settled to
 // prevent concurrent calls to run()
 runner = null;
 // state must be re-read after nulling runner to prevent
 // leaked interrupts
 int s = state;
 if (s >= INTERRUPTING)
 handlePossibleCancellationInterrupt(s);
 }
}

在真正计算之前,首先检查一下state,看有没有被cancel();然后用CAS操作对runner字段进行竞争性赋值,避免多个线程进入run()。如果以上两步失败则停止,否则执行Callable或Runnable中的run()。等run()计算完成后,先将状态改为一个中间状态(COMPLETING),然后对结果outcome赋值,再用一个延迟写入赋值修改state为终止状态NORMAL或EXCEPTIONAL。为什么要多此一举,先把状态改为COMPLETING然后又改为COMPLETED,不能直接改为COMPLETED吗?因为修改state和把结果写入outcome变量是两个步骤,修改state和修改outcome操作的中间可能有其他操作,所以定义一个中间状态COMPLETING,它告诉你结果已经计算好了,但是我们目前还不能从outcome读取,只有等它变成NORMAL之后,才能读取outcome。这是一个无锁设计。最后,对结果变量outcome赋值成功以后,唤醒所有等待线程。以上就是FutureTask的run()方法的实现。FutureTask.runAndReset与FutureTask.run大体上相似,区别是runAndReset从其他地方取得返回结果,计算结束后状态恢复成NEW从而让FutureTask可以重新执行。

原文链接:FutureTask的使用方法及实现原理 (toutiao.com)

上一篇:bzoj1132[POI2008]Tro 计算几何


下一篇:向peersim开火!P2P开火!(安装和样例运行)