线程池

Executors工厂方法创建

工厂方法有四种方式:newSingleThreadExecutor(单线程池)、newFixedThreadPool(固定线程池)、newCachedThreadPool(根据需要自动创建线程池)、newSingleThreadScheduledExecutor(单任务线程池)、newScheduledThreadPool(多任务线程池)。
前三种底层使用ThreadPoolExecutor,后两种底层使用ScheduledThreadPoolExecutor。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * Executors工厂方法创建
 **/
public class Executor {

    /**
     * 计数器
     */
    static int count = 0;

    public static void main(String[] args) {

        /**
         * 创建使用单个线程的线程池
         * 只有一个线程执行任务
         **/
        ExecutorService es1 = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            es1.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "正在执行任务"+(count++));
                }
            });
        }

        /**
         * 创建使用固定线程数的线程池
         * 线程数固定
         **/
        ExecutorService es2 = Executors.newFixedThreadPool(3);  //这里指定固定线程大小
        for (int i = 0; i < 10; i++) {
            es2.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "正在执行任务"+(count++));
                }
            });
        }

        /**
         * 创建一个会根据需要创建新线程的线程池
         * 如果没有空闲线程就新建,有空闲则用空闲线程
         * 适合任务多而短的,新建的线程空闲60s后回收
         * */
        ExecutorService es3 = Executors.newCachedThreadPool();
        for (int i = 0; i < 20; i++) {
            es3.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "正在执行任务"+(count++));
                }
            });
        }

        /**
         * 创建只有一个定时线程任务的线程池
         **/
        ScheduledExecutorService es5 = Executors.newSingleThreadScheduledExecutor();
        System.out.println("时间:" + System.currentTimeMillis());
        for (int i = 0; i < 5; i++) {
            /**
             * command - the task to execute
             * delay - the time from now to delay execution
             * unit - the time unit of the delay parameter
             **/
            es5.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println("时间:"+System.currentTimeMillis()+"--"+Thread.currentThread().getName() + "正在执行任务"+(count++));
                }
            },3, TimeUnit.SECONDS);
        }

        /**
         * 创建拥有固定线程数量的定时线程任务的线程池
         **/
        ScheduledExecutorService es4 = Executors.newScheduledThreadPool(4);    //固定线程数
        System.out.println("时间:" + System.currentTimeMillis());
        for (int i = 0; i < 5; i++) {
            /**
             * command - the task to execute
             * delay - the time from now to delay execution
             * unit - the time unit of the delay parameter
             **/
            es4.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println("时间:"+System.currentTimeMillis()+"--"+Thread.currentThread().getName() + "正在执行任务"+(count++));
                }
            },3, TimeUnit.SECONDS);
        }
        //关掉线程池,线程执行完毕后会关掉线程池
        es4.shutdown();
    }
}

ThreadPoolExecutor(推荐)

自定义创建线程池,即Executors的底层使用的线程池,推荐使用这种。

在开始之前先了解两样东西分别是阻塞队列和拒绝策略。

阻塞队列

BlockingQueue阻塞队列,阻塞的情况主要有如下2种:

  1. 当队列满了,进行入队操作阻塞
  2. 当队列空了,进行出队操作阻塞

阻塞队列主要用在生产者/消费者模式中,如图:

线程池

阻塞队列常用方法

阻塞线程有如下方法:

线程池

线程池

抛出异常:如果操作不能马上进行,则抛出异常。

特殊值:如果操作不能马上进行,将会返回一个特殊的值,一般是true/false。

阻塞:如果操作不能马上进行,操作会阻塞。

超时:如果操作不能马上进行,操作会阻塞指定时间,如果指定时间没执行,则返回一个特殊值,一般为true/false。

不能向BlockingQueue中插入null.否则会报NullPointerException异常。

常见阻塞队列

ArrayBlockingQueue  //一个由数组结构组成的有界阻塞队列,初始化时给出队列大小构造参数,以先进先出的方式存储数据,最新插入的对象是尾部,最新移除的对象是头部.
LinkedBlockingQueue  //一个由链表结构组成的阻塞队列,初始化时可以指定队列大小构造参数,不给大小时采用默认值Integer.MAX_VALUE的容量
PriorityBlockingQueue  //一个支持优先级排序的*阻塞队列,允许插入null对象,插入PriorityBlockingQueue队列的对象必须实现Comparable接口
DelayQueue  //一个使用优先级队列实现的*阻塞队列
SynchronousQueue  //一个不存储元素的阻塞队列,内部仅仅容纳一个元素,当一个线程插入一个元素之后就被阻塞(放入元素的线程立刻被阻塞),除非这个元素被另一个线程消费。
LinkedTransferQueue  //一个由链表结构组成的*阻塞队列(实现了继承于 BlockingQueue的 TransferQueue)
LinkedBlockingDeque  //一个由链表结构组成的双向阻塞队列

拒绝策略

当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略。

ThreadPoolExecutor.AbortPolicy  //丢弃任务,并抛出RejectedExecutionException异常。

ThreadPoolExecutor.CallerRunsPolicy  //该任务被线程池拒绝,由调用execute方法的线程执行该任务。

ThreadPoolExecutor.DiscardOldestPolicy  //抛弃队列最前面的任务,然后重新尝试执行任务。

ThreadPoolExecutor.DiscardPolicy  //丢弃任务,不过也不抛出异常。

线程池执行任务流程,提交 -》核心线程池 -》 等待队列 -》 最大线程池 -》 拒绝策略。

线程池

ThreadPoolExecutor

   /** @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     */
new ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

线程池

参数说明

corePoolSize: 核心池的大小;预创建线程,在没有任务到来之前就创建corePoolSize个线程,默认为0。

maximumPoolSize: 线程池最大线程数;程池中最多能创建多少个线程。

keepAliveTime:空闲线程最长保持时间;只有线程池中线程大于corePoolSize时有效,

                       如果设置了allowCoreThreadTimeOut(true)方法,则keepAliveTime对核心线程也起作用。

unit: 参数keepAliveTime的时间单位;

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒

workQueue:工作队列,用来存储等待执行的任务。生产者消费者模式用前两种。

ArrayBlockingQueue    //基于数组的有界阻塞队列,按FIFO排序,新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。
LinkedBlockingQueue    //基于链表的阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似*性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。
SynchronousQueue    //一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。
PriorityBlockingQueue    //具有优先级的*阻塞队列,优先级通过参数Comparator实现。

threadFactory:用于设置创建线程的工厂;通过线程工厂给创建出来的线程设置线程名、daemon、优先级等等。

handler:线程池拒绝策略;当线程池线程全部在运行,且缓存也存满时后来的任务。

AbortPolicy    //丢弃任务,直接抛出RejectedExecutionException异常;默认为这种.
CallerRunsPolicy    //用调用者所在线程来运行任务;如果线程池已经shutdown,则直接抛弃任务.
DiscardOldestPolicy    //丢弃队列里最早的一个任务,然后把这次拒绝的任务放入队列.
DiscardPolicy    //不处理,丢弃任务.

创建

第一种方式创建,使用第一个构造函数。

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * ThreadPoolExecutor线程池
 **/
public class ThreadPoolDemo {
    //计数
    static int count = 0;
    public static void main(String[] args) {
        
        //创建ThreadPoolExecutor,线程池核心线程大小,线程池最大线程数量,空闲线程存活时间,空闲线程存活时间单位,工作队列
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,5,550, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

        for (int i=0;i<10;i++){
            /*线程池执行Runnable实现类*/
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "正在执行任务"+(count++));
                }
            });
        }
        //关掉线程池
        threadPoolExecutor.shutdown();
    }
}

参考:

https://www.cnblogs.com/lujiango/p/7581031.html

https://www.cnblogs.com/jxxblogs/p/11655670.html

https://blog.csdn.net/ye17186/article/details/89467919

上一篇:多线程


下一篇:Java线程池实现原理及其在美团业务中的实践