java并发基础(三)--- 任务执行

第6章开始是第二部分,讲解结构化并发应用程序,大多数并发应用程序都是围绕“任务执行”构造的,任务通常是一些抽象的且离散的工作单元。

一、线程池

大多数服务器应用程序都提供了一种自然的任务边界:以独立的客户请求为边界。现在我们要实现自己的web服务器,你一定见过这样的代码:

class SingleThreadWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
Socket connection = socket.accept();
//处理请求
handleRequest(connection);
}
}
}

这种串行的执行任务的方法当然不可行,它使程序失去可伸缩性。我们将它改进:

class ThreadPerTaskWebServer{
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
@Override
public void run() {
//处理请求
handleRequest(connection);
}
}; new Thread(task).start();
}
}
}

  我们为每个任务分配一个线程,但它仍存在很多问题,如,线程生命周期开销非常高、消耗过多的资源,尤其是内存、可创建线程的数量上有一个上限,如果超出,可能会抛出OutOfMemoryError异常。我在读这本书之前,最多也就是理解到这里。但其实java对任务执行提供了支持,也就是Executor。

public interface Executor{
void execute(Runnable command);
}

虽然Executor是一个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础。现在把上例修改为基于线程池的web服务器:

class TaskExecutionWebServer{
private static final int NTHREADS = 100;
private static final Executor exec = Executors.newFixedThreadPool(NTHREADS); public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
@Override
public void run() {
//处理请求
handleRequest(connection);
}
}; exec.execute(task);
}
}
}

每当看到下面这种形式的代码时:

new Thread(task).start();

并且你希望获得一种更灵活的执行策略时,请考虑使用Executor来代替Thread。

  Executor基于生产者-消费者模式。提交任务的线程相当于生产者,执行任务的线程相当于消费者。生产者将任务提交给队列,消费者从队列中获得任务执行。这样将任务的提交过程与执行过程解耦。

  java提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用Executors中的静态工厂方法来创建:

  1newFixedThreadPool newFixedThreadPool将创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化。(如果某个线程由于发生了未预期的Exception而结束,那么线程池将补充一个新的线程) 。

  2newCachedThreadPool newCachedThreadPool将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲线程,当需求增加时,可以添加新的线程,线程池的规模不存在任何限制。

  3newSingleThreadExecutor newSingleThreadExecutor是一个单线程的Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来代替。newSingleTheadExecutor能确保依照任务在队列中的顺序来串行执行。(例如:FIFO、LIFO、优先级)

  4、newScheduledThreadPool newScheduledThreadPool创建了一个固定长度的线程池,而且以延迟或者定时的方式执行任务。

二、Executor的生命周期

  我们已经知道如何创建一个Executor,但如何关闭它呢?Executor的实现通常会创建线程来执行任务。但JVM只有在所有(非守护)线程全部终止后才会退出。因此,如果无法正确关闭Executor,那么JVM将无法结束。

  为了解决执行服务的生命周期问题,Executor扩展了ExecutorService接口,添加了一些用于生命周期管理的方法,如下:

public interface ExecutorServce implements Executor{
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException;
}

  ExecutorService的生命周期有3中状态:运行、关闭、已终止。ExecutorService在初始创建时处于运行状态。shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成。shutdownNow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。

  我们再将web服务器改进成支持关闭的形式:

class LifecycleWebServer{
private final ExecutorService exec = Executors.newCachedThreadPool(); public void start() throws IOException{
ServerSocket socket = new ServerSocket(80);
while (!exec.isShutdown()) {
try {
final Socket conn = socket.accept();
exec.execute(new Runnable() {
@Override
public void run() {
handleRequest(conn);
}
});
} catch (RejectedExecutionException e) {
if (!exec.isShutdown()) {
log("task submission rejected",e);
}
}
}
} public void stop(){
exec.shutdown();
} void handleRequest(Socket connection){
Request req = readRequest(connection);
if (isShutdownRequest(req)) {
stop();
}else {
dispatchRequest(req);
}
}
}

三、携带结果的任务Callable与Future

  Executor框架使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象,它不能返回一个值或者抛出一个受检查的异常。Callable是一种更好的抽象:它认为主入口点将返回一个值,并可能抛出一个异常。Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获得任务的结果或者取消任务等。

public interface Callable<V>{
V call() throws Exception;
} public interface Future<V>{
boolean cancel(boolean mayInterruptIFRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException,ExecutionException,CancellationException;
V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,CancellationException,TimeoutException;
}

  如果任务已经完成,那么get会立即返回或者抛出一个Exception,如果任务没有完成,那么get将阻塞并直到任务完成。如果任务抛出了异常,那么get将该异常封装为ExecutionException并重新抛出。如果任务被取消,那么get将抛出CannellationException。如果get抛出了ExecutionException,那么可以通过getCause来获得被封装的初始异常。ExecutorService中的所有方法都将返回一个Future。

例子:使用Future实现页面渲染器

  为了使页面渲染器实现更高的并发性,首先将渲染过程分解为两个任务,一个是渲染所有的文本,另一个是下载所有的图像。(因为其中一个任务是CPU密集型,另一个任务是I/O密集型,因此这种方法即使在单CPU系统上也能提升性能。)

  伪代码:

public class FutureRenderer{
private final ExecutorService executor = Executors.newCachedThreadPool(); void renderPage(CharSequence source){
final List<ImageInfo> imageInfos = scanForImageInfo(source);
Callable<List<ImageData>> task =
new Callable<List<ImageData>>() {
@Override
public List<ImageData> call() throws Exception {
List<ImageData> result = new ArrayList<ImageData>();
for (ImageInfo imageInfo:imageInfos) {
result.add(imageInfo.downloadImage());
}
return result;
}
}; Future<List<ImageData>> future = executor.submit(task);
renderText(source); try {
List<ImageData> imageData = future.get();
for (ImageData data : imageData) {
renderImage(data);
}
} catch (InterruptedException e) {
//重新设置线程的中断状态
Thread.currentThread().interrupt();
//由于不需要结果,因此取消任务
future.cancel(true);
}catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}

  FutureRenderer中创建了一个Callable来下载所有的图像,并将其提交到一个ExecutorService。这将返回一个描述任务执行情况的Future。当主任务需要图像时,它会等待Future.get的调用结果。如果幸运的话,当请求开始时所有的图像都已经下载完成了,即使没有,至少图像的下载任务也已经提前开始了。

  问题:如果渲染文本的速度远远高于下载图像的速度,那么程序的最终性能与串行执行时的性能差别不大,而代码却变得更复杂了。所以,只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的工作负载分配到多个任务中带来的真正性能提升。

  解决:为每一幅图像的下载都创建一个独立任务,并在线程池中执行它们。如果向线程池提交一组任务,并希望在计算完成后获得结果,可以保留与每个计算结果相关联的Future,然后反复的使用get方法,然后通过轮询判断任务是否已经完成,这种方法可行,但很繁琐。幸运的是,还有一种更好的方法:完成服务(CompletionService)。

  CompletionService将Executor和BlockingQueue的功能融合在一起。你可以将Callable任务交给它来执行,然后使用类似于队列操作的take和poll方法来获得已经完成的结果。ExecutorCompletionService实现了CompletionService。

public class Render {
private final ExecutorService executor; public Render(ExecutorService executor) {
super();
this.executor = executor;
} void renderPage(CharSequence source){
List<ImageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionService =
new ExecutorCompletionService<ImageData>(executor);
//提交每张图片的下载任务
for (final ImageInfo imageInfo: info) {
completionService.submit(new Callable<ImageData>(){
public ImageData call(){
return imageInfo.downloadImage();
}
});
}
//加载文本
renderText(source); try {
for (int t = 0 t = info.size(); t < n ;t++) {
//获得已经完成的任务,每次获得一个
Future<ImageData> f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (InterruptedException e) {
//线程中断会执行两个操作:1.清除线程的中断状态 2.抛出InterruptedException
//所以捕获异常后使该线程仍处于中断状态
Thread.currentThread().interrupt();
}catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}
上一篇:动态线程池


下一篇:ThreadPoolTaskExecutor和ThreadPoolExecutor有何区别?