线程池的参数设定,为什么这样设定?有什么好处?

线程池的参数设定,为什么这样设定?有什么好处?

自定义线程池

package com.sunhui.thread.CompletableFuture.util;

/**
 * @Description
 * @ClassName ThreadPoolUtil
 * @Author SunHui
 * @Date 2021/9/24 9:47 上午
 */

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 线程池管理的工具类,封装类
 *
 * @author SunHui
 */
public class ThreadPoolUtil {
    // 通过ThreadPoolExecutor的代理类来对线程池的管理
    private static volatile ThreadPollProxy mThreadPollProxy;

    // 单列对象
    public static ThreadPollProxy getThreadPollProxy() {
        if (mThreadPollProxy == null) {
            synchronized (ThreadPollProxy.class) {
                if (mThreadPollProxy == null) {
                    mThreadPollProxy = new ThreadPollProxy(Runtime.getRuntime().availableProcessors() * 2,
                            Runtime.getRuntime().availableProcessors() * 4,
                            30);
                }
            }
        }
        return mThreadPollProxy;
    }

    // 通过ThreadPoolExecutor的代理类来对线程池的管理
    public static class ThreadPollProxy {
        public ThreadPoolExecutor poolExecutor;// 线程池执行者 ,java内部通过该api实现对线程池管理
        private int corePoolSize;
        private int maximumPoolSize;
        private long keepAliveTime;

        public ThreadPollProxy(int corePoolSize, int maximumPoolSize, long keepAliveTime) {
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.keepAliveTime = keepAliveTime;
            poolExecutor = new ThreadPoolExecutor(
                    // 核心线程数量
                    corePoolSize,
                    // 最大线程数量
                    maximumPoolSize,
                    // 当线程空闲时,保持活跃的时间
                    keepAliveTime,
                    // 时间单元 ,毫秒级
                    TimeUnit.SECONDS,
                    // 线程任务队列
                    new LinkedBlockingQueue<>(20000),
                    // 创建线程的工厂
                    Executors.defaultThreadFactory(),
                    // 拒绝策略,直接丢弃
                    new ThreadPoolExecutor.AbortPolicy());
        }
}

源码:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

线程池作用

  1. 降低资源消耗;提高线程利用率,降低创建和销毁线程的消耗。
  2. 提高响应速度;任务来了,直接有线程可用可执行,而不需要先创建线程,在执行。
  3. 提高线程的可管理性;线程是稀缺资源,使用线程池可以统一分配调优监控。

线程池七大参数

  1. corePoolSize :代表核心线程数,也就是正常情况下创建的工作线程数,这些线程创建后并不会销毁,而是会常驻。当核心线程的数量等于线程池允许的核心线程最大数量的时候,如果有新任务来,不会再创建新的核心线程;
  2. maxinumPoolSize:代表的是最大线程数,它与核心线程数相对应,表示最大允许创建的线程数,比如任务较多,将核心线程数都用完了,仍然无法满足需求时,此时就会继续创建新的线程,但是线程池内线程总数不会超过最大线程数;
  3. keepAliveTime:表示超出核心线程数之外的线程的空闲时间,也就是说核心线程不会消除,但是超出核心线程数的部分线程如果空闲一定时间则会销毁,我们可以通过设置keepAliveTime来控制空闲时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率,不然线程刚执行完一个任务,还没来得及处理下一个任务,线程就被终止,而需要线程的时候又再次创建,刚创建完不久执行任务后,没多少时间又终止,线程反复切换会导致资源浪费(还可以设置allowCoreThreadTimeout = true这样就会让核心线程池中的线程有了存活的时间);
  4. unit:表示线程活动保持时间的单位:可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)
  5. workQueue:用来保存等待执行任务的阻塞队列,假设我们现自核心线程都被使用了,还有任务进来则全部放入阻塞队列中,直到阻塞队列放满任务还再继续加入新任务则会创建新的新的线程;
  6. ThreadFactory:创建线程的工厂,用来生产线程去执行任务。我们可以选择使用默认的创建工厂,生产的线程在同一个组内,拥有相同的优先级,且都不是守护线程。当然我们也可以根据业务需要选择自定义线程工厂;
  7. Handler:任务拒绝策略。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK1.5中Java线程池框架提供了4种策略,下面会详细讲解下。

线程池处理流程

线程池的参数设定,为什么这样设定?有什么好处?

线程池的四大拒绝策略

测试代码:

		// 创建200个任务,每个任务执行时间1秒钟
        List<MyTask> tasks = IntStream.range(0, 200)
                .mapToObj(i -> new MyTask(1))
                .collect(toList());
        // 创建自定义线程池
        ThreadPoolUtil.ThreadPollProxy threadPollProxy = ThreadPoolUtil.getThreadPollProxy();
  1. AbortPolicy(默认策略)
public static ThreadPollProxy getThreadPollProxy() {
        if (mThreadPollProxy == null) {
            synchronized (ThreadPollProxy.class) {
                if (mThreadPollProxy == null) {
                    mThreadPollProxy = new ThreadPollProxy(
                    		3, 5, 3);
                }
            }
        }
        return mThreadPollProxy;
    }

    // 通过ThreadPoolExecutor的代理类来对线程池的管理
    public static class ThreadPollProxy {
        public ThreadPoolExecutor poolExecutor;// 线程池执行者 ,java内部通过该api实现对线程池管理
        private int corePoolSize;
        private int maximumPoolSize;
        private long keepAliveTime;

        public ThreadPollProxy(int corePoolSize, int maximumPoolSize, long keepAliveTime) {
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.keepAliveTime = keepAliveTime;
            poolExecutor = new ThreadPoolExecutor(
                    // 核心线程数量
                    corePoolSize,
                    // 最大线程数量
                    maximumPoolSize,
                    // 当线程空闲时,保持活跃的时间
                    keepAliveTime,
                    // 时间单元 ,毫秒级
                    TimeUnit.SECONDS,
                    // 线程任务队列
                    new LinkedBlockingQueue<>(20),
                    // 创建线程的工厂
                    Executors.defaultThreadFactory(),
                    // 拒绝策略,直接丢弃
                    new ThreadPoolExecutor.AbortPolicy());
        }

使用默认的 AbortPolicy策略 运行结果如下所示,直接抛出异常信息

pool-1-thread-2
pool-1-thread-1
pool-1-thread-4
pool-1-thread-2
pool-1-thread-5
pool-1-thread-3
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@3b0143d3 rejected from java.util.concurrent.ThreadPoolExecutor@731a74c[Running, pool size = 5, active threads = 5, queued tasks = 14, completed tasks = 13]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
	at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
	at com.sunhui.thread.CompletableFuture.CompletableFutureTest1.lambda$main$1(CompletableFutureTest1.java:50)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
	at com.sunhui.thread.CompletableFuture.CompletableFutureTest1.main(CompletableFutureTest1.java:51)
  1. CallerRunsPolicy 策略

我们发现最后一个任务竟然还是main线程帮我们执行了,因为CallerRunsPolicy策略就是,等线程池处理不了的任务,谁提交的任务,就给谁拿回去,让他自己执行,这里发现是main提交的任务,所以最终还是还给了main线程自己执行

pool-1-thread-5
pool-1-thread-4
main
pool-1-thread-3
pool-1-thread-4
pool-1-thread-4
  1. DiscardPolicy 策略

直接抛弃策略,异常也不会抛,什么都不做

pool-1-thread-5
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
  1. DiscardOldestPolicy 策略
    尝试找第一个线程帮忙执行,但是得要等线程1给执行完才可以执行,不过这个得在线程配置大一点情况下才可以看出,这里我也是试了很久感觉没啥效果
pool-1-thread-4
pool-1-thread-5
pool-1-thread-2
pool-1-thread-3
pool-1-thread-1

小结:当配置的抛弃策略什么都不做或者直接抛弃时,会很难发现问题,所以最好不要使用这种策略,使用默认的策略 AbortPolicy 即可。

线程池中阻塞队列的作用

  • 一般的队列只能保证作为一个有限长度的缓冲区,如果超出了缓冲长度,就无法保留当前任务了,阻塞队列通过阻塞可以保留住当前想要继续入队的任务。
    阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait状态,释放cpu资源。
  • 阻塞队列自带的阻塞和唤醒功能,不需要额外处理,无任务时,线程池利用阻塞队列的take方法挂起,从而维持核心线程的存活,不至于一直占用cpu资源。

为什么当核心线程数已满时,是先添加队列,而不是先创建临时线程

  • 在创建新线程的时候,需要获取全局锁,这个时候其他线程就得被阻塞,影响了线程的整体执行效率。而将任务添加到队列缓冲,很好的避免了临时线程的创建销毁开销。
  • 举例:一个企业里面有10个(core)正式员工的名额,最多招收10个正式工,要是任务超过了正式工的数量(task > core)的情况下,工厂领导(线程池)不是首先扩招工人,还是这10个正式工,允许任务稍微积压一下(放到队列中)。让10个正式工慢慢干,迟早会干完的,要是任务还在继续增加,超过了正式工的加班忍耐极限(队列已满),这时就不得不招收临时工了,要是加上临时工后,还是不能及时完成的任务,这时就会被领导拒绝(执行拒绝策略)

小结

  1. 当有新任务来的时候,先看看当前的线程数有没有超过核心线程数,如果没超过就直接新建一个线程来执行新的任务,如果超过了就看看缓存队列有没有满,没满就将新任务放进缓存队列中,满了就新建一个线程来执行新的任务,如果线程池中的线程数已经达到了指定的最大线程数了,那就根据相应的策略拒绝任务。
  2. 当缓存队列中的任务都执行完了的时候,线程池中的线程数如果大于核心线程数,就销毁多出来的线程,直到线程池中的线程数等于核心线程数。此时这些线程就不会被销毁了,它们一直处于阻塞状态,等待新的任务到来。
上一篇:Map集合的使用:


下一篇:算法基础三:分治算法---快速排序算法