文章目录
一、Callable与Runnable的区别
Callable和Runnable都是接口,而且都只有一个方法。它们都是用于多线程环境下。
Runnable只有一个run方法,实现类在run方法中定义线程需要完成的任务,它作为Thread构造函数的入参,线程启动后执行run方法。Runnable的run方法没有返回值,所以如果需要线程返回值的话,Runnable就无能为力了。Runnable的源码如下:
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
为了解决这个问题,从JDK1.5开始,java提供了Callable接口,该接口里面只有一个call方法。Callable的源码如下:
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
作用与Runnable一样,Callable也是在实现类里面定义另一个线程需要执行的逻辑,然后启动线程执行。两个类的区别是,
- Callable的call方法可以有返回值,而且还可以抛出checked exception异常,而Runnable都不可以;
- Runnable可以作为Thread构造方法的入参,直接启动线程执行,而启动线程执行Callable,需要先将Callable转换为Runnable对象,然后才能执行,java为此提供了一个封装方法ThreadPoolExecutor.submit()来执行Callable。
二、Future接口
Future接口也是从JDK1.5才有的,它表示获取异步任务的执行结果。也就是当前线程启动了另一个线程A,同时线程A返回了一个Future对象,当前线程可以通过该Future对象的方法来判断线程A的任务是否执行完成,并且获取执行结果。
ThreadPoolExecutor.submit()方法的返回值就是Future对象,可以通过该对象获得Callable的执行结果。
public interface Future<V> {
//尝试取消任务的执行,如果任务已经完成,或者已经被取消,或者不允许取消,都会造成调用cancel方法失败;
//如果任务启动前取消了,那么该任务永远都不会被执行;
//如果任务已经开始,那么入参mayInterruptIfRunning将决定是否产生中断来停止任务,true表示产生中断,false表示不做处理;
//成功调用该方法后,方法isDone()将永远都返回true
boolean cancel(boolean mayInterruptIfRunning);
//检查当前任务是否已经取消,只有在任务完成之前取消才有效
boolean isCancelled();
//检查任务是否已经完成,若已完成,则返回true;
boolean isDone();
//通过该方法获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回
V get() throws InterruptedException, ExecutionException;
//与上面的方法作用类似,只不过该方法有一个超时时间,在指定的时间内没有结果,则抛出TimeoutException超时异常
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
三、FutureTask详解
FutureTask表示一个可以取消的异步任务,其继承结构如下图:
FutureTask实现了Runnable和Future接口,它同时具有两个接口的功能,既代表了一个异步任务,而且还可以调用Future接口方法获取任务结果或者取消任务。
FutureTask还实现了RunnableFuture接口,它只有一个run方法,相当于合并了Runnable和Future接口。
下面详细介绍FutureTask中的方法。
1.构造方法
FutureTask提供了两个构造方法:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
第二个构造方法将Runnable转换为Callable对象,并且该Callable对象的运行结果是入参result。
FutureTask提供了属性state来表示任务状态,state的定义如下:
private volatile int state;
该属性的取值如下:
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
从构造方法中可以看到创建FutureTask对象时,state初始化为NEW
,任务执行完成,状态切换为COMPLETING
,执行完成包括了正常执行结束和抛出异常结束,如果正常结束,接着将state切换为NORMAL
,如果抛出异常结束,状态切换为EXCEPTIONAL
;如果任务运行过程中调用了cancel(false)方法,状态可以从NEW
切换为CANCELLED
,如果调用的是cancel(true)方法,那么状态将从NEW
切换为INTERRUPTING
,之后发生线程中断,状态接着变为INTERRUPTED
。
上述状态中,NORMAL
、EXCEPTIONAL
、CANCELLED
、INTERRUPTED
都是FutureTask的终态,变为终态后,FutureTask的状态不会再发生变化。
上述7个状态可能的变换方式如下:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
2、run()
当启动异步线程后,异步线程执行run()方法。
public void run() {
//在任务执行结束之前,且没有调用cancel()方法,FutureTask的状态始终为NEW
//如果状态不为NEW,说明任务已经取消或者执行结束。
//FutureTask使用属性runner记录当前执行任务的线程
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//调用Callable实现类中的方法,也就是执行任务
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;//任务执行失败,记录状态为false
setException(ex);
}
if (ran)
//任务执行成功,记录执行结果,并修改FutureTask状态
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
//如果任务执行过程中,调用了cancel(true)方法,则进入下面这个方法
handlePossibleCancellationInterrupt(s);
}
}
//任务执行过程中,也就是调用Callable.call()方法时抛出异常,则进入setException()方法
protected void setException(Throwable t) {
//下面首先修改状态为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//outcome记录FutureTask的执行结果,如果抛出异常,则使用outcome记录异常对象
outcome = t;
//修改状态为EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
//如果有其他线程正在等待当前任务的执行结果,则唤醒这些线程
//在介绍get()方法时,再介绍该方法
finishCompletion();
}
}
//任务执行成功,进入set()方法
protected void set(V v) {
//修改状态为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;//使用outcome记录任务的执行结果
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
//如果任务执行过程中调用过cancel(true)方法,则进入下面这个方法
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
//如果任务状态变为了INTERRUPTING,下面会发起线程中断,将状态变为INTERRUPTED,
//在状态变为INTERRUPTED之前,当前线程一直处于自旋等待状态
//这里自旋是为了等待线程中断的发生,中断可以认为是与调用者之间的一种通讯方式,
//这里要确保中断先发生,之后再是run()方法退出。
while (state == INTERRUPTING)//自旋等待
Thread.yield();
}
这里总结一下run()方法:
- 检查FutureTask的状态是否是
NEW
; - 调用Callable的call()方法;
- 如果call()发生异常,使用outcome属性记录异常对象,并设置状态为
EXCEPTIONAL
,同时唤醒等待任务结果的线程; - 如果call()正常结束,得到任务结果,使用outcome属性记录任务结果,并设置状态为
NORMAL
,同时唤醒等待任务结果的线程; - 如果执行call()过程中,调用了cancel(true)方法,则自旋等待,直到状态变为
INTERRUPTED
。
3、get()
FutureTask对get方法提供了重载,一个不带参数,一个带有超时时间。不带参数的,如果任务还没执行完成,则阻塞当前线程直到任务执行结束;带有超时时间的,如果到了指定时间任务还没执行完成,则抛出TimeoutException异常。
下面先来看一下不带参数的get():
public V get() throws InterruptedException, ExecutionException {
int s = state;
//小于COMPLETING,表示任务还在执行过程中或者还没开始指定
if (s <= COMPLETING)
s = awaitDone(false, 0L);//awaitDone用于阻塞当前线程
//report()可以根据任务状态,返回不同的结果对象
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//检查当前等待任务结果的线程是否发生中断
if (Thread.interrupted()) {
removeWaiter(q);//将当前节点和所有其他已经发生中断的节点从等待队列中删除
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
//如果任务已经执行结束,或者调用了cancel(),则直接返回
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
//如果正在记录任务运行结果,则让出CPU时间片
Thread.yield();
else if (q == null)
//如果任务正在运行中或者还没开始运行,则创建一个WaitNode对象
//之后进入下一个循环,将该对象入等待队列
q = new WaitNode();
else if (!queued)
//将代表当前等待线程的WaitNode对象加入队列中,放入队列头
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
//代表当前等待线程的WaitNode对象加入队列后,检查是否指定了超时时间,
//如果指定了,则阻塞当前线程指定的时间
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);//如果已经超时,则将当前节点从等待队列中删除
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
//如果没有指定超时时间,则阻塞当前线程
LockSupport.park(this);
}
}
//如果当前任务状态是NORMAL,表示任务正常结束,返回任务结果
//如果当前任务开始执行前或者执行过程中,调用了cacel()方法,则抛出CancellationException异常
//如果任务执行过程中抛出了异常,则使用ExecutionException封装异常,并抛出
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
不带参数的get()方法首先检查任务状态,如果还在运行中或者还没开始,则进入awaitDone()方法。
awaitDone()里面再次检查任务状态,如果任务还在运行中或者还没开始,则创建一个WaitNode对象,WaitNode对象代表了一个正在等待结果的线程,下面是该类的定义:
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
这样每一个需要等待的线程都创建一个WaitNode对象,然后使用WaitNode的next属性将这些对象连接起来,形成一个单向队列,然后使用FutureTask的waiters属性记录该队列的头。入队之后,就可以调用LockSupport.park()阻塞线程了。
有了这个单向队列,当任务执行完后,就可以通过队列找到所有的等待线程了,之后依次唤醒这些等待线程。还记得介绍run()方法时,有一个finishCompletion()方法,这个方法的作用就是唤醒队列里面的等待线程:
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);//唤醒等待线程
}
WaitNode next = q.next;//找到队列的下一个节点
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();//空方法,子类可以加入一些逻辑
callable = null; // to reduce footprint
}
4、cancel()
cancel()可以取消任务或者发起线程中断。该方法的入参不同,产生的效果也不同:
- 如果入参是false,则修改任务状态为
CANCELLED
,如果这时任务还没开始,则该任务永远都不会再执行,如果已经开始执行了,调用cancel(false)并不会阻塞任务的执行。 - 如果入参是true,则修改任务状态为
INTERRUPTING
,并对正在执行任务的线程发起中断,之后将任务状态修改为INTERRUPTED
。
无论入参是true还是false,在方法的最后都会调用finishCompletion()方法唤醒所有的等待线程。
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();//如果任务已经开始执行,则发起中断
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();//唤醒所有的等待线程
}
return true;
}
最后看一下ThreadPoolExecutor.submit()方法的原理。
ThreadPoolExecutor.submit()可以接受Callable或者Runnable作为入参,无论入参是Callable还是Runnable,submit()都会将入参转换为FutureTask对象,然后启动线程池中的线程执行其run()方法。submit()还会将该FutureTask对象返回给调用方,这样调用方可以通过该对象对任务的运行进行控制,并且获得任务结果。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}