ThreadPool线程池的使用与学习
说明:部分摘自博客:https://www.cnblogs.com/dafanjoy/p/9729358.html
关于线程池:
在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式;如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
说明:Executors返回的线程池对象的弊端如下:
1) FixedThreadPool和SingleThreadPool: 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
2) CachedThreadPool: 允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
线程池的构造方法
通过线程池的构造方法可以知道,创建一个线程池都包含那些元素;
public ThreadPoolExecutor(int corePoolSize, //指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中 int maximumPoolSize, //指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量 long keepAliveTime, //当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁 TimeUnit unit, //keepAliveTime的单位 BlockingQueue<Runnable> workQueue, //任务队列 ThreadFactory threadFactory, //线程工厂 RejectedExecutionHandler handler //拒绝策略 ) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
代码测试
package com.linwei.thread; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * @author: Linwei * @date 2021/6/10 * @Description: */ public class ThreadPoolDemo { private static ExecutorService pool; public static void main(String[] args){ pool = new ThreadPoolExecutor(1, 3, 1000, TimeUnit.MILLISECONDS, /** 任务队列类型1:直接提交队列:它没有容量,达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略 */ // new SynchronousQueue<Runnable>(), /** 任务队列类型2:有界任务队列:队列未满情况下线程数将一直维持在corePoolSize以下,队列已满则会以maximumPoolSize为最大线程数上限, * 提交的线程数到达最大负荷时(5+3=8)执行拒绝策略; */ new ArrayBlockingQueue<Runnable>(5), /** 任务队列类型3:*任务队列:线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量, * 也就是说在这种情况下maximumPoolSize这个参数是无效的,使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制, * 不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。 */ // new LinkedBlockingQueue<Runnable>(), /** * 任务队列类型4:优先级任务队列(*):PriorityBlockingQueue它其实是一个特殊的*队列,它其中无论添加了多少个任务, * 线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务, * 而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。 */ // new PriorityBlockingQueue<Runnable>(), // Executors.defaultThreadFactory(), // 线程工厂:默认工厂 new MyThreadFactory("Linwei"), // 自定义线程工厂 new ThreadPoolExecutor.AbortPolicy()// 拒绝策略:该策略会直接抛出异常,阻止系统正常工作; ); for (int i=0;i<8;i++){ pool.execute(new MyThread()); // pool.execute(new MyCompareThread((i))); // 配合优先级队列策略测试 } pool.shutdown(); // 不发生拒绝策略情况下,不执行该语句进程一致挂起 } } // 普通线程类 class MyThread implements Runnable{ @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("-->当前工作线程: " + Thread.currentThread().getName()); } } // 带优先级的线程类 class MyCompareThread implements Runnable,Comparable<MyCompareThread>{ // 定义一个标识优先级的属性 private int priority; public MyCompareThread(int priority) { this.priority = priority; } public int getPriority() { return priority; } public void setPriority(int priority) { this.priority = priority; } /** * 当前对象和其他对象做比较,当前优先级大就返回-1,优先级小就返回1,值越大优先级越高; * 如果想要值越小优先级越高,把大于号换成小于号即可; * @param o * @return */ @Override public int compareTo(MyCompareThread o) { return this.priority > o.priority ? -1 : 1; } @Override public void run() { //让线程阻塞,使后续任务进入缓存队列 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("priority:"+this.priority+",ThreadName:"+Thread.currentThread().getName()); } } // 自定义线程工厂 class MyThreadFactory implements ThreadFactory{ private final String namePrefix; private final AtomicInteger nextId = new AtomicInteger(1); public MyThreadFactory(String group) { this.namePrefix = "From MyThreadFactory's " + group + "-worker-"; } @Override public Thread newThread(Runnable task) { String name = namePrefix + nextId.getAndIncrement(); System.out.println("-->工厂创建新线程:"+name); Thread thread = new Thread(task,name); return thread; } }
关于任务队列
- 直接提交队列SynchronousQueue<Runnable>():当任务队列为SynchronousQueue,创建的线程数大于maximumPoolSize时,直接执行了拒绝策略抛出异常。
- 有界任务队列ArrayBlockingQueue<Runnable>(5):队列未满情况下线程数将一直维持在corePoolSize以下,队列已满则会以maximumPoolSize为最大线程数上限,提交的线程数到达最大负荷时(5+3=8)执行拒绝策略;
- *队列LinkedBlockingQueue<Runnable>():线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。
- 优先级队列(*)PriorityBlockingQueue<Runnable>():PriorityBlockingQueue它其实是一个特殊的*队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。
关于拒绝策略
一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要你指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略,来处理线程池"超载"的情况。ThreadPoolExecutor自带的拒绝策略如下:
1、AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作;
2、CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行;
3、DiscardOledestPolicy策略:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交;
4、DiscardPolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失;
自定义线程工厂
创建线程或线程池时请指定有意义的线程名称,方便出错时回溯。
// 自定义线程工厂 class MyThreadFactory implements ThreadFactory{ private final String namePrefix; private final AtomicInteger nextId = new AtomicInteger(1); public MyThreadFactory(String group) { this.namePrefix = "From MyThreadFactory's " + group + "-worker-"; } @Override public Thread newThread(Runnable task) { String name = namePrefix + nextId.getAndIncrement(); System.out.println("-->工厂创建新线程:"+name); Thread thread = new Thread(task,name); return thread; } }