一、线程池
在使用C++的经历中,经常使用多线程(计算密集型),也经常会思考要如何对多线程控制,但没有采用过线程池思想的实现。
在java并发的学习过程中,了解了Java并发组件J.U.C(java.util.concurrent),包含5个包,executor就是线程池的实现类
二、excutor
TheadPoolExecutor 抽象类
构造函数
ThreadPoolExecutor(int corePoolSize, \
int maximumPoolSize, \
long keepAliveTime,
unit,
BlockingDeque<Runnable> workQueue)
- corePoolSize:线程池中所保存的线程数
- maximumPoolSize: 池中允许的最大线程数
- keepAliveTime :超时时间
- unit :keepAliveTime的时间单位
-
workQueue :保存execute方法提交的Runnable任务
有一大段关于线程数和队列的描述(线程池规则),先引用一下:
(一) 下面都假设任务队列没有大小限制:
- 如果线程数量<=核心线程数量,那么直接启动一个核心线程来执行任务,不会放入队列中
- 如果线程数量>核心线程数,但<=最大线程数,并且任务队列是LinkedBlockingDeque的时候,超过核心线程数量的任务会放在任务队列中排队。
- 如果线程数量>核心线程数,但<=最大线程数,并且任务队列是SynchronousQueue的时候,线程池会创建新线程执行任务,这些任务也不会被放在任务队列中。这些线程属于非核心线程,在任务完成后,闲置时间达到了超时时间就会被清除。
- 如果线程数量>核心线程数,并且>最大线程数,当任务队列是LinkedBlockingDeque,会将超过核心线程的任务放在任务队列中排队。也就是当任务队列是LinkedBlockingDeque并且没有大小限制时,线程池的最大线程数设置是无效的,他的线程数最多不会超过核心线程数。
- 如果线程数量>核心线程数,并且>最大线程数,当任务队列是SynchronousQueue的时候,会因为线程池拒绝添加任务而抛出异常。
(二)任务队列大小有限时
- 当LinkedBlockingDeque塞满时,新增的任务会直接创建新线程来执行,当创建的线程数量超过最大线程数量时会抛异常。
- SynchronousQueue没有数量限制。因为他根本不保持这些任务,而是直接交给线程池去执行。当任务数量超过最大线程数时会直接抛异常。
三、通常我们会用一个封装更高的工具类Executors,返回ThreadPoolExecutor对象
ExecutorService executorService = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
说明
- 对corePoolsize的数量不限制,
- 新任务启动新线程
- 60秒回收
容易产生高的系统负载。
另外三个常用的ThreadPool类
- 指定线程池中线程数的实现方式。
ExecutorService executorService = Executors.newFixedThreadPool(3);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- 线程池只有一个线程,实现按顺序进行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 可调度的线程
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
四、CachedThreadPool结合信号量限制负载
public class AtomicExample1 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count.get());
}
private static void add() {
count.incrementAndGet();
// count.getAndIncrement();
}
}