线程池使用小结

既然线程池是日常工作非常常见的知识且使用过程中需要对此有着充分的认知,所以今天就总结一下线程池的常见知识点。

线程池使用小结

如上图:阿里巴巴 Java 开发手册中对于线程池的创建有着明确的规范。


简单的例子

如下使用 ThreadPoolExecutor 实现了自定义线程池完成 Callable 的任务:

class ImpCallable implements Callable<String> {
    // 核心线程数
    private static final int CORE_POOL_SIZE=2;
    // 最大线程数
    private static final int MAX_POOL_SIZE=4;
    // 线程数大于 corePoolSize 线程持续时间
    private static final int KEEP_ALIVE_TIME=1;
    // 阻塞队列的大小
    private static final int QUEUE_CAPACITY=5;
    private static final TimeUnit UNIT = TimeUnit.SECONDS;
    // 自定义线程名
    private static final String THREAD_NAME = "my-self-thread-pool-%d";

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                UNIT,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new BasicThreadFactory.Builder().namingPattern(THREAD_NAME).build(),
                new ThreadPoolExecutor.CallerRunsPolicy());
        ImpCallable task = new ImpCallable();
        List<Future<String>> futureList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            // 提交任务到线程池
            Future<String> future = executor.submit(task);
            // 任务结果 future 加入结果队列
            futureList.add(future);
        }

        for (Future<String> fut : futureList) {
            try {
                // 取出结果
                System.out.println(new Date() + "--" + fut.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }

        executor.shutdown();

        try {
            executor.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("All threads Finished");
    }

    @Override
    public String call() throws Exception {
        Thread.sleep(2000L);
        return Thread.currentThread().getName();
    }
}


阻塞队列

用来保存等待被执行的任务的阻塞队列,Java 中提供了如下阻塞队列:

  1. ArrayBlockingQueue:基于数组结构的有界阻塞队列,其构造必须指定大小。对象按 FIFO 排序。
  2. LinkedBlockingQuene:基于链表结构的阻塞队列,大小不固定,若不指定大小,其大小有Integer.MAX_VALUE 来决定。对象按 FIFO 排序。吞吐量通常要高于 ArrayBlockingQuene
  3. SynchronousQuene:特殊的 BlockingQueue,对其的操作必须是放和取交替完成。
  4. priorityBlockingQuene:类似于 LinkedBlockingQueue,对象的排序由对象的自然顺序或者构造函数的 Comparator 决定。


自定义线程名

创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。

关于这一点非常有必要,方便快速定位问题以及监控线程池。


拒绝策略

线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:

  1. AbortPolicy 直接抛出异常,默认策略,可以及时发现线程池的瓶颈。
  2. CallerRunsPolicy 使用调用者所在的线程来执行任务,新任务不会丢失,采用谁提交谁负责的策略,有效控制线程池压力。
  3. DiscardOldestPolicy 丢弃阻塞队列中靠最前的任务,并执行当前任务,该策略存在丢失任务的风险,不建议使用
  4. DiscardPolicy 直接丢弃任务,该策略简单粗暴以保证系统可以为主要目标,不建议使用

当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略。如记录日志或持久化存储不能处理的任务。


线程池大小

关于线程池线程大小可以根据实际业务场景具体设置。

推荐个适用面比较广的公式( N 为 CPU 核心数):

  • CPU 密集型任务 N+1 。
  • IO 密集型任务 2N 。


线程池状态

线程池使用小结

如上图线程池的状态分为五种,分别对应 Java 中五个 int 字段:

private static final int RUNNING = -536870912;
private static final int SHUTDOWN = 0;
private static final int STOP = 536870912;
private static final int TIDYING = 1073741824;
private static final int TERMINATED = 1610612736;
  • RUNNING 线程创建成功初始化状态,能够接收新任务,以及对已添加的任务进行处理。
  • SHUTDOWN 不接收新任务,但能处理已添加的任务。
  • STOP 不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
  • TIDYING 当所有的任务已终止队列任务也为空线程池会变为 TIDYING 状态。
  • TERMINATED 线程池彻底终止,由 TIDYING 状态变成 TERMINATED 状态。


监控线程池

我们可以通过第三方组件监控线程池的运行状态比如 SpringBoot 中的 Actuator 。


除此之外,我们还可以利用 ThreadPoolExecutor 的相关 API 监测线程池状态。如下图我们可以轻松获得线程池的各项参数,结合邮件实时监测线程池健康状况。


线程池使用小结



上一篇:查询优化器内核剖析第四篇:从一个实例看执行计划


下一篇:简单整理几个常用Linux命令及使用案例