线程池

线程池

线程池

  • 线程池的重要性
    • 重用线程,避免线程创建与销毁带来的性能开销。
    • 控制线程池的最大并发数,避免大量线程之间因为互相抢占系统资源而导致的阻塞现象。
    • 能够对线程进行简单管理,并且提供定时执行以及制定间隔循环执行等功能。
  • 线程池的好处

    • 加快响应速度,消除了线程创建带来的延迟
    • 合理利用CPU和内存,达到一个资源占用与效率之间的平衡。
    • 统一管理
  • 线程池的适用场合

    • 服务器接收到大量请求时,使用线程池技术是非常合适的,可以大大减少线程的创建和销毁次数,提高服务器的工作效率。
    • 实际开发中,如果需要创建5个以上的线程,那么就可以使用线程池来管理。

创建与停止线程池

  • 线程池构造函数主要参数

    • int corePoolSize:核心线程数,线程池在完成初始化以后,默认情况下,线程池中没有线程,线程池会等待有任务到来时,再创建新的线程去完成任务。

    • int maximumPoolSize:最大线程数,线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数会有一个上限,这就是最大量maximumPoolSize。

    • long keepAliveTime:存活时间,如果线程池当前的线程数多于corePoolSize,那么多于的线程空闲时间超过keepAliveTime,他们就会被终止。这种机制可以在线程池占用资源的过程中减少冗余消耗。默认情况下是只有多于corePoolSize的线程会被回收,除非是修改了allowCoreThreadTimeOut属性设置true,那么keepAliveTime也会同样作用于核心线程。

  • BlockingQueue workQueue:任务存储队列,3中常见的队列类型,(1)直接交接:SynchronousQueue,(2)*队列:LinkedBlockingQueue,(3)有界的队列:ArrayBlockingQueue。

  • ThreadFactory threadFactory:当线程池需要新的线程时,会使用线程工厂来生成新的线程。默认使用 Executor.defaultThreadFacdtory(),创建出来的线程都在统一个线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程。如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否守护线程等等,只是一般用默认的就足够了。

  • RejectedExecutionHandler handler:由于线程池无法接受提交的任务而拒绝策略

所以线程的添加规则是:

  1. 如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来执行新任务。
  2. 如果线程数等于或者大于corePoolSize但是少于maximumPoolSize,就将任务放入队列workQueue
  3. 如果队列已满,并且线程数小于maximumPoolSize,就创建一个新的线程来执行任务。
  4. 如果队列已经满了,并且线程数大于或等于maximumPoolSize,就拒绝这个任务。

从而得出线程池中增减线程的特点

  • 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池。
    • 线程池希望保持较少数的线程数,并且只有在复杂变得很大时,才增加它。
    • 通过设置maximumPoolSize为很高的值,例如Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务。
    • 是只有在队列填满时才创建多余corePoolSize的线程,所以如果使用的是*队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize。
  • 手动创建与自动创建

    手动创建更好,因为可以更加明确线程池的运行规则,避免资源耗尽的风险。相反,自动创建线程池,直接调用JDK封装好的构造函数,可能会带来一些问题。

    • newFixedThreadPool

      由于传进去的LinkedBlockingQueue是没有容量上限的,所以当传入的请求越来越多,并且无法及时处理完毕的时候,就是请求堆积的时候,会容易造成占用大量的内存,可能会导致OOM

    • newSingleThreadPool

      单线程的线程池,它只会用唯一的线程来执行任务。这个和FixedThreadPool的原理基本相同,只是把核心线程数和最大线程数都设置为了1,所以也会导致同样的问题,也就是请求堆积的时候,可能会占用大量的内存。

    • CachedThreadPool

      可缓存线程;*线程池,具有自动回收多余线程的功能。它的弊端在于第二个参数maximumPoolSize被设置为Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至直接导致OOM。

    • ScheduleThreadPool

      它支持定时以及周期性任务执行的线程池。

    所以线程池的创建,最好的方式应该是手动创建,根据不同的业务场景,自己设置线程池参数,比如我们的内存有多大,想给线程取什么名字等等。

  • 线程池中线程数量的合适设定

    • CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的1-2倍左右。

    • 耗时IO型(读写数据库、文件、网络读写等等):最佳线程数一般会大于CPU核心数很多被,以JVM线程监控显示频繁情况为依据,保证线程空闲可以衔接上,参考BrainGoetz推荐的计算方法:

      线程数=CPU核心数*(1+平均等待时间/平均工作时间)

  • 停止线程池的正确方法

    1. shutdown:不会马上停止,已经提交的任务会执行完,新的任务无法再被提交了。
    2. isShutdown:返回是否shutdow,即是线程池中的任务还在执行。
    3. isTerminated:它可返回整个线程池是不是已经完全停止了。
    4. awaitTermination:测试一段时间内线程池是不是会完全停止,起到的作用是检测。这个方法在返回之前是阻塞的,只有当线程池中所有任务都执行完毕,或者等待时间到了,以及等待过程中被中断了抛出异常。
    5. shutdownNow:马上停止线程池,用interrupt信号去通知正在执行的线程停止,而队列当中等待的任务则直接返回runableList,可以用于再以后通过其他方式执行。

常见线程池的特点和用法

  • newFixedThreadPool
  • newSingleThreadPool
  • CachedThreadPool
  • ScheduleThreadPool
  • 以及JDK1.8加入的WorkingStealingPool:这个线程池和其他的有很大的不同。加入的任务不是普通任务,而且是有子任务的情况下会适合这个场景,比如说二叉树遍历、矩阵的处理。使用这个线程池有一定的窃取能力,每一个线程之间是会合作的。(使用场景有限)

FixedThreadPool与SingleThreadPool的Queue是LinkedBlockingQueue

他们的线程数量已经固定了,添加的任务数量无法估计,所以是用了一个可以存储无限多任务的队列来帮助存储任务。

CachedThreadPool使用的Queue是SynchronousQueue?

SynchronousQueue内部实际上是不存储的,但是在CachedThreadPool这个线程池的情况下也根本不需要队列来存储,会直接启动新的线程,效率更高些,也不需要一个队列去中转了。

ScheduleThreadPool使用的是延迟队列DelayedWorkQueue。

拒绝任务

  • 拒绝时机
    1. Executor关闭时,提交新任务会被决绝
    2. 以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时。
  • 4种拒绝策略
    • AbortPolicy:直接抛出一个异常
    • DiscardPolicy:默默地把任务丢弃
    • DiscardOldestPolicy:丢弃队列中最老的任务,以便腾出空间存储添加的新任务。
    • CallerRunsPolicy:让提交任务的这个线程去执行任务。这种方式避免了任务损失,也是一种负反馈策略,给了线程池缓冲的时间。

钩子方法

  • 在有特殊用途或者需要在每个任务执行前后进行一些日志或者统计,等等。

线程池实现原理

线程池、ThreadPoolExecutor、ExecutorService、Executor、Executors等,这么多和线程池相关的类,大家都是什么关系?

首先它们的继承实现关系依次是这样的:Executor <-ExecutorService <- AbstractExecutorService <- ThreadPoolExecutor

线程池实现任务复用的原理:

相同线程执行不同任务。源码的分析如下:

executorService.execute(new Task());

public interface Executor {
    void execute(Runnable command);
}


class ThreadPoolExecutor {
    //其它省略...
    
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
     
        //ctl记录了线程池状态和线程数
        int c = ctl.get();
        //首先检查当前线程熟练是不是小于核心线程数量
        if (workerCountOf(c) < corePoolSize) {
            //如果不够,就创建新的线程,command就是新的任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //检查是不是running状态,如果是就继续放到工作队列中
        if (isRunning(c) && workQueue.offer(command)) {
            //由于在此期间线程可能会被终止了,所以再进行一次判断
            int recheck = ctl.get();
            //如果现在不是正在运行,就将这个任务删除,并且使用拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果线程已经减少至0,就需要创建新的线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //能够执行到这里,说明线程已经停止,
        //或者队列已经放不进去了线程数也大于核心线程数了
        //就要增加线程,直到达到最大线程数量。
        else if (!addWorker(command, false))
            reject(command);//如果已经不能再增加了,就拒绝
    }
    
    
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //这个Worker,就是主要的工作相关的类
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();

                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    
    //这个方法反应了线程的复用
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //获取到Runable类型的任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //只要这个任务不为空,就去执行,getTask()就是从阻塞队列中去取出任务
            //while循环意味着整个worker不会停止,不停的执行任务,取出新的任务执行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        //执行Runable的run方法
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
}
  • 线程池管理器
  • 工作线程
  • 任务队列
  • 任务接口(Task)

线程池的五种状态:

  • RUNNING:接受新任务并且处理排队任务
  • SHUTDOWN:不接受新任务,但处理排队任务
  • STOP:不接受新任务,也不处理排队任务
  • TIDYING:中文是整洁,理解了中文就容易理解这个状态,所有任务都已经终止,workerCount为零时,线程会转换到TIDYING状态,并且将运行terminate()钩子方法。
  • TERMINATED:terminate()方法运行完成。

使用线程池的注意点

  • 避免任务的堆积
  • 避免线程数过度增加
  • 排查线程泄露:线程执行完毕却不能回收,往往是任务逻辑有问题,导致线程结束不了。
上一篇:Gitlab的SSL证书更新


下一篇:gitlab的安装以及汉化