引入线程池原因
我们在工作中一般不会直接创建线程,原因:虽然在 Java 语言中创建线程看上去就像创建一个对象一样简单,只需要 new Thread() 就可以了,但实际上创建线程远不是创建一个对象那么简单。创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本就很高了,所以线程是一个重量级的对象,应该避免频繁创建和销毁。
于是就有了线程池,利用线程池把资源池化,使得线程资源能服用,可以避免频繁地创建和销毁。
线程池的设计采用了生产者-消费者模式。线程池的使用方是生产者,线程池本身是消费者,阻塞队列来存储要处理的任务。简单画了个图:
Java中的线程池
从jdk1.5版本开始,在java.uitl.concurrent包下面定义定义了一些列与并发相关的类,其中线程池最核心的一个类是ThreadPoolExecutor。
查看ThreadPoolExecutor的源码,看下基本的继承关系:
public class ThreadPoolExecutor extends AbstractExecutorService {
…
}
public abstract class AbstractExecutorService implements ExecutorService {
…
}
public interface ExecutorService extends Executor {
…
}
public interface Executor {
void execute(Runnable command);
}
我们可以看出,Executor接口中定义了execute方法,execute是用来执行我们提交的任务的。
但是类ThreadPoolExecutor源码注释中,是推荐我们使用类Executors的工程方法来创建线程池的:
* <p>To be useful across a wide range of contexts, this class
* provides many adjustable parameters and extensibility
* hooks. However, programmers are urged to use the more convenient
* {@link Executors} factory methods {@link
* Executors#newCachedThreadPool} (unbounded thread pool, with
* automatic thread reclamation), {@link Executors#newFixedThreadPool}
* (fixed size thread pool) and {@link
* Executors#newSingleThreadExecutor} (single background thread), that
* preconfigure settings for the most common usage
* scenarios.
看下源码,Executors提供四种线程池,分别为:
newCachedThreadPool:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newFixedThreadPool :创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newScheduledThreadPool :创建一个定长线程池,支持定时及周期性任务执行。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
newSingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
既然JDK提供了这么好的工具类,我们是不是就肯定选择它呢?并不是,在阿里开发手册中有这样一条:
看来,最终都有可能导致OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列,并设置最大线程数。
自定义线程池ThreadPoolExecutor
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler)
corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略
一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。
当一个任务通过execute(Runnable)方法欲添加到线程池时:
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
也就是:处理任务的优先级为:
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue
handler有四个选择:
ThreadPoolExecutor.AbortPolicy() 抛出java.util.concurrent.RejectedExecutionException异常
ThreadPoolExecutor.CallerRunsPolicy() 重试添加当前的任务,他会自动重复调用execute()方法
ThreadPoolExecutor.DiscardOldestPolicy() 抛弃旧的任务
ThreadPoolExecutor.DiscardPolicy() 抛弃当前的任务
ThreadPoolExecutor 使用例子
public class TestThreadPoolExecutor {
public static int corePoolSize = 2;
public static int maxMumPoolSize = 4;
public static int keepAliveTime = 60;
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maxMumPoolSize,keepAliveTime, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3),new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 10;i++ ){
threadPoolExecutor.execute(new ThreadpoolPojo(""+ i));
}
}
}
class ThreadpoolPojo implements Runnable{
public String threadName;
public ThreadpoolPojo(String threadName) {
this.threadName = threadName;
}
public void run() {
System.out.println(threadName +"开始于 :" +new Date() + Thread.currentThread().getName());
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadName +"结束于 :" +new Date());
}
}