Java线程池

引入线程池原因

我们在工作中一般不会直接创建线程,原因:虽然在 Java 语言中创建线程看上去就像创建一个对象一样简单,只需要 new Thread() 就可以了,但实际上创建线程远不是创建一个对象那么简单。创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本就很高了,所以线程是一个重量级的对象,应该避免频繁创建和销毁。

于是就有了线程池,利用线程池把资源池化,使得线程资源能服用,可以避免频繁地创建和销毁。

线程池的设计采用了生产者-消费者模式。线程池的使用方是生产者,线程池本身是消费者,阻塞队列来存储要处理的任务。简单画了个图:
Java线程池

Java中的线程池

从jdk1.5版本开始,在java.uitl.concurrent包下面定义定义了一些列与并发相关的类,其中线程池最核心的一个类是ThreadPoolExecutor。

查看ThreadPoolExecutor的源码,看下基本的继承关系:

public class ThreadPoolExecutor extends AbstractExecutorService {
	…
}

public abstract class AbstractExecutorService implements ExecutorService {
	…
}
public interface ExecutorService extends Executor {
	…
}
public interface Executor {
    void execute(Runnable command);
}

我们可以看出,Executor接口中定义了execute方法,execute是用来执行我们提交的任务的。

但是类ThreadPoolExecutor源码注释中,是推荐我们使用类Executors的工程方法来创建线程池的:

* <p>To be useful across a wide range of contexts, this class
* provides many adjustable parameters and extensibility
* hooks. However, programmers are urged to use the more convenient
* {@link Executors} factory methods {@link
* Executors#newCachedThreadPool} (unbounded thread pool, with
* automatic thread reclamation), {@link Executors#newFixedThreadPool}
* (fixed size thread pool) and {@link
* Executors#newSingleThreadExecutor} (single background thread), that
* preconfigure settings for the most common usage
* scenarios.

看下源码,Executors提供四种线程池,分别为:

newCachedThreadPool:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

newFixedThreadPool :创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

newScheduledThreadPool :创建一个定长线程池,支持定时及周期性任务执行。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

newSingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

既然JDK提供了这么好的工具类,我们是不是就肯定选择它呢?并不是,在阿里开发手册中有这样一条:
Java线程池

看来,最终都有可能导致OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列,并设置最大线程数。

自定义线程池ThreadPoolExecutor

线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit, 
BlockingQueue workQueue, 
RejectedExecutionHandler handler)

corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略
一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。
当一个任务通过execute(Runnable)方法欲添加到线程池时:
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
也就是:处理任务的优先级为:
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue
handler有四个选择:
ThreadPoolExecutor.AbortPolicy() 抛出java.util.concurrent.RejectedExecutionException异常
ThreadPoolExecutor.CallerRunsPolicy() 重试添加当前的任务,他会自动重复调用execute()方法
ThreadPoolExecutor.DiscardOldestPolicy() 抛弃旧的任务
ThreadPoolExecutor.DiscardPolicy() 抛弃当前的任务

ThreadPoolExecutor 使用例子


public class TestThreadPoolExecutor {

    public static int corePoolSize = 2;
    public static int maxMumPoolSize = 4;
    public static int keepAliveTime = 60;
    public static void main(String[] args) {

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maxMumPoolSize,keepAliveTime, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3),new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 10;i++ ){
            threadPoolExecutor.execute(new ThreadpoolPojo(""+ i));

        }

    }
    
}

class ThreadpoolPojo implements Runnable{


    public String threadName;

    public ThreadpoolPojo(String threadName) {
        this.threadName = threadName;
    }
    
    public void run() {
        System.out.println(threadName +"开始于      :" +new Date() + Thread.currentThread().getName());
        try {
            Thread.sleep(1000 * 5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(threadName +"结束于      :" +new Date());
    }
}
上一篇:ThreadPoolExecutor源码阅读


下一篇:java基础|自定义java线程池