Concurrent包是jdk1.5所提供的一个针对高并发进行编程的包,也是多线程并发编程最喜欢问的,特别是从线程,线程池的使用做为最简单的切入点,先看个不太好理解的类图
接下来我们从头往下,一步步拆分开来研究:
Executor:多线程框架的主要接口,它底层就是一个execute执行方法。它代表着线程任务的执行,也是一切的开始,不管什么线程安全策略,多线程管理,定时,串行执行,都是建立在这个“执行”的基础之上,没有execute执行,线程也就等于无用。看它源码注释:
An object that executes submitted Runnable tasks. This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc. An Executor is normally used instead of explicitly creating threads. For example, rather than invoking new Thread(new(RunnableTask())).start() for each of a set of tasks, you might use:
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
…
以前我们学多线程的时候,记不记得有种写法就是这样直接new Runnable多线程任务,但是这种线程创建方式是不规范,不安全的。
The Executor implementations provided in this package implement ExecutorService, which is a more extensive interface. The ThreadPoolExecutor class provides an extensible thread pool implementation. The Executors class provides convenient factory methods for these Executors.
Memory consistency effects: Actions in a thread prior to submitting a Runnable object to an Executor happen-before its execution begins, perhaps in another thread.
ExecutorService是实现Executor接口的拥有更多扩展方法的一个接口。而ThreadPoolExecutor类提供了可扩展的线程池实现方式,Executors类则是为那些多线程的执行提供了便利的工厂模式(这一段话你就当它开始介绍它的儿子们多么有出息,下面会详细介绍)
ExecutorService:上面Executor就是它的父亲,注释里也介绍过它,它拥有更多的扩展方法,功能更强大,按照惯例,我们也是先看看它的自我介绍注释
An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks.
An ExecutorService can be shut down, which will cause it to reject new tasks. Two different methods are provided for shutting down an ExecutorService. The shutdown method will allow previously submitted tasks to execute before terminating, while the shutdownNow method prevents waiting tasks from starting and attempts to stop currently executing tasks. Upon termination, an executor has no tasks actively executing, no tasks awaiting execution, and no new tasks can be submitted. An unused ExecutorService should be shut down to allow reclamation of its resources.
Method submit extends base method Executor.execute(Runnable) by creating and returning a Future that can be used to cancel execution and/or wait for completion. Methods invokeAny and invokeAll perform the most commonly useful forms of bulk execution, executing a collection of tasks and then waiting for at least one, or all, to complete. (Class ExecutorCompletionService can be used to write customized variants of these methods.)
The Executors class provides factory methods for the executor services provided in this package.
Method submit extends base method Executor.execute(Runnable) by creating and returning a Future that can be used to cancel execution and/or wait for completion. Methods invokeAny and invokeAll perform the most commonly useful forms of bulk execution, executing a collection of tasks and then waiting for at least one, or all, to complete. (Class ExecutorCompletionService can be used to write customized variants of these methods.)
The Executors class provides factory methods for the executor services provided in this package.
Here is a sketch of a network service in which threads in a thread pool service incoming requests. It uses the preconfigured Executors.newFixedThreadPool factory method:
它先提及了一下它的爹Executor,它说:Executor提供了最简单的一个或者多个异步任务执行和终止的方法。但是我提供了更丰富的玩法,例如两种不同的关闭任务方式,shutdown拒绝接收新任务,而且我还能在终止之前设置某些线程任务的提交执行,而shutdownNow就会尝试停止当前正在执行的任务,并且防止等待中的任务开始执行。终止时,执行者没有正在执行的任务,没有等待执行的任务,并且不能提交新任务。应关闭未使用的ExecutorService以允许回收其资源。
同时我还扩展了我爹的单一execute方法,我有多种submit提交任务的姿势,返回一个future,这个future可以让你跟踪任务执行状态,也可以用于取消任务执行或者等待任务执行。InvokeAll和invokeAny方法使用方式上有点类似,都是批量执行,然后等到最少一个或者全部任务都完成。
我还可以用来通过工厂模式创建不同类型的线程池(其实严格意义来说,它只是一个接口,创建线程池以及这堆方法它只是定义了而已,还需要一个实现类去真正做这些事情,下面会详细说)
听完它自我介绍,再简单看下源码中的方法:
方法和用途跟它自我介绍的差不多,再详细的用法可以参考网上资料,重点提交以下它怎么工厂模式创建线程池:
所谓工厂模式,就是根据你给过来的参数条件通过对应的生产流水线生产出对应的类给你,这里就必须提及一下常见的几种线程池,具体有什么作用,怎么用?
看到这里懂了吧,ExecutorService接口是所有类型线程池的爷爷,所以不管哪种类型创建,返回类型都是ExecutorService爷爷来接住,为什么是爷爷?不是爹呢?因为中间还隔着一个AbstractExecutorService;(另外一条定时器功能的分支ScheduledThreadPoolExecutor后面单独说)
下面开始介绍AbstractExecutorService是干什么用的,为什么要隔着一层Abstract抽象
AbstractExecutorService:根据上面的例子可以看出它属于ExecutorService的下一层,其实它是实现了ExecutorService接口的一个抽象类
Provides default implementations of ExecutorService execution methods. This class implements the submit, invokeAny and invokeAll methods using a RunnableFuture returned by newTaskFor, which defaults to the FutureTask class provided in this package. For example, the implementation of submit(Runnable) creates an associated RunnableFuture that is executed and returned. Subclasses may override the newTaskFor methods to return RunnableFuture implementations other than FutureTask.
提供了ExecutorService接口的默认方法实现,这个类主要实现了submit, invokeAny ,invokeAll以及返回值类型FutureTask等方法,例如,submit(Runnable)的实现创建了一个关联的RunnableFuture,该RunnableFuture被执行并返回。子类可以重写newTaskFor方法以返回除FutureTask以外的RunnableFuture实现。
觉得拗口没关系,你就知道它是ExecutorService接口的一个抽象实现类,之所以是抽象的,是因为它也留有扩展空间提供给再下层的实现类去实现自己的逻辑。
(这里可以看到它实现了接口的方法)
ThreadPoolExecutor:是AbstractExecutorService的一个子类,主要作用是提供有参构造方法创建线程池,从根源上来说,它也是围绕着ExecutorService接口的功能做实现。
看它的源码注释:
The main pool control state, ctl, is an atomic integer packing two conceptual fields workerCount, indicating the effective number of threads runState, indicating whether running, shutting down etc In order to pack them into one int, we limit workerCount to (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 billion) otherwise representable. If this is ever an issue in the future, the variable can be changed to be an AtomicLong, and the shift/mask constants below adjusted. But until the need arises, this code is a bit faster and simpler using an int. The workerCount is the number of workers that have been permitted to start and not permitted to stop. The value may be transiently different from the actual number of live threads, for example when a ThreadFactory fails to create a thread when asked, and when exiting threads are still performing bookkeeping before terminating. The user-visible pool size is reported as the current size of the workers set. The runState provides the main lifecycle control, taking on values: RUNNING: Accept new tasks and process queued tasks SHUTDOWN: Don’t accept new tasks, but process queued tasks STOP: Don’t accept new tasks, don’t process queued tasks, and interrupt in-progress tasks TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method TERMINATED: terminated() has completed The numerical order among these values matters, to allow ordered comparisons. The runState monotonically increases over time, but need not hit each state. The transitions are: RUNNING -> SHUTDOWN On invocation of shutdown(), perhaps implicitly in finalize() (RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow() SHUTDOWN -> TIDYING When both queue and pool are empty STOP -> TIDYING When pool is empty TIDYING -> TERMINATED When the terminated() hook method has completed Threads waiting in awaitTermination() will return when the state reaches TERMINATED. Detecting the transition from SHUTDOWN to TIDYING is less straightforward than you’d like because the queue may become empty after non-empty and vice versa during SHUTDOWN state, but we can only terminate if, after seeing that it is empty, we see that workerCount is 0 (which sometimes entails a recheck – see below).
比较长,个人翻译浓缩一下,它主要负责线程池的创建和控制,然后劈里啪啦介绍了一堆几个常用的默认参数(最大线程数,核心线程数,等待队列等等),更详细的关于线程池的讲解请看:常见线程池用法
说到这一步,已经很接近我们常用的几种类型的线程池了,但是ThreadPoolExecutor只是一个总的线程池统一规范管理的类,要实现定制化功能的线程池,例如上面说的有边界线程池,可复用线程池,无边界线程池,串行线程池,定时线程池。这种定制化功能就需要再下一层子类去实现,那一块就是完完全全的线程池配置应用模块内容了,这里不做过多展开, 说到这里,关于AbstractExecutorService那条分支的关系一直撸到平时常用类型的线程池就看完了,但是不要忘记ExecutorService除了AbstractExecutorService,还有另外一条子类分支:ScheduledExecutorService,这一块比较特殊,所以到最后单独看看
ScheduledExecutorService
开始之前再看一下族谱,确定一下它的上下级类图关系
我们可以看到它们的父亲ExecutorService接口定义了很多线程执行和中断任务的方法,它的兄弟AbstractExecutorService实现了这些方法,往下发展了各种应用线程池,那么我们猜测一下为什么ScheduledExecutorService要独立出来?重点应该就是在Scheduled这个字眼上面,它的功能,更注重定时调度,定时执行。而不像它兄弟那样围绕着管理线程。带着这个猜测,去看源码注释:
An ExecutorService that can schedule commands to run after a given delay, or to execute periodically.
The schedule methods create tasks with various delays and return a task object that can be used to cancel or check execution. The scheduleAtFixedRate and scheduleWithFixedDelay methods create and execute tasks that run periodically until cancelled.
Commands submitted using the Executor.execute(Runnable) and ExecutorService submit methods are scheduled with a requested delay of zero. Zero and negative delays (but not periods) are also allowed in schedule methods, and are treated as requests for immediate execution.
All schedule methods accept relative delays and periods as arguments, not absolute times or dates. It is a simple matter to transform an absolute time represented as a java.util.Date to the required form. For example, to schedule at a certain future date, you can use: schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS). Beware however that expiration of a relative delay need not coincide with the current Date at which the task is enabled due to network time synchronization protocols, clock drift, or other factors.
The Executors class provides convenient factory methods for the ScheduledExecutorService implementations provided in this package.
这是一款能够跑定时命令或者周期性执行任务的执行器服务。
定时方法在创建任务的时候配合这各种类型的定时参数和任务对象来取消或者检查任务的执行;scheduleAtFixedRate和scheduleWithFixedDelay方法创建并执行定期运行直到任务取消。
使用Executor.execute(Runnable)和ExecutorService submit方式提交的命令的调度任务延迟为零。在调度方法中也允许零延迟和负延迟(但不允许周期),也就是说它的兄弟AbstractExecutorService不支持定时和周期性执行任务,但是它可以。
Executors类为这个包中提供的ScheduledExecutorService实现提供了方便的工厂方法。
综上所述,我们猜测是对的,它真的往更灵活的定时器和周期性执行方向去发展,相反线程的管理(线程池)的功能反倒不在他的考虑范围。
而它也只是一个接口,如果要更深一步地实现这些定时执行周期执行方法,则需要借助它的实现类:ScheduleThreadPoolExecutor
这样分析,我们就不难理解为什么ScheduledThreadPoolExecutor需要继承ThreadPoolExecutor又要实现ScheduledExecutorService;因为它们一个提供了线程池的配置和限制,一个提供了定时和周期性任务执行功能。
注意区分Executor和Executors
这个容易模糊,Executor是Concurrent包的一个*接口,是一切线程管理接口/类的爹,爷,祖爷,祖宗
Executors是一个工厂类,主要用来创建不同类型的线程池。它很独立,就是一个工具包的感觉,要用到哪些类哪些接口,直接import进来用的,没有抽象实现继承之类的关系。
Executors的注解:
创建一个线程池,该线程池重用在共享无边界队列上运行的固定数量的线程。在任何时候,线程最多都是活动的处理任务。如果在所有线程都处于活动状态时提交其他任务,它们将在队列中等待,直到有线程可用。如果任何线程在关机前的执行过程中由于失败而终止,那么如果需要执行后续任务,将有一个新线程代替它。池中的线程将一直存在,直到它显式关闭。
参数:nThreads–池中的线程数
返回:新创建的线程池