FutureTask(未来任务)
一、前情回顾(重要)
首先我们先回顾一下多线程创建的方式
- 直接继承Thread方式
- 实现Runnable 方式
- 实现Callable方式
- 线程池方式
这四种方式主要分为两类:没返回值的(1,2) 有返回值的(3,4)
没返回值的相信已经烂熟于心了。这次我们讲讲有返回值的,下面先给出3,4的两种创建多线程的示例:
1.实现Callable方式
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建任务
FutureTask task = new FutureTask(() -> {
System.out.println(Thread.currentThread().getName() + " is running");
return 200;
});
//创建线程
new Thread(task,"A").start();
//获得任务的返回值
System.out.println(task.get());
}
}
/**
执行结果:
A is running
200
**/
提问?
1.Thread为什么能接收FutureTask?
2.为什么线程能执行Callable实现的call方法?
2.线程池方式
//实现Runnable
class RunnableDemo implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " runnable is running");
}
}
//实现Callable
class CallableDemo implements Callable{
@Override
public Object call() throws Exception {
System.out.println(Thread.currentThread().getName() + " callable is running");
return 200;
}
}
public class ThreadPoolTest {
public static void main(String[] args) throws Exception {
//创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(5);
//提交并获得任务
Future callableTask = threadPool.submit(new CallableDemo());
Future runnableTask = threadPool.submit(new RunnableDemo(), 400);
//根据任务获得返回值
System.out.println(callableTask.get());
System.out.println(runnableTask.get());
//关闭线程池
threadPool.shutdown();
}
}
/**
pool-1-thread-1 callable is running
200
pool-1-thread-2 runnable is running
400
**/
提问?
1. 为何线程池中能传入Callable 或 Runnable的实现?
2. submit的方法返回的Future类是什么?
3. 为何也都可以根据任务获取返回值,且Runnable也有返回值?
你在学习以上两种方式创建多线程的时候是否会有这些疑问呢,根据上面的几个提问,来一点点获取线索,相信自己最终可以破案。
3.获取线索
我们首先要找出 实现Callable方式
与 线程池方式
两者的共同点。
-
实现Callable方式
中new FutureTask()
-
线程池的方式
这里我们要先看一下submit()
方法做了什么?
public interface ExecutorService extends Executor{
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
}
不难发现,ExecutorService
是一个接口,那么就直接去看他的一个实现类
public abstract class AbstractExecutorService implements ExecutorService {
//提交Runnable实现 不带任务返回值
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
//提交Runnable实现 带任务返回值
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
//提交Callable实现
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}
有三个submit()
的重载方法,其中有个共同的方法就是newTaskFor()
将submit
的传参的值原封不动的传了进去,且返回了个 RunnableFuture
,那么直接来看这个newTaskFor()
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
惊奇的发现它内部也是 new FutureTask()
。
那么接下来就来到了我们今天的主题 解密 FutureTask
二、继承与实现
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
从FutureTask
的实现关系来看,它实际上是实现了 Runnable
和 Future
这两个接口
这里也就解释了之前的一个问题:
问:Thread为什么能接收FutureTask?
答:因为FutureTask实现了Runnable接口。
三、构造方法
//接收Callable实现的构造
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
//赋值 Callable的实现
this.callable = callable;
//将当前任务标记为 NEW(任务尚未执行)
this.state = NEW; // ensure visibility of callable
}
//接收Runnable实现的构造
public FutureTask(Runnable runnable, V result) {
//将Runnable包装成Callable
this.callable = Executors.callable(runnable, result);
//将当前任务标记为 NEW(任务尚未执行)
this.state = NEW; // ensure visibility of callable
}
接收Runnable
实现的构造中,使用了适配器模式 将Runnable
包装成Callable
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
//使用的是适配器模式将runnable转换为了 callable接口,外部线程 通过get获取当前任务执行结果时
//结果可能为null 也可以能为 传进来的result值
return new RunnableAdapter<T>(task, result);
}
//实现了Callable,并实现了
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
//将runnable的实现赋值给task
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
RunnableAdapter
实现了Callable
并在实现call()
方法中调用了Runnable
实现的run()
方法。这样就成功将Runnable
包装成了 Callable
四、成员变量及内部类
//当前任务的状态
private volatile int state;
//以下7个对应任务有哪些状态
//当前任务尚未执行
private static final int NEW = 0;
//当前任务正在结束,一种临界状态
private static final int COMPLETING = 1;
//当前任务正常结束,且有正常的返回值
private static final int NORMAL = 2;
//当前任务异常结束,且会向上抛异常
private static final int EXCEPTIONAL = 3;
//当前任务被取消
private static final int CANCELLED = 4;
//当前任务中断中...
private static final int INTERRUPTING = 5;
//当前任务已中断, 中断就是个标记,并不代表已经结束了。
private static final int INTERRUPTED = 6;
//构造方法中传进来的 runnable/callable,
//callable直接赋值
//runnable 经过 RunnableAdapter包装成Callable后赋值
private Callable<V> callable;
//存放任务的执行结果,有两种结果
//正常情况下,任务正常执行结束, outcome保存 callable的call方法的返回值
//异常情况下,callable的 call方法向上抛出异常,outcome保存异常
private Object outcome;
//当前任务被线程执行期间,保存当前执行任务的线程的对象引用,有且仅有一个线程。
private volatile Thread runner;
//因为会有很多线程来get当前任务的结果, 所以这里使用了链表,来存放这些线程。
private volatile WaitNode waiters;
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
五、成员方法
看下面的代码时,需要规定一个场景
场景 : 多线程并发
1.执行入口run()方法
为什么说run()是执行入口?
当我们启动线程时,最基本的就是
new Thread().start()
start()
内部实际上又调用了native start0()
方法
通过操作系统为我们创建一个线程,并调用run()
方法。
最终会执行target.run()
也就会调用Runnable
实现的run()
方法 (详情查看Thread
源码)由于
FutureTask
是Runnable
的实现,因此该run()
方法是执行入口。
public void run() {
/**
判断当前任务状态是否为未执行
state != NEW
线程池中的所有线程会争抢该task任务,
通过CAS控制并发,赋值runner属性,也就意味着只有一个线程能够抢到该任务去执行
!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread())
**/
//该判断的目的主要是为了保证只能有一个线程来执行该任务。
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
//执行到这里,当前task一定是NEW状态,且只有一个当前线程抢占成功,runner属性是当前线程的引用。
try {
//callable就是程序员自己实现的callable/通过装饰后的runnable(RunnableAdapter)
Callable<V> c = callable;
//相当于是一个DCL
//条件一:c != null 防止空指针
//条件二:state==NEW 防止被其它线程Cancel了,所以再次确认下任务状态
if (c != null && state == NEW) {
//执行call的返回结果的引用
V result;
//true 表示callable.call 执行成功 未抛出异常
//false 表示callable.call 执行失败 抛出异常
boolean ran;
try {
//执行程序员实现的业务逻辑
result = c.call();
//若没抛异常,则设置为true
ran = true;
} catch (Throwable ex) {
//若抛异常,则设置为false,返回值也置为null
result = null;
ran = false;
//TODO 后面会说... (透露:会将异常信息设置到outcome中)
setException(ex);
}
//如果为true 代表执行成功,
if (ran)
//TODO 后面会说... (透露:会将成功返回值结果设置到outcome中)
set(result);
}
} finally {
//将执行当前任务的线程引用置为null, 表示该任务没有线程对其进行执行了
runner = null;
// 判断当前任务状态是不是中断状态
int s = state;
if (s >= INTERRUPTING)
//如果是中断状态,那么该任务会一直死循环在该处,直到由外部对其进行cancel 或者 unpark
handlePossibleCancellationInterrupt(s);
}
}
下面是 run()
中调用的方法
protected void set(V v) {
//使用CAS方式,设置当前任务状态完成中...
//有没有可能失败呢? 外部线程等不及了,直接在set执行CAS前,将task取消了
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//赋值正常返回值
outcome = v;
//结果赋值结束后,马上会将当前任务状态修改为 NORMAL状态 表示正结束
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//TODO 该方法后面再说...
finishCompletion();
}
}
protected void setException(Throwable t) {
//使用CAS方式,设置当前任务状态完成中...
//有没有可能失败呢? 外部线程等不及了,直接在setException执行CAS前,将task取消了
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//赋值异常信息
outcome = t;
//将当前任务状态修改为 EXCEPTIONAL状态 表示异常状态
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
//TODO 该方法后面再说...
finishCompletion();
}
}
private void handlePossibleCancellationInterrupt(int s) {
//若任务状态是中断中... 则一直在此死循环,且一直释放CPU,直到该状态不是中断中...
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
2.获取任务结果get()方法
get()方法有两种,一种是不带超时的,一种是带超时的。这里我们主要看不带超时的。
public V get() throws InterruptedException, ExecutionException {
//获得当前任务状态
int s = state;
//条件成立:当前任务状态为 NEW,COMPLETING 表示当前任务还未执行或正在完成中..
if (s <= COMPLETING)
//所有调用get()方法的线程 会被阻塞在此
//直到会得到结果(任务的状态返回值) 或者 向上抛出中断异常
s = awaitDone(false, 0L);
//返回 结果或者抛异常
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
//timed = false
//deadline = 0L
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//引用当前线程 封装成WaitNode对象
WaitNode q = null;
//表示当前get()的线程有没有入队
boolean queued = false;
//自旋
for (;;) {
//条件成立: 当前执行get的线程被interrupt 叫醒
if (Thread.interrupted()) {
//将引用当前线程的WaitNode对象 移除队,且在移除的过程中同样会移除其它q=null的节点对象
removeWaiter(q);
//向上抛出中断的异常
throw new InterruptedException();
}
//获取当前任务的状态
int s = state;
//条件成立:说明当前任务已经有结果了,可能是好结果,也可以能使抛出异常的结果
if (s > COMPLETING) {
//条件成立:说明当前线程已经创建过WaitNode节点,
if (q != null)
//此时由于得到了结果,需要将q.thread = null
//helpGC 或者 当有线程执行removeWaiter方法时 会主动将该节点移除
q.thread = null
//直接返回当前任务的状态
return s;
}
//条件成立:说明当前任务正在完成中,还差赋值和设置成NORMAL或者EXCEPTIONAL两步
else if (s == COMPLETING) // cannot time out yet
//由于任务快要完成了,因此主动释放CPU,进行下一次CPU的抢占,目的是尽量让执行任务的线程去抢到CPU
Thread.yield();
//条件成立: 第一次自旋,当前任务是NEW状态,当前线程还未创建WaitNode对象
else if (q == null)
//为当前线程创建WaitNode对象
q = new WaitNode();
//条件成立:第二次自旋,当前任务是NEW状态,当前线程已经创建WaitNode对象,但是WaitNode对象还为入队
else if (!queued)
//头插法入队
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
//条件成立:第三次自旋 当前任务是NEW状态,由于规定timed=false,则不会进入这个判断,会进入下面的else //也就意味着 如果是带超时的会走当前判断, 不带超时的会走下一个else判断
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
//不带超时的
//当前get操作的线程就会被park,线程会进入waiting状态 休眠
//除非有其它线程将其unpark(thread) 唤醒 或者 将当前线程中断 才会进入下一次自旋
else
LockSupport.park(this);
}
}
返回给用户结果的方法,该结果可能是好结果,也可能是抛异常的结果
private V report(int s) throws ExecutionException
//outcome 保存的是Callable运行结束的结果,可能是正常的好结果,也可以能是异常的信息
Object x = outcome;
//条件成立:当前任务状态正常结束
if (s == NORMAL)
//直接返回好结果
return (V)x;
//条件成立:任务被取消 或 中断
if (s >= CANCELLED)
//抛出异常
throw new CancellationException();
//执行到这 说明任务状态为EXCEPTIONAL 程序员的Callable实现的代码有bug了
throw new ExecutionException((Throwable)x);
}
3.finishCompletion()方法
private void finishCompletion() {
// assert state > COMPLETING;
//循环Waiters链表
for (WaitNode q; (q = waiters) != null;) {
//使用CAS设置Waiters为null,是因为防止外部线程使用cancel()方法取消当前任务
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
//获取当前Node节点封装的thread
Thread t = q.thread;
//条件成立:说明当前下次呢很难过不会为null
if (t != null) {
//help GC
q.thread = null;
//唤醒get()阻塞的的线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
//如果到链表末尾了就退出循环
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//可扩展的点
done();
//将callable 设置为null help GC
callable = null; // to reduce footprint
}
4.取消任务 cancel()方法
public boolean cancel(boolean mayInterruptIfRunning) {
//也就意味着 当前任务状态如果是NEW 不允许被cancel 将状态改为INTERRUPTING 或者CANCELLED
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
//执行到这就说明 当前任务是从非NEW状态 被改为了INTERRUPTING 或CANCELLED 的
//mayInterruptIfRunning
// true: INTERRUPTING false:CANCELLED
try {
//如果是true 也就是状态改成了INTERRUPTING
if (mayInterruptIfRunning) {
try {
//将执行当前任务的线程取出来
Thread t = runner;
//判断是否有线程执行该任务
if (t != null)
//有的话 则中断该线程
t.interrupt();
} finally { // final state
//最终将该任务的状态设置为 INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//然后唤醒所有执行get()阻塞的线程
finishCompletion();
}
return true;
}