线程池(二)--ThreadPoolExecutor
构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数解释
- corePoolSize:线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务将会被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
- maximumPoolSize:线程池中允许的最大线程数,如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于
maximumPoolSize - keepAliveTime:线程池维护线程所允许的空闲时间,当线程池中的线程数量大于corePoolSize的时候,这时候没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,知道等待的时间超过了keepAliveTime;
- unit:KeepAliveTime的单位
- workQueue:用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下几种阻塞队列:
1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有优先级的*阻塞队列; - threadFactory:他是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
- handler:线程池饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
1.AbortPolicy:直接抛出异常,默认策略。
2.calleRunsPolicy:用调用者所在的线程来执行任务。
3.DiscardOldestPolicy:丢弃阻塞队列中最靠前的任务,并职执行当前任务
4.DiscardPolicy:直接丢弃任务;
上面4种策略都是ThreadPoolExecutor的内部类。同时也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
API
public void execute(Runnable command)
Future<?> submit(Runnable task);
这两个API都是用来做任务提交,第一个方法提交任务无返回值,第二个方法任务执行完有返回值。
线程池监控:
public long getTaskCount() //线程池已执行与未执行的任务总数
public long getCompletedTaskCount() //已完成的任务数
public int getPoolSize() //线程池当前的线程数
public int getActiveCount() //线程池中正在执行任务的线程数量
线程池原理图
当有多个任务提交到线程池中时,会先使用核心线程,当核心线程满载以后任务进入阻塞队列中,核心线程执行任务完成后会从阻塞队列中拿取任务执行,当核心线程和阻塞队列任务都满载就会使用非核心线程执行任务
,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,就会使用饱和策略。
ThreadPoolExecutor提交线程例子
public class Test implements Runnable {
Integer index;
public Test(Integer index) {
this.index = index;
}
@SneakyThrows
@Override
public void run() {
System.out.println(Thread.currentThread().toString()+" index:"+index+" time:"+System.currentTimeMillis()+"执行开始");
Thread.sleep(10000);
}
}
public class Testcase {
private static final int QUEUE_CAPACITY = 2;
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 6, 1000, TimeUnit.DAYS, new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());
for(int i=0;i<13;i++){
threadPoolExecutor.execute(new Test(i));
}
threadPoolExecutor.shutdown();
}
}
执行结果
Thread[pool-1-thread-3,5,main] index:2 time:1637030677612执行开始
Thread[pool-1-thread-2,5,main] index:1 time:1637030677612执行开始
Thread[pool-1-thread-4,5,main] index:5 time:1637030677612执行开始
Thread[main,5,main] index:8 time:1637030677612执行开始
Thread[pool-1-thread-1,5,main] index:0 time:1637030677612执行开始
Thread[pool-1-thread-5,5,main] index:6 time:1637030677612执行开始
Thread[pool-1-thread-6,5,main] index:7 time:1637030677612执行开始
Thread[main,5,main] index:9 time:1637030687627执行开始
Thread[pool-1-thread-6,5,main] index:3 time:1637030687627执行开始
Thread[pool-1-thread-4,5,main] index:4 time:1637030687627执行开始
Thread[main,5,main] index:12 time:1637030697638执行开始
Thread[pool-1-thread-5,5,main] index:11 time:1637030697638执行开始
Thread[pool-1-thread-1,5,main] index:10 time:1637030697638执行开始
以上创建了一个核心线程为3个,非核心线程为3个,阻塞队列长度为2,饱和策略为calleRunsPolicy的线程池,并同时提交了13个任务,从结果可以看出当线程池任务饱和后,会调用当前主线程处理任务。