JAVA多线程线程池,还蒙圈的过来瞅瞅吧!!

关于高并发与多线程中的线程池

定义

线程是稀缺资源,它的创建与销毁是一个相对偏重且耗资源的操作,而Java线程依赖于内核线程,创建线程需要进行操作系统状态切换,为避免资源过度消耗需要设法重用线程执行多个任务。线程池就是一个线程缓存,负责对线程进行统一分配、调优与监控。

什么时候使用线程池:

  • 单个任务处理时间比较短
  • 需要处理的任务数比较大

线程池的优势:

  • 重用存在的线程,减少线程创建,消亡的开销,提高性能
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性,可统一分配,调优和监控。

线程池的创建方式

JavaJUC包下的Executors 提供了创建线程池的四种方式及底层参数(四种线程的创建方式其底层都是使用ThreadPoolExecutor这个类,根据参数的不同来定义的,下面会给出相关参数的说明)

newCachedThreadPool

定义及作用

创建一个可根据需要创建新线程的线程池,如果线程池中有可用线程(可用指线程存在且空闲),如果没有则创建一个新的线程去执行,通常用于执行时间比较短的异步任务

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,//此核心线程数corePoolSize为0,maximumPoolSize接近无限大
                                      60L, TimeUnit.SECONDS,//keepAliveTime存活时间为 60s,时间到自动回收
                                      new SynchronousQueue<Runnable>());//使用的是同步阻塞队列
    }

用法实例

//定义一个线程类
class RunnableThread implements Runnable{
    private int i=0;
    public RunnableThread(int i) {
        this.i = i;
    }
     public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+"执行了第"+i+"个任务!");
        }
    }
//创建10个任务
   ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            executorService.execute(new RunnableThread(i));

        }

可以看到一秒后一下执行了10个任务

newScheduledThreadPool

定义及作用

创建一个支持周期执行,可做定时任务的线程池,主要用于周期性的执行任务的场景

  public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,//maximumPoolSize接近无限大,keepAliveTime存活时间为 0纳秒,线程不会被回收
              new DelayedWorkQueue());//采用延迟队列,提交的任务按照执行时间排序到队列中
    }

用法实例

  //schedule 是ScheduledExecutorService特有的方法
ScheduledExecutorService  executorService = Executors.newScheduledThreadPool(6);
            executorService.schedule(new RunnableThread(1),6L,TimeUnit.SECONDS);
            executorService.schedule(new RunnableThread(2),5L,TimeUnit.SECONDS);
            executorService.schedule(new RunnableThread(3),4L,TimeUnit.SECONDS);
            executorService.schedule(new RunnableThread(4),3L,TimeUnit.SECONDS);
            executorService.schedule(new RunnableThread(5),2L,TimeUnit.SECONDS);

JAVA多线程线程池,还蒙圈的过来瞅瞅吧!!
可以看到按预先设置的延时去执行任务,需要做定时任务的可以使用此线程去实现

newFixedThreadPool

定义及作用

根据需要创建一个可固定线程数的线程池,任务较多时入队等待,适用于任务数固定,平稳的场景下,就是确定并发压力的情况下,去创建指定的线程数

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,//核心线程数等于最大线程数
                                      0L, TimeUnit.MILLISECONDS,//纳秒
                                      new LinkedBlockingQueue<Runnable>()); //阻塞队列,任务量来的时候无可用线程时,入队等待
    }

用法实例

   ExecutorService  executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 6; i++) {
            executorService.execute(new RunnableThread(i));
        }

可以看到第一秒三个线程执行一次,第2秒这三个线程又重复执行了,没有创建新的线程

newSingleThreadExector

定义及作用

创建一个只有单个线程的线程池,使用唯一工作线程去执行任务,保证任务的有序执行,适用于要求任务有序进行的情况下,和newFixedThreadPool定义一样,只是线程数只有一个

 public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

用法实例


   ExecutorService  executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            executorService.execute(new RunnableThread(i));
        }

可以看到任务是一秒钟执行一个,且是有序进行

以上是Executors所提供的的四种线程池的创建,从上清楚的看到其底层是使用ThreadPoolExecutor根据不同的参数来确定的。下面主要是以这个类来对线程池底层原理做分析。

ThreadPoolExecutor

//构造方法
public ThreadPoolExecutor(int corePoolSize,  //核心线程数
                              int maximumPoolSize,  //最大线程数
                              long keepAliveTime,  //存活时间
                              TimeUnit unit,  //时间单位
                              BlockingQueue<Runnable> workQueue,  //使用队列类型,任务可以储存在任务队列中等待被执行,执行的是FIFIO原则(先进先出
                              ThreadFactory threadFactory,  //就是创建线程的线程工,可以通过自己自定义的方式去创建一个线程
                              RejectedExecutionHandler handler)//是一种拒绝策略,我们可以在任务满了后,根据采用的策略拒绝执行某些任务,java提供了四种执行策略:
   			    // AbortPolicy:中断抛出异常
    			//DiscardPolicy:默默丢弃任务,不进行任何通知
    			//DiscardOldestPolicy:丢弃掉在队列中存在时间最久的任务
    			//CallerRunsPolicy:让提交任务的线程去执行任务(对比前三种比较友好一丢丢)

线程的执行原理(结合源码来看的话会更明白)

JAVA多线程线程池,还蒙圈的过来瞅瞅吧!!
在实际项目开发中也是推荐使用手动创建线程池的方式,而不用默认方式,关于这点在《阿里巴巴开发规范》中是这样描述的:

JAVA多线程线程池,还蒙圈的过来瞅瞅吧!!
创建一个自定义线程池,根据并发量和具体的任务需求去创建线程池

 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10));
        for (int i = 0; i < 100; i++) {
            threadPoolExecutor.execute(new RunnableThread(i)
            );
        }

execute源码:关于源码这部分说明,后面会做补充

   int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { //如果小于核心线程数的数往下执行
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { //
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command); //采用拒绝策略

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

关于线程池的基本使用和原理就先到这了,欢迎各位指出不足,共同学习!!

上一篇:C#变量与常量


下一篇:Java 异步任务执行服务 基本概念和原理