Future实现分析
还是先来小demo
@Test
public void testFuture() throws ExecutionException, InterruptedException {
FutureTask<String> test = new FutureTask<>(() -> {
logMessage("要睡了");
TimeUnit.SECONDS.sleep(5);
logMessage("睡醒了");
return "1";
});
Thread thread = new Thread(test);
thread.start();
logMessage(test.get());
}
public static void logMessage(Object o) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(simpleDateFormat.format(new Date()) + "-" + Thread.currentThread().getName() + ":" + o);
}
FutureTask解析
还是从他的继承关系来看,Future接口是什么?Runnable我知道,但是Future是什么?
/*
* @see FutureTask
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
* Future表示异步执行。提供了几个方法,用于,检查计算是否完成、等待计算完成以及获取计算结果。
* 主要记住,他就是用来用来执行异步任务的。并且提供了一些有用的api
*/
public interface Future<V> {
/*
1. 尝试取消任务的执行,如果这个任务还没有执行,直接取消,这个任务就不会再运行了
2. 如果任务已经完成,或者已经被取消了,或者说因为一些其他的原因导致不能取消,返回false
3. 如果任务已经开始运行,mayInterruptIfRunning参数表示是否要中断执行这个任务的线程来停止任务
在这个方法调用之后,后面调用isDone的方法就会返回true,如果这方法返回true,后序调用isCancelled方法也会返回true
*/
boolean cancel(boolean mayInterruptIfRunning);
/*
判断这个任务是否被取消掉
*/
boolean isCancelled();
/**
如果这个任务完成,或者在完成的时候被正常的终止,或者异常,或者取消,这些情况下,都会返回true
*/
boolean isDone();
/**
等待结果,一直等
*/
V get() throws InterruptedException, ExecutionException;
/**
等待结果,增加超时时间
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future表示异步获取结果,规定了一些方法,还得看具体的实现是怎么做的。所以,这里直接看FutureTask
看看属性
// 表示当前这个异步计算的状态,
private volatile int state;
// 下面都是一些不同状态的码值。
private static final int NEW = 0; //新建
private static final int COMPLETING = 1; // 完成中
private static final int NORMAL = 2; // 正常
private static final int EXCEPTIONAL = 3;// 异常
private static final int CANCELLED = 4;// 取消
private static final int INTERRUPTING = 5;// 中断中
private static final int INTERRUPTED = 6;// 中断
// 等会要运行的callable
private Callable<V> callable;
//结果存放的地方,并且不是volatile修饰的。是为了保护读和写的状态
// 在有结果的时候,这就是结果,如果异常的话,这就是异常。所以在Future里面异常的话,得通过get方法获取到
// 这让我想到了线程池,submit方法也会返回一个future,线程池异常异常了会怎么办?
private Object outcome;
// 运行的线程
private volatile Thread runner;
// 等待的节点队列,
private volatile WaitNode waiters;
这些属性下面的代码和文章里面会不间断的引用。
从构造方法开始
-
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
这种构造方法肯定是见过的最多的,只是一个赋值操作,将传递进来的callable赋值,并且将
state
变为NEW。 -
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
这我可没用过,之前也没在意过,这里确实很特殊。有一个resutl,并且将这个两个参数包装成
Executors.callable(runnable, result);
,那就看看这个方法。static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
运行原来的task,返回的是result,那是不是意味着,传递一个
Runnable
,对result一顿操作,最后又将这个result返回来。比如下面的这个样子@Test public void testFuture() throws ExecutionException, InterruptedException { HashMap<String, String> map = new HashMap<>(); FutureTask<Map> test = new FutureTask<>(() -> { logMessage("要睡了"); map.put("a","a"); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } logMessage("睡醒了"); },map); Thread thread = new Thread(test); thread.start(); logMessage(test.get()); }
传递一个空map进去,然后一顿操作,返回返回。么的一点点的问题。
要知道,他还是继承了Runnable接口,并且传递给了Thread类,那么Runnable肯定要看run方法,并且Future肯定实在run方法里面做手脚的。
run方法分析
public void run() {
//如果当前的状态不是NEw或者替换cas将 runner变为当前线程失败,直接返回,不需要运行。
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
// 拿到callable
Callable<V> c = callable;
// 准备开始运行了
if (c != null && state == NEW) {
// 接受结果
V result;
boolean ran;
try {
// 运行方法,
result = c.call();
ran = true;
} catch (Throwable ex) {
//如果出现了任何的异常,将result变为null,将ran变为false。
result = null;
ran = false;
//
setException(ex);
}
// 如果除了异常,就不会走到这个方法,
// 正常的话肯定是这个方法,这里肯定是设置结果给outcome。
if (ran)
set(result);
}
} finally {
// 这里用的就不是cas操作了,直接变为null,防止再次调用。
runner = null;
// 在runner变为null,之后,重新读取state的值。
// 这里为啥要再次读取,state是volatile修饰的,难道说这里是为了防止发生指令重排序。
int s = state;
// 如果说状态是 INTERRUPTING之前的状态,(new,COMPLETING,NORMAL,EXCEPTIONAL,CANCELLED,INTERRUPTING)只要不是中断。
if (s >= INTERRUPTING)
// 在看看这个方法是干了什么事情?
handlePossibleCancellationInterrupt(s);
}
}
setException (运行出现异常,设置值,并且更改state)
// 出现异常了之后,就调用这个方法,这个操作也很清晰了,
protected void setException(Throwable t) {
// 使用cas操作将当前的future的状态变为完成中
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将异常信息赋值给outcome
outcome = t;
//将state变为exception
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// 再看看这个干了什么事情
finishCompletion();
}
}
handlePossibleCancellationInterrupt(如果state为INTERRUPTING,让出cpu的使用权。)
//这个方法的作用没有看懂。
// 看看源码上面的注释写的把
// 这个方法是为了确保调用cancel(true)时候产生的中断,只有在task run或者task runAndReset的时候才能投递。
private void handlePossibleCancellationInterrupt(int s) {
// 如果说当前future的状态是INTERRUPTING,就说明,当前的future正在中断中,那么直接就等等。
if (s == INTERRUPTING)
while (state == INTERRUPTING)
// 当前线程放弃执行,让出cpu的使用权。
Thread.yield();
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
set(任务执行成功。设置值,并且更改状态)
// 还是同样的操作,state先变为COMPLETING,最后变为NORMAL,设置值,之后调用finishCompletion方法,唤醒等待。
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
finishCompletion(唤醒等待的节点,并且移除等待队列)
// 看看这个方法里面做了什么事情。
// 看了这个操作是拿到了等待的所有线程,并且唤醒移除他们,并且调用done方法。
private void finishCompletion() {
// assert state > COMPLETING;
// 拿到等待队列,从头开始,循环遍历,,unpark,,移除是从头开始的,
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
// 移除,help gc,
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//这个方法是留给拓展的
done();
callable = null; // to reduce footprint
}
总结:
Future里面本质还是一个Runnable,在run方法里面,会将当前线程设置为执行future的线程。调用callable的call方法,在出现异常的时候会将state先变为完成中,然后将异常设置为结果,之后将state变为异常。并且唤醒等待的线程(从头开始循环,唤醒),从等待队列里面移除掉(从头开始移除)。如果没有异常,state先是完成中,设置值,最后变为normal(正常)。
上面执行之后,在最后,将 runner 变为null,并且判断如果状态是中断中(INTERRUPTING)当前线程等待,等等。
问题:
在finally方法里面,为什么要再次判断获取state。
这个我着实没有看懂,是不是为了防止指令重排序。
在finally方法里面,为什么要在这里面将 runner 变为null
因为 finally 在方法的最后执行,所以,不管成功还是失败,这里都会将 runner 变为null。能保证任务执行一次之后,就将当前执行的线程赋值为null
handlePossibleCancellationInterrupt 方法是在什么样具体的场景下面才会出现。
get 方法分析
// get方法,如果state<= COMPLETING,说明还没有运行结束。就要等待,如果不是,就report
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
awaitDone分析
// 这个方法就是将当前线程添加到等待队列里面去,注意,WaitNode是用 volatile修饰的。我猜肯定cas的操作。
// 需要注意的是,这个方法返回的state,真正获取值的操作是通过s的不同的值,在report方法里面获取的。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
// 死循环
for (;;) {
// 当前的线程是否中断了。interrupted的方法是属于Thread类的,并且这个方法会清楚掉thread intercept states的状态。
// 如果当前线程中断过,
if (Thread.interrupted()) {
// 移除掉,并且抛出异常
removeWaiter(q);
throw new InterruptedException();
}
// future已经完成了。直接返回s
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // 就再等等,放弃cpu的执行权。
Thread.yield();
//如果q是null,这就是第一次循环吗,到这里说明他 stats <= COMPLETING,说明任务还没有执行或者才new出来,
// 构建等待节点,连接队列,入队,
// 第一次循环来,q肯定是null,所以构建等待队列
// 第一次循环就结束了,
// 第二次循环来的时候就不会q就!=null了
else if (q == null)
//如果说第一次循环构建了q,然后发送中断了,将他传递到removeWaiter方法里面,会造成无效的链表的查找。
q = new WaitNode();
// 第二次循环来了就开始入队了。
else if (!queued)
//这里采用的头插入法。
// 如果插入成功,queued就是true,并且第三次循环来的时候就不会走到这里了。
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 在入队之后,如果有time,就unpark住当前线程,将当前线程阻塞在当前对象上面,
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
//堵塞在当前对象上面
LockSupport.parkNanos(this, nanos);
}
else
// 一直堵塞,等待unpark。
LockSupport.park(this);
}
}
report方法分析
//通过传递进来的状态判断,如果当前的状态是 NORMAL,
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
// 如果取消,中断中,中断,抛异常
if (s >= CANCELLED)
throw new CancellationException();
// 如果不是,就抛出ExecutionException,在ExecutionException里面包装原始异常
throw new ExecutionException((Throwable)x);
}
removeWaiter分析
// 这个前提是当前当前的节点已经构建好了, 如果node为null那就确实没有意义
/**
这个前提是,当前线程已经中断了。
1. 一开始先把node的thread变为null。在awaitDone方法拿到WaitNode的节点的引用了,先把他变为null,这个WaitNode的thread肯定是volatile修饰的,所以,这里能直接变为null。
2. 一开始变为null,在后面的操作里面,才会循环遍历找出来,然后从等待队列里面移除。
3. 这里是单链表的删除操作,肯定有两个指针(前置节点和当前节点)
- 分为两种情况
1. 要删除的节点是头结点,如果要删除是头结点,前两个if都不会进去,直接到第四个,将q变为s,也就是将头结点变为头结点的下一个。
2. 如果不是头结点
那第一个if一直可以进去,一直找到了,就会进去第二个if,先直接删除,pred.next = s;(要删除的是p)。
在看看后面的判断,如果前置节点变为null,这意味着,此时此刻也有别的线程要删除pred节点,那当前的这个就直接放弃
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) {
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
总结
get方法,调用的时候,会通过 state 判断状态,如果 state完成,就会走到
report
方法,还是继续通过 state判断,如果是正常的结果,就直接返回,如果不是正常的,就抛出异常。没有完成,就会到awaitDone方法里面,在这个方法里面会构建等待节点,通过cas操作添加到等待队列里面,然后park住。在这个过程中,还会判断当前线程是否发生中断,如果发送中断,还会将当前线程从头结点中移除。
也就是说,在get里面一个线程到阻塞会循环最少会循环两次,一个是构建节点,一次是cas替换,之后才是堵塞,如果cas替换失败了,还会继续,所以,最少有两次尝试。
cancel方法分析
public boolean cancel(boolean mayInterruptIfRunning) {
// 不满足条件
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
//
if (mayInterruptIfRunning) {
try {
//调用运行future线程的中断方法
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
// 状态变为INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 上面已经分析过了
finishCompletion();
}
return true;
}
关于Future的分析就分析到这里了。 如有不正确的地方,欢迎指出。谢谢。