深入浅出Java(Android )线程池ThreadPoolExecutor

前言

关于线程池 
在Java/Android开发中,设计到并发的请求,那基本上是离不开线程池了。用线程池的好处:

  • 1、减少线程频繁创建、销毁的开销;
  • 2、好控制并发量,降低OOM的可能,至于原因文中会说;
  • 3、提高程序的响应速度,因为可以省去部分创建的过程;

要不要深度学习线程池

  • 对于服务端的同学来说应该会比较重视这一块,因为需要做高并发;而移动端的同学可能比较容易忽略这一块。有些人觉得平时也用不到,移动端没有那么大并发量,或者说第三方框架中已经完成了,比如 OkHtttp ; 其实只能说有这种想法的同学还没有遇到大一点的项目或者说没有太多多线程优化的经验。 如果你真的遇到了这种项目瓶颈,你连线程池的运行原理都不知道,那又如何解决项目问题呢?

  • 如果你要寻求一份中高级开发工程师的工作,那线程池是基本是必问题目之一,而且还要有一定深度。

如何深度学习线程池 
这也是我们今天的重点,本文将从下面几点带大家快速掌握线程池的要点:

  • 1、从API使用到原码解析,基于JDK1.8版本;
  • 2、从源码阅读(深入)中总结出(浅出)线程池工作原理;
  • 3、对应用场景的分析以及异常处理

预览

深入浅出Java(Android )线程池ThreadPoolExecutor

线程池类图.png

先对线程池的部分核心类/接口做个简介,大家有个印象就好。

Executor接口

public interface Executor {

    /**
     * 就一个方法,用来执行线程任务的,类似于Thread的start()方法
     */
    void execute(Runnable command);
}

由于Executor是一个接口,所以 execute 是由具体的实现类来完成的,调用这个方法,可能会出现如下情况:

  • 1.创建一个新线程并立即启动;
  • 2.复用线程池中空闲的线程来执行任务;
  • 3.进入一个阻塞队列中排队;
  • 4.抛出异常/拒绝接收该任务,这个要看具体的拒绝策略,默认抛出异常。

ExecutorService接口 
继承自Executor接口,我们常用的很多方法就是在这个接口中定义的。主要涉及到: 提交任务 、 关闭线程 、 获取结果 。

public interface ExecutorService extends Executor {

    /**
     * 关闭线程池,新提交的任务会被拒绝,但是已经提交的任务会继续执行
     */
    void shutdown();

    /**
     * 关闭线程池,新提交的任务会被拒绝,并且尝试关闭正在执行的任务
     */
    List<Runnable> shutdownNow();

    /**
     * 线程池是否已关闭
    */
    boolean isShutdown();

    /**
     * 如果调用了shutdown或者shutdownNow之后,所有的任务都结束了,那么返回true,否则返回false
     */
    boolean isTerminated();

    /**
     * 当调用shutdown 或 shutdownNow之后,再调用这个方法,会
     *等待所有的任务执行完成,直到超时(超过timeout)或者说当前的线程被中断了
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;


    /**
     * 提交一个Runnable 任务
     */
    Future<?> submit(Runnable task);

    /**
     * 执行所有任务,返回 Future 类型的一个 list
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
}

AbstractExecutorService 
抽象类,实现了 ExecutorService 接口。主要封装了通过submit方式提交任务的一些操作。

注意: 不需要获取结果,可以用 execute 方法;需要获取结果(FutureTask)用 submit 方法。

由于篇幅有限,本文只针对 execute 方式做讲解,想了解 submit 方式的同学可以参考 深度解读 java 线程池设计思想及源码实现

Executors 
这是大多数人最常用的一个类,实质上就是一个工具类。可以快速的构建一个线程池对象,常见的操作有如下:

/**
     * 创建一个固定大小的线程池,而且全是核心线程,
     * 会一直存活,除非特别设置了核心线程的超时时间
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

   /**
     * 创建了一个没有大小限制的线程池,全是非核心线程;如果线程
     * 空闲的时间超过60s就会被移除
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

   /**
     * 这个线程池只有1个唯一的核心线程
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

   /**
     * 创建一个定长的线程池,可以执行周期性的任务
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

可以看出这几种方式最后都是通过 ThreadPoolExecutor 来实现的,所以下面就来研究一下今天的主角 ThreadPoolExecutor ,等理解了这个类,也就可以掌握线程池等工作原理,甚至可以根据自己的策略来自定义线程池。

ThreadPoolExecutor(关键类)

继承与抽象方法 AbstractExecutorService ,也就间接实现了 ExecutorService 、 Executor等接口。

从构造方法谈起

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize: 
    核心线程数量,所谓核心线程就是一直保留在线程池中,及时处于空闲状态也不会销毁等线程,除非手动调用 allowCoreThreadTimeOut 才可以在超时销毁。
  • maximumPoolSize: 
    线程池允许创建的最大线程数量。
  • keepAliveTime: 
    线程池中除了有核心线程之外,还有非核心线程,非核心线程处于空闲的时候会在一定时间范围内被关闭,而这个空闲的时间就是keepAliveTime。
  • unit: 
    keepAliveTime 的时间单位,比如秒、分、时等
  • workQueue: 
    保存待执行任务的阻塞队列。如果一个任务进入线程池之后,如果核心线程满了的话,就会先尝试添加到队列中,当然未必添加成功,而且队列也有多种实现,具体的后面再说,先简单理解为排队即可。
  • threadFactory: 
    如果没有设置的话,使用默认的ThreadFactory来创建线程;当然你也可以通过ThreadFactory自己创建线程,比如设置线程名称,优先级等
  • handler: 
    当达到线程池的最大容量时的拒绝策略。当线程池饱和,继续提交任务,需要一种策略来处理该任务。线程池提供了4种策略:

    AbortPolicy :直接抛出异常,这是默认策略;

    CallerRunsPolicy :用调用者所在的线程来执行任务;

    DiscardOldestPolicy :丢弃阻塞队列中靠最前的任务,并执行当前任务;

    DiscardPolicy :直接丢弃任务;

一些重要属性和方法

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

这里很关键,一定要认真看,后面分析任务执行 execute() 方法就需要用到这些基础。

Integer.SIZE =32 ,这代表了java中,int最大是32位,所以 COUNT_BITS 等于29; CAPACITY 等于1*2^29 -1,用它来表示线程池的最大容量是足够了的。

从线程池的生命周期来看,线程池有5种状态:

RUNNING
SHUTDOWN
STOP
TIDYING
TERMINATED

关于状态转换

  • RUNNING -> SHUTDOWN : 
    当调用shutdown()方法后,会发生这个状态转换;
  • (RUNNING or SHUTDOWN) -> STOP : 
    当调用 shutdownNow() 后,会发生这个状态转换;
  • SHUTDOWN -> TIDYING : 
    当队列和线程池都变成空的时候,会发生这个状态转换;
  • STOP -> TIDYING : 
    当线程池是空的时候,会发生这个状态转换;
  • TIDYING -> TERMINATED : 
    当terminated() 方法结束后,会发生这个状态转换。
深入浅出Java(Android )线程池ThreadPoolExecutor

线程状态转换.png

关于状态转换就讲完了,特别是前2个状态转换,更是常用。还有一个关键的属性 ctl 需要讲一下,初学者可能不太好理解,需要一点计算机基础。

首先 ctl 是一个AtomicInteger类型的对象,它其实是对int的包装,可以在多线程并发的情况下保证原子性,它传入的参数就是它表示的值。这里是通过 ctlOf() 方法来计算的。

在计算之前先补充2个小知识点:

1、 <<:是移位运算符,具体俩说是左移;右移用>>表示。左移的意思是将一个二进制数向左边移动1位,那么左移1位就等于这个数 2,左移n位的话就是 2^n;右移的话就是除以;

2、 由于10进制数有正负之分,所以转换成二进制数的时候,需要在最高位加上0/1来表示正负,正数用0表示;负数用1来表示。

3、原码:加上符号为之后的二进制数;反码:正数的反码是其本身,负数的反码:符号位不变,其余各位取反;补码:正数的补码就是其本身;负数的补码:即在反码的基础上+1。

4、针对二进制数的&、|、~。与(&)运算:相同位0,不同位1;或(|)运算:只要有1个为1就是1,否则为0;非(~)运算:取反,1变0,0变1。

首先 RUNNING = -1 << COUNT_BITS; 其中-1的二进制数1001,那转换成补码就是1111,然后左移29位就变成 1110 0000 0000 0000 0000 0000 0000 0000 ,因为int最多32位,所以高位的1没了;然后再和0做或运算,所以结果还是它本身,所以ctl初始值为 1110 0000 0000 0000 0000 0000 0000 0000 ,其中高3位存放线程状态,后面29位存放线程数量。

还有几个方法

runStateOf
workerCountOf
ctlOf

execute(关键方法)

这是线程池的关键方法,用来提交任务的。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //表示 “线程池状态” 和 “线程数量” 的整数
        int c = ctl.get();
        /*
         * 如果当前活跃线程数小于核心线程数,就会添加一个worker来执行任务;
         * 具体来说,新建一个核心线程放入线程池中,并把任务添加到该线程中。
         */
        if (workerCountOf(c) < corePoolSize) {
            /*
             * addWorker()如果返回true表示添加成功,线程池会执行这个任务,那么本方法可以结束了,返回 false 代表线程池不允许提交任务,那么就会执行后面的方法。
             */
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

      //程序执行到这里,说明要么活跃线程数大于核心线程数;要么addWorker()失败

        /*
         * 如果当前线程池是运行状态,会把任务添加到队列
         */
        if (isRunning(c) && workQueue.offer(command)) {
            /*
            *这里的逻辑比较有意思,又重新检查了线程状态和数量;
            *如果线程不处于 RUNNING 状态,就会移除刚才添加到队列中的任务;
            *如果线程池还是 RUNNING 状态,并且线程数为 0,那么开启新的线程;
            * addWorker(null, false)参数分析:
            * 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
            * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
         
            /
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

 //程序执行到这里,说明要么线程状态不是RUNNING;要么workQueue队列已经满了

         /*
          * 这时,再调用addWorker方法去创建线程,
          * 会把线程池的线程 数量的上限设置为maximum;
          * 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
          */
        else if (!addWorker(command, false))
            reject(command);
    }

为什么当任务添加到队列后,内部还执行了那么复杂的判断?

因为担心任务提交到队列中了,但是线程池却关闭了。

当执行 execute 方法提交一个任务的时候,如果线程池一直处于RUNNING状态,那流程如下:

  • 1、当工作线程数量 < 核心线程数量,会尝试创建一个核心线程并提交任务;
  • 2、当工作线程数量 >= 核心线程数量,如果阻塞队列没有满,则把任务添加到队列中;如果队列满了,则尝试启动一个新的非核心线程来提交任务;
  • 3、当工作线程数量 > maximumPoolSize,则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

注意:

addWorker(null, false); 也是创建一个线程,但并没有传入任务,因为任务已经被添加到 workQueue 中了,当 worker 在执行的时候,会直接从 workQueue 中获取任务。在 workerCountOf(recheck) == 0 时执行 addWorker(null, false); 也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。

关于Worker类和addWorker方法

addWorker() 是尝试在线程池中创建一个线程并执行任务, firstTask 表示作为新创建的线程的第一个任务, core 参数为true的时候,会用核心线程数做创建线程的边界;如果为false,会用最大线程数 maximumPoolSize 做为边界。如果 addWorker() 返回true,表示创建线程成功

private boolean addWorker(Runnable firstTask, boolean core) {}
深入浅出Java(Android )线程池ThreadPoolExecutor

任务执行流程.png

业务场景(分析2种常用的线程池)

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

固定大小的线程池。最大线程数与核心线程数相等,keepAliveTime的设置无效,因为核心线程默认不会销毁,阻塞队列为 LinkedBlockingQueue ,它是*队列。

工作流程:

  • 提交任务
  • 如果线程数小于核心线程数,创建核心线程并执行任务
  • 如果线程数大于核心线程,把任务添加到LinkedBlockingQueue阻塞队列
  • 如果线程执行完任务,去阻塞队列取任务,继续执行。

虽然线程数量是固定的,但是由于使用了*队列LinkedBlockingQueue,如果线程的并发量比较大,任务的执行时间比较长,那还是可能会OOM的。 适用于CPU密集型的任务,也就是那种长期的任务。

newCachedThreadPool

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

核心线程数为 0,最大线程数为 Integer.MAX_VALUE,所有线程空闲时间 为 60 秒,任务队列采用 SynchronousQueue。

用它去处理那种并发量很大的任务就不合适,由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。 适用于那种任务可以快速完成的任务。

总结

线程池的内容其实是很多的,绝不是1,2篇文章就能讲完的。本文也主要是针对提交任务之后线程池的工作原理以及线程状态变化来做讲解。核心要义如下:

1、Executors这个工具类下创建的几种线程池的工作原理。

newFixedThreadPool、newCachedThreadPool等,需要注意每一个的优缺点和使用场景。

2、ThreadPoolExecutor这个类的构造方法和一些关键成员属性

Executors创建的多种线程池都是通过它的构造方法来实现的,读者需要熟悉它的参数的意义,这样的话,就可以自定义满足个性化需求的线程池。在文中列举出的一些成员属性也很重要,后面对线程池的各种操作离不开它们。

3、理解深刻线程池中的线程创建时机

主要是那个 execute() 方法和 addWorker() 方法,主要是根据线程池状态、当前线程数、核心线程数、队列大小、线程池最大线程数来结合来判断。

4、拒绝策略

添加任务到线程池,不一定会被接受。主要看一下哪些情况会执行 reject(command) 方法;还有几种不同的拒绝策略,默认是抛异常。

5、异常处理

如果某个任务执行出现异常,那么执行任务的线程会被关闭。

深入浅出Java(Android )线程池ThreadPoolExecutor

上一篇:ASP.NET Core AutoWrapper 自定义响应输出


下一篇:四、创建、删除、复制、移动目录