线程池源码解析2.工作原理与内部结构

线程池核心参数以及工作原理参考第六节

线程池简单原理见1.7


ThreadPoolExecutor内部结构

1.核心属性之ctl

    /*
     * 线程池核心属性之一 ctl。
     * 高三位表示当前线程池运行状态,低29位表示当前线程池中所拥有的线程数量。
     * 是一个原子类 AtomicInteger。
     */

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

	/*
	 *  Integer.SIZE = 32
	 *  32 - 3 = 29 表示低29位用来存放当前线程数量的位
	 */
    private static final int COUNT_BITS = Integer.SIZE - 3;

	/*
	 * 表示低29位能表示的最大的线程数 就是 1 << 29 - 1 (大概是5亿多)
	 * CAPACITY = 000 11111111111111111111111111111
	 */
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
	
	/*
	 * 下面的表示线程池的5种状态
	 *   状态从上到下依次递增。
	 */
    //111 00000000000000000000000000000 (转换为二进制) 转换成10进制是一个负数
    private static final int RUNNING    = -1 << COUNT_BITS;

	//000 00000000000000000000000000000 
    private static final int SHUTDOWN   =  0 << COUNT_BITS;

	//001 00000000000000000000000000000 
    private static final int STOP       =  1 << COUNT_BITS;

	//010 00000000000000000000000000000 
    private static final int TIDYING    =  2 << COUNT_BITS;

	//011 00000000000000000000000000000 
    private static final int TERMINATED =  3 << COUNT_BITS;
	
	
	/*
	 *   获取当前线程池的运行状态。
	 *   ~CAPACITY = 111 00000000000000000000000000000
	 *   因为要进行一个&运算,而~CAPACITY的值是固定的,根据这个值并且我们知道ctl的高三位
	 *   表示线程池的运行状态,所以进行&运算后就能获取到ctl的高三位的状态,即线程池的状态。
	 */
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    

	/*
	 *  获取当前线程池的线程数量。
	 *   CAPACITY = 000 11111111111111111111111111111
	 *   这个值跟ctl进行&运算,取出ctl的低29位的值,即表示获取线程池中的线程数量。
	 */
	private static int workerCountOf(int c)  { return c & CAPACITY; }
	
	/* 
	 *  用在重置当前线程池ctl值时会用到 
	 *  rs 表示线程池状态, wc表示当前线程池中worker(线程)数量
	 *   |表示的就是不进位加法 表示的就是通过rs 和 wc重新构建一个ctl。  
	 */
    private static int ctlOf(int rs, int wc) { return rs | wc; }
	
	
	/*
	 * 表示当前线程池ctl所表示的状态是否小于某个状态s
	 *  RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
	 */
	private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
	
	/*
	 * 表示当前线程池ctl所表示的状态是否大于等于某个状态s。
	 *
	 */
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
	
	/*
	 *  判断线程池是否处于RUNNING状态
	 *  小于SHUTDOWN的状态一定是RUNNING状态 
	 */
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

2.常用简单方法

  	/*  
  	 *  使用CAS的方式让 ctl值+1,成功返回true失败返回false。
  	 *  即尝试添加一个线程。(Worker实际上就是工作者线程)
  	 */

	private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    /*
     *  使用CAS的方式让 ctl值-1,成功返回true失败返回false。
  	 *  即尝试干掉一个线程。(Worker实际上就是工作者线程)
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    /**
	  * 将ctl的值-1, 这个方法一定成功,使用的是 自旋 + CAS的方式保证。
      */
    private void decrementWorkerCount() {
        do {} while (!compareAndDecrementWorkerCount(ctl.get()));
    }

3.核心成员属性


    /*
     *  增加worker(线程) 减少worker 时需要持有mainLock,修改线程池运行状态时也需要。
     */
    private final ReentrantLock mainLock = new ReentrantLock();

    //线程池中真正存到 worker(Thread)的地方 工作者集合。
    private final HashSet<Worker> workers = new HashSet<Worker>();

    /*
     * 当外部线程调用awaitTermination()方法时,外部线程会阻塞等待当前线程池状态为Termination为止
     *  
     * 底层类似AQS的原理,等待就是将当前线程封装成一个Node,然后进入Condition的等待队列	中,当线程池状态变为termination时,
     * 会通过调用termination.signalAll()方法会将这	些线程全部唤醒,进入到阻塞队列中(AQS),继续去争抢锁(每次只有头节点可以获得锁)
     */
    private final Condition termination = mainLock.newCondition();

    //记录线程池生命周期内,线程数的最大值
    private int largestPoolSize;

    //记录线程池所完成的任务总数,当一个worker退出时,会将worker完成的任务累加到这个属性中
    private long completedTaskCount;
	
	/* 
 	 *  线程池7大核心参数之一:任务队列:BlockingQueue(阻塞队列)是一个接口。
  	 *  当线程池中的正在工作的线程达到核心线程数时,这时再提交的任务会直接放到workQueue中。 
  	 *  常用的实现类有基于数组的阻塞队列  ArrayBlockingQueue
  	 *  		  基于链表的阻塞队列    LinkedBlockingQueue 
 	 */
    private final BlockingQueue<Runnable> workQueue;


    /*
     * 线程池的7大参数之一,线程的创建工厂,是一个接口
     * 一般不推荐使用默认的实现类DefaultThreadFactory。
     */
    private volatile ThreadFactory threadFactory;

    /*
     *  线程池7大核心参数之一,拒绝策略,是一个接口,有四种实现,默认是直接丢弃并抛出异常。
	 *	DiscardOldestPolicy   --->丢弃队列中最老(最先入队)的任务
	 *  AbortPolicy           --->直接丢弃新来的任务 抛出异常 (默认的)
	 *	CallerRunsPolicy      --->直接调用run方法,相当于同步方法
	 *	DiscardPolicy         --->直接丢弃新来的任务 不抛出异常
     */
    private volatile RejectedExecutionHandler handler;

    
	/*
	 * 线程池7大核心参数之一: 空闲线程存活时间
	 * 当allowCoreThreadTimeOut为false时,只有当非核心线程空闲时间达到指定时间时才会被	 * 回收。
	 * 当allowCoreThreadTimeOut为true时,线程池内所有的线程到达指定的时间均会被回收。
	 *  此参数常常和 TimeUnit一起使用,指定超时时间的单位(也是线程池的7大核心参数之一) 
	 */
    private volatile long keepAliveTime;
	
	
	/*
	 * 线程池7大核心参数之一: 核心线程数
	 */
    private volatile int corePoolSize;

    /*
     * 线程池7大核心参数之一: 最大线程数
     */
    private volatile int maximumPoolSize;

    //控制线程池内核心线程空闲时间达到指定时间时能否被回收
    private volatile boolean allowCoreThreadTimeOut;

4.核心内部类Worker

private final class Worker
        extends AbstractQueuedSynchronizer //是AQS的子类
        implements Runnable  //实现了Runnable接口
    {
     	/*
     	 *  Worker采用了AQS的 独占 模式
     	 *  独占模式:两个重要属性 state 和 ExclusiveOwnerThread
     	 *  state: 0时表示表示未被占用,> 0时表示被占用 < 0时表示初始状态。
     	 *  ExclusiveOwnerThread表示抢占到锁的线程。
     	 */		
        private static final long serialVersionUID = 6138294804551838833L;

        //worker内部封装的工作线程
        final Thread thread;
        
    	//假设firstTask不为NULL,那么当worker启动后(内部的线程启动)会优先执行firstTask,当执行完firstTask后,会到队列中去获取下一个任务。	
        Runnable firstTask;
    
    
        //记录当前worker所完成的任务数量
        volatile long completedTasks;

        /*
         * 构造器 传来的Runnable任务可以为NULL,firstTask为NULL的线程启动后会去队列中		  *	获取任务
         */
        Worker(Runnable firstTask) {
            //设置AQS独占模式为初始化中状态,这个时候不能被抢占锁
            setState(-1); 
            //为内部的firstTask赋值
            this.firstTask = firstTask;
            
            /*
             * 使用线程工厂创建了一个线程,并且将当前worker指定为Runnable,也就是说当				 * thread启动的时候会议worker.run为入口
             */
            this.thread = getThreadFactory().newThread(this);
        }

        /*
         * 当worker启动时,会执行run()方法。当前的这个Worker就是一个任务(Runnable)
         * 底层调用runWorker()直接将this传入了。
         */
        public void run() {
            //直接将当前对象传入进行执行。
            runWorker(this);
        }

		/*
		 * 判断当前worker的锁是否被占用
		 *  state为0 表示为被占用
		 *  state为1 表示被占用
		 */
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
		
    	//尝试去占用worker的独占锁 
        protected boolean tryAcquire(int unused) {
            //CAS的方式,将state设置为1
            if (compareAndSetState(0, 1)) {
                //CAS成功,则将exclusiveOwnerThread设置为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
		
    	/*  尝试释放锁 
         *  外部不会直接调用这个方法,这个方法时AQS内调用的
         *  外部调用unlock时,unlock -> AQS.release ->tryRelease (模板方法模式)
         */
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
		
    	//加锁,加锁失败时会阻塞当前线程。(类似ReentarntLock)
        public void lock()        { acquire(1); }
    
    	/*
         *   尝试去加锁,如果锁是未被持有状态,那么加锁成功后会返回true
    	 *	 否则 不会阻塞当前线程会返回false。
    	 */ 
        public boolean tryLock()  { return tryAcquire(1); }
    
    	//启动worker之前会先调用unlock(),强制将独占线程置为NULL,将state变为0.
        public void unlock()      { release(1); }
    
    	//返回当前worker的lock被占用
        public boolean isLocked() { return isHeldExclusively(); }
			
   	
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

5.构造方法

	/*
	 *  七个参数的构造方法。传入七大核心参数,为内部属性赋值。
	 */  	
	public ThreadPoolExecutor(int corePoolSize, //核心线程数
                              int maximumPoolSize, //最大线程数
                              long keepAliveTime, //空闲线程存活时间
                              TimeUnit unit, //时间单位
                              BlockingQueue<Runnable> workQueue, //阻塞队列
                              ThreadFactory threadFactory,  //线程工厂
                              RejectedExecutionHandler handler) { //拒绝策略
      	//判断参数是否合法
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
       
       //workQueue threadFactory handler不能为NULL。
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
      	//为属性赋值
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
上一篇:线程池_ThreadPoolExecutor原理分析


下一篇:KingbaseES 如何开启并进入数据库