提交任务到线程池中执行两种方式,一种是通过java.lang.Runnable的实现类提交,一种是通过java.util.concurrent.Callable<V>的实现类提交,这种是有返回值返回的,那返回值是如何实现的?
平常开发中,如果某个变量的值需要等待其他线程的计算结果,那我们会怎么做?一般的做法是通过在主线程定义一个变量,将这个变量传入其他线程中,把执行结果赋值到这个变量中,主线程等该线程执行完毕,就可以通过我们定义的变量获取到计算结果。
我想有返回值的任务应该是类似的实现方式,通过查看其实现逻辑后发现,也是类似的做法,是通过一个变量outcome来实现的。
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
下面简单整理下实现逻辑。
假设我们通过java.util.concurrent.ExecutorService#submit(java.util.concurrent.Callable<T>)提交任务,具体的实现逻辑在java.util.concurrent.AbstractExecutorService#submit(java.util.concurrent.Callable<T>),如代码片段所示。
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
- 判断任务是否为空,为空则抛出空指针异常,否则进行下一步
- 将任务包装成FutureTask对象,具体可以查看newTaskFor方法
- 将任务提交到线程池中执行,这是一个是异步方法,立即返回
- 返回Future对象
获取结果的时候,通过调用Future的get方法获取,实际运行中调用的是java.util.concurrent.FutureTask#get()方法,再通过java.util.concurrent.FutureTask#report方法获取outcome变量再返回。
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
/**
* Returns result or throws exception for completed task.
*
* @param s completed state value
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
大概流程如上面所示。另一个问题是线程的执行结果是如何赋值到outcome变量的?
通过java.util.concurrent.ExecutorService#submit(java.util.concurrent.Callable<T>)方法发现,任务通过execute这个方法执行,我用的是ThreadPoolExecutor这个具体的实现类,最终会调用这个类里的execute方法。
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
这个是线程池中提交任务的逻辑,假设线程池能够执行这一任务,也就是addWorker方法被调用成功,我们提交的任务会被包装成Worker对象,当Worker对象的成员变量thread(通过worker对象构建的线程)被启动时,Worker对象的run方法最终会被执行
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
runWorker最终会调用java.util.concurrent.FutureTask#run方法,该方法里面通过java.util.concurrent.FutureTask#set将线程执行结果赋值给outcome变量,并改变当前的状态,后续get方法将会获取到线程的执行结果。
参考文档