1.ThreadPoolExecutor概述
《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险;
Executors 返回线程池对象的弊端如下:
-
FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM。
-
CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
ThreadPoolExecutor 是 JDK 中的线程池实现,这个类实现了一个线程池需要的各个方法,它实现了任务提交、线程管理、监控等等方法。
创建线程池主要是 ThreadPoolExecutor 类来完成。
1.1 ThreadPoolExecutor
类的构造方法
ThreadPoolExecutor
类的构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
ThreadPoolExecutor中3 个最重要的参数:
(1) corePoolSize :核心线程数,定义了最小可以同时运行的线程数量。
(2) maximumPoolSize: 线程不够用时能够创建的最大线程数。
(3) workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
(4) keepAliveTime:当线程池中的线程数量大于 `corePoolSize` 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
(5) unit : keepAliveTime 参数的时间单位。
1.2 线程池的工作过程
(1)线程池刚创建时,里面没有一个线程,任务队列作为参数传入。此时,即使任务队列中有任务,线程池也不会马上执行他们。
(2)当调用 execute() 方法添加一个任务时,线程池会做如下判断:
-
如果正在运行的线程数 < corePoolSize,那么创建线程执行任务;
-
如果正在运行的线程数 >= corePoolSize,那么将任务放入任务队列;
-
如果队列满了,并且正在运行的线程数 < maximumPoolSize,那么将创建非核心线程执行任务;
-
如果队列满了,并且正在运行的线程数 >= maximumPoolSize,那么线程池会抛出异常
RejectExecutionExcaption
(3)当一个线程完成任务后,在从队列中取出一个任务来执行
(4)如果一个线程空闲超过一定时间(keepAlivTime),线程池会判断,如果正在运行的线程数 > corePoolSize,则回收该线程。线程池在任务执行完后,线程数会维持在 corePoolSize 的大小。
2.ThreadPoolExecutor使用案例
ThreadPoolController.java
/** * @Author lucky * @Date 2022/3/1 16:48 */ @Slf4j @RestController @RequestMapping("/testThreadPool") public class ThreadPoolController { private static final int CORE_POOL_SIZE=500; private static final int MAX_POOL_SIZE=600; private static final int QUEUE_CAPACITY=600; private static final Long KEEP_ALIVE_TIME=1L; @PostMapping("/uploadFileInner") public void uploadFileInner(@RequestParam int threadNum){ //01 使用ThreadPoolExecutor创建线程池 ThreadPoolExecutor executor=new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_CAPACITY)); //02 创建指定threadNum个数的线程,利用CountDownLatch模拟并发 final CountDownLatch start=new CountDownLatch(threadNum); for (int i = 0; i <threadNum ; i++) { ConcurrentUploadThread concurrentUploadThread=new ConcurrentUploadThread(start); executor.execute(concurrentUploadThread); } //03 关闭线程池 executor.shutdown(); } }
ConcurrentUploadThread.java
/** * @Author lucky * @Date 2022/3/1 17:22 */ @Slf4j public class ConcurrentUploadThread implements Runnable { private final CountDownLatch startSignal; public ConcurrentUploadThread(CountDownLatch startSignal){ this.startSignal=startSignal; } @Override public void run() { startSignal.countDown(); log.info(Thread.currentThread().getName()+",prepare at:"+System.currentTimeMillis()); try { startSignal.await(); doTask(); } catch (InterruptedException e) { e.printStackTrace(); } } private void doTask() { try { Random random=new Random(); int temp = random.nextInt(500); Thread.sleep(2000+temp); log.info(Thread.currentThread().getName()+"..............."); } catch (InterruptedException e) { e.printStackTrace(); } } }
Postman测试:
参考文献:
https://blog.csdn.net/qielanyu_/article/details/115614762