线程池 ThreadPool

一、线程池

在使用C++的经历中,经常使用多线程(计算密集型),也经常会思考要如何对多线程控制,但没有采用过线程池思想的实现。
在java并发的学习过程中,了解了Java并发组件J.U.C(java.util.concurrent),包含5个包,executor就是线程池的实现类


image

二、excutor

Executor

TheadPoolExecutor 抽象类

TheadPoolExecutor构造函数

构造函数

ThreadPoolExecutor(int corePoolSize, \
                  int maximumPoolSize, \
                  long keepAliveTime,
                  unit,
                  BlockingDeque<Runnable> workQueue)
  • corePoolSize:线程池中所保存的线程数
  • maximumPoolSize: 池中允许的最大线程数
  • keepAliveTime :超时时间
  • unit :keepAliveTime的时间单位
  • workQueue :保存execute方法提交的Runnable任务
    有一大段关于线程数和队列的描述(线程池规则),先引用一下:

(一) 下面都假设任务队列没有大小限制:

  1. 如果线程数量<=核心线程数量,那么直接启动一个核心线程来执行任务,不会放入队列中
  2. 如果线程数量>核心线程数,但<=最大线程数,并且任务队列是LinkedBlockingDeque的时候,超过核心线程数量的任务会放在任务队列中排队。
  3. 如果线程数量>核心线程数,但<=最大线程数,并且任务队列是SynchronousQueue的时候,线程池会创建新线程执行任务,这些任务也不会被放在任务队列中。这些线程属于非核心线程,在任务完成后,闲置时间达到了超时时间就会被清除。
  4. 如果线程数量>核心线程数,并且>最大线程数,当任务队列是LinkedBlockingDeque,会将超过核心线程的任务放在任务队列中排队。也就是当任务队列是LinkedBlockingDeque并且没有大小限制时,线程池的最大线程数设置是无效的,他的线程数最多不会超过核心线程数。
  5. 如果线程数量>核心线程数,并且>最大线程数,当任务队列是SynchronousQueue的时候,会因为线程池拒绝添加任务而抛出异常。

(二)任务队列大小有限时

  1. 当LinkedBlockingDeque塞满时,新增的任务会直接创建新线程来执行,当创建的线程数量超过最大线程数量时会抛异常。
  2. SynchronousQueue没有数量限制。因为他根本不保持这些任务,而是直接交给线程池去执行。当任务数量超过最大线程数时会直接抛异常。

三、通常我们会用一个封装更高的工具类Executors,返回ThreadPoolExecutor对象

  ExecutorService executorService = Executors.newCachedThreadPool();
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

说明

  • 对corePoolsize的数量不限制,
  • 新任务启动新线程
  • 60秒回收
    容易产生高的系统负载。

另外三个常用的ThreadPool类

  1. 指定线程池中线程数的实现方式。
ExecutorService executorService = Executors.newFixedThreadPool(3);
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  1. 线程池只有一个线程,实现按顺序进行。
     public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  1. 可调度的线程
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

四、CachedThreadPool结合信号量限制负载

public class AtomicExample1 {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count.get());
    }

    private static void add() {
        count.incrementAndGet();
        // count.getAndIncrement();
    }
}
上一篇:Tensorflow 迁移学习 识别中国军网、中国军视网Logo水印


下一篇:三路快排算法-求中位数问题(4)