ScheduledExecutorService

public interface ScheduledExecutorService extends ExecutorService 

基于多线程,任务间不会互相影响

内部使用延迟队列,本身基于等待/唤醒机制

创建:

1、public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)创建线程池,参数即池中保存的线程数,返回新创建的线程池

2、public static ScheduledExecutorService newSingleThreadScheduledExecutor()单线程,顺序执行任务,新增任务在队列中排队等待,看到名字应该也可以guess这个是和单线程池结合

方法

/**
 * 创建并执行在给定延迟后启用的一次性操作
 *
 * @param command 要执行的任务 
 * @param delay 从现在开始延迟执行的时间 
 * @param unit 延时参数的时间单位 
 * @return 表示任务等待完成,并且其的ScheduledFuture get()方法将返回 null完成后 , 调度之后可通过Future.get()阻塞直至任务执行完毕
 * @throws RejectedExecutionException 如果任务无法安排执行 
 * @throws NullPointerException 如果命令为空 
 */
1. public ScheduledFuture<?> schedule(Runnable command,
                                      long delay, TimeUnit unit);

/**
 * 创建并执行在给定延迟后启用的ScheduledFuture,只执行一次
 *
 * @param callable 执行的功能 
 * @param delay 从现在开始延迟执行的时间 
 * @param unit 延迟参数的时间单位 
 * @param <V> the 可调用结果的类型 
 * @return一个可用于提取结果或取消的ScheduledFuture:调度之后可通过Future.get()阻塞直至任务执行完毕,并且可以获取执行结果
 * @throws RejectedExecutionException 如果该任务无法安排执行 
 * @throws NullPointerException 如果callable为空 
 */
2. public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                          long delay, TimeUnit unit);

/**
 * **带延迟时间的调度,循环执行,固定频率**
 * 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在initialDelay后开始执行,然后在 
 * initialDelay+period后执行,接着在initialDelay + 2 * period后执行,依此类推,应该注意到了下次执行任务的时间与执行过程所花费时间无关。 
 * 如果任务的执行遇到异常,则后续的执行被抑制。 否则,任务将仅通过取消或终止执行人终止。
 * 如果任务执行时间比其周期长,则后续执行可能会迟到,但不会同时执行。 
 *
 * @param command 要执行的任务 
 * @param initialDelay 首次执行的延迟时间
 * @param period 连续执行之间的周期
 * @param unit initialDelay和period参数的时间单位 
 * @return 一个ScheduledFuture代表待完成的任务,其 get()方法将在取消时抛出异常 
 * @throws RejectedExecutionException 如果任务无法安排执行 
 * @throws NullPointerException 如果命令为空 
 * @throws IllegalArgumentException 如果period小于或等于零 
 */
3. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                 long initialDelay,
                                                 long period,
                                                 TimeUnit unit);

/**
 * **带延迟时间的调度,循环执行,固定延迟**
 * 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
*  如果任务的执行遇到异常,则后续的执行被抑制。 否则,任务将仅通过取消或终止执行人终止。 
 *
 * @param command 要执行的任务 
 * @param initialDelay 首次执行的延迟时间
 * @param delay 一次执行终止和下一次执行开始之间的延迟
 * @param unit initialDelay和delay参数的时间单位
 * @return 表示挂起任务完成的ScheduledFuture,并且其get()方法在取消后将抛出异常
 * @throws RejectedExecutionException 如果任务不能安排执行 
 * @throws NullPointerException 如果command为null
 * @throws IllegalArgumentException 如果delay小于等于0
 */
4. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                    long initialDelay,
                                                    long delay,
                                                    TimeUnit unit);

上面的源码已经说了,scheduleAtFixedRate是(任务执行的开始时间的)固定频率,scheduleWithFixedDelay是(任务执行的结束时间的)固定延迟

源码

线程池

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    /** ThreadPoolExecutor构造方法,阻塞队列DelayedWorkQueue **/
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

scheduleWithFixedDelay()

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    // 1. 入参校验,包括空指针、数字范围
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    // 2. 将Runnable包装成`RunnableScheduledFuture`
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    // 3. 延迟执行`RunnableScheduledFuture`
    delayedExecute(t);
    return t;
}

delayedExecute()

/**
* 延迟执行
**/
private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 1. 线程池运行状态判断
    if (isShutdown())
        reject(task);
    else {
        // 2. 将任务添加到队列
        super.getQueue().add(task);
        // 3. 如果任务添加到队列之后,线程池状态变为非运行状态,
        // 需要将任务从队列移除,同时通过任务的`cancel()`方法来取消任务
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        // 4. 如果任务添加到队列之后,线程池状态是运行状态,需要提前启动线程
        else
            ensurePrestart();
    }
}

ensurePrestart()

/**
 * Same as prestartCoreThread except arranges that at least one
 * thread is started even if corePoolSize is 0.
 */
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    // 1. 当前线程数未达到核心线程数,则创建核心线程
    if (wc < corePoolSize)
        addWorker(null, true);
    // 2. 当前线程数已达到核心线程数,则创建非核心线程,
    // 2.1 不会将任务放到阻塞队列中,这一点是和普通线程池是不相同的
    else if (wc == 0)
        addWorker(null, false);
}

 

注意

当执行时间大于指定的间隔,不会开辟新的线程并发执行任务,而是等待任务执行完毕再执行

异常会中终止所有任务的执行,要处理异常

 

https://www.jianshu.com/p/a4608ac35277

https://www.jianshu.com/p/aeb391e4edb0

https://blog.csdn.net/wangmx1993328/article/details/80840598

https://blog.csdn.net/tsyj810883979/article/details/8481621?utm_source=distribute.pc_relevant.none-task

ScheduledExecutorService

ScheduledExecutorServiceScheduledExecutorService 没有大海的星辰没有灵魂 发布了431 篇原创文章 · 获赞 155 · 访问量 44万+ 他的留言板 关注
上一篇:Spark中将RDD转换成DataFrame的两种方法


下一篇:SparkStreaming