线程池核心源码分析

ThreadPoolExecutor线程池源码

一、基础知识

java 中的& 和 | 运算

& 是所有的2进制位数“与”出的最终结果,“与”的规则是两者都为1时才得1,否则就得0

| 是所有的2进制位数“或”出的最终结果,“或”的规则是两者之一有一个1就得1,否则就得0
是所有的2进制位数“非”出的最终结果,如果位为0,结果是1,如果位为1,结果是0.

~(A) = -(A+1)

位运算

例如:1 << 2 向左移2位

0001 -> 0100 = 4

Integer.SIZE 为何是32?

Integer 值的大小范围为 -231~231-1,int类型数据占4字节,1一个 字节8个bit位。

-1的二进制表示

先取1的原码:00000000 00000000 00000000 00000001

得反码:     11111111 11111111 11111111 11111110

得补码:     11111111 11111111 11111111 11111111

二、线程池使用

newFixedThreadPool
newSingleThreadExecutor
newCachedThreadPool
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
/** 
参数:
corePoolSize : 核心线程数
maximumPoolSize:最大线程数
keepAliveTime:非核心线程数最大的空闲时间
TimeUnit:时间单位
workQueue:工作队列
    1. 同步队列 SynchronousQueue 长度为0
    2. 有界队列 ArrayBlockingQueue 长度自己指定
    3. *队列 LinkedBlockingQueue *
threadFactory:线程工厂,用来创建线程池中的线程的,可以自己实现这个线程工厂。
handler:拒绝策略,可以自己实现拒绝策略。
    jdk自身实现了4种:
   (1)ThreadPoolExecutor.AbortPolicy 丢弃任务,并抛出 RejectedExecutionException 异常。默认
   (2)ThreadPoolExecutor.CallerRunsPolicy:该任务被线程池拒绝,由调用线程执行该任务。
   (3)ThreadPoolExecutor.DiscardOldestPolicy : 抛弃队列最前面的任务,然后重新尝试执行任务。
   (4)ThreadPoolExecutor.DiscardPolicy,丢弃任务,不抛出异常。
   注:当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略。
**/

几种常用写法(入口调用不同方法,底层都是调用的ThreadPoolExecutor的execute方法)

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadTest {

    public static void main(String[] args) {

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 100, 60,
                TimeUnit.MICROSECONDS, new SynchronousQueue<>(), new MyThreadFactory(), new MyRejectedExecutionHandler());
        try {

//            for (int i = 1; i < 100; i++) {
//                threadPoolExecutor.execute(new MyTask(i));
//            }
//
//            for (int i = 1; i < 100; i++) {
//                threadPoolExecutor.submit(new MyCallable(i));
//            }

            List<MyCallable> tasks = new ArrayList<MyCallable>();
            for (int i = 1; i <= 105; i++) {
                tasks.add(new MyCallable(i));
            }
            threadPoolExecutor.invokeAll(tasks);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPoolExecutor.shutdown();
        }
    }
}

class MyTask extends Thread {

    private int i;

    public MyTask(int i) {
        this.i = i;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "=" + i);
    }
}

class MyCallable implements Callable<Object> {

    private int i;

    public MyCallable(int i) {
        this.i = i;
    }

    @Override
    public Object call() throws Exception {
        System.out.println("Callable" + Thread.currentThread().getName() + "=" + i);
        return i;
    }
}

// 自定义线程工厂
class MyThreadFactory implements ThreadFactory{

    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    private final Long stackSize;

    MyThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
        namePrefix = "myPool" +
                poolNumber.getAndIncrement() +
                "-thread-";
        stackSize = 1024L;
    }

    /**
     * 创建线程时指定线程所在组,线程预期栈大小,优先级,是否为守护线程,名字等。
     *
     * @param r
     * @return
     */
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                namePrefix + threadNumber.getAndIncrement(),
                stackSize);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

// 自定义任务拒绝策略
class MyRejectedExecutionHandler implements RejectedExecutionHandler{

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("任务被拒绝");
    }
}

任务添加顺序:

核心线程–》工作队列–》非核心线程。见源码解析

任务执行顺序:

核心线程–》非核心线程–》工作队列。见源码解析

三、源码分析

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));  // ctl = running
private static final int COUNT_BITS = Integer.SIZE - 3; //COUNT_BITS = 29; 
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; //0010 - 1= 2^29-1

// runState is stored in the high-order bits 线程池运行状态存储在高3位,剩下的低29位用来记录当前工作线程数量,高明之处就是一个int类型的值,去记录线程池中几个维度的状态和数据

// 能接受新任务以及处理已添加的任务
private static final int RUNNING    = -1 << COUNT_BITS; // 1110 

// 不接受新任务,可以处理已经添加的任务,调用shutdown方法时
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 0000

// 不接受新任务,不处理已经添加的任务,并且中断正在处理的任务 调用shutdownnow方法时 
private static final int STOP       =  1 << COUNT_BITS; // 0010

// 所有的任务已经终止,ctl记录的“任务数量” 为0 ,ctl负责记录线程池的运行状态与活动线程数量
private static final int TIDYING    =  2 << COUNT_BITS; // 0100

// 线程池彻底终止,则线程池转变为terminated 状态
private static final int TERMINATED =  3 << COUNT_BITS; // 0110

// Packing and unpacking ctl

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
    // 获取ctl的值,初始值是running的值
    int c = ctl.get();
    // 获取到工作线程的数量 和 核心线程数进行比较,小于核心线程数,则增加ctl的值+1
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 线程池是不是running状态,同时往队列里面添加任务是否成功
    if (isRunning(c) && workQueue.offer(command)) {
        // 
        int recheck = ctl.get();
        // 再次判断是不是running状态,不是的话,直接移除任务
        if (! isRunning(recheck) && remove(command))
            // 拒绝任务
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 核心线程已满 队列已满,就走下面
    else if (!addWorker(command, false))
        reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                // 除了加入的队列的任务之外,其他的任务进来都会把ctl这个值加1,代表线程池的工作线程数加1
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
   
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 池中线程启动的地方
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 这里体现了线程池中线程的复用,这里就有一个问题,这里和调addWorker的地方是异步的,所以这里可能是核心线程触发的,但是会读到后续添加到队列中的任务 6
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 我们传递的任务被执行
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null; ******
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

 private void processWorkerExit(Worker w, boolean completedAbruptly) {
     	// 是否异常结束的
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            // 减少线程池中工作线程数量
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 非原子操作,固加锁
            completedTaskCount += w.completedTasks;
            // 从线程池中移除当前工作线程
            workers.remove(w);
        } finally {
            // 解锁
            mainLock.unlock();
        }
	
     	// 尝试中断线程池中的线程,假如当前线程池不处于running状态
        tryTerminate();
		
     	// 获取ctl值
        int c = ctl.get();
     	// 判断线程池的状态是不是小于stop的 就是 running,shutdown ,假如上一步将线程池的状态改为了TIDYING 或者 TERMINATED 状态下面就不会走了
        if (runStateLessThan(c, STOP)) {
            // 很显然这里是true
            if (!completedAbruptly) {
                // min = corePoolSize 一般不会0,看自己设置的是多大
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 假如为0,同时工作队列不是空的,那么min = 1;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 如果线程池的工作线程 大于或等于了 min (我觉得就可以理解为核心线程数) 直接返回
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
           
            addWorker(null, false); // ?
        }
    }

// 这个方法 只要return null, 这个工作线程就会死掉,同时线程池中的工作线程数就减一
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
             // The pool is stopped. The pool is shutdown and the queue is empty.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            // 核心线程是不是开启了超时设置   工作线程是不是大于核心线程 
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
		   // There are more than maximumPoolSize workers (due to a call to setMaximumPoolSize).
            if ((wc > maximumPoolSize || (timed && timedOut))  // 7
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 1 timed 为true 时,一般来说就是工作线程大于了核心线程,非核心线程就需要就等待最长空闲时间,2.timed为false时,只有核心线程,然后就一直wait 主线程往队列里面添加任务,一有任务就开始处理,直到shutdown
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take(); // 8
                // poll 是等待这个时间,没有的话就返回null,take 是 阻塞
                if (r != null)
                    return r;
                // 当r为null时,代表非核心线程超时了,再次循环,7的地方,就直接返回null了
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
// 由于每次都是核心线程最开始执行,非核心线程后开始执行,所以当池中只有核心线程时,timed 就是 false , 当池中有非核心线程时,timed 就是 true ,所以上面 8处就是来处理空闲的非核心线程的

总结:我们传递给线程池的任务task,最后在worker(线程池中的线程)的run方法中被执行,而且是以task.run()的方法进行直接调用的。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable

疑问:

  1. 核心线程和非核心线程都是用ctl这个值来维护的,

    是的,都是由ctl这个值来维护。

  2. 线程池如何区分池中的线程是核心的还是非核心的呢?没有看到worker携带相关标识

    不用区分,在没有设置核心线程超时的情况时,非核心线程是优先被清理的。

  3. workers的数量何时减少,我们是设置了一个空闲等待时间,啥时候触发?

    由于每次都是核心线程最开始执行,非核心线程后开始执行,所以当池中只有核心线程时,timed 就是 false , 当池中有非核心线程时,timed 就是 true ,所以上面 8处就是来处理空闲的非核心线程的

  4. 什么时候从队列中获取任务并执行的?

    当工作线程(包括核心非核心)执行完首次分配的任务后,都会从队列中获取任务

  5. 线程的复用在哪里体现了?

    第6处

  6. 为何设置核心线程数可以超时时,当核心线程空闲时间超过空闲时间时,线程池会自动退出?

    因为线程池的运行状态 也是由内部的工作线程维持的,当线程池中没有工作线程是,自然就停止了。

源码中所用设计模式:

  1. 策略模式:线程工厂和拒绝策略;
  2. 适配器模式:public class ThreadPoolExecutor extends abstract class AbstractExecutorService implements ExecutorService 。抽象类中实现了接口部分方法,具体类中也实现了接口的部分方法,典型的接口适配器模式。
上一篇:【Jmeter】Address already in use : connect &&Permission denied: connect 解决方案


下一篇:MySQL数据库快速入门与应用实战(阶段一)