线程池的参数设定,为什么这样设定?有什么好处?
自定义线程池
package com.sunhui.thread.CompletableFuture.util;
/**
* @Description
* @ClassName ThreadPoolUtil
* @Author SunHui
* @Date 2021/9/24 9:47 上午
*/
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 线程池管理的工具类,封装类
*
* @author SunHui
*/
public class ThreadPoolUtil {
// 通过ThreadPoolExecutor的代理类来对线程池的管理
private static volatile ThreadPollProxy mThreadPollProxy;
// 单列对象
public static ThreadPollProxy getThreadPollProxy() {
if (mThreadPollProxy == null) {
synchronized (ThreadPollProxy.class) {
if (mThreadPollProxy == null) {
mThreadPollProxy = new ThreadPollProxy(Runtime.getRuntime().availableProcessors() * 2,
Runtime.getRuntime().availableProcessors() * 4,
30);
}
}
}
return mThreadPollProxy;
}
// 通过ThreadPoolExecutor的代理类来对线程池的管理
public static class ThreadPollProxy {
public ThreadPoolExecutor poolExecutor;// 线程池执行者 ,java内部通过该api实现对线程池管理
private int corePoolSize;
private int maximumPoolSize;
private long keepAliveTime;
public ThreadPollProxy(int corePoolSize, int maximumPoolSize, long keepAliveTime) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.keepAliveTime = keepAliveTime;
poolExecutor = new ThreadPoolExecutor(
// 核心线程数量
corePoolSize,
// 最大线程数量
maximumPoolSize,
// 当线程空闲时,保持活跃的时间
keepAliveTime,
// 时间单元 ,毫秒级
TimeUnit.SECONDS,
// 线程任务队列
new LinkedBlockingQueue<>(20000),
// 创建线程的工厂
Executors.defaultThreadFactory(),
// 拒绝策略,直接丢弃
new ThreadPoolExecutor.AbortPolicy());
}
}
源码:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池作用
- 降低资源消耗;提高线程利用率,降低创建和销毁线程的消耗。
- 提高响应速度;任务来了,直接有线程可用可执行,而不需要先创建线程,在执行。
- 提高线程的可管理性;线程是稀缺资源,使用线程池可以统一分配调优监控。
线程池七大参数
- corePoolSize :代表核心线程数,也就是正常情况下创建的工作线程数,这些线程创建后并不会销毁,而是会常驻。当核心线程的数量等于线程池允许的核心线程最大数量的时候,如果有新任务来,不会再创建新的核心线程;
- maxinumPoolSize:代表的是最大线程数,它与核心线程数相对应,表示最大允许创建的线程数,比如任务较多,将核心线程数都用完了,仍然无法满足需求时,此时就会继续创建新的线程,但是线程池内线程总数不会超过最大线程数;
- keepAliveTime:表示超出核心线程数之外的线程的空闲时间,也就是说核心线程不会消除,但是超出核心线程数的部分线程如果空闲一定时间则会销毁,我们可以通过设置keepAliveTime来控制空闲时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率,不然线程刚执行完一个任务,还没来得及处理下一个任务,线程就被终止,而需要线程的时候又再次创建,刚创建完不久执行任务后,没多少时间又终止,线程反复切换会导致资源浪费(还可以设置allowCoreThreadTimeout = true这样就会让核心线程池中的线程有了存活的时间);
- unit:表示线程活动保持时间的单位:可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)
- workQueue:用来保存等待执行任务的阻塞队列,假设我们现自核心线程都被使用了,还有任务进来则全部放入阻塞队列中,直到阻塞队列放满任务还再继续加入新任务则会创建新的新的线程;
- ThreadFactory:创建线程的工厂,用来生产线程去执行任务。我们可以选择使用默认的创建工厂,生产的线程在同一个组内,拥有相同的优先级,且都不是守护线程。当然我们也可以根据业务需要选择自定义线程工厂;
- Handler:任务拒绝策略。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK1.5中Java线程池框架提供了4种策略,下面会详细讲解下。
线程池处理流程
线程池的四大拒绝策略
测试代码:
// 创建200个任务,每个任务执行时间1秒钟
List<MyTask> tasks = IntStream.range(0, 200)
.mapToObj(i -> new MyTask(1))
.collect(toList());
// 创建自定义线程池
ThreadPoolUtil.ThreadPollProxy threadPollProxy = ThreadPoolUtil.getThreadPollProxy();
- AbortPolicy(默认策略)
public static ThreadPollProxy getThreadPollProxy() {
if (mThreadPollProxy == null) {
synchronized (ThreadPollProxy.class) {
if (mThreadPollProxy == null) {
mThreadPollProxy = new ThreadPollProxy(
3, 5, 3);
}
}
}
return mThreadPollProxy;
}
// 通过ThreadPoolExecutor的代理类来对线程池的管理
public static class ThreadPollProxy {
public ThreadPoolExecutor poolExecutor;// 线程池执行者 ,java内部通过该api实现对线程池管理
private int corePoolSize;
private int maximumPoolSize;
private long keepAliveTime;
public ThreadPollProxy(int corePoolSize, int maximumPoolSize, long keepAliveTime) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.keepAliveTime = keepAliveTime;
poolExecutor = new ThreadPoolExecutor(
// 核心线程数量
corePoolSize,
// 最大线程数量
maximumPoolSize,
// 当线程空闲时,保持活跃的时间
keepAliveTime,
// 时间单元 ,毫秒级
TimeUnit.SECONDS,
// 线程任务队列
new LinkedBlockingQueue<>(20),
// 创建线程的工厂
Executors.defaultThreadFactory(),
// 拒绝策略,直接丢弃
new ThreadPoolExecutor.AbortPolicy());
}
使用默认的 AbortPolicy策略 运行结果如下所示,直接抛出异常信息
pool-1-thread-2
pool-1-thread-1
pool-1-thread-4
pool-1-thread-2
pool-1-thread-5
pool-1-thread-3
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@3b0143d3 rejected from java.util.concurrent.ThreadPoolExecutor@731a74c[Running, pool size = 5, active threads = 5, queued tasks = 14, completed tasks = 13]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
at com.sunhui.thread.CompletableFuture.CompletableFutureTest1.lambda$main$1(CompletableFutureTest1.java:50)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at com.sunhui.thread.CompletableFuture.CompletableFutureTest1.main(CompletableFutureTest1.java:51)
- CallerRunsPolicy 策略
我们发现最后一个任务竟然还是main线程帮我们执行了,因为CallerRunsPolicy策略就是,等线程池处理不了的任务,谁提交的任务,就给谁拿回去,让他自己执行,这里发现是main提交的任务,所以最终还是还给了main线程自己执行
pool-1-thread-5
pool-1-thread-4
main
pool-1-thread-3
pool-1-thread-4
pool-1-thread-4
- DiscardPolicy 策略
直接抛弃策略,异常也不会抛,什么都不做
pool-1-thread-5
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
- DiscardOldestPolicy 策略
尝试找第一个线程帮忙执行,但是得要等线程1给执行完才可以执行,不过这个得在线程配置大一点情况下才可以看出,这里我也是试了很久感觉没啥效果
pool-1-thread-4
pool-1-thread-5
pool-1-thread-2
pool-1-thread-3
pool-1-thread-1
小结:当配置的抛弃策略什么都不做或者直接抛弃时,会很难发现问题,所以最好不要使用这种策略,使用默认的策略 AbortPolicy 即可。
线程池中阻塞队列的作用
- 一般的队列只能保证作为一个有限长度的缓冲区,如果超出了缓冲长度,就无法保留当前任务了,阻塞队列通过阻塞可以保留住当前想要继续入队的任务。
阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait状态,释放cpu资源。 - 阻塞队列自带的阻塞和唤醒功能,不需要额外处理,无任务时,线程池利用阻塞队列的take方法挂起,从而维持核心线程的存活,不至于一直占用cpu资源。
为什么当核心线程数已满时,是先添加队列,而不是先创建临时线程
- 在创建新线程的时候,需要获取全局锁,这个时候其他线程就得被阻塞,影响了线程的整体执行效率。而将任务添加到队列缓冲,很好的避免了临时线程的创建销毁开销。
- 举例:一个企业里面有10个(core)正式员工的名额,最多招收10个正式工,要是任务超过了正式工的数量(task > core)的情况下,工厂领导(线程池)不是首先扩招工人,还是这10个正式工,允许任务稍微积压一下(放到队列中)。让10个正式工慢慢干,迟早会干完的,要是任务还在继续增加,超过了正式工的加班忍耐极限(队列已满),这时就不得不招收临时工了,要是加上临时工后,还是不能及时完成的任务,这时就会被领导拒绝(执行拒绝策略)
小结
- 当有新任务来的时候,先看看当前的线程数有没有超过核心线程数,如果没超过就直接新建一个线程来执行新的任务,如果超过了就看看缓存队列有没有满,没满就将新任务放进缓存队列中,满了就新建一个线程来执行新的任务,如果线程池中的线程数已经达到了指定的最大线程数了,那就根据相应的策略拒绝任务。
- 当缓存队列中的任务都执行完了的时候,线程池中的线程数如果大于核心线程数,就销毁多出来的线程,直到线程池中的线程数等于核心线程数。此时这些线程就不会被销毁了,它们一直处于阻塞状态,等待新的任务到来。