Executors
java.util.concurrent下的Executors工具类提供了如下几个方法,代码如下:
/** * Creates a single-threaded executor that can schedule commands * to run after a given delay, or to execute periodically. * (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * {@code newScheduledThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created scheduled executor */ public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } /** * Creates a single-threaded executor that can schedule commands * to run after a given delay, or to execute periodically. (Note * however that if this single thread terminates due to a failure * during execution prior to shutdown, a new one will take its * place if needed to execute subsequent tasks.) Tasks are * guaranteed to execute sequentially, and no more than one task * will be active at any given time. Unlike the otherwise * equivalent {@code newScheduledThreadPool(1, threadFactory)} * the returned executor is guaranteed not to be reconfigurable to * use additional threads. * * @param threadFactory the factory to use when creating new threads * @return the newly created scheduled executor * @throws NullPointerException if threadFactory is null */ public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); } /** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @return the newly created scheduled thread pool * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } /** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param threadFactory the factory to use when the executor * creates a new thread * @return the newly created scheduled thread pool * @throws IllegalArgumentException if {@code corePoolSize < 0} * @throws NullPointerException if threadFactory is null */ public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
解读:
上述代码片段中的前两个方法均构造了DelegatedScheduledExecutorService的实例,其定义如下:
/** * A wrapper class that exposes only the ScheduledExecutorService * methods of a ScheduledExecutorService implementation. */ private static class DelegatedScheduledExecutorService extends DelegatedExecutorService implements ScheduledExecutorService { private final ScheduledExecutorService e; DelegatedScheduledExecutorService(ScheduledExecutorService executor) { super(executor); e = executor; } public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { return e.schedule(command, delay, unit); } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { return e.schedule(callable, delay, unit); } public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { return e.scheduleAtFixedRate(command, initialDelay, period, unit); } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { return e.scheduleWithFixedDelay(command, initialDelay, delay, unit); } }
解读:
从DelegatedScheduledExecutorService的定义可以看出,其schedule方法依赖于具体的ScheduledExecutorService,即前述代码片段中构造的ScheduledThreadPoolExecutor实例
ScheduledExecutorService
ScheduledExecutorService的类图如下:
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor的类图如下:
解读:
据上图可知,ScheduledThreadPoolExecutor继承了ThreadPoolExecutor。
上图中的DelayedWorkQueue是线程池队列,与 DelayedQueue类似,是一个延迟队列,其定义如下:
/** * Specialized delay queue. To mesh with TPE declarations, this * class must be declared as a BlockingQueue<Runnable> even though * it can only hold RunnableScheduledFutures. */ static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
上图中的ScheduledFutureTask是具有返回值的任务,继承了 FutureTask,定义如下:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
关于ScheduledFutureTask的细节,请查阅本文后续内容。
构造函数
ScheduledThreadPoolExecutor的构造函数的定义如下:
/** * Creates a new {@code ScheduledThreadPoolExecutor} with the * given core pool size. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); } /** * Creates a new {@code ScheduledThreadPoolExecutor} with the * given initial parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param threadFactory the factory to use when the executor * creates a new thread * @throws IllegalArgumentException if {@code corePoolSize < 0} * @throws NullPointerException if {@code threadFactory} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory); } /** * Creates a new {@code ScheduledThreadPoolExecutor} with the * given initial parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if {@code corePoolSize < 0} * @throws NullPointerException if {@code handler} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), handler); } /** * Creates a new {@code ScheduledThreadPoolExecutor} with the * given initial parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if {@code corePoolSize < 0} * @throws NullPointerException if {@code threadFactory} or * {@code handler} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory, handler); }
解读:
上述构造函数的差异在于是否指定ThreadFactory、RejectedExecutionHandler,然而它们最终都是调用父类ThreadPoolExecutor的构造函数来完成对应线程池的创建
schedule方法
ScheduledThreadPoolExecutor的submit、execute方法均调用了schedule方法,代码如下:
/** * Executes {@code command} with zero required delay. * This has effect equivalent to * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}. * Note that inspections of the queue and of the list returned by * {@code shutdownNow} will access the zero-delayed * {@link ScheduledFuture}, not the {@code command} itself. * * <p>A consequence of the use of {@code ScheduledFuture} objects is * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always * called with a null second {@code Throwable} argument, even if the * {@code command} terminated abruptly. Instead, the {@code Throwable} * thrown by such a task can be obtained via {@link Future#get}. * * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution because the * executor has been shut down * @throws NullPointerException {@inheritDoc} */ public void execute(Runnable command) { schedule(command, 0, NANOSECONDS); } // Override AbstractExecutorService methods /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<?> submit(Runnable task) { return schedule(task, 0, NANOSECONDS); } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { return schedule(Executors.callable(task, result), 0, NANOSECONDS); } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, NANOSECONDS); }
解读:
需要注意的是这几个方法均没法指定延时或者周期
schedule方法的定义如下:
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<Void> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit), sequencer.getAndIncrement())); delayedExecute(t); return t; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit), sequencer.getAndIncrement())); delayedExecute(t); return t; }
解读:
schedule方法的运行方式是一次性的,不是定时反复执行
scheduleAtFixedRate、scheduleWithFixedDelay
如果需要周期性的执行某个任务,则需要使用如下两个方法,其定义如下:
/** * Submits a periodic action that becomes enabled first after the * given initial delay, and subsequently with the given period; * that is, executions will commence after * {@code initialDelay}, then {@code initialDelay + period}, then * {@code initialDelay + 2 * period}, and so on. * * <p>The sequence of task executions continues indefinitely until * one of the following exceptional completions occur: * <ul> * <li>The task is {@linkplain Future#cancel explicitly cancelled} * via the returned future. * <li>Method {@link #shutdown} is called and the {@linkplain * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on * whether to continue after shutdown} is not set true, or method * {@link #shutdownNow} is called; also resulting in task * cancellation. * <li>An execution of the task throws an exception. In this case * calling {@link Future#get() get} on the returned future will throw * {@link ExecutionException}, holding the exception as its cause. * </ul> * Subsequent executions are suppressed. Subsequent calls to * {@link Future#isDone isDone()} on the returned future will * return {@code true}. * * <p>If any execution of this task takes longer than its period, then * subsequent executions may start late, but will not concurrently * execute. * * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0L) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period), sequencer.getAndIncrement()); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } /** * Submits a periodic action that becomes enabled first after the * given initial delay, and subsequently with the given delay * between the termination of one execution and the commencement of * the next. * * <p>The sequence of task executions continues indefinitely until * one of the following exceptional completions occur: * <ul> * <li>The task is {@linkplain Future#cancel explicitly cancelled} * via the returned future. * <li>Method {@link #shutdown} is called and the {@linkplain * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on * whether to continue after shutdown} is not set true, or method * {@link #shutdownNow} is called; also resulting in task * cancellation. * <li>An execution of the task throws an exception. In this case * calling {@link Future#get() get} on the returned future will throw * {@link ExecutionException}, holding the exception as its cause. * </ul> * Subsequent executions are suppressed. Subsequent calls to * {@link Future#isDone isDone()} on the returned future will * return {@code true}. * * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0L) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), -unit.toNanos(delay), sequencer.getAndIncrement()); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
ScheduledFutureTask
由于ScheduledFutureTask是延迟任务,所以其实现了 long getDelay(TimeUnit unit)和 int compareTo(Delayed other) 方法。
getDelay 用于计算当前任务还有多少时间过期。
compareTo 的作用是加入元素到延迟队列后,在内部建立或者调整堆时会使用该元素的 compareTo 方法与队列里面其他元素进行比较,让最快要过期的元素放到队首。
构造函数
ScheduledFutureTask的构造函数如下:
/** * Creates a one-shot action with given nanoTime-based trigger time. */ ScheduledFutureTask(Runnable r, V result, long triggerTime, long sequenceNumber) { super(r, result); this.time = triggerTime; this.period = 0; this.sequenceNumber = sequenceNumber; } /** * Creates a periodic action with given nanoTime-based initial * trigger time and period. */ ScheduledFutureTask(Runnable r, V result, long triggerTime, long period, long sequenceNumber) { super(r, result); this.time = triggerTime; this.period = period; this.sequenceNumber = sequenceNumber; } /** * Creates a one-shot action with given nanoTime-based trigger time. */ ScheduledFutureTask(Callable<V> callable, long triggerTime, long sequenceNumber) { super(callable); this.time = triggerTime; this.period = 0; this.sequenceNumber = sequenceNumber; }
解读:
此代码片段的前两个构造函数通过调用父类FutureTask的特定构造函数将Runnable类型参数转换为Callable类型,对应代码如下:
/** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Callable}. * * @param callable the callable task * @throws NullPointerException if the callable is null */ public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } /** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Runnable}, and arrange that {@code get} will return the * given result on successful completion. * * @param runnable the runnable task * @param result the result to return on successful completion. If * you don‘t need a particular result, consider using * constructions of the form: * {@code Future<?> f = new FutureTask<Void>(runnable, null)} * @throws NullPointerException if the runnable is null */ public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
任务类型
ScheduledFutureTask 内部有一个变量 period 用来表示任务的类型,对应定义如下:
/** * Period for repeating tasks, in nanoseconds. * A positive value indicates fixed-rate execution. * A negative value indicates fixed-delay execution. * A value of 0 indicates a non-repeating (one-shot) task. */ private final long period;
run方法
对应定义如下:
/** * Overrides FutureTask version so as to reset/requeue if periodic. */ public void run() { if (!canRunInCurrentRunState(this)) cancel(false); else if (!isPeriodic()) super.run(); else if (super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
小结: