Java并发编程实践 目录
并发编程 04—— 闭锁CountDownLatch 与 栅栏CyclicBarrier
并发编程 06—— CompletionService : Executor 和 BlockingQueue
并发编程 10—— 任务取消 之 关闭 ExecutorService
并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性
并发编程 13—— 线程池的使用 之 配置ThreadPoolExecutor 和 饱和策略
第1 部分 配置ThreadPoolExecutor
1.1 创建
ThreadPoolExecutor为一些Executor提供了基本的实现,比如,newCachedThreadPool,newFixedThreadPool等等。ThreadPoolExecutor允许各种定制
它的构造函数如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}
最常用的newCachedThreadPool。
1.2 管理队列任务
据线程池的大小来选择合适的队列有利于充分利用资源和防止耗尽资源。
基本的排队方法有3种:
- *队列
- 有界队列
- 同步移交
1.3 饱和策略
我们在ThreadPoolExecutor的构造函数中看到了最后一个参数。 RejectedExecutionHandler handler。这个就是饱和策略。
JDK提供了几种不同的实现:
- DiscardOldestPolicy
- AbortPolicy
- CallerRunsPolicy
- discardPolicy
AbortPolicy是默认的饱和策略,就是中止任务,该策略将抛出RejectedExecutionException。调用者可以捕获这个异常然后去编写代码处理异常。
当新提交的任务无法保存到队列中等待执行时,DiscardPolicy会稍稍的抛弃该任务,DiscardOldestPolicy则会抛弃最旧的(下一个将被执行的任务),然后尝试重新提交新的任务。如果工作队列是那个优先级队列时,搭配DiscardOldestPolicy饱和策略会导致优先级最高的那个任务被抛弃,所以两者不要组合使用。
CallerRunsPolicy是“调用者运行”策略,实现了一种调节机制 。它不会抛弃任务,也不会抛出异常。 而是将任务回退到调用者。它不会在线程池中执行任务,而是在一个调用了Executor的线程中执行该任务。
/**
* 调用者运行的饱和策略
* @ClassName: ThreadDeadlock2
* TODO
* @author xingle
* @date 2014-11-20 下午4:18:11
*/
public class ThreadDeadlock2 {
ExecutorService exec = new ThreadPoolExecutor(0, 2, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),new ThreadPoolExecutor.CallerRunsPolicy()); private void putrunnable() {
for (int i = 0; i < 4; i++) {
exec.submit(new Runnable() { @Override
public void run() {
// TODO Auto-generated method stub
while (true) {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}
}
public static void main(String[] args) {
new ThreadDeadlock2().putrunnable();
}
}
执行结果:
当工作队列被填满之后,没有预定义的饱和策略来阻塞execute。通过使用 Semaphore (信号量)来限制任务的到达率,就可以实现这个功能。在下面的BoundedExecutor 中给出了这种方法,该方法使用了一个*队列,并设置信号量的上界设置为线程池的大小加上可排队任务的数量,这是因为信号量需要控制正在执行的和正在等待执行的任务数量。
/**
* 8.4 使用Semaphore来控制任务的提交速率
* @ClassName: BoundedExecutor
* TODO
* @author xingle
* @date 2014-11-20 下午2:46:19
*/
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
int bound; public BoundedExecutor(Executor exec,int bound){
this.exec = exec;
this.semaphore = new Semaphore(bound);
this.bound = bound;
} public void submitTask(final Runnable command) throws InterruptedException{
//通过 acquire() 获取一个许可
semaphore.acquire();
System.out.println("----------当前可用的信号量个数:"+semaphore.availablePermits());
try {
exec.execute(new Runnable() { @Override
public void run() {
try {
System.out.println("线程" + Thread.currentThread().getName() +"进入,当前已有" + (bound-semaphore.availablePermits()) + "个并发");
command.run();
} finally {
//release() 释放一个许可
semaphore.release();
System.out.println("线程" + Thread.currentThread().getName() +
"已离开,当前已有" + (bound-semaphore.availablePermits()) + "个并发");
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
}
}
}
测试程序:
public class BoundedExecutor_main {
public static void main(String[] args) throws InterruptedException{
ExecutorService exec = Executors.newCachedThreadPool();
BoundedExecutor e = new BoundedExecutor(exec, 3); for(int i = 0;i<5;i++){
final int c = i;
e.submitTask(new Runnable() { @Override
public void run() {
System.out.println("执行任务:" +c);
}
});
}
}
}
执行结果:
参考: