Java 线程池
前言:啊里巴巴开发手册2021)
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool
和 SingleThreadPool
: 允许的请求队列长度为 Integer.MAX_VALUE
,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool
: 允许的创建线程数量为Integer.MAX_VALUE
,可能会创建大量的线程,从而导致 OOM。
java自带的线程池工具类(不推荐使用)
newCachedThreadPool
源码
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
创建newCacheThreadPool
线程池:
ExecutorService executorService = Executors.newCachedThreadPool();
Executors.newCachedThreadPool()
创建出来的newCachedThreadPool
实际上是创建一个ThreadPoolExecutor
(线程池执行器)类
ThreadPoolExecutor
参数说明:
- corePoolSize:核心线程数
- maxnumPoolSize:最大线程数
- keepAliveTime:线程的最大生命周期,这里的生命周期有两个约束条件:==一:该参数针对的是超过corePoolSize数量的线程;二:处于非运行状态的线程。==即非核心线程数存活时间(非核心线程数 = 最大线程数 - 核心线程数)。
- unit:时间单位
- workQueue:任务队列。当线程池中的线程都处于运行状态,而此时任务数量继续增加,则需要一个容器来容纳这些任务,这就是任务队列。
其中,Interger.MAX_VALUE = 231 - 1,即线程池中核心线程为 0 个,非核心线程最大为 231 - 1(而不是初始化就有这么多),空闲的非核心线程生命周期为60s。任务队列为SynchronousQueue,这个队列是无法插入任务的,一有任务立即执行,所以newCachedThreadPool比较适合任务量大但耗时少的任务。
SynchronousQueue特点
- SynchronousQueue没有容量。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
- 因为没有容量,所以对应的peek、contains、clear、isEmpty…等方法其实是无效的。例如clear是不执行任何操作的,contains始终返回false,peek始终返回null。
- SynchronousQueue分为公平和非公平,默认情况下采用非公平性访问策略,当然也可以通过构造函数来设置为公平性访问策略(为true即可)。
- SynchronousQueue内部采用了无锁实现(CAS)
执行过程
一旦有任务 ,newCachedThreadPool就会不断创建非核心线程来执行任务,执行任务结束的线程会重新回到线程池等待复用。
优缺点
优点
- 效率快
缺点
- 比较消耗cpu
- 可能存在呢内存泄露(oom)问题,当线程处理业务耗时较长时,对于新的任务,线程池会不断创建新的非核心线程来处理任务,最后导致内存溢出
newFixThreadPool
源码
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
创建newFixedThreadPool
线程池:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(n); // 参数核心n代表线程数量,也代表最大线程数量
newFixedThreadPool的核心线程数量和最大线程数量为传入的n
,也就是说非核心线程数量为 0
,非运行状态的非核心线程存活时间是0 ms
,任务队列是LinkedBlockingQueue(链表实现的有界队列,默认构造方法的队列长度是Integer.MXA_VALUE
)
执行过程
newFixedThreadPool线程池执行任务时,如果线程池内存在空闲的核心线程,则用核心线程执行任务,如果核心线程都不空闲,此时还有任务产生,则将任务存储到LinkedBlockingQueue队列中,等待核心线程执行任务后,再从队列中取出任务执行
优缺点
- 执行速度适中
- 当任务过多的时候,可能存在oom问题
newSingleThreadPool
源码
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
创建newSingleThreadPool
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
newSingleThreadPool的核心线程数量和最大线程数量为传入的1
,也就是说非核心线程数量为 0
,非运行状态的非核心线程存活时间是0 ms
,任务队列是LinkedBlockingQueue(链表实现的有界队列,默认构造方法的队列长度是Integer.MXA_VALUE
)
执行过程
newSingleThreadPool线程池执行任务时,只有唯一的核心线程在执行任务,核心线程执行任务结束,再从队列中取出任务执行
优缺点
- 速度最慢
- 当任务过多的时候,可能存在oom问题
周期性线程池 newScheduledThreadPool
常用于需要延迟执行或周期循环执行任务的场景
源码
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
构造方法
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
-
threadFactory
:执行器创建新线程时使用的工
或者
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
周期执行:scheduleAtFixedRate()
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
- command:任务线程
- initialDelay:初始执行延迟时间
- period:执行周期
- unit:时间单位,如果任务执行时间大于执行周期,则任务执行完成后立马开始执行下一轮任务。
demo
package com.zhan.threadPool;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Demo {
public static void main(String[] args) {
System.out.println("时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format( new Date()));
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try{
Thread.sleep(1000L);
System.out.println("时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format( new Date()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 5, 2, TimeUnit.SECONDS);
}
}
设置任务时长大于执行周期
Thread.sleep(3000L);
延迟执行:schedule()
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
demo
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
try{
//Thread.sleep(3000L);
System.out.println("时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format( new Date()));
} catch (Exception e) {
e.printStackTrace();
}
}
}, 2, TimeUnit.SECONDS);
}
执行后线程不会结束
延迟执行并返回结果:submit()
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}
执行后将执行结果放入result,并作为返回值返回
线程池执行过程
提交优先级:核心线程池 > 队列 > 非核心线程池
执行优先级:核心线程池 > 非核心线程池 > 队列
线程异常策略
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务