ThreadPoolExecutor源码阅读

一、Executors创建线程池的工厂方法

Executors是线程池的工具类,它提供了四种创建线程池的工厂方法:

  1. newFixedThreadPool:该方法返回一个固定数量的线程池,线程数不变,当有一个任务提交时,若线程池中空闲,则立即执行,若没有,则会被暂缓在一个任务队列中,等待有空闲的线程去执行;
  2. newSingleThreadExecutor: 创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中;
  3. newCachedThreadPool:返回一个可根据实际情况调整线程个数的线程池,不限制最大线程数量,若用空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在60
    秒后自动回收;
  4. newScheduledThreadPool:创建一个可以指定线程的数量的线程池,但是这个线程池还带有延迟和周期性执行任务的功能,类似定时器。

  上面4种有些是因为*队列容易引发OOM,有些是因为没有最大线程数的限制资料浪费,这些都有风险,所以阿里建议使用ThreadPoolExecutor的方式手动创建线程池。

二、ThreadPoolExecutor构造参数

  • corePoolSize:线程池核心线程个数;
  • maximunPoolSize:线程池最大线程数量。
  • keeyAliveTime:空闲线程的最大存活时间。注意这里说的是非核心线程!
  • TimeUnit,存活时间的时间单位。
  • workQueue:用于保存等待执行的任务的阻塞队列。比如基于数组的有界 ArrayBlockingQueue,基于链表的*LinkedBlockingQueue,最多只有一个元素的同步队列 SynchronousQueue,优先队列PriorityBlockingQueue 等。
  • ThreadFactory:创建线程的工厂。可以自定义个线程工厂指定线程池的名称。
  • RejectedExecutionHandler:饱和策略,当队列满了并且线程个数达到 maximunPoolSize 后采取的策略,比如AbortPolicy(抛出异常),CallerRunsPolicy(使用调用者所在线程来运行任务),DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务),DiscardPolicy(默默丢弃,不抛出异常)。

三、ThreadPoolExecutor执行流程图

ThreadPoolExecutor源码阅读

四、ThreadPoolExecutor执行源码

1、线程池的状态

  ThreadPoolExecutor 成员变量 ctl 是个 Integer 的原子变量用来记录线程池状态 和 线程池中线程个数,则其中高 3 位用来表示线程池状态,后面 29 位用来记录线程池线程个数。

//用来标记线程池状态(高3位),线程个数(低29位)
//默认是RUNNING状态,线程个数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//线程个数掩码位数,并不是所有平台int类型是32位,所以准确说是具体平台下Integer的二进制位数-3后的剩余位数才是线程的个数,
private static final int COUNT_BITS = Integer.SIZE - 3;

//线程最大个数(低29位)00011111111111111111111111111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

线程池状态:

//(高3位):11100000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;

//(高3位):00000000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

//(高3位):00100000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;

//(高3位):01000000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;

//(高3位):01100000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

线程池状态 和 线程池中线程 个数的查询和操作,后面线程池执行的时候会用到。

// 通过二进制的与运算获取高三位 运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }

// 通过二进制的与运算获取低29位 线程个数
private static int workerCountOf(int c)  { return c & CAPACITY; }

//计算ctl新值,线程状态 与 线程个数
private static int ctlOf(int rs, int wc) { return rs | wc; }

2、ThreadPoolExecutor#execute

public void execute(Runnable command) {

    //(1) 如果任务为null,则抛出NPE异常
    if (command == null)
        throw new NullPointerException();

    //(2)获取当前线程池的状态+线程个数变量的组合值
    int c = ctl.get();

    //(3)当前线程池线程个数是否小于corePoolSize,小于则开启新线程运行
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    //(4)如果线程池处于RUNNING状态,则添加任务到阻塞队列
    if (isRunning(c) && workQueue.offer(command)) {

        //(4.1)二次检查
        int recheck = ctl.get();
        //(4.2)如果当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);

        //(4.3)否者如果当前线程池线程空,则添加一个线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //(5)如果队列满了,则新增线程,新增失败则执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

接下来看新增线程的 addWorkder 方法的源码,如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //(6) 若线程池状态为 STOP,TIDYING,TERMINATED,或者队列没有任务时则不添加worker!
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        //(7)循环cas增加线程个数
        for (;;) {
            int wc = workerCountOf(c);

            //(7.1)如果线程个数超限则返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //(7.2)cas增加线程个数,同时只有一个线程成功
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //(7.3)cas失败了,则看线程池状态是否变化了,变化则跳到外层循环重试重新获取线程池状态,否者内层循环重新cas。
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    //(8)cas增加线程个数,接下来则创建worker
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //(8.1)创建worker
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {

            //(8.2)加独占锁,为了workers同步,因为可能多个线程调用了线程池的execute方法。
            mainLock.lock();
            try {

                //(8.3)重新检查线程池状态,为了避免在获取锁前调用了shutdown接口
                int c = ctl.get();
                int rs = runStateOf(c);

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //(8.4)添加任务
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //(8.5)添加成功则启动任务
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
    	// (8.6)如果worker启动失败,则从HashSet剔除当前worker,并递减实际工作线程数。
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

  如上代码主要分两部分,第一部分的双重循环目的是通过 cas 操作增加线程池线程数,第二部分主要是并发安全的把任务添加到 workers 里面,并且启动任务执行。

接着我们再看看runWorker方法,代码如下:

final void runWorker(Worker w) {
	Thread wt = Thread.currentThread();
	Runnable task = w.firstTask;
	w.firstTask = null;
	w.unlock(); //(9)status设置为0,允许中断
	boolean completedAbruptly = true;
	try {
	   //(10)
		while (task != null || (task = getTask()) != null) {

			 //(10.1)
			w.lock();
		   ...
			try {
				//(10.2)任务执行前干一些事情
				beforeExecute(wt, task);
				Throwable thrown = null;
				try {
					task.run();//(10.3)执行任务(执行具体的任务)
				} catch (RuntimeException x) {
					thrown = x; throw x;
				} catch (Error x) {
					thrown = x; throw x;
				} catch (Throwable x) {
					thrown = x; throw new Error(x);
				} finally {
					//(10.4)任务执行完毕后干一些事情
					afterExecute(task, thrown);
				}
			} finally {
				task = null;
				//(10.5)统计当前worker完成了多少个任务
				w.completedTasks++;
				w.unlock();
			}
		}
		completedAbruptly = false;
	} finally {

		//(11)执行清工作
		processWorkerExit(w, completedAbruptly);
	}
}

  上面在执行具体任务期间加锁,是为了避免任务运行期间,其他线程调用了 shutdown 或者 shutdownNow 命令关闭了线程池。

3、Worker

先看下Worker的继承关系:
ThreadPoolExecutor源码阅读
由上图可知,Worker继承了AQS和Runnable接口,而Worker重写或复用了AQS的锁相关的方法。

再看下Worker类里2个成员变量:

final Thread thread;

thread属性是Worker维护的线程,每个Worker对象一个,这个就是用来执行任务用的线程,也就是说,Worker对象的数量也就代表了线程池中活动线程的数量。

Runnable firstTask;

  Worker对象的初始任务,多数情况下每个Worker对象创建时都伴随一个初始任务,是这个Worker对象优先会执行的任务,当执行完firstTask后,Worker对象还会从线程池中继续获取任务并执行。
  由Worker里的两个成员变量可知,Worker是对线程和任务的封装,task任务就是交由Worker里的thread来执行的!

上一篇:ES6学习笔记01 -- 暂时性死区 ( temporal dead zone )


下一篇:Java线程池