java线程池总结

Java中的线程池

一、线程池的底层原理
1.1 先来了解一下原理

先放一张图片上来,看看线程池工作的核心步骤

java线程池总结

1:)当任务提交的时候,先判断核心线程池是否已经满了,如果没有满,则直接创建线程执行任务

2:)如果核心线程池满了,则会把多余的任务放在我们定义的阻塞队列中,判断阻塞队列是否已经满了

3:)如果阻塞队列也已经满了,此时会启用线程池中所有的线程(即最大线程都会被用完),判断是否都满了

4:)如果最大线程池也已经满了,那么下面多余的任务就会启动我们定义的四大拒绝策略中的一个。

1.2 底层的源码分析

Java底层线程池的源码如下:

public ThreadPoolExecutor(int corePoolSize, // 核心池线程数大小 (常用)
                              int maximumPoolSize,  // 最大的线程数大小 (常用)
                              long keepAliveTime, // 超时等待时间 (常用)
                              TimeUnit unit, // 时间单位 (常用)
                              BlockingQueue<Runnable> workQueue, // 阻塞队列(常用)
                              ThreadFactory threadFactory, // 线程工厂
                              RejectedExecutionHandler handler // 拒绝策略(常用)) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
        null :
    AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
1.3 七大参数的分析

其实在线程池中主要的就是,三大方法,七大核心参数,四大拒绝策略,以及最大线程池的设置方法。**有一个ThreadPoolExecutor的构造函数,主要有七个核心的参数:

corePoolSize ------- 核心池线程数大小 (常用)
maximumPoolSize ---- 最大的线程数大小 (常用)
keepAliveTime ------ 超时等待时间 (常用)
unit --------------- 时间单位 (常用)
workQueue  --------- 阻塞队列(常用)
threadFactory ------ 线程工厂
handler ------------ 拒绝策略(常用))

七大核心参数中,首先来看看最大线程池的问题。我们怎么设置最大线程池的数量呢?

答:这个就要根据我们的实际业务来了,主要有如下三种情况吧

第一种:如果是CPU密集型的话,我们可以把最大线程池大小设置为N+1(N为处理器的数量),

在java中可以通过这么一行代码来获取

System.out.println(Runtime.getRuntime().availableProcessors());

第二种:如果是I/O密集型的话,可能占用CPU的时间不是很多,所以我们可以多设置一下线程,让CPU忙起来,充分提高效率。可以设置为2N+1

第三种:;两种混合在一起,即是CPU密集型,也是I/O密集型。这样的话,可以分别处理,将任务分为CPU

密集型和I/O密集型,然后用不同的线程池去处理。

1.4 阻塞队列的分析

至于这个阻塞队列(会把他单独作为一个模块来说后面....)推荐看一下

1.5 内置的四大拒绝策略

java线程池总结

1)AbortPolicy: 这是线程池默认的拒绝策略,在任务不能再提交的时候,抛出异常,及时反馈程序运行状态。如果是比较关键的业务,推荐使用此拒绝策略,这样子在系统不能承载更大的并发量的时候,能够及时的通过异常发现。

2)DisCardPolicy: 丢弃任务,但是不抛出异常。如果线程队列已满,则后续提交的任务都会被丢弃,且是静默丢弃。使用此策略,可能会使我们无法发现系统的异常状态。建议是一些无关紧要的业务采用此策略。例如,本人的博客网站统计阅读量就是采用的这种拒绝策略。

3)DisCardOldestPolicy: 丢弃队列最前面的任务,然后重新提交被拒绝的任务。 此拒绝策略,是一种喜新厌旧的拒绝策略。是否要采用此种拒绝策略,还得根据实际业务是否允许丢弃老任务来认真衡量。

4)CallerRunPolicy: 由调用线程处理该任务 。 如果任务被拒绝了,则由调用线程(提交任务的线程)直接执行此任务 。

二:创建线程池的方法

Java中创建线程池的方式一般有两种:

1:通过Executors工厂创建
2:通过new ThreadPoolExecutor(.......)自定义创建

讲解之前,我们应该先来看看 ExecutorService 这个接口。那就先来说说我们自定义创建线程池方法 ExecutorService是Java中对线程池定义的一个接口,它java.util.concurrent包中,在这个接口中定义了和后台任务执行相关的方法:

java线程池总结

2.1 自定义创建线程池方法(重点)
package com.test;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 自定义线程池
* @author huxd
* @createTime 2020-04-04 21:22:10
**/
public class ThreadPoolDemo {
   public static void main(String[] args) {
       /** 阻塞队列,一般指定默认的值,如果不指定的话,默认会被看成是一个*队列,这个可能会造成OOM */
      LinkedBlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>(10);
       System.out.println(Runtime.getRuntime().availableProcessors());

      ThreadFactory threadFactory = new ThreadFactory() {
          /** int i = 1,用于并发安全的包装类 */
          AtomicInteger atomicInteger = new AtomicInteger(1);
          @Override
          public Thread newThread(Runnable r) {
              /** 创建线程,把任务传进来 */
              Thread thread = new Thread(r);
              /** 给线程起个名字 */
              thread.setName("MyThread"+atomicInteger.getAndIncrement());
              return thread;
          }
      };

      ExecutorService threadPool = new ThreadPoolExecutor(
              5,
              9,
              3L,
              TimeUnit.SECONDS,
              blockingQueue,
              threadFactory,
              new ThreadPoolExecutor.AbortPolicy());
      try {
          for (int i = 0; i < 10; i++) {
              threadPool.execute(() -> {
                  try {
                      method();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              });
          }
      } catch (Exception e) {
          e.printStackTrace();
      } finally {
          threadPool.shutdown();
      }

   }

   private static void method() throws InterruptedException {
       System.out.println("ThreadName:"+Thread.currentThread().getName()+"进来了");
       TimeUnit.SECONDS.sleep(5);
       System.out.println("ThreadName:"+Thread.currentThread().getName()+"出去了");
   }
}

通过new ThreadPoolExecutor并设置好七大参数,并提交任务,最后用完一定要关闭线程池。

execute(Runnable)

这个方法接收一个Runnable实例,并且异步的执行,请看下面的实例:

ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService.execute(new Runnable() {
public void run() {
    System.out.println("Asynchronous task");
}
});

executorService.shutdown();

这个方法有个缺陷,就是没有返回值的结果,不知道任务是不是真的已经完成了。

submit(Runnable)

submit(Runnable)execute(Runnable)区别是前者可以返回一个Future对象,通过返回的Future对象,我们可以检查提交的任务是否执行完毕,请看下面执行的例子:

Future future = executorService.submit(new Runnable() {
public void run() {
    System.out.println("Asynchronous task");
}
});

future.get();  //returns null if the task has finished correctly.

如果任务执行完成,future.get()方法会返回一个null。注意,future.get()方法会产生阻塞。

submit(Callable)

submit(Callable)和submit(Runnable)类似,也会返回一个Future对象,但是除此之外,submit(Callable)接收的是一个Callable的实现,Callable接口中的call()方法有一个返回值,可以返回任务的执行结果,而Runnable接口中的run()方法是void的,没有返回值。请看下面实例:

Future future = executorService.submit(new Callable(){
public Object call() throws Exception {
    System.out.println("Asynchronous Callable");
    return "Callable Result";
}
});

System.out.println("future.get() = " + future.get());

如果任务执行完成,future.get()方法会返回Callable任务的执行结果。注意,future.get()方法会产生阻塞。

在深挖一下execute和submit源码发现其中的区别。

先来说说execute:(这个有点负责,先按下不表)

其次来聊聊submit,看看源码底层

java线程池总结

源码底层提供了submit三种重载方式,他们的返回值都是Future

<T> Future<T> submit(Callable<T> task); // 其底层实现如下

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }


java线程池总结

我们可以看到穿入了一个Callable的对象,我们都知道,Callable是创建线程的三大方式之一,而且它的一个call方法可以有返回值,这是其余两种创建线程的方式锁不具备的。

具体的判断流程是:第一步是,如果闯入的线程对象为空,直接抛出空异常;其次会调用newTaskFor方法,返回一个RunnableFuture对象,至于这个RunnableFuture,在我的另一篇博客,Future详解中有介绍,这里先按下不表,不过我们要到newTaskFor里面看看,他到底做了什么操作。

java线程池总结

原来它是返回了一个FutureTask的构造函数,有些人可能就会很迷惑,为什么返回FutureTask,它与RunnableFuture有什么关系吗?有关系,而且有很紧密的关系,当然这个在我的关系在我的另一篇博客,Future中有详细的解释,可以移步去看看。

java线程池总结

到这里就是FutureTask的一个构造函数了,返回的是RunnableFuture的子类FutureTask,并且他们的最终父类是Future。他们三者关系时:Future<------(extends)-----RunnableFuture<-------(implements)--------FutureTask。

那么这样的话,我们就知道调用submit,最终返回的是Future的实现类FutureTask的构造函数,并且在FutureTask中传入了Callable作为参数,那么我们就可以拿得到其call方法返回值的内容了,通过Future类中的get()方法可以得到线程的计算的值了。这个才是我们最终需要的。

示例:

package com.future;

import java.util.concurrent.*;

/**
 * future的一个例子
 * @author Huxudong
 * @createTime 2020-04-06 17:15:41
 **/
public class FutureDemo {
    private static ExecutorService pool = new ThreadPoolExecutor(
            5,
            9,
            3L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /** 使用Future来实现 */
        FutureTest();

    }

    private static void FutureTest() {
        try{
            Future<String> future = getFutureKid();
            String s = future.get();
            System.out.println("拿到的值是-----"+s+"-----哈哈哈");
            System.out.println("你去B系统取ID,我先去做自己的其他的事情了");
            System.out.println("我的其他的事情都已经做好了呀,就差你了,快点啊。拿到了吗?"+future.isDone());
            long counter = 0;
            String temp = "";
            /** 判断是否已经完成 */
            while(!future.isDone()) {
                counter++;
            }

            temp = future.get();
            System.out.println("我的天,你终于拿到了呀,ID="+temp);
            System.out.println("你知道我等了多久吗?"+counter);
        } catch(Exception e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }
    }

    public static Future<String> getFutureKid() {
            return pool.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    /** 用睡眠等待模拟具体实际的业务,获取一个什么东西 */
                    TimeUnit.SECONDS.sleep(5);
                    return "String";
                }
            });


    }

}

java线程池总结

我们再来看看传入的是Runnable的对象,底层源码发生了什么?

java线程池总结

好像没啥太大区别,同样是需要返回一个RunnableFuture的对象,只不过newTaskFor传入了两个参数,调用了FutureTask的另外一个构造函数,看看里面做了什么东西

java线程池总结

会发现返回的callbble对象时调用Eexcutors里面的方法,继续往里面看

java线程池总结

发现在这里new RunnableAdapter,这个就是设计模式中的是适配器模式,在这里的作用就是将我们传入的Runnble对象转换为Callable对象

java线程池总结

到此结束,原来根本就是,把我们传入的Runnable对象转换成了Callable对象。

java线程池总结

同理:如果此时使用这个构造函数传入两个参数的话,会返回第二个参数的值。

示例:

package com.future;

import java.util.concurrent.*;

/**
 * future的一个例子
 * @author Huxudong
 * @createTime 2020-04-06 17:15:41
 **/
public class FutureDemo {
   private static ExecutorService pool = new ThreadPoolExecutor(
            5,
            9,
            3L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /** 使用Future来实现 */
         FutureTest();

    }


    private static void FutureTest() {
        try{
            Future<String> future = getFutureKid();
            String s = future.get();
            System.out.println("拿到的返回值是----"+s+"-----哈哈哈");
            System.out.println("你去B系统取ID,我先去做自己的其他的事情了");
            System.out.println("我的其他的事情都已经做好了呀,就差你了,快点啊。拿到了吗?"+future.isDone());
            long counter = 0;
            String temp = "";
            /** 判断是否已经完成 */
            while(!future.isDone()) {
                counter++;
            }

            temp = future.get();
            System.out.println("我的天,你终于拿到了呀,ID="+temp);
            System.out.println("你知道我等了多久吗?"+counter);
        } catch(Exception e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }
    }

    public static Future<String> getFutureKid() {

        return pool.submit(()->{
            System.out.println("这里就是好啊");
        },"hello");

    }
    
}

java线程池总结

2.2 Executors创建线程池(未完待续......)
参考文章:
https://blog.csdn.net/suifeng3051/article/details/49443835
https://blog.csdn.net/wei_lei/article/details/74262818
https://www.cnblogs.com/shamo89/p/8847028.html
上一篇:【渝粤题库】陕西师范大学292021 初级宏观经济学 作业(高起专)


下一篇:Poor Economics(贫穷的本质) - social experiments to fight poverty