并发编程(四)TaskFuture

并发编程(四)TaskFuture

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Object> future = executorService.submit(() -> {
TimeUnit.SECONDS.sleep(5);
return 5;
});
Object result = future.get();

ExecutorService 异步执行任务返回一个 Future,本节重点分析 Future 的 get 方法是如何拿到返回结果的呢?

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
} protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

下面我们重点分析 FutureTask 类

并发编程(四)TaskFuture

一、基本变量

(1) 核心成员变量

// 1. 执行的回调方法。如果是 Runnable 就通过 Executors#callable 包装成一个 Callable
private Callable<V> callable; // 2. 保存计算结果或者异常信息。non-volatile, protected by state reads/writes
private Object outcome; // 3. 执行 callable 的线程,run 方法中通过 CAS 保证原子性操作
private volatile Thread runner; // 4. 等待结果的线程队列,eg: 不同的线程同时调用 get()
// 这个队列使用 Treiber stack(可以理解为基于 CAS 的无锁的栈,先进后出)
private volatile WaitNode waiters;

(2) 状态变化

/*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

任务执行正常结束前,state 会被设置成 COMPLETING,代表任务即将完成,接下来很快就会被设置为 NARMAL 或者 EXCEPTIONAL,这取决于调用 Runnable 中的 call() 方法是否抛出了异常。有异常则后者,反之前者。

任务提交后、任务结束前取消任务,那么有可能变为 CANCELLED 或者 INTERRUPTED。在调用 cancel 方法时,如果传入 false 表示不中断线程,state 会被置为 CANCELLED,反之 state 先被变为 INTERRUPTING,后变为 INTERRUPTED。

总结下,FutureTask 的状态流转过程,可以出现以下四种情况:

  1. 任务正常执行并返回。 NEW -> COMPLETING -> NORMAL
  2. 执行中出现异常。NEW -> COMPLETING -> EXCEPTIONAL
  3. 任务执行过程中被取消,并且不响应中断。NEW -> CANCELLED
  4. 任务执行过程中被取消,并且响应中断。 NEW -> INTERRUPTING -> INTERRUPTED 

补充:Unsafe

Unsafe 是 JDK 底层的类库,位于 sun.misc.Unsafe 中,在 java.util.concurrent 广泛使用。

private static final UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long stateOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("state")); // 更新 state 状态
UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)

二、run

/**
* run 方法执行有两个条件:1. state=NEW; 2. runner=null
* 1. 执行前 state=NEW & runner=null
* 2. 执行中 state=NEW & runner=Thread.currentThread()
* 3. 执行后 state!=NEW & runner=null,根据是否有异常执行 set(result) 或 setException(ex),无论执行成功与否都会更新 state 状态
* 因此,多个线程同时调用 run 方法的情况 callable 也只会执行一次
*/
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
// set 方法会回调钩子方法 done(),可能抛出异常
if (ran)
set(result);
}
} finally {
runner = null; // 等待调用 cancel(true) 的线程完成中断,防止中断操作逃逸出 run 或者 runAndReset 方法,影响后续操作
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
} protected void set(V v) {
// 通过 CAS 状态来确认计算没有被取消,而且线程只执行了一次
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
} protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
} private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
// 必须将栈顶 CAS 为 null,否则重读栈顶并重试。
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 遍历并唤醒栈中等待的线程
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
// 将 next 域置为 null,这样对 GC 友好
q.next = null;
q = next;
}
break;
}
} /*
* done 方法是暴露给子类的一个钩子方法。
* 这个方法在 ExecutorCompletionService.QueueingFuture 中的 override 实现是把结果加到阻塞队列里。
*/
done(); callable = null;
} private void handlePossibleCancellationInterrupt(int s) {
/*
* 等待调用 cancel(true) 的线程完成中断,防止中断操作逃逸出 run 或者 runAndReset 方法,影响后续操作
*
* 实际上,当前调用 cancel 方法的线程不一定能够中断到本线程。
* 有可能 cancel 方法里读到 runner 是 null,甚至有可能是其它并发调用 run/runAndReset 方法的线程。
* 但是也没办法判断另一个线程在 cancel 方法中读到的 runner 到底是什么,所以索性自旋让出 CPU 时间片也没事。
*/
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield();
}

三、get

public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果线程已经执行完成直接返回
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
} /**
* 等待任务执行完毕,如果任务取消或者超时则停止
* @param timed 为 true 表示设置超时时间
* @param nanos 超时时间
* @return 任务完成时的状态
* @throws InterruptedException
*/
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
} int s = state;
// 1. callable 已执行完成,无论成功或失败直接返回执行结果
if (s > COMPLETING) {
// 已执行完,为了 GC 需要清 q.thread
if (q != null)
q.thread = null;
return s;
}
// 2. COMPLETING 是一个很短暂的状态,调用 Thread.yield 期望让出时间片,之后重试循环
else if (s == COMPLETING)
Thread.yield();
// 3. 初始化节点,重试一次循环
else if (q == null)
q = new WaitNode();
// 4. queued 记录是否已经入栈,此处准备将节点压栈
else if (!queued)
/*
* 这是 Treiber Stack 算法入栈的逻辑。
* Treiber Stack 是一个基于 CAS 的无锁并发栈实现
* 更多可以参考https://en.wikipedia.org/wiki/Treiber_Stack
*/
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 5. 如果有时限,判断是否超时,未超时则park剩下的时间。
else if (timed) {
nanos = deadline - System.nanoTime();
// 超时,移除栈中节点
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
} /**
* 清理用于保存等待线程栈里的无效节点,所谓节点无效就是内部的 thread 为 null(类比 ThreadLocalMap)
*
* 一般有以下几种情况:
* 1. 节点调用 get 超时。
* 2. 节点调用 get 中断。
* 3. 节点调用 get 拿到 task 的状态值(> COMPLETING)。
*
* 此方法干了两件事情:
* 1. 置标记参数 node 的 thread 为 null
* 2. 清理栈中的无效节点
*
* 如果在遍历过程中发现有竞争则重新遍历栈。
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
// pre -> current -> next,如果 current 无效就把 pre.next=next
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
// 1. 如果当前节点仍有效,则置 pred 为当前节点,继续遍历
if (q.thread != null)
pred = q; // 2. 当前节点已无效且有前驱,则将前驱的后继置为当前节点的后继实现删除节点。
// 如果前驱节点已无效,则重新遍历 waiters 栈。
else if (pred != null) {
pred.next = s;
if (pred.thread == null)
continue retry;
}
// 3. 当前节点已无效,且当前节点没有前驱,则将栈顶置为当前节点的后继。
// 失败的话重新遍历 waiters 栈。
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
continue retry;
}
break;
}
}
} /**
* 导出结果。
*/
private V report(int s) throws ExecutionException {
Object x = outcome;
// 1. 正常执行完计算任务
if (s == NORMAL)
return (V)x;
// 2. 取消
if (s >= CANCELLED)
throw new CancellationException();
// 3. 执行计算任务时发生异常
throw new ExecutionException((Throwable)x);
}

四、cancal

/**
* mayInterruptIfRunning=false 时,不允许在线程运行时中断,设成 true 的话就允许但不保证一定会中断线程。
* 1. true 时,将状态修改成 INTERRUPTING,执行 thread.interrupt()
* 2. false 时,将状态修改成 CANCELLED
*/
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally {
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}

参考:

  1. 《FutureTask 源码解读》:http://www.cnblogs.com/micrari/p/7374513.html

每天用心记录一点点。内容也许不重要,但习惯很重要!

上一篇:Java基础学习总结(57)——Jrebel插件热部署


下一篇:nginx grpc 试用