线程池:业务代码常见的问题
在程序中,我们会使用各种池优化缓存创建昂贵的对象,比如线程池、连接池、内存池。一般是预先创建一些对象放入池中,使用的时候直接取出使用,用完归还以便复用,还会通过一定策略调整池中缓存的对象数量,实现动态伸缩。
由于线程的创建比较昂贵,随意、没有控制地创建大量线程会造成性能问题,因此短平快的任务一般优先考虑使用线程池来处理,而不是直接创建线程
1. 线程池的声明需要手动进行
Java 中的 Executors 类定义了一些快捷的工具方法,来帮助我们快速创建线程池。《阿里 巴巴 Java开发手册》中提到,禁止使用这些方法来创建线程池,而应该手动 new ThreadPoolExecutor来创建线程池。这一条规则的背后,是大量血淋淋的生产事故,最典 型的就是 newFixedThreadPool 和 newCachedThreadPool,可能因为资源耗尽导致 OOM 问题。
阿里巴巴文档:
测试 OOM问题 :
- 来初始化一个单线程的 FixedThreadPool,循环 1 亿次向线程池提
交任务,每个任务都会创建一个比较大的字符串然后休眠一小时
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@Slf4j
public class threadPooltest extends RecordLogApplicationTest {
/**
* 测试 OOM问题
*/
@Test
public void threadPooltest() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(100000);
System.out.println("开始执行");
for (int i = 0; i < 100000000; i++) {
executorService.execute(() -> {
String payload = IntStream.rangeClosed(1, 1000000)
.mapToObj(__ -> "a").collect(Collectors.joining("")) + UUID.randomUUID().toString();
System.out.println("等待一小时开始");
try {
TimeUnit.HOURS.sleep(1);
} catch (Exception e) {
log.info(payload);
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
}
}
结果:java.lang.OutOfMemoryError 错误
首先我们看下 newFixedThreadPool 方法的源码,发现,线程池的工作队列直接 new 了一个 LinkedBlockingQueue,
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
点进去查看 LinkedBlockingQueue构造方法 是一个 Integer.MAX_VALUE长度的队列,可以认为是*的
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
虽然 newFixedThreadPool 可以把工作线程控制在固定的数量上,但任务队列是*但。如果任务较多并且执行较慢但话,队列可能会快速积压,撑爆内存导致OOM
测试newCachedThreadPool
如果我们把 newFixedThreadPool 改成 newCachedThreadPool方法来获取线程池。程序运行不久后,同样会看到 OOM 异常
java.lang.OutOfMemoryError: unable to create new native thread
源码:
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
这种线程池的最大线程数是 Integer.MAX_VALUE ,认为是没有上限的,可以认为是没有上限的,而其工作队列 SynchronousQueue 是一个没有存储空间的阻塞队列。这意味着,只要有请求到来,就必须找到一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的。由于我们的任务需要一小时才能完成,大量的任务进来后会创建大量的线程,我们知道线程是分配一定的内存空间做为线程栈,比如 1MB,因此无限创建线程必然会导致OOM
我们不建议使用 Executors 提供的两种快捷的线程池,原因如下:
- 我们需要根据自己的场景、并发情况来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合要求,一般都需要设置有界的工作队列和 可控的线程数。
- 任何时候,都于根伟自定义线程池指定有意思的名称,以方便排查问题。当出现线程数量暴增、线程死锁、线程占用大量CPU 、线程执行出现异常等问题时,我们往往会抓取线程栈,此时,有意义的线程名称,就可以方便我们定位问题。
总结 线程池工作行为:
- 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
- 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
- 如果队列已经满了,则在总线程数不大于maximumPoolSize的前提下,则创建新的线程
- 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
- 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
2.确认线程池是否在复用
- 在生产环境中,监控一直报警当前使用线程数太多,一会又将下来,但是当前用户访问量也不是很大
通过代码排查 发现项目中使用了 Executors.newCachedThreadPool(); 创建线程池使用,我们知道newCachedThreadPool 会在需要时创建必要多的线程,业务代码的一次业务操作会向线程池提交多个慢任务,这样执行一次业务操作就会开启多个线程,如果业务操作量较大,的确有可能一下子开启几千个线程
源码发现
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 它的核心线程数是0,而最大线程数 Integer的最大值,一般来说机器都没那么大内存给它不断使用,而 keepAliveTime 是60秒,也就是在 60秒之后所有的线程都是可以回收的,采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
所以我们在使用线程池要根据任务的“轻重缓急”来指定线程池的核心参数,包括线程数、回收策略和任务队列:
: 1. 对于执行比较慢、数量不大的 IO 任务,或许要考虑更多的线程数,而不需要太大的队
列。
: 2. 而对于吞吐量较大的计算型任务,线程数量不宜过多,可以是 CPU 核数或核数 *2(理
由是,线程一定调度到某个 CPU 进行执行,如果任务本身是 CPU 绑定的任务,那么过
多的线程只会增加线程切换的开销,并不能提升吞吐量),但可能需要较长的队列来做
缓冲。