线程池详解
JDK类图
Executor , ExecutorService , Executors 区别
Executor | ExecutorService |
---|---|
Executor 是 Java 线程池的核心接口,用来并发执行提交的任务 | ExecutorService 是 Executor 接口的扩展,提供了异步执行和关闭线程池的方法 |
提供execute()方法用来提交任务 | 提供submit()方法用来提交任务 |
execute()方法无返回值 | submit()方法返回Future对象,可用来获取任务执行结果 |
不能取消任务 | 可以通过Future.cancel()取消pending中的任务 |
没有提供和关闭线程池有关的方法 | 提供了关闭线程池的方法 |
JDK线程池执行逻辑
在实际开发中,我们最多使用的是ThreadPoolExecutor 自定义线程池,下面就其参数最多的构造方法简单说明:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize 核心线程池大小
maximumPoolSize 最大线程池大小
keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间
TimeUnit keepAliveTime时间单位
workQueue 阻塞任务队列
threadFactory 新建线程工厂
RejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理
1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
4.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
5.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
6.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
Executors 线程池配置方案及风险
Executors.newSingleThreadExecutor();//创建一个单核心的线程池
Executors.newScheduledThreadPool(10);//创建一个按照计划规定执行的线程池
Executors.newCachedThreadPool();//创建一个自动增长的线程池
Executors.newFixedThreadPool(4);//创建一个具有固定线程数的线程池
Executors.newWorkStealingPool();//创建一个具有抢占式操作的线程池
《阿里巴巴 Java开发手册》1.6并发处理
第3条规定:线程资源必须通过线程池提供,不允许在应用中自行显式创建线程
第4条规定:线程池不允许使用Executors创建,而是通过ThreadPoolExecutor的方式创建,这样的处理方式能让编写代码的攻城狮更加明确线程池的运行规则,规避资源耗尽(OOM)的风险
之所以会出现这样的规范,是因为jdk已经封装好的线程池存在潜在风险:
- FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE ,会堆积大量请求OOM
- CachedThreadPool 和 ScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量线程OOM
所以从系统安全角度出发,原则上都应该自己手动创建线程池
Executors 部分源码
1)SingleThreadExecutor线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
可以看到使用的是ThreadPoolExecutor,并且将工作队列LinkedBlockingQueue,而LinkedBlockingQueue是*队列。
LinkedBlockingQueue是其实也算是一个有界队列,只是队列的长度为Integer.MAX_VALUE
因此SingleThreadExecutor线程池使用了*队列,当持续产生任务,来不及执行的时候,会不停的放入到工作队列(阻塞队列),由于*,最终会导致OOM,因此不推荐
2)ScheduledThreadPool线程池
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
其实DelayedWorkQueue采用的是数组实现,初始容量16,因此是一个有界队列。
虽然DelayedWorkQueue是有界队列,但由于最大线程数量设置成Integer.MAX_VALUE,CachedThreadPool线程池,可能因为阻塞队列满了,然后开启最大线程执行任务,而最大线程数量为Integer.MAX_VALUE,也可能会导致OOM异常。
3)CachedThreadPool线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
与ScheduledThreadPool线程池一样,SynchronousQueue虽然不是*队列(SynchronousQueue内部只能包含一个元素的队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取了队列中存储的元素),但最大线程池数量设置成Integer.MAX_VALUE,可能会导致OOM异常
4)FixedThreadPool线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
与SingleThreadExecutor线程池一样,LinkedBlockingQueue是*队列,很容易出现OOM异常
JDK线程池拒绝策略
常见的几种拒绝策略
所有拒绝策略都实现了接口 RejectedExecutionHandler
public interface RejectedExecutionHandler {
/**
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
这个接口只有一个 rejectedExecution 方法。
r 为待执行任务;executor 为线程池;方法可能会抛出拒绝异常。
1)AbortPolicy
此为线程池默认的拒绝策略,直接抛出拒绝异常(继承自RuntimeException),会中断调用者的处理过程,所以除非有明确需求,一般不推荐
public static class AbortPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
2)CallerRunsPolicy
在调用者线程中,运行当前被丢弃的任务。
只会用调用者所在线程来运行任务,也就是说任务不会进入线程池。
如果线程池已经被关闭,则直接丢弃该任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
3)DiscardOledestPolicy
丢弃队列中最老的,然后再次尝试提交新任务
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
这里 e.getQueue() 是获得待执行的任务队列,也就是前面提到的待业队列。
因为是队列,所以先进先出,一个poll()方法就能直接把队列中最老的抛弃掉,再次尝试执行execute®。
4)DiscardPolicy
丢弃无法加载的任务
public static class DiscardPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
自定义拒绝策略
场景:如果我们在执行异步任务时,当线程池队列已满且达到最大线程数时,就会触发线程池的拒绝策略(AbortPolicy);此时被拒绝的任务就会无法被执行,影响业务结果;此时就需要自定义线程池拒绝策略,如果将任务冲入加入队列,待线程池有空闲资源时可再执行
demo:
public class CustomThreadPoolExecutor {
private ThreadPoolExecutor pool = null;
/**
* 线程池初始化方法
*
* corePoolSize 核心线程池大小----1 maximumPoolSize 最大线程池大小----3 keepAliveTime
* 线程池中超过corePoolSize数目的空闲线程最大存活时间----30+单位TimeUnit TimeUnit
* keepAliveTime时间单位----TimeUnit.MINUTES workQueue 阻塞队列----new
* ArrayBlockingQueue<Runnable>(5)====5容量的阻塞队列 threadFactory 新建线程工厂----new
* CustomThreadFactory()====定制的线程工厂 rejectedExecutionHandler
* 当提交任务数超过maxmumPoolSize+workQueue之和时,
* 即当提交第41个任务时(前面线程都没有执行完,此测试方法中用sleep(100)), 任务会交给RejectedExecutionHandler来处理
*/
public void init() {
pool = new ThreadPoolExecutor(10, 30, 30, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(10),
new CustomThreadFactory(), new CustomRejectedExecutionHandler());
}
public void destory() {
if (pool != null) {
pool.shutdownNow();
}
}
public ExecutorService getCustomThreadPoolExecutor() {
return this.pool;
}
private class CustomThreadFactory implements ThreadFactory {
private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
System.out.println(threadName);
t.setName(threadName);
return t;
}
}
private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录异常
// 报警处理等
// try {
// executor.getQueue().put(r);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.out.println("error.............");
}
}
// 测试构造的线程池
public static void main(String[] args) {
CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();
// 1.初始化
exec.init();
ExecutorService pool = exec.getCustomThreadPoolExecutor();
for (int i = 1; i < 100; i++) {
System.out.println("提交第" + i + "个任务!");
pool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("running=====");
}
});
}
// 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了
// exec.destory();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
注意:41以后提交的任务就不能正常处理了,因为,execute中提交到任务队列是用的offer方法,如上面代码,这个方法是非阻塞的,所以就会交给CustomRejectedExecutionHandler
来处理,所以对于大数据量的任务来说,这种线程池,如果不设置队列长度会OOM,设置队列长度,会有任务得不到处理,接下来我们构建一个阻塞的自定义线程池
查看ThreadPoolExecutor 源码execute 方法实现
所以解决此问题,可以自定义拒绝策略
当提交任务被拒绝时,进入拒绝机制,我们实现拒绝方法,把任务重新用阻塞提交方法put提交,实现阻塞提交任务功能,防止队列过大,OOM,提交被拒绝方法在下面
总结:
1、用ThreadPoolExecutor自定义线程池,看线程是的用途,如果任务量不大,可以用*队列,如果任务量非常大,要用有界队列,防止OOM
2、如果任务量很大,还要求每个任务都处理成功,要对提交的任务进行阻塞提交,重写拒绝机制,改为阻塞提交。保证不抛弃一个任务
3、最大线程数一般设为2N+1最好,N是CPU核数
4、核心线程数,看应用,如果是任务,一天跑一次,设置为0,合适,因为跑完就停掉了,如果是常用线程池,看任务量,是保留一个核心还是几个核心线程数
5、如果要获取任务执行结果,用CompletionService,但是注意,获取任务的结果的要重新开一个线程获取,如果在主线程获取,就要等任务都提交后才获取,就会阻塞大量任务结果,队列过大OOM,所以最好异步开个线程获取结果