ThreadPoolExecutor线程池详解

文章目录

一、基本概念

ThreadPoolExecutor类实现了ExecutorService接口和Executor接口,可以设置线程池corePoolSize,最大线程池大小,BlockingQueue,AliveTime,拒绝策略等。

常用构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
}
参数 含义
corePoolSize 线程池维护线程的最少数量
maximumPoolSize 线程池维护线程的最大数量
keepAliveTime 线程池维护线程所允许的空闲时间
unit 线程所允许的空闲时间单位
workQueue 线程池所使用的缓冲队列,属于BlockingQueue的具体实例
handler 线程池对拒绝任务的处理策略

二、缓冲队列BlockingQueue

类型 说明
ArrayBlockingQueue 是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序
LinkedBlockingQueue 一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue,静态工厂方法Executors.newFixedThreadPool()使用了这个队列
SynchronousQueue 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
PriorityBlockingQueue 一个具有优先级的无限阻塞队列

三、饱和策略RejectedExecutionHandler

类型 说明
AbortPolicy 任务队列满后,新的任务将被丢弃,并抛出异常
CallerRunsPolicy 只用调用者所在线程来运行任务
DiscardOldestPolicy 丢弃队列里最近的一个任务,并执行当前任务
DiscardPolicy 不处理,直接丢弃任务
也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务

四、线程池的工作方式

1、如果此时线程池中的数量小于corePoolSize,即便线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

2、如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。

3、如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,则创建新的线程来处理被添加的任务。

4、如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过handler所指定的策略来处理此任务。

处理任务的优先级为:
核心线程corePoolSize --> 任务队列workQueue --> 最大线程maximumPoolSize --> 使用handler饱和策略;

五、代码演示

核心线程数:3,最大线程数:5,缓冲队列:10,饱和策略:AbortPolicy

package com.wzl.executor;

/**
 * 工作类
 */
public class Worker implements Runnable {

    int id;

    public Worker(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        System.out.println("Index: " + id + " is running , Thread Name:" + Thread.currentThread().getName());
        try {
            Thread.sleep(6000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
package com.wzl.executor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test {

    private static final Logger log = LoggerFactory.getLogger(Test.class);

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                3,
                5,
                1 * 60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                new ThreadPoolExecutor.AbortPolicy()//饱和策略,AbortPolicy:任务队列满后,新的任务将被丢弃,并抛出异常
        );

        for (int i = 0; i < 100; i++) {
            try {
                threadPoolExecutor.execute(new Worker(i));
            } catch (Exception e) {
                log.error("发生异常,当前index:" + i, e);
                break;
            }
        }

        threadPoolExecutor.shutdown();
    }

}

运行结果 & 解答

  • Index 0、1、2 交给corePoolSize,正在执行
  • Index 3 ~ 12 进入缓冲队列(size == 10)进行等待
  • Index 13、14 交给maximumPoolSize扩展的2个线程进行处理,正在执行
  • Index 15 基于饱和策略,进行处理,这里是拒绝任务,抛出异常
  • Index 3 ~ 12 进入缓冲队列(size == 10)进行等待,有空闲线程后进行处理

ThreadPoolExecutor线程池详解

上一篇:多线程 JDK线程池详解


下一篇:线程池