1. 线程池优点
- 线程复用、控制最大并发数、管理线程
- 降低资源的消耗
- 提高响应速度
- 方便管理
2. Executors创建线程池的方法
Executors创建线程池实例常用的几种方法
-
new SingleThreadExecutor()
创建单个线程的执行程序。 -
new CachedThreadPool()
容量大小可变化。 -
new FixedThreadPool(int nThreads)
容量大小固定 -
new ScheduThreadPool()
创建一个定时任务线程池
线程池关闭的方法pool.shutdown()
3. 为什么线程池不允许使用Executors去创建?
阿里巴巴java开发手册有这么一句话:“线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险”
因为Integer.MAX_VALUE非常大(2的32次方-1),可以认为是可以无限创建线程的,在服务器资源有限的情况下容易引起OOM异常
4. 使用new ThreadPoolExecutor 创建线程池
使用 new ThreadPoolExecutor()创建一个想线程池,其中默认使用
线程工厂 Executors.defaultThreadFactory()
、
拒绝策略 new ThreadPoolExecutor.AbortPolicy()
(拒绝策略会抛出异常 RejectedExecutionException
)
5. ThreadPoolExecutor 的源码(重点)
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
5.1 七大参数(重点在这里)
- 核心线程数 :
corePoolSize
池中要保留的线程数,即使它们处于空闲状态,除非设置了 {@code
allowCoreThreadTimeOut} - 最大线程数:
maximumPoolSize
池中允许的最大线程数 - 空闲存活时间:
keepAliveTime
当线程数较大时与核心相比,这是多余空闲线程在终止之前等待新任务的最长时间。 - 超时单位:
unit
{@code keepAliveTime} 参数的时间单位 - 阻塞队列:
workQueue
用于在执行任务之前保存任务的队列。该队列将仅保存由 {@code execute} 方法提交的
{@code Runnable} 任务。 - 线程工厂 :
threadFactory
执行程序创建新线程时使用的工厂 - 拒接策略:
handler
执行被阻塞时使用的处理程序,因为达到了线程边界和队列容量 @throws
IllegalArgumentException 如果以下情况之一成立: {corePoolSize
(核心线程数) < 0} {keepAliveTime
(空闲存活时间)< 0} {maximumPoolSize
(最大线程数)<= 0}
{maximumPoolSize
(最大线程数)<corePoolSize
(核心线程数)} @throws
NullPointerException if {@code workQueue} 或 {@code threadFactory}或
{@code handler} 为空
理解举例子:
场景为银行业务
人进入银行----》任务进入线程池
窗口 —》线程
办业务的人 ----》任务
等候区—》阻塞队列
参数理解
核心线程数:默认一直会打开的窗口(其他窗口可能有问题关闭)
最大线程数:全部的窗口,一般不会全部打开,只有人满了 并且 等候区也满了(对应 核心线程、阻塞队列 都满的情况),才会开启最大线程数。
拒绝策略:当 全部窗口 也全被占用了,这时候再来人,就会被拒绝(对应再有新任务就会被拒绝)
空闲存活时间:当 窗口一直空闲没人,超过设置的时间就会关闭
6. ThreadPoolExecutor 的举例
public static void main(String[] args) {
/**
* 使用原生的方式创建 线程池
*/
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,
5,
10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
try {
for (int i = 1; i <= 8; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() );
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
/*
核心线程 2
最大线程 5
阻塞队列 3
====测试1====
执行任务 3
输出:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1,只开启了核心线程1,2
====测试2====
执行任务 5
输出:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2,只开启了核心线程1,2
====测试3====
执行任务 6
输出:
pool-1-thread-2
pool-1-thread-3
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3 除了开启了核心线程,还另外开了一个线程3
pool-1-thread-1
====测试4====
执行任务 8
输出:
pool-1-thread-2
pool-1-thread-5
pool-1-thread-4
pool-1-thread-3
pool-1-thread-1
pool-1-thread-5
pool-1-thread-4
pool-1-thread-2
除了开启了核心线程,还另外开了三个线程3、4、5
====测试4====
执行任务 9
输出:
pool-1-thread-3
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-1
pool-1-thread-5,超出最大线程数8,(阻塞队列 + 最大线程数),共 8 个输出,最后一个任务被拒绝,抛出异常.RejectedExecutionException
*/
}
7. 四种拒绝策略(重点)
当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。
RejectedExecutionHandler 是拒绝策略的接口,有四个实现类
-
AbortPolicy
: 默认拒绝策略,抛出异常 -
DiscardPolicy
:不会抛出异常,会丢掉任务 -
DiscardOldestPolicy
:当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入 -
CallerRunsPolicy
:从哪个线程来的,哪个线程处理,即main线程处理
总结:
四种拒绝策略是相互独立无关的,一般直接使用 ExecutorService 的时候,都是使用的默认的 defaultHandler
,也即 AbortPolicy
策略。
8. 四种拒绝策略的例子
public static void main(String[] args) {
/**
* 拒绝策略
*/
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,
5,
10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
try {
for (int i = 1; i <= 9; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() );
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
/*
核心线程 2
最大线程 5
阻塞队列 3
====测试1(拒绝策略是AbortPolicy)====
执行任务 9
输出:
pool-1-thread-1
pool-1-thread-5
pool-1-thread-3
pool-1-thread-2
pool-1-thread-5
pool-1-thread-3
pool-1-thread-4
pool-1-thread-1
超出最大线程数8,(阻塞队列 + 最大线程数),共 8 个输出,
最后一个任务被拒绝,抛出异常.RejectedExecutionException
====测试2(拒绝策略是DiscardPolicy)====
执行任务 9
输出:
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
pool-1-thread-1
pool-1-thread-5
pool-1-thread-4
pool-1-thread-3
pool-1-thread-2
超出最大线程数8,(阻塞队列 + 最大线程数),共 8 个输出,
不会抛出异常,会丢掉任务(上面就丢了一个任务)
====测试3(DiscardOldestPolicy)====
执行任务 9
输出:
pool-1-thread-1
pool-1-thread-4
pool-1-thread-3
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-1
pool-1-thread-5
超出最大线程数8,(阻塞队列 + 最大线程数),共 8 个输出,
只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
====测试4(CallerRunsPolicy)====
执行任务 9
输出:
main
pool-1-thread-4
pool-1-thread-2
pool-1-thread-3
pool-1-thread-3
pool-1-thread-1
pool-1-thread-2
pool-1-thread-4
pool-1-thread-5
超出最大线程数8,(阻塞队列 + 最大线程数),共 9 个输出,
由main线程+阻塞队列 + 最大线程数执行
*/
}
9.最大线程数应该怎么设置?
获取CPU核心数:
System.out.println("查看本机核心数:"+Runtime.getRuntime().availableProcessors());
两种方式:
- CPU密集型:参考值可以设为核心数+1。和本机核心数保持一致,CPU性能最高。
《Java并发编程实践》这么说:
计算密集型的线程恰好在某时因为发生一个页错误或者因其他原因而暂停,刚好有一个“额外”的线程,可以确保在这种情况下CPU周期不会中断工作。
所以 Ncpu+1 是一个经验值。
- IO密集型:参考值可以设为2倍的核心数。根据程序中大型IO耗时线程,保证大于等于。
问题一:
假如一个程序平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么最佳的线程数应该是?
最佳线程数目 = (线程等待时间/线程CPU时间 + 1)* CPU数目
根据上面这个公式估算得到最佳的线程数:((0.5+1.5)/0.5)*8=32个线程。
问题二:
假如在一个请求中,计算操作需要5ms,DB操作需要100ms,对于一台8个CPU的服务器,总共耗时100+5=105ms,而其中只有5ms是用于计算操作的,CPU利用率为5/(100+5)。使用线程池是为了尽量提高CPU的利用率,减少对CPU资源的浪费,假设以100%的CPU利用率来说,要达到100%的CPU利用率,又应该设置多少个线程呢?
根据上面这个公式估算得到最佳的线程数:(100/5+1)*8=168 个线程。
10.在Springboot中使用自定义线程池(实战重点)
第一步:配置线程池
package com.lin.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync //开启多线程
public class ThreadPoolConfig {
@Bean("taskExecutor")
public Executor asyncServiceExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(5);
// 设置最大线程数
executor.setMaxPoolSize(20);
//配置队列大小
executor.setQueueCapacity(Integer.MAX_VALUE);
// 设置线程空闲时间,当超过核心线程之外的线程在空闲到达之后会被销毁(秒)
executor.setKeepAliveSeconds(60);
// 设置默认线程名称
executor.setThreadNamePrefix("redisCountThread");
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
//拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//执行初始化
executor.initialize();
return executor;
}
}
第二步:为@Async指定线程池
package com.lin.service;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class ThreadService {
//期望此操作在线程池执行 ,不会影响原有的主线程
@Async("taskExecutor") //要和线程池bean中的对应
try {
Thread.sleep(5000);
System.out.println("更新完成了....");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
适应场景:
比如在写操作还需要读操作时,因为写操作是写锁,会阻塞其他的读操作,性能就会比较低,利用线程池处理写操作,就与主线程读操作没有关系了,提高了性能。