引入
为什么使用线程池?
在连接数少的情况下,对于需要线程的地方我们只需要直接新建线程来处理就可以了,但是在并发量高的场景下,频繁的线程创建、销毁是非常消耗资源的,所以针对于这样的场景可以使用线程池,让一开始就创建好线程,在需要新连接进来需要线程时就从线程池中拿一条执行,完成后再将线程放回线程池,等到其他线程需要时再获取就可以了,这样可以有效提高系统整体的性能。
线程池的好处?适应场景?
好处:1、降低资源损耗 2、响应速度快 3、方便线程管理 4、提供定时执行、定期执行、并发数控制等功能。适用场景:并发量大,IO操作多,需要频繁创建线程的场景。
阻塞队列
阻塞队列是一个支持两个附加操作的队列,其本质还是一个队列,当内部存储的数据量超过当前阻塞队列的容量时,就会阻塞,停止接收新的数据;同样,如果内部存储的数据量变为0,那么也会阻塞,外界的数据请求也会不再接收。
种类
1、ArrayBlockingQueue:由数组结构组成的有界阻塞队列
2、LinkedBlockingQueue:由链表组成的有界(但大小默认值为Integer.MAX_Value)
3、PriorityBlockingQueue:支持优先级排序的*阻塞队列
4、DelayQueue:使用优先级队列实现的延迟*阻塞队列
5、SynchronizedQueue:不存储元素的阻塞队列,也即单个元素的队列
6、LinkedTransferQueue:由链表结构组成的*阻塞队列
7、LinkedBlockingDeque:由链表结构组成的双向阻塞队列
其中橘色的三种是常用的。其中 LinkedBlockingQueue 和 SynchronizedQueue 是两个极端,SynchronizedQueue 是没有容量的阻塞队列,而 LinkedBlockingQueue 在未指定容量时可以看作是容量无穷大的阻塞队列。
核心方法
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
检查 | element() | peek() | 不可用 | 不可用 |
检查是在有数据时返回队列的第一个数据,并不会从队列中移除该数据。内部使用 ReentrantLock 进行同步控制的。
抛出异常 |
当阻塞队列满时,再往队列里add插入元素会抛lllegalStateException:Queue full 当阻塞队列空时,再往队列里remove移除元素会抛NoSuchElementException |
特殊值 |
插入方法,成功true失败false 移除方法,成功返回出队列的元素,没有元素返回null |
阻塞 |
队列满时put,队列会一直阻塞直到put数据或者响应中断退出 队列为空时take,队列会一直阻塞直到队列可用 |
超时 | 当队列满时,队列会阻塞生产者线程一定时间,超时后限时后生产者线程会退出 |
线程池
线程池的核心接口是 ExecutorService,它定义了线程池的各个基本抽象方法。
执行机制
当新的线程请求进来时,会先判断核心线程数是否已满,如果未满则直接新建线程并执行,执行完将其放回线程池;
如果已满就再检查队列是否已满,如果没满就将当前线程请求加入阻塞队列,等待空闲线程分配;
如果已满就再检查线程池当前存在的线程数是否已达到规定的最大值,如果没有达到就创建线程执行;
如果达到就执行对应的饱和策略。
其中的名词下面会解释。
种类
ThreadPoolExecutor
首先看一下《阿里巴巴Java开发手册》中推荐的线程池创建,这也是线程池的最基本创建方式。那就是使用创建 ThreadPoolExecutor 对象来作为线程池,首先看一下它的构造器
可以看到它有四种构造函数,但前三种的实现本质还是使用第四种实现的,只不过使用的都是对应默认配置而已。
所以我们着重看一下第四个构造函数。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
内容是对应属性的赋值,方法参数从左到右分别为核心线程数、线程池允许同时存在的最大线程数、线程的最大存活时间、最大存活时间的单位、阻塞队列、线程工厂、拒绝策略。
核心线程数、最大线程数、阻塞队列和拒绝策略就是上面执行机制中提到的。最大存活时间是指非核心线程在执行完代码后回到线程池,在经过最大存活时间后仍然没有任务分配给它,那么它就会被回收。核心线程则不会被回收,所以核心线程数就规定了线程池的最小线程数(当前线程池刚刚被创建时为0,直到有线程请求进来后才会开始创建线程)。线程工厂是指定创建线程的工厂,这样在创建线程可以更方便。拒绝策略是指在阻塞队列满以及线程池容纳的线程数也达到最大线程池后执行的策略。
拒绝策略包括以下几种:
1、ThreadPoolExecutor.AbortPolicy:新任务提交直接抛出异常,RejectedExecutionException(默认)
2、ThreadPoolExecutor.CallerRunsPolicy:即不抛弃任务也不抛出异常,而是将任务返回给调用者。不会在线程池中执行,而是在调用executor方法的线程中执行(也就是传进来的Runnble对象来创建线程启动运行),会降低新任务的提交速度,影响程序的整体性能。
3、ThreadPoolExecutor.DiscardPolicy:直接抛弃新提交的任务。
4、ThreadPoolExecutor.DiscardOldestPolicy:抛弃最早加入阻塞队列的请求。
需要注意的是这些拒绝策略其实是 ThreadPoolExecutor 的内部类。
关于核心线程数的设置,可以参考下面的配置
1、对于CPU密集型,也就是代码大部分操作都是CPU去执行计算处理的,不需要创建过多的线程,所以可以设置为 CPU核数+1
2、对于IO密集型,因为IO操作往往伴随着线程的线程的使用,所以应该设置大一些,所以可以设置为 CPU核数*2
线程池的状态
1、Running。运行中,线程池正常执行,当线程池被创建后就会进入 Running 状态。
2、ShutDown。关闭,不会再接受新的线程请求,但是还是会处理阻塞队列中的请求。当调用对象的 shutdown 方法后就会进入该状态。
3、Stop。停止,不会再接受新的线程请求,也不会再处理阻塞队列中的请求。当调用对象的 shutdownNow 方法后就会进入该状态。
4、Tidying。进入该状态会开始执行线程池的 terminated 方法。在 ShutDown 状态中阻塞队列为空,同时线程池中的工作线程数为0时就会进入该状态;在 Stop 状态中工作线程数为0就会进入该状态。
5、Terminated。终止。表示线程池正式停止工作。当在 Tidying 状态中执行完 terminated 方法后就会进入该状态。
常用方法
1、execute(Runnable):处理 Ruunable 类型线程请求
2、submit(Runnable)、submit(Callable<T>):处理 Runnable 或者 Callable 类型的线程请求。submit 方法实现就是将 Callable对象转成 FutureTask 类型对象再调用 execute 方法处理。
3、shutdown():进入 ShutDown 状态
4、shutdownNow():进入 Stop 状态
5、terminated():线程池停止前执行的方法,空方法,子类可以来重写自定义。
6、getTaskCount():获取线程池接收的任务总数
7、getCompletedTaskCount():获取线程池已完成的任务数
8、getPoolSize():获取线程池的线程数量
9、getActiveCount():获取线程池正在执行的线程数
其他封装好的线程池
对于直接创建 ThreadPoolExecutor 对象来实现线程池的创建,过程比较复杂,当然在实际开发中还是推荐这种方式,而在某些场景中则不需要定义这么规范的线程池,所以在 Executors 工具类中为我们封装了几种线程池,我们只需要调用方法就可以获取对应的线程池。
1、Executors.newSingletonThreadExecutor。方法源码如下。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
可以看到这个线程池就是一个单线程的线程池,只能存储一个线程,但是它的阻塞队列是 LinkedBlockingQueue,所以意味着阻塞队列的容量可以看作是无限大的。
2、Executors.newFixedThreadPool(int)。方法源码如下。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
这个线程池的最大线程数是传参值,核心线程数也是传参值,这意味着在工作线程执行完后回到线程池永远不会被回收,使用的阻塞队列也是 LinkedBlockingQueue。
3、Executors.newCachedThreadPool()。方法源码如下。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
这里的核心线程数是0,也就是线程池在空闲时会回收所有的线程,但是最大线程数是 Integer的最大范围,所以可以看作可以同时包括无限大的线程,并且使用的阻塞队列是 SynchronousQueue,所以当线程请求进来时总会立即创建线程执行。
4、Executors.newScheduledThreadPool(int)。方法源码如下。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
这个线程池的核心线程数是指定的参数,最大线程数同样是无限大,阻塞队列是 DelayedWorkQueue(),默认容量是16。
上面四种封装好的线程池都有缺陷,前两个因为阻塞队列是 LinkedBlockingQueue,所以在大量的线程请求进来时大部分会存储在阻塞队列中,最终撑爆堆空间,抛出OOM;而后两个因为允许的最大线程数是 Integer.MAX_VALUE,所以可以看作是无限大的,所以在大量的线程请求进来时也会因为创建过多的线程数而抛出OOM。所以这四种线程池需要慎用。