Java线程池架构2-多线程调度器(ScheduledThreadPoolExecutor)

在前面介绍了java的多线程的基本原理信息:《Java线程池架构原理和源码解析(ThreadPoolExecutor)》,本文对这个java本身的线程池的调度器做一个简单扩展,如果还没读过上一篇文章,建议读一下,因为这是调度器的核心组件部分。

 

我们如果要用java默认的线程池来做调度器,一种选择就是Timer和TimerTask的结合,在以前的文章:《Timer与TimerTask的真正原理&使用介绍》中有明确的说明:一个Timer为一个单独的线程,虽然一个Timer可以调度多个TimerTask,但是对于一个Timer来讲是串行的,至于细节请参看对应的那篇文章的内容,本文介绍的多线程调度器,也就是定时任务,基于多线程调度完成,当然你可以为了完成多线程使用多个Timer,只是这些Timer的管理需要你来完成,不是一个框架体系,而ScheduleThreadPoolExecutor提供了这个功能,所以我们第一要搞清楚是如何使用调度器的,其次是需要知道它的内部原理是什么,也就是知其然,再知其所以然

首先如果我们要创建一个基于java本身的调度池通常的方法是:

Executors.newScheduledThreadPool(int);

当有重载方法,我们最常用的是这个就从这个,看下定义:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

其实内部是new了一个实例化对象出来,并传入大小,此时就跟踪到ScheduledThreadPoolExecutor的构造方法中:

public ScheduledThreadPoolExecutor(int corePoolSize) {

        super(corePoolSize, Integer.MAX_VALUE, 0,TimeUnit.NANOSECONDS,

              new DelayedWorkQueue());

}

你会发现调用了super,而super你跟踪进去会发现,是ThreadPoolExecutor中,那么ScheduledThreadPoolExecutorThreadPoolExecutor有何区别,就是本文要说得重点了,首先我们留下个引子,你发现在定义队列的时候,不再是上文中提到的LinkedBlockingQueue,而是DelayedWorkQueue,那么细节上我们接下来就是要讲解的重点,既然他们又继承关系,其实搞懂了不同点,就搞懂了共同点,而且有这样的关系大多数应当是共同点,不同点的猜测:这个是要实现任务调度,任务调度不是立即的,需要延迟和定期做等情况,那么是如何实现的呢?


这就是我们需要思考的了,通过源码考察,我们发现,他们都有execute方法,只是ScheduledThreadPoolExecutor将源码进行了重写,并且还有以下四个调度器的方法:

public ScheduledFuture<?> schedule(Runnable command,
				       long delay, TimeUnit unit);

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
					   long delay, TimeUnit unit);

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
						  long initialDelay,
						  long period,
						  TimeUnit unit);

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
						     long initialDelay,
						     long delay,
						     TimeUnit unit);

那么这四个方法有什么区别呢?其实第一个和第二个区别不大,一个是Runnable、一个是Callable,内部包装后是一样的效果;所以把头两个方法几乎当成一种调度,那么三种情况分别是:

1、 进行一次延迟调度:延迟delay这么长时间,单位为:TimeUnit传入的的一个基本单位,例如:TimeUnit.SECONDS属于提供好的枚举信息;(适合于方法1和方法2)。

2、 多次调度,每次依照上一次预计调度时间进行调度,例如:延迟2s开始,5s一次,那么就是2、7、12、17,如果中间由于某种原因导致线程不够用,没有得到调度机会,那么接下来计算的时间会优先计算进去,因为他的排序会被排在前面,有点类似Timer中的:scheduleAtFixedRate方法,只是这里是多线程的,它的方法名也叫:scheduleAtFixedRate,所以这个是比较好记忆的(适合方法3)

3、 多次调度,每次按照上一次实际执行的时间进行计算下一次时间,同上,如果在7秒没有被得到调度,而是第9s才得到调度,那么计算下一次调度时间就不是12秒,而是9+5=14s,如果再次延迟,就会延迟一个周期以上,也就会出现少调用的情况(适合于方法3);

4、 最后补充execute方法是一次调度,期望被立即调度,时间为空:

public void execute(Runnable command) {

       if (command == null)

           throw new NullPointerException();

       schedule(command, 0, TimeUnit.NANOSECONDS);

    }

 

我们简单看看scheduleAtFixedRate、scheduleWithFixedDelay对下面的分析会更加有用途:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Object>(command,
                                            null,
                                            triggerTime(initialDelay, unit),
                                            unit.toNanos(period)));
        delayedExecute(t);
        return t;
}

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Boolean>(command,
                                             null,
                                             triggerTime(initialDelay, unit),
                                             unit.toNanos(-delay)));
        delayedExecute(t);
        return t;
}

你是否发现,两段源码唯一的区别就是在unit.toNanos(int)这唯一一个地方,scheduleAtFixedRate里面是直接传入值,而scheduleWithFixedDelay里面是取了相反数,也就是假如我们都传入正数,scheduleWithFixedDelay其实就取反了,没有任何区别,你是否联想到前面文章介绍Timer中类似的处理手段通过正负数区分时间间隔方法,为0代表仅仅调度一次,其实在这里同样是这样的,他们也同样有一个问题就是,如果你传递负数,方法的功能正好是相反的。

而你会发现,不论是那个schedule方法里头,都会创建一个ScheduledFutureTask类的实例,此类究竟是何方神圣呢,我们来看看。

ScheduledFutureTask的类(ScheduleThreadPoolExecutor的私有的内部类)来进行调度,那么可以看看内部做了什么操作,如下:

        ScheduledFutureTask(Runnable r, V result, long ns) {
            super(r, result);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        /**
         * Creates a periodic action with given nano time and period.
         */
        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        /**
         * Creates a one-shot action with given nanoTime-based trigger.
         */
        ScheduledFutureTask(Callable<V> callable, long ns) {
            super(callable);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }


最核心的几个参数正好对应了调度的延迟的构造方法,这些参数如何用起来的?那么它还提供了什么方法呢?

        public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
        }

        public int compareTo(Delayed other) {
            if (other == this) // compare zero ONLY if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long d = (getDelay(TimeUnit.NANOSECONDS) -
                      other.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0)? 0 : ((d < 0)? -1 : 1);
        }

        /**
         * 返回是否为片段,也就是多次调度
         *
         */
        public boolean isPeriodic() {
            return period != 0;
        }

这里发现了,他们可以运行,且判定时间的方法是getDelay方法我们知道了。

对比时间的方法是:compareTo,传入了参数类型为:Delayed类型,不难猜测出,ScheduledFutureTaskDelayed有某种继承关系,没错,ScheduledFutureTask实现了Delayed的接口,只是它是间接实现的;并且Delayed接口继承了Comparable接口,这个接口可用来干什么?看过我前面写的一篇文章关于中文和对象排序的应该知道,这个是用来自定义对比和排序的,我们的调度任务是一个对象,所以需要排序才行,接下来我们回溯到开始定义的代码中,找一个实际调用的代码来看看它是如何启动到run方法的?如何排序的?如何调用延迟的?就是我们下文中会提到的,而这里我们先提出问题,后文我们再来说明这些问题。


我们先来看下run方法的一些定义。

          /**
           * 时间片类型任务执行
           */
         private void runPeriodic() {
            //运行对应的程序,这个是具体的程序
            boolean ok = ScheduledFutureTask.super.runAndReset();
            boolean down = isShutdown();
            // Reschedule if not cancelled and not shutdown or policy allows
            if (ok && (!down ||
                       (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
                        !isStopped()))) {
                long p = period;
                if (p > 0)//规定时间间隔算出下一次时间
                    time += p;
                else//用当前时间算出下一次时间,负负得正
                    time = triggerTime(-p);
                //计算下一次时间,并资深再次放入等待队列中
                ScheduledThreadPoolExecutor.super.getQueue().add(this);
            }
            else if (down)
                interruptIdleWorkers();
        }

        /**
         * 是否为逐片段执行,如果不是,则调用父亲类的run方法
         */
        public void run() {
            if (isPeriodic())//周期任务
                runPeriodic();
            else//只执行一次的任务
                ScheduledFutureTask.super.run();
        }

可以看到run方法首先通过isPeriod()判定是否为时间片,判定的依据就是我们说的时间片是否“不为零”,如果不是周期任务,就直接运行一次,如果是周期任务,则除了运行还会计算下一次执行的时间,并将其再次放入等待队列,这里对应到scheduleAtFixedRate、scheduleWithFixedDelay这两个方法一正一负,在这里得到判定,并且将为负数的取反回来,负负得正,java就是这么干的,呵呵,所以不要认为什么是不可能的,只要好用什么都是可以的,然后计算的时间一个是基于标准的time加上一个时间片,一个是根据当前时间计算一个时间片,在上文中我们已经明确说明了两者的区别。



以:schedule方法为例:

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
	   			       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
}




其实这个方法内部创建的就是一个我们刚才提到的:ScheduledFutureTask,外面又包装了下叫做RunnableScheduledFuture,也就是适配了下而已,呵呵,代码里面就是一个return操作,java这样做的目的是方便子类去扩展。

 

关键是delayedExecute(t)方法中做了什么?看名称是延迟执行的意思,难道java的线程可以延迟执行,那所有的任务线程都在运行状态?

 

它的源码是这样的:

    private void delayedExecute(Runnable command) {
        if (isShutdown()) {
            reject(command);
            return;
        }
        if (getPoolSize() < getCorePoolSize())
            prestartCoreThread();

        super.getQueue().add(command);
    }

我们主要关心prestartCoreThread()和super.getQueue().add(command),因为如果系统关闭,这些讨论都没有意义的,我们分别叫他们第二小段代码和第三小段代码。

第二个部分如果线程数小于核心线程数设置,那么就调用一个prestartCoreThread(),看方法名应该是:预先启动一个核心线程的意思,先看完第三个部分,再跟踪进去看源码。

第三个部分很明了,就是调用super.getQueue().add(command);也就是说直接将任务放入一个队列中,其实super是什么?super就是我们上一篇文章所提到的ThreadPoolExecutor,那么这个Queue就是上一篇文章中提到的等待队列,也就是任何schedule任务首先放入等待队列,然后等待被调度的。


    public boolean prestartCoreThread() {
        return addIfUnderCorePoolSize(null);
    }
    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
}

这个代码是否似曾相似,没错,这个你在上一篇文章介绍ThreadPoolExecutor的时候就见到过,说明不论是ThreadPoolExecutor还是ScheduleThreadPoolExecutor他们的Thread都是由一个Worker来处理的(上一篇文章有介绍),而这个Worker处理的基本机制就是将当前任务执行后,不断从线程等待队列中获取数据,然后用以执行,直到队列为空为止。

那么他们的区别在哪里呢?延迟是如何实现的呢?和我们上面介绍的ScheduledFutureTask又有何关系呢?

 

那么我们回过头来看看ScheduleThreadPool的定义是如何的。

public ScheduledThreadPoolExecutor(int corePoolSize) {

        super(corePoolSize, Integer.MAX_VALUE, 0,TimeUnit.NANOSECONDS,

              new DelayedWorkQueue());

}

发现它和ThreadPoolExecutor有个定义上很大的区别就是,ThreadPoolExecutor用的是LinkedBlockingQueue(当然可以修改),它用的是DelayedWeorkQueue,而这个DelayedWorkQueue里面你会发现它仅仅是对java.util.concurrent.DelayedQueue类一个简单访问包装,这个队列就是等待队列,可以看到任务是被直接放到等待队列中的,所以取数据必然从这里获取,而这个延迟的队列有何神奇之处呢,它又是如何实现的呢,我们从什么地方下手去看这个DelayWorkQueue?

 

我们还是回头看看Worker里面的run方法(上一篇文章中已经讲过):

       public void run() {
            try {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }

这里面要调用等待队列就是getTask()方法:

Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
            } catch (InterruptedException ie) {
            }
        }
}

你会发现,如果没有设置超时,默认只会通过workQueue.take()方法获取数据,那么我们就看take方法,而增加到队列里面的方法自然看offer相关的方法。接下来我们来看下DelayQueue这个队列的take方法:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    available.await();//等待信号,线程一直挂在哪里
                } else {
                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay > 0) {
                        long tl = available.awaitNanos(delay);//最左等delay的时间段
                    } else {
                        E x = q.poll();//可以运行,取出一个
                        assert x != null;
                        if (q.size() != 0)
                            available.signalAll();
                        return x;

                    }
                }
            }
        } finally {
            lock.unlock();
        }
}


这里的for就是要找到数据为止,否则就等着,而这个“q”和“available”是什么呢?

private transient final Condition available = lock.newCondition();

private final PriorityQueue<E> q = new PriorityQueue<E>();

怎么里面还有一层队列,不用怕,从这里你貌似看出点名称意味了,就是它是优先级队列,而对于任务调度来讲,优先级的方式就是时间,我们用这中猜测来继续深入源码。

 

上面首先获取这个队列的第一个元素,若为空,就等待一个available发出的信号,我们可以猜测到这个offer的时候会发出的信号,一会来验证即可;若不为空,则通过getDelay方法来获取时间信息,这个getDelay方法就用上了我们开始说的ScheduledFutureTask了,如果是时间大于0,则也进入等待,因为还没开始执行,等待也是“available”发出信号,但是有一个最长时间,为什么还要等这个信号,是因为有可能进来一个新的任务,比这个等待的任务还要先执行,所以要等这个信号;而最多等这么长时间,就是因为如果这段时间没任务进来肯定就是它执行了。然后就返回的这个值,被Worker(上面有提到)拿到后调用其run()方法进行运行。

 

那么写入队列在那里?他们是如何排序的?

我们看看队列的写入方法是这样的:
public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            q.offer(e);
            if (first == null || e.compareTo(first) < 0)
                available.signalAll();
            return true;
        } finally {
            lock.unlock();
        }
}

队列也是首先取出第一个(后面会用来和当前任务做比较),而这里“q”是上面提到的“PriorityQueue”,看来offer的关键还在它的里面,我们看看调用过程:

public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        modCount++;
        int i = size;
        if (i >= queue.length)
            grow(i + 1);
        size = i + 1;
        if (i == 0)
            queue[0] = e;
        else
            siftUp(i, e);//主要是这条代码很关键
        return true;
}
private void siftUp(int k, E x) {
        if (comparator != null)
            siftUpUsingComparator(k, x);
        else
        //我们默认走这里,因为DelayQueue定义它的时候默认没有给定义comparator
            siftUpComparable(k, x);    
}
/*
可以发现这个方法是将任务按照compareTo对比后,放在队列的合适位置,但是它肯定不是绝对顺序的,这一点和Timer的内部排序机制类似。
*/
private void siftUpComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (key.compareTo((E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = key;
}

你是否发现,compareTo也用上了,就是我们前面描述一大堆的:ScheduledFutureTask类中的一个方法,那么run方法也用上了,这个过程貌似完整了。

 

我们再来理一下思路:

1、调用的Thread的包装,由在ThreadPoolExecutor中的Worker调用你传入的Runnable的run方法,变成了Worker调用Runnable的run方法,由它来处理时间片的信息调用你传入的线程。

2、ScheduledFutureTask类在整个过程中提供了基础参考的方法,其中最为关键的就是实现了接口Comparable,实现内部的compareTo方法,也实现了Delayed接口中的getDelay方法用以判定时间(当然Delayed接口本身也是继承于Comparable,我们不要纠结于细节概念就好)。

3、等待队列由在ThreadPoolExecutor中默认使用的LinkedBlockingQueue换成了DelayQueue(它是被DelayWorkQueue包装了一下子,没多大区别),而DelayQueue主要提供了一个信号量“available”来作为写入和读取的信号控制开关,通过另一个优先级队列“PriorityQueue”来控制实际的队列顺序,他们的顺序就是基于上面提到的ScheduledFutureTask类中的compareTo方法,而是否运行也是基于getDelay方法来实现的。

4、ScheduledFutureTask类的run方法会判定是否为时间片信息,如果为时间片,在执行完对应的方法后,开始计算下一次执行时间(注意判定时间片大于0,小于0,分别代表的是以当前执行完的时间为准计算下一次时间还是以当前时间为准),这个在前面有提到。

5、它是支持多线程的,和Timer的机制最大的区别就在于多个线程会最征用这个队列,队里的排序方式和Timer有很多相似之处,并非完全有序,而是通过位移动来尽量找到合适的位置,有点类似贪心的算法,呵呵。







上一篇:predis连接redis sentinel的问题处理


下一篇:少个人保护?我来!——阿里云在ICANN第3届GDD峰会纪实