package com.wly.javathread.chap7; import java.util.LinkedList; /** * 线程池的一种任务队列式的实现,实现的关键在于对"任务队列"进行锁定 * @author wly * */ public class WorkQueue { private final int nThreads; private final PoolWorker[] threads; private final LinkedList queue; public static void main(String[] args) { WorkQueue wq = new WorkQueue(3); Runnable r1 = new Runnable() { @Override public void run() { int i = 0; while(i < 5) { System.out.println("r1:" + i + "|" + "Tid:" + Thread.currentThread().getId()); i ++; try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Runnable r2 = new Runnable() { @Override public void run() { int i = 0; while(i < 5) { System.out.println("r2:" + i + "|" + "Tid:" + Thread.currentThread().getId()); i ++; try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Runnable r3 = new Runnable() { @Override public void run() { int i = 0; while(i < 5) { System.out.println("r3:" + i + "|" + "Tid:" + Thread.currentThread().getId()); i ++; try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Runnable r4 = new Runnable() { @Override public void run() { int i = 0; while(i < 5) { System.out.println("r4:" + i + "|" + "Tid:" + Thread.currentThread().getId()); i ++; try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Runnable r5 = new Runnable() { @Override public void run() { int i = 0; while(i < 5) { System.out.println("r5:" + i + "|" + "Tid:" + Thread.currentThread().getId()); i ++; try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }; wq.execute(r1); wq.execute(r2); wq.execute(r3); wq.execute(r4); wq.execute(r5); } public WorkQueue(int nThreads) { this.nThreads = nThreads; queue = new LinkedList(); threads = new PoolWorker[nThreads]; for (int i = 0; i < nThreads; i++) { threads[i] = new PoolWorker(); threads[i].start(); } } public void execute(Runnable r) { synchronized (queue) { //对任务队列进行锁定 queue.addLast(r); queue.notify(); //唤醒锁对象 } } private class PoolWorker extends Thread { public void run() { Runnable r; while (true) { synchronized (queue) { //对任务队列进行锁定 while (queue.isEmpty()) { //当任务队列为空时,使其进入"等待"状态 try { queue.wait(); } catch (InterruptedException ignored) { } } r = (Runnable)queue.removeFirst(); //开始队列首位的Runnable,并将其从任务队列中移除 } // If we don‘t catch RuntimeException, // the pool could leak threads try { r.run(); } catch (RuntimeException e) { // You might want to log something here } } } } }
r1:0|Tid:8 r3:0|Tid:9 r2:0|Tid:10 r3:1|Tid:9 r1:1|Tid:8 r2:1|Tid:10 r1:2|Tid:8 r3:2|Tid:9 r2:2|Tid:10 r1:3|Tid:8 r3:3|Tid:9 r2:3|Tid:10 r2:4|Tid:10 r1:4|Tid:8 r3:4|Tid:9 r4:0|Tid:9 r5:0|Tid:8 r5:1|Tid:8 r4:1|Tid:9 r5:2|Tid:8 r4:2|Tid:9 r5:3|Tid:8 r4:3|Tid:9 r4:4|Tid:9 r5:4|Tid:8
STEP 1.新建RejectedExecutionHandler的实现类RejectedExecutionHandlerImpl:
package com.wly.javathread.threadpool; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { //Method that may be invoked by a ThreadPoolExecutor when execute cannot accept a task. //This may occur when no more threads or queue slots are available because their bounds would be exceeded, //or upon shutdown of the Executor. In the absence of other alternatives, the method may throw an unchecked //RejectedExecutionException, which will be propagated to the caller of execute. //当一个ThreadPoolExecutor拒绝接受一个任务(task)时可能会调用该方法。当可执行线程数或者任务队列将要超出边界时会触发本方法或者在关闭 //Executor时可能会触发本方法。在没有意外的情况下,这个方法会抛出一个RejectedExecutionException给这个任务的调用者。 @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + " is rejected"); } }
STEP 2.新建任务类WorkerThread
package com.wly.javathread.threadpool; public class WorkerThread implements Runnable { private String command; public WorkerThread(String s){ this.command=s; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " Start. Command = " + command); processCommand(); System.out.println(Thread.currentThread().getName() + " End."); } private void processCommand() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString(){ return this.command; } }
STEP 3.新建监视器线程类,用于定时打印当前线程池的状态
package com.wly.javathread.threadpool; import java.util.concurrent.ThreadPoolExecutor; /** * 一个用于监视ThreadPoolExecutor当前状态的线程 * @author wly * */ public class MyMonitorThread implements Runnable { private ThreadPoolExecutor executor; private int seconds; private boolean run=true; public MyMonitorThread(ThreadPoolExecutor executor, int delay) { this.executor = executor; this.seconds=delay; } public void shutdown() { this.run=false; } @Override public void run() { while(run) { System.out.println( String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s", this.executor.getPoolSize(), this.executor.getCorePoolSize(), this.executor.getActiveCount(), this.executor.getCompletedTaskCount(), this.executor.getTaskCount(), this.executor.isShutdown(), this.executor.isTerminated())); try { Thread.sleep(seconds*1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
STEP 4.测试线程池类WorkerPool
package com.wly.javathread.threadpool; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class WorkerPool { public static void main(String args[]) throws InterruptedException{ RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl(); ThreadFactory threadFactory = Executors.defaultThreadFactory(); ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS , new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler); MyMonitorThread monitor = new MyMonitorThread(executorPool,3); Thread monitorThread = new Thread(monitor); monitorThread.start(); for(int i=0; i<10; i++){ System.out.println("--add task.cmd" + i + "--"); executorPool.execute(new WorkerThread("cmd"+i)); Thread.sleep(100); } Thread.sleep(30000); //关闭线程池 executorPool.shutdown(); Thread.sleep(5000); //关闭监视线程 monitor.shutdown(); } }
STEP 5. 运行结果:
--add task.cmd0-- pool-1-thread-1 Start. Command = cmd0 [monitor] [1/2] Active: 1, Completed: 0, Task: 1, isShutdown: false, isTerminated: false --add task.cmd1-- pool-1-thread-2 Start. Command = cmd1 --add task.cmd2-- --add task.cmd3-- --add task.cmd4-- pool-1-thread-3 Start. Command = cmd4 --add task.cmd5-- pool-1-thread-4 Start. Command = cmd5 --add task.cmd6-- cmd6 is rejected --add task.cmd7-- cmd7 is rejected --add task.cmd8-- cmd8 is rejected --add task.cmd9-- cmd9 is rejected [monitor] [4/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false pool-1-thread-1 End. pool-1-thread-1 Start. Command = cmd2 pool-1-thread-2 End. pool-1-thread-2 Start. Command = cmd3 pool-1-thread-3 End. pool-1-thread-4 End. [monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false [monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false pool-1-thread-1 End. pool-1-thread-2 End. [monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true
注意这里ThreadPoolExecutor的构造函数的第五个参数,就是那个workQueue参数,这里设定了一个长度为2的任务队列。再结合Java API中关于ThreadPoolExecutor的Rejected tasks段落的论述可知。当Executor已经关闭,或者最大线程(maximumPoolSize)和任务队列都饱和时,再添加进来的任务是会被拒绝的。本人更进一步的推论就是任务队列中存放的是等待启动的任务,当一个任务启动后他就会从任务队列中移动到具体的线程中,从而当支持的线程数和任务队列都已饱和时,线程池当然不能再接收更多的任务啦。
ArrayBlockingQueue是一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。
参考文章: http://my.oschina.net/zhdkn/blog/115048