java8 FutureTask、Future、Callable、Runnable区别总结

文章目录

一、Callable与Runnable的区别

Callable和Runnable都是接口,而且都只有一个方法。它们都是用于多线程环境下。
Runnable只有一个run方法,实现类在run方法中定义线程需要完成的任务,它作为Thread构造函数的入参,线程启动后执行run方法。Runnable的run方法没有返回值,所以如果需要线程返回值的话,Runnable就无能为力了。Runnable的源码如下:

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

为了解决这个问题,从JDK1.5开始,java提供了Callable接口,该接口里面只有一个call方法。Callable的源码如下:

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

作用与Runnable一样,Callable也是在实现类里面定义另一个线程需要执行的逻辑,然后启动线程执行。两个类的区别是,

  1. Callable的call方法可以有返回值,而且还可以抛出checked exception异常,而Runnable都不可以;
  2. Runnable可以作为Thread构造方法的入参,直接启动线程执行,而启动线程执行Callable,需要先将Callable转换为Runnable对象,然后才能执行,java为此提供了一个封装方法ThreadPoolExecutor.submit()来执行Callable。

二、Future接口

Future接口也是从JDK1.5才有的,它表示获取异步任务的执行结果。也就是当前线程启动了另一个线程A,同时线程A返回了一个Future对象,当前线程可以通过该Future对象的方法来判断线程A的任务是否执行完成,并且获取执行结果。
ThreadPoolExecutor.submit()方法的返回值就是Future对象,可以通过该对象获得Callable的执行结果。

public interface Future<V> {
	//尝试取消任务的执行,如果任务已经完成,或者已经被取消,或者不允许取消,都会造成调用cancel方法失败;
	//如果任务启动前取消了,那么该任务永远都不会被执行;
	//如果任务已经开始,那么入参mayInterruptIfRunning将决定是否产生中断来停止任务,true表示产生中断,false表示不做处理;
	//成功调用该方法后,方法isDone()将永远都返回true
    boolean cancel(boolean mayInterruptIfRunning);
	//检查当前任务是否已经取消,只有在任务完成之前取消才有效
    boolean isCancelled();
	//检查任务是否已经完成,若已完成,则返回true;
    boolean isDone();
	//通过该方法获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回
    V get() throws InterruptedException, ExecutionException;
	//与上面的方法作用类似,只不过该方法有一个超时时间,在指定的时间内没有结果,则抛出TimeoutException超时异常
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

三、FutureTask详解

FutureTask表示一个可以取消的异步任务,其继承结构如下图:
java8 FutureTask、Future、Callable、Runnable区别总结
FutureTask实现了Runnable和Future接口,它同时具有两个接口的功能,既代表了一个异步任务,而且还可以调用Future接口方法获取任务结果或者取消任务。
FutureTask还实现了RunnableFuture接口,它只有一个run方法,相当于合并了Runnable和Future接口。
下面详细介绍FutureTask中的方法。

1.构造方法

FutureTask提供了两个构造方法:

	public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
	public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

第二个构造方法将Runnable转换为Callable对象,并且该Callable对象的运行结果是入参result。
FutureTask提供了属性state来表示任务状态,state的定义如下:

    private volatile int state;

该属性的取值如下:

    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

从构造方法中可以看到创建FutureTask对象时,state初始化为NEW,任务执行完成,状态切换为COMPLETING,执行完成包括了正常执行结束和抛出异常结束,如果正常结束,接着将state切换为NORMAL,如果抛出异常结束,状态切换为EXCEPTIONAL;如果任务运行过程中调用了cancel(false)方法,状态可以从NEW切换为CANCELLED,如果调用的是cancel(true)方法,那么状态将从NEW切换为INTERRUPTING,之后发生线程中断,状态接着变为INTERRUPTED
上述状态中,NORMALEXCEPTIONALCANCELLEDINTERRUPTED都是FutureTask的终态,变为终态后,FutureTask的状态不会再发生变化。
上述7个状态可能的变换方式如下:

     NEW -> COMPLETING -> NORMAL
     NEW -> COMPLETING -> EXCEPTIONAL
     NEW -> CANCELLED
     NEW -> INTERRUPTING -> INTERRUPTED

2、run()

当启动异步线程后,异步线程执行run()方法。

    public void run() {
    	//在任务执行结束之前,且没有调用cancel()方法,FutureTask的状态始终为NEW
    	//如果状态不为NEW,说明任务已经取消或者执行结束。
    	//FutureTask使用属性runner记录当前执行任务的线程
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                	//调用Callable实现类中的方法,也就是执行任务
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;//任务执行失败,记录状态为false
                    setException(ex);
                }
                if (ran)
                	//任务执行成功,记录执行结果,并修改FutureTask状态
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
            	//如果任务执行过程中,调用了cancel(true)方法,则进入下面这个方法
                handlePossibleCancellationInterrupt(s);
        }
    }
    //任务执行过程中,也就是调用Callable.call()方法时抛出异常,则进入setException()方法
    protected void setException(Throwable t) {
    	//下面首先修改状态为COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        	//outcome记录FutureTask的执行结果,如果抛出异常,则使用outcome记录异常对象
            outcome = t;
            //修改状态为EXCEPTIONAL
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            //如果有其他线程正在等待当前任务的执行结果,则唤醒这些线程
            //在介绍get()方法时,再介绍该方法
            finishCompletion();
        }
    }
    //任务执行成功,进入set()方法
    protected void set(V v) {
    	//修改状态为COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;//使用outcome记录任务的执行结果
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    //如果任务执行过程中调用过cancel(true)方法,则进入下面这个方法
    private void handlePossibleCancellationInterrupt(int s) {
        if (s == INTERRUPTING)
        	//如果任务状态变为了INTERRUPTING,下面会发起线程中断,将状态变为INTERRUPTED,
        	//在状态变为INTERRUPTED之前,当前线程一直处于自旋等待状态
        	//这里自旋是为了等待线程中断的发生,中断可以认为是与调用者之间的一种通讯方式,
        	//这里要确保中断先发生,之后再是run()方法退出。
            while (state == INTERRUPTING)//自旋等待
                Thread.yield(); 
    }

这里总结一下run()方法:

  1. 检查FutureTask的状态是否是NEW
  2. 调用Callable的call()方法;
  3. 如果call()发生异常,使用outcome属性记录异常对象,并设置状态为EXCEPTIONAL,同时唤醒等待任务结果的线程;
  4. 如果call()正常结束,得到任务结果,使用outcome属性记录任务结果,并设置状态为NORMAL,同时唤醒等待任务结果的线程;
  5. 如果执行call()过程中,调用了cancel(true)方法,则自旋等待,直到状态变为INTERRUPTED

3、get()

FutureTask对get方法提供了重载,一个不带参数,一个带有超时时间。不带参数的,如果任务还没执行完成,则阻塞当前线程直到任务执行结束;带有超时时间的,如果到了指定时间任务还没执行完成,则抛出TimeoutException异常。
下面先来看一下不带参数的get():

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //小于COMPLETING,表示任务还在执行过程中或者还没开始指定
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);//awaitDone用于阻塞当前线程
        //report()可以根据任务状态,返回不同的结果对象
        return report(s);
    }
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
        	//检查当前等待任务结果的线程是否发生中断
            if (Thread.interrupted()) {
                removeWaiter(q);//将当前节点和所有其他已经发生中断的节点从等待队列中删除
                throw new InterruptedException();
            }
            int s = state;
            if (s > COMPLETING) {
            	//如果任务已经执行结束,或者调用了cancel(),则直接返回
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) 
            	//如果正在记录任务运行结果,则让出CPU时间片
                Thread.yield();
            else if (q == null)
            	//如果任务正在运行中或者还没开始运行,则创建一个WaitNode对象
            	//之后进入下一个循环,将该对象入等待队列
                q = new WaitNode();
            else if (!queued)
            	//将代表当前等待线程的WaitNode对象加入队列中,放入队列头
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
            	//代表当前等待线程的WaitNode对象加入队列后,检查是否指定了超时时间,
            	//如果指定了,则阻塞当前线程指定的时间
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);//如果已经超时,则将当前节点从等待队列中删除
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
            	//如果没有指定超时时间,则阻塞当前线程
                LockSupport.park(this);
        }
    }
    //如果当前任务状态是NORMAL,表示任务正常结束,返回任务结果
    //如果当前任务开始执行前或者执行过程中,调用了cacel()方法,则抛出CancellationException异常
    //如果任务执行过程中抛出了异常,则使用ExecutionException封装异常,并抛出
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

不带参数的get()方法首先检查任务状态,如果还在运行中或者还没开始,则进入awaitDone()方法。
awaitDone()里面再次检查任务状态,如果任务还在运行中或者还没开始,则创建一个WaitNode对象,WaitNode对象代表了一个正在等待结果的线程,下面是该类的定义:

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

这样每一个需要等待的线程都创建一个WaitNode对象,然后使用WaitNode的next属性将这些对象连接起来,形成一个单向队列,然后使用FutureTask的waiters属性记录该队列的头。入队之后,就可以调用LockSupport.park()阻塞线程了。
有了这个单向队列,当任务执行完后,就可以通过队列找到所有的等待线程了,之后依次唤醒这些等待线程。还记得介绍run()方法时,有一个finishCompletion()方法,这个方法的作用就是唤醒队列里面的等待线程:

    private void finishCompletion() {
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);//唤醒等待线程
                    }
                    WaitNode next = q.next;//找到队列的下一个节点
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();//空方法,子类可以加入一些逻辑
        callable = null;        // to reduce footprint
    }

4、cancel()

cancel()可以取消任务或者发起线程中断。该方法的入参不同,产生的效果也不同:

  • 如果入参是false,则修改任务状态为CANCELLED,如果这时任务还没开始,则该任务永远都不会再执行,如果已经开始执行了,调用cancel(false)并不会阻塞任务的执行。
  • 如果入参是true,则修改任务状态为INTERRUPTING,并对正在执行任务的线程发起中断,之后将任务状态修改为INTERRUPTED

无论入参是true还是false,在方法的最后都会调用finishCompletion()方法唤醒所有的等待线程。

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();//如果任务已经开始执行,则发起中断
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();//唤醒所有的等待线程
        }
        return true;
    }

最后看一下ThreadPoolExecutor.submit()方法的原理。
ThreadPoolExecutor.submit()可以接受Callable或者Runnable作为入参,无论入参是Callable还是Runnable,submit()都会将入参转换为FutureTask对象,然后启动线程池中的线程执行其run()方法。submit()还会将该FutureTask对象返回给调用方,这样调用方可以通过该对象对任务的运行进行控制,并且获得任务结果。

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
上一篇:创建多线程的方式三:实现Callable接口。


下一篇:Callable接口实现多线程