【JUC源码解析】ScheduledThreadPoolExecutor

简介

它是一个线程池执行器(ThreadPoolExecutor),在给定的延迟(delay)后执行。在多线程或者对灵活性有要求的环境下,要优于java.util.Timer。

提交的任务在执行之前支持取消,默认情况下,在延迟到来之前,不会自动从队列中删除,但可以设置,使其立刻从队列中移除。

有两种模式,固定频率(scheduleAtFixedRate)和固定延迟(scheduleWithFixedDelay),不管哪种模式,同一个任务不会被叠加执行,即便是不同的线程执行同一个任务。

继承ThreadPoolExecutor,维护一个固定大小的线程池和一个*延迟队列(delay queue)。

ScheduledFutureTask,用来描述要执行的任务,DelayedWorkQueue,则是装在这些任务的delay queue.

固定频率

一个任务,从第一次开始执行的时间点开始,每隔一定的时间执行一次,如果执行的时间大于间隔时间,则要等这次执行结束,再执行下一次。

【JUC源码解析】ScheduledThreadPoolExecutor

如上图所示,蓝色表示任务执行,白色表示间隔时间。

固定延迟

一个任务,每一次执行结束之后,延迟一定的时间,执行下一次。

【JUC源码解析】ScheduledThreadPoolExecutor

如上图所示,蓝色表示任务执行,白色表示间隔时间。

源码分析

属性

     private volatile boolean continueExistingPeriodicTasksAfterShutdown; // shut down之后,是否取消period任务

     private volatile boolean executeExistingDelayedTasksAfterShutdown = true; // shut down之后,是否取消non-period任务

     private volatile boolean removeOnCancel = false; // cancel后,是否从队列里移除此任务

     private static final AtomicLong sequencer = new AtomicLong(); // 给任务编号

ScheduledFutureTask

属性

         private final long sequenceNumber; // 序列编号

         private long time; // 执行时间

         private final long period; // 周期,正值:固定频率;负值:固定延迟;0:不重复执行

         RunnableScheduledFuture<V> outerTask = this; // 实际任务

         int heapIndex; // 堆索引

构造方法

         ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0; // 不重复执行
this.sequenceNumber = sequencer.getAndIncrement();
} ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
} ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}

关键方法

getDelay(TimeUnit)

         public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}

compareTo(Delayed other)

         public int compareTo(Delayed other) { // 根据延迟比较元素,在延迟队列中,延迟越小越靠前,延迟最小的在队首,最先出队被执行
if (other == this)
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

setNextRunTime()

         private void setNextRunTime() { // 设置下一次执行时间
long p = period;
if (p > 0) // 固定频率,从第一次时间点,每次加period
time += p;
else
time = triggerTime(-p); // 固定延迟,每次执行结束后,加period作为下一次执行时间
}

cancel(boolean mayInterruptIfRunning)

         public boolean cancel(boolean mayInterruptIfRunning) { // 取消任务
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}

run()

         public void run() { // 执行任务
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run(); // 单次执行
else if (ScheduledFutureTask.super.runAndReset()) { // 周期执行,runAndReset
setNextRunTime(); // 设置下次执行时间
reExecutePeriodic(outerTask); // 重新加入队列
}
}

构造方法

     public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
} public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
} public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler);
} public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
}

关键方法

scheduleAtFixedRate

     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { // 固定频率
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit),
unit.toNanos(period)); // period 大于 0
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

scheduleWithFixedDelay

     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { // 固定延迟
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit),
unit.toNanos(-delay)); // -delay 小于 0
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

delayedExecute(RunnableScheduledFuture<?> task)

     private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown()) // 如果线程池已经shut down,则拒绝任务
reject(task);
else {
super.getQueue().add(task); // 否则,添加任务到延迟队列
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
task.cancel(false); // 根据run-after-shutdown参数,决定是否取任务
else
ensurePrestart(); // 保证线程启动
}
}

reExecutePeriodic(RunnableScheduledFuture<?> task)

     void reExecutePeriodic(RunnableScheduledFuture<?> task) { // 周期性任务重新入队,策略同delayedExecute
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}

ensurePrestart()

     void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}

该方法在ThreadPoolExecutor类,保证线程池中至少有一个活动线程。

triggerTime()

     long triggerTime(long delay) { // 返回延迟动作的触发时间
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
} private long overflowFree(long delay) { // 处理溢出情况
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}

DelayedWorkQueue

属性

         private static final int INITIAL_CAPACITY = 16; // 初始容量
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; // 堆,充当优先级队列
private final ReentrantLock lock = new ReentrantLock(); // 可重入锁
private int size = 0;
private Thread leader = null; // 领导者线程
private final Condition available = lock.newCondition(); // 条件队列

关键方法

以下这些方法的解释可参考前两篇文章,【JUC源码解析】DelayQueue【JUC源码解析】PriorityBlockingQueue

siftUp

         private void siftUp(int k, RunnableScheduledFuture<?> key) { // 向上调整,同
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}

siftDown

         private void siftDown(int k, RunnableScheduledFuture<?> key) { // 向下调整
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}

grow

         private void grow() { // 扩容
int oldCapacity = queue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
queue = Arrays.copyOf(queue, newCapacity);
}

offer

         public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}

take

         public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}

行文至此结束。

尊重他人的劳动,转载请注明出处:http://www.cnblogs.com/aniao/p/aniao_stpe.html

上一篇:java垃圾回收机制GC


下一篇:MFC之CToolTipCtrl按钮提示(消息捕获和消息传递)