通过java.util.concurrent.Callable实现类获取线程池中任务的执行结果浅析

提交任务到线程池中执行两种方式,一种是通过java.lang.Runnable的实现类提交,一种是通过java.util.concurrent.Callable<V>的实现类提交,这种是有返回值返回的,那返回值是如何实现的?

平常开发中,如果某个变量的值需要等待其他线程的计算结果,那我们会怎么做?一般的做法是通过在主线程定义一个变量,将这个变量传入其他线程中,把执行结果赋值到这个变量中,主线程等该线程执行完毕,就可以通过我们定义的变量获取到计算结果。

我想有返回值的任务应该是类似的实现方式,通过查看其实现逻辑后发现,也是类似的做法,是通过一个变量outcome来实现的。

    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes

下面简单整理下实现逻辑。

假设我们通过java.util.concurrent.ExecutorService#submit(java.util.concurrent.Callable<T>)提交任务,具体的实现逻辑在java.util.concurrent.AbstractExecutorService#submit(java.util.concurrent.Callable<T>),如代码片段所示。

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
  1. 判断任务是否为空,为空则抛出空指针异常,否则进行下一步
  2. 将任务包装成FutureTask对象,具体可以查看newTaskFor方法
  3. 将任务提交到线程池中执行,这是一个是异步方法,立即返回
  4. 返回Future对象

获取结果的时候,通过调用Future的get方法获取,实际运行中调用的是java.util.concurrent.FutureTask#get()方法,再通过java.util.concurrent.FutureTask#report方法获取outcome变量再返回。

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
   /**
     * Returns result or throws exception for completed task.
     *
     * @param s completed state value
     */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

大概流程如上面所示。另一个问题是线程的执行结果是如何赋值到outcome变量的?

通过java.util.concurrent.ExecutorService#submit(java.util.concurrent.Callable<T>)方法发现,任务通过execute这个方法执行,我用的是ThreadPoolExecutor这个具体的实现类,最终会调用这个类里的execute方法。

    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

这个是线程池中提交任务的逻辑,假设线程池能够执行这一任务,也就是addWorker方法被调用成功,我们提交的任务会被包装成Worker对象,当Worker对象的成员变量thread(通过worker对象构建的线程)被启动时,Worker对象的run方法最终会被执行

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

runWorker最终会调用java.util.concurrent.FutureTask#run方法,该方法里面通过java.util.concurrent.FutureTask#set将线程执行结果赋值给outcome变量,并改变当前的状态,后续get方法将会获取到线程的执行结果。

 

参考文档

上一篇:java – 如何确保提交给ThreadPoolExecutor然后取消的FutureTask的垃圾收集?


下一篇:谈谈 Callable 任务是怎么运行的?它的执行结果又是怎么获取的?