从实战到原理,线程池的各类使用场景整合(中)

线程池内部的源代码分析


我们在项目里使用线程池的时候,通常都会先创建一个具体实现Bean来定义线程池,例如:


@Bean
public ExecutorService emailTaskPool() {
    return new ThreadPoolExecutor(2, 4,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(), new SysThreadFactory("email-task"));
}


ThreadPoolExecutor的父类是AbstractExecutorService,然后AbstractExecutorService的顶层接口是:ExecutorService。


就例如发送邮件接口而言,当线程池触发了submit函数的时候,实际上会调用到父类AbstractExecutorService对象的java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)方法,然后进入到ThreadPoolExecutor#execute部分。


@Override
public void sendEmail(EmailDTO emailDTO) {
    emailTaskPool.submit(() -> {
        try {
            System.out.printf("sending email .... emailDto is %s \n", emailDTO);
            Thread.sleep(1000);
            System.out.println("sended success");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}


java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable) 源代码位置:


/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}


这里面你会看到返回的是一个future对象供调用方判断线程池内部的函数到底是否有完全执行成功。因此如果有时候如果需要判断线程池执行任务的结果话,可以这样操作:


Future future = emailTaskPool.submit(() -> {
          try {
              System.out.printf("sending email .... emailDto is %s \n", emailDTO);
              Thread.sleep(1000);
              System.out.println("sended success");
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      });
      //todo something
      future.get();
}


在jdk8源代码中,提交任务的执行逻辑部分如下所示:新增线程任务的时候代码:


public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        //工作线程数小于核心线程的时候,可以填写worker线程
        if (workerCountOf(c) < corePoolSize) {
              //新增工作线程的时候会加锁
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果线程池的状态正常,切任务放入就绪队列正常
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                //如果当前线程池处于关闭状态,则抛出拒绝异常
                reject(command);
            //如果工作线程数超过了核心线程数,那么就需要考虑新增工作线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果新增的工作线程已经达到了最大线程数限制的条件下,需要触发拒绝策略的抛出
        else if (!addWorker(command, false))
            reject(command);
    }


通过深入阅读工作线程主要存放在了一个hashset集合当中, 添加工作线程部分的逻辑代

码如下所示:


private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //确保当前线程池没有进入到一个销毁状态中
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
              // 如果传入的core属性是false,则这里需要比对maximumPoolSize参数
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
                //通过cas操作去增加线程池的工作线程数亩
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
       //真正需要指定的任务是firstTask,它会被注入到worker对象当中
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
        //加入了锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //workers是一个hashset集合,会往里面新增工作线程    
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //worker本身是一个线程,但是worker对象内部还有一个线程的参数,
                //这个t才是真正的任务内容
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果worker线程创建好了,但是内部的真正任务还没有启动,此时突然整个
        //线程池的状态被关闭了,那么这时候workerStarted就会为false,然后将
        //工作线程的数目做自减调整。
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}


进过理解之后,整体执行的逻辑以及先后顺序如下图所示:


从实战到原理,线程池的各类使用场景整合(中)


首先判断线程池内部的现场是否都有任务需要执行。如果不是,则使用一个空闲的工作线程用于任务执行。否则会判断当前的堵塞队列是否已经满了,如果没有满则往队列里面投递任务,等待工作线程去处理。


如果堵塞队列已经满了,此时会判断工作线程数是否大于最大线程数,如果没有,则继续创建工作线程,如果已经达到则根据饱和策略去判断是果断抛出异常还是其他方式来进行处理。

上一篇:Java架构师之面试题


下一篇:Linux下C编程,子进程创建函数fork() 执行解析