本文介绍了一下Java中关于线程调度的线程池的相关内容。本来以<Java线程>这本书为依据的,但是里面的BusyFlag笔者觉得很麻烦。于是就从网上搜索了一些和线程池相关的内容来说明,本文分别包含了线程池的简单模拟实现和Java自带线程池的使用两部分。
线程池中拥有有限数目的线程,但是其中每一个线程都可以依次运行多个对象。为什么要有线程池?当要处理的单个任务处理的时间很短而请求的数目却是巨大的时。为每个请求创建一个新线程的开销很大,为每个请求创建新线程的服务器在创建和销毁线程上花费的时间和消耗的系统资源要比花在处理实际的用户请求的时间和资源更多。
一、线程池的简单模拟实现
实现代码:
package com.wly.javathread.chap7; import java.util.LinkedList; /** * 线程池的一种任务队列式的实现,实现的关键在于对"任务队列"进行锁定 * @author wly * */ public class WorkQueue { private final int nThreads; private final PoolWorker[] threads; private final LinkedList queue; public static void main(String[] args) { WorkQueue wq = new WorkQueue(3); Runnable r1 = new Runnable() { @Override public void run() { int i = 0; while(i < 5) { System.out.println("r1:" + i + "|" + "Tid:" + Thread.currentThread().getId()); i ++; try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Runnable r2 = new Runnable() { @Override public void run() { int i = 0; while(i < 5) { System.out.println("r2:" + i + "|" + "Tid:" + Thread.currentThread().getId()); i ++; try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Runnable r3 = new Runnable() { @Override public void run() { int i = 0; while(i < 5) { System.out.println("r3:" + i + "|" + "Tid:" + Thread.currentThread().getId()); i ++; try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Runnable r4 = new Runnable() { @Override public void run() { int i = 0; while(i < 5) { System.out.println("r4:" + i + "|" + "Tid:" + Thread.currentThread().getId()); i ++; try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Runnable r5 = new Runnable() { @Override public void run() { int i = 0; while(i < 5) { System.out.println("r5:" + i + "|" + "Tid:" + Thread.currentThread().getId()); i ++; try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }; wq.execute(r1); wq.execute(r2); wq.execute(r3); wq.execute(r4); wq.execute(r5); } public WorkQueue(int nThreads) { this.nThreads = nThreads; queue = new LinkedList(); threads = new PoolWorker[nThreads]; for (int i = 0; i < nThreads; i++) { threads[i] = new PoolWorker(); threads[i].start(); } } public void execute(Runnable r) { synchronized (queue) { //对任务队列进行锁定 queue.addLast(r); queue.notify(); //唤醒锁对象 } } private class PoolWorker extends Thread { public void run() { Runnable r; while (true) { synchronized (queue) { //对任务队列进行锁定 while (queue.isEmpty()) { //当任务队列为空时,使其进入"等待"状态 try { queue.wait(); } catch (InterruptedException ignored) { } } r = (Runnable)queue.removeFirst(); //开始队列首位的Runnable,并将其从任务队列中移除 } // If we don‘t catch RuntimeException, // the pool could leak threads try { r.run(); } catch (RuntimeException e) { // You might want to log something here } } } } }
运行结果:
r1:0|Tid:8 r3:0|Tid:9 r2:0|Tid:10 r3:1|Tid:9 r1:1|Tid:8 r2:1|Tid:10 r1:2|Tid:8 r3:2|Tid:9 r2:2|Tid:10 r1:3|Tid:8 r3:3|Tid:9 r2:3|Tid:10 r2:4|Tid:10 r1:4|Tid:8 r3:4|Tid:9 r4:0|Tid:9 r5:0|Tid:8 r5:1|Tid:8 r4:1|Tid:9 r5:2|Tid:8 r4:2|Tid:9 r5:3|Tid:8 r4:3|Tid:9 r4:4|Tid:9 r5:4|Tid:8
从运行结果可以看,基本实现了线程池的任务调度的功能。不过这里线程对象没有实现自动销毁功能,是常驻内存的。大致流程:有几个固定的线程对象从任务(Runnable)队列中拿到Runnable对象,然后运行,运行完成后,会再尝试去拿新的任务,如此循环直到没有任务可做为止。
代码来源:http://blog.csdn.net/preterhuman_peak/article/details/7561635
二、Java自带的线程池的使用
稍微介绍一些Java自带的线程池中常用的一些知识,当然笔者也是一边学习一边写博客的,所以也没有很深入的研究。就权当学习笔记了。
Java中线程池相关的类都在并发包concurrent中,本文讨论的线程池主要涉及的类有:RejectedExecutionHandler、ThreadFactory、ThreadPoolExecutor以及ArrayBlockingQueue。其中的ThreadPoolExecutor就是线程池的主角,负责处理任务以及任务的调度;RejectedExecutionHandler是一个负责处理被线程池拒绝的任务对象(Runnable);ThreadFactory是一个线程工厂,可以使用Executors.defaultThreadFactory()来便捷的得到默认线程工厂;ArrayBlockingQueue是用来存放任务对象的数据容器,与其相似的还有LinkedBlockingQueue、SynchronousQueue、PriorityBlockQueue,本文使用的ArrayBlockingQueue,从名字中的Array就可以看出其是"有界"的。
先上代码,看运行结果,再来具体的讨论一下吧!
STEP 1.新建RejectedExecutionHandler的实现类RejectedExecutionHandlerImpl:
package com.wly.javathread.threadpool; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { //Method that may be invoked by a ThreadPoolExecutor when execute cannot accept a task. //This may occur when no more threads or queue slots are available because their bounds would be exceeded, //or upon shutdown of the Executor. In the absence of other alternatives, the method may throw an unchecked //RejectedExecutionException, which will be propagated to the caller of execute. //当一个ThreadPoolExecutor拒绝接受一个任务(task)时可能会调用该方法。当可执行线程数或者任务队列将要超出边界时会触发本方法或者在关闭 //Executor时可能会触发本方法。在没有意外的情况下,这个方法会抛出一个RejectedExecutionException给这个任务的调用者。 @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + " is rejected"); } }
STEP 2.新建任务类WorkerThread
package com.wly.javathread.threadpool; public class WorkerThread implements Runnable { private String command; public WorkerThread(String s){ this.command=s; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " Start. Command = " + command); processCommand(); System.out.println(Thread.currentThread().getName() + " End."); } private void processCommand() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString(){ return this.command; } }
STEP 3.新建监视器线程类,用于定时打印当前线程池的状态
package com.wly.javathread.threadpool; import java.util.concurrent.ThreadPoolExecutor; /** * 一个用于监视ThreadPoolExecutor当前状态的线程 * @author wly * */ public class MyMonitorThread implements Runnable { private ThreadPoolExecutor executor; private int seconds; private boolean run=true; public MyMonitorThread(ThreadPoolExecutor executor, int delay) { this.executor = executor; this.seconds=delay; } public void shutdown() { this.run=false; } @Override public void run() { while(run) { System.out.println( String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s", this.executor.getPoolSize(), this.executor.getCorePoolSize(), this.executor.getActiveCount(), this.executor.getCompletedTaskCount(), this.executor.getTaskCount(), this.executor.isShutdown(), this.executor.isTerminated())); try { Thread.sleep(seconds*1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
STEP 4.测试线程池类WorkerPool
package com.wly.javathread.threadpool; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class WorkerPool { public static void main(String args[]) throws InterruptedException{ RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl(); ThreadFactory threadFactory = Executors.defaultThreadFactory(); ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS , new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler); MyMonitorThread monitor = new MyMonitorThread(executorPool,3); Thread monitorThread = new Thread(monitor); monitorThread.start(); for(int i=0; i<10; i++){ System.out.println("--add task.cmd" + i + "--"); executorPool.execute(new WorkerThread("cmd"+i)); Thread.sleep(100); } Thread.sleep(30000); //关闭线程池 executorPool.shutdown(); Thread.sleep(5000); //关闭监视线程 monitor.shutdown(); } }
STEP 5. 运行结果:
--add task.cmd0-- pool-1-thread-1 Start. Command = cmd0 [monitor] [1/2] Active: 1, Completed: 0, Task: 1, isShutdown: false, isTerminated: false --add task.cmd1-- pool-1-thread-2 Start. Command = cmd1 --add task.cmd2-- --add task.cmd3-- --add task.cmd4-- pool-1-thread-3 Start. Command = cmd4 --add task.cmd5-- pool-1-thread-4 Start. Command = cmd5 --add task.cmd6-- cmd6 is rejected --add task.cmd7-- cmd7 is rejected --add task.cmd8-- cmd8 is rejected --add task.cmd9-- cmd9 is rejected [monitor] [4/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false pool-1-thread-1 End. pool-1-thread-1 Start. Command = cmd2 pool-1-thread-2 End. pool-1-thread-2 Start. Command = cmd3 pool-1-thread-3 End. pool-1-thread-4 End. [monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false [monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false pool-1-thread-1 End. pool-1-thread-2 End. [monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true
代码已经被笔者稍微改动了一下,因为原来的代码的打印信息不够,使得笔者在corePoolSize和maximumPoolSize时,看了老半天还是不懂为什么maximumPoolSize明明是4,最后却有6个任务被执行,4个任务被拒绝。后来笔者查找了很多文章,又添加了打印信息总算搞清楚了。
注意这里ThreadPoolExecutor的构造函数的第五个参数,就是那个workQueue参数,这里设定了一个长度为2的任务队列。再结合Java API中关于ThreadPoolExecutor的Rejected tasks段落的论述可知。当Executor已经关闭,或者最大线程(maximumPoolSize)和任务队列都饱和时,再添加进来的任务是会被拒绝的。本人更进一步的推论就是任务队列中存放的是等待启动的任务,当一个任务启动后他就会从任务队列中移动到具体的线程中,从而当支持的线程数和任务队列都已饱和时,线程池当然不能再接收更多的任务啦。
关于本文中用到的有界任务队列(ArrayBlockingQueue)以及corePoolSize和maximumPoolSize的数量关系可以稍加展开。
ArrayBlockingQueue是一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。
corePoolSize和maximumPoolSize:1)如果当前运行的线程数小于corePoolSize,则创建新线程来处理新的任务对象,即使有其他空闲线程的存在。2)如果当前运行的线程数大于corePoolSize,小于maximumPoolSize时,优先向任务队列中添加任务,只有当任务队列满时,才创建新线程。3)如果corePoolSize等于maximumPoolSize,则意味着创建了一个固定大小的线程池。
参考文章: http://my.oschina.net/zhdkn/blog/115048
O啦~~~
装载请保留出处:http://blog.csdn.net/u011638883/article/details/18547573
谢谢!!