线程池
1.工具类实现
/**
* @title: ThreadUtil
* @Author bao
* @description
* @Date: 2021/12/2216:07
*/
public class ThreadUtil {
private static volatile ExecutorService executorService;
//获取单例对象
public static ExecutorService getInstance(){
if (executorService == null) {
synchronized (ThreadPoolManager.class) {
if (executorService == null) {
int cpuNum = Runtime.getRuntime().availableProcessors();// 获取处理器数量
int threadNum = cpuNum * 2;// 根据cpu数量,计算出合理的线程并发数
executorService = Executors.newFixedThreadPool(threadNum);
}
}
}
return executorService;
}
//ceshi
public static void main(String[] args) {
ExecutorService instance = ThreadUtil.getInstance();
long taskCount = 0;
for (int i = 0; i < 100; i++) {
int j = i;
ThreadUtil.getInstance().execute(()->{
System.out.println(j);
});
taskCount = ((ThreadPoolExecutor)instance).getActiveCount();
System.out.println("taskCount:"+taskCount);
}
}
}
线程池监控:
long activeCount = ((ThreadPoolExecutor)instance).getActiveCount();
-
taskCount 任务的数量
-
completedTaskCount 运行的过程中完成的任务数量
-
largestPoolSize 曾经创建过的最大的线程数量
-
getPoolSize 线程数量
-
getActiveCount 获取活动的线程数
-
扩展线程池:beforeExecute、afterExecute 在线程执行前,执行后做点什么
自定义线程池
工具类
/**
* @title: ThreadUtil
* @Author bao
* @description
* @Date: 2021/12/2215:39
*/
public class ThreadManager {
public static volatile ThreadPool instance;
// 获取单例的线程池对象
public static ThreadPool getInstance() {
if (instance == null) {
synchronized (ThreadPoolManager.class) {
if (instance == null) {
int cpuNum = Runtime.getRuntime().availableProcessors();// 获取处理器数量
int threadNum = cpuNum * 2;// 根据cpu数量,计算出合理的线程并发数
instance = new ThreadPool(threadNum, threadNum+1, Integer.MAX_VALUE);
}
}
}
return instance;
}
public static class ThreadPool {
private ThreadPoolExecutor mExecutor;
private int corePoolSize;
private int maximumPoolSize;
private long keepAliveTime;
private ThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.keepAliveTime = keepAliveTime;
}
public void execute(Runnable runnable) {
if (runnable == null) {
return;
}
if (mExecutor == null) {
mExecutor = new ThreadPoolExecutor(corePoolSize,// 核心线程数
maximumPoolSize, // 最大线程数
keepAliveTime, // 闲置线程存活时间
TimeUnit.MILLISECONDS,// 时间单位
new LinkedBlockingDeque<Runnable>(Integer.MAX_VALUE),// 线程队列
Executors.defaultThreadFactory(),// 线程工厂
new ThreadPoolExecutor.AbortPolicy() {// 队列已满,而且当前线程数已经超过最大线程数时的异常处理策略
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
super.rejectedExecution(r, e);
}
}
);
}
mExecutor.execute(runnable);
}
}
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
int j = i;
ThreadManager.getInstance().execute(()->{
System.out.println(j);
});
}
}
}
参数
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
-
corePoolSize:线程池的基本大小。 提前调用prestartAllCoreThreads(),会把所有的基本线程启动 。
-
workQueue: ⽤于保存等待执⾏的任务的阻塞队列。
-
ArrayBlockingQueue 基于数组实现的(先进先出)。
-
LinkedBlockingQueue 吞吐量要高于ArrayBlockingQueue。
-
SynchronousQueue 吞吐量要高于LinkedBlockingQueue 不存储元素的阻塞队列,得等一个线程做移除操作才能继续进行,要不会一直阻塞。
-
PriorityBlockingQueue 具有优先级的无限阻塞队列。
-
-
maximumPoolSize: 线程池允许创建的最⼤线程数。
-
threadFactory: ⽤于设置创建线程的工厂可以使用谷歌的开源方法。
-
handler: 饱和策略,阻塞队列和我们的线程的创建数都满了的时候就会饱和选择一个策略对新提交的策略进行处理。
-
AbortPolicy 直接抛出异常。
-
CallerRunsPolicy 只用调用者所在的线程来处理任务。
-
DiscardOldestPolicy 丢弃队列里最近的一个任务。
-
DiscardPolicy 直接丢弃。
-
⾃定义 自己定义一个处理方式。
-
-
keepAliveTime:线程池的⼯作线程空闲后,保持存活的时间。
-
unit:线程活动保持时间的单位。
线程数选择
-
CPU密集型 : N cpu + 1 配置尽可能小的线程,线程数要少一点,减少cpu频繁的上下文切换,提高cpu的利用率
-
IO 密集型 :2 * N cpu 需要配置尽可能多的线程,这样才能保证cpu能被充分的利用
-
混合型 :拆分成CPU密集型和IO密集型
-
N = Runtime.getRuntime().availableProcessors()