ExecutorService 的Future类

1.概述
在本文中,我们将了解Future。自Java 1.5以来一直存在的接口,在处理异步调用和并发处理时非常有用。

2.创建Future
简单地说,Future类表示异步计算的未来结果 - 这个结果最终将在处理完成后出现在Future中。

让我们看看如何编写创建和返回Future实例的方法。

Future接口是长时间运行方法异步处理的理想选择。这使我们能够在等待Future封装的任务完成时执行一些其他事情。

利用Future的异步性质的操作示例如下:

计算密集型过程(数学和科学计算)
操纵大数据结构(大数据)
远程方法调用(下载文件,抓取HTML,Web服务)。
2.1 用FutureTask实现Future
对于我们的示例,我们将创建一个非常简单的类来计算Integer的平方。这绝对不属于“长期运行”方法类别,但是我们将对它进行一次Thread.sleep()调用以使其持续1秒钟完成:

public class SquareCalculator {    
     
    private ExecutorService executor = Executors.newSingleThreadExecutor();
     
    public Future<Integer> calculate(Integer input) {        
        return executor.submit(() -> {
            Thread.sleep(1000);
            return input * input;
        });
    }
}
实际执行计算的代码位包含在call()方法中,作为lambda表达式提供。正如你所看到的,除了之前提到的sleep()调用之外没有什么特别之处。

当我们将注意力转向Callable 和ExecutorService的使用时,它会变得更有趣。

Callable是一个接口,表示返回结果并具有单个call()方法的任务。在这里,我们使用lambda表达式创建了它的实例。

创建一个Callable实例并没有把我们带到任何地方,我们仍然必须将这个实例传递给一个执行器,该执行器将负责在一个新线程中启动该任务并返回有价值的Future对象。这就是ExecutorService的用武之地。

我们可以通过几种方式获得ExecutorService实例,其中大部分都是由实用程序类Executors的静态工厂方法提供的。在这个例子中,我们使用了基本的newSingleThreadExecutor(),它为我们提供了一次能够处理单个线程的ExecutorService。

一旦我们有了一个ExecutorService对象,我们只需要调用submit()传递我们的Callable作为参数。submit()将负责启动任务并返回FutureTask 对象,该对象是Future接口的实现。

3.使用Future
到目前为止,我们已经学会了如何创建Future的实例。

在本节中,我们将通过探索Future的API中的所有方法来学习如何使用此实例。

3.1 使用isDone()和get()来获取结果
现在我们需要调用calculate()并使用返回的Future来获得生成的Integer。Future API中的两种方法将帮助我们完成这项任务。

Future.isDone()告诉我们执行程序是否已完成任务处理。如果任务完成,则返回 true,否则返回 false。

从计算中返回实际结果的方法是Future.get()。请注意,此方法会阻止执行,直到任务完成,但在我们的示例中,这不会成为问题,因为我们首先通过调用isDone()来检查任务是否已完成。

通过使用这两种方法,我们可以在等待主任务完成时运行其他一些代码:

Future<Integer> future = new SquareCalculator().calculate(10);
 
while(!future.isDone()) {
    System.out.println("Calculating...");
    Thread.sleep(300);
}

Integer result = future.get();
在这个例子中,我们在输出上写一条简单的消息,让用户知道程序正在执行计算。

方法get()将阻止执行,直到任务完成。但是我们不必担心,因为我们的示例只是在确保任务完成后才调用get()。因此,在这种情况下,future.get()将始终立即返回。

值得一提的是,get()有一个重载版本,它接受超时和TimeUnit作为参数:

Integer result = future.get(500, TimeUnit.MILLISECONDS);
get(long,TimeUnit)和get()之间的区别在于,如果任务在指定的超时时间之前没有返回,前者将抛出TimeoutException。

3.2 使用cancel()取消Future
假设我们已经触发了一项任务,但由于某种原因,我们不再关心结果了。我们可以使用Future.cancel(boolean)告诉执行程序停止操作并中断其底层线程:

Future<Integer> future = new SquareCalculator().calculate(4);

boolean canceled = future.cancel(true);
从上面的代码我们的Future实例永远不会完成它的操作。实际上,如果我们尝试从该实例调用get(),在调用cancel()之后,结果将是CancellationException。Future.isCancelled()将告诉我们Future是否已被取消。这对于避免获取CancellationException非常有用。

对cancel()的调用可能会失败。在这种情况下,其返回值将为false。请注意,cancel()接受一个布尔值作为参数 - 这将控制执行此任务的线程是否应该被中断。

4.使用线程池进行更多多线程处理
我们当前的ExecutorService是单线程的,因为它是使用Executors.newSingleThreadExecutor获得的。要突出显示“单线程”,让我们同时触发两个计算:

SquareCalculator squareCalculator = new SquareCalculator();

Future<Integer> future1 = squareCalculator.calculate(10);
Future<Integer> future2 = squareCalculator.calculate(100);

while (!(future1.isDone() && future2.isDone())) {
   System.out.println(
     String.format(
       "future1 is %s and future2 is %s", 
       future1.isDone() ? "done" : "not done", 
       future2.isDone() ? "done" : "not done"
     )
   );
   Thread.sleep(300);
}

Integer result1 = future1.get();
Integer result2 = future2.get();

System.out.println(result1 + " and " + result2);

squareCalculator.shutdown();
现在让我们分析一下这段代码的输出:

calculating square for: 10
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
calculating square for: 100
future1 is done and future2 is not done
future1 is done and future2 is not done
future1 is done and future2 is not done
100 and 10000
很明显,这个过程并不平行。注意第二个任务仅在第一个任务完成后才开始,使整个过程大约需要2秒钟才能完成。

为了使我们的程序真正具有多线程,我们应该使用不同风格的ExecutorService。让我们看一下如果我们使用工厂方法Executors.newFixedThreadPool()提供的线程池,我们的示例的行为会如何变化:

public class SquareCalculator {
 
   private ExecutorService executor = Executors.newFixedThreadPool(2);
    
   //...
}
通过对SquareCalculator类的简单更改,我们现在有一个执行器,它可以使用2个同步线程。

如果我们再次运行完全相同的客户端代码,我们将获得以下输出:

calculating square for: 10
calculating square for: 100
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
100 and 10000
现在看起来好多了。注意2个任务如何同时开始和结束运行,整个过程大约需要1秒钟才能完成。

还有其他工厂方法可用于创建线程池,例如Executors.newCachedThreadPool(),它们在可用时重用以前使用过的Thread,而Executors.newScheduledThreadPool() 则调度命令在给定的延迟后运行。

5. ForkJoinTask概述
ForkJoinTask是一个实现 Future的抽象类,能够运行由 ForkJoinPool中的少量实际线程托管的大量任务。

在本节中,我们将快速介绍ForkJoinPool的主要特性。

ForkJoinTask的主要特征是它通常会产生新的子任务,作为完成其主要任务所需的工作的一部分。它通过调用fork()生成新任务,并使用join()收集所有结果,从而得到类的名称。

有两个实现ForkJoinTask的抽象类:RecursiveTask,它在完成时返回一个值,而RecursiveAction则不返回任何内容。顾名思义,这些类将用于递归任务,例如文件系统导航或复杂的数学计算。

让我们扩展前面的例子来创建一个类,给定一个Integer,它将计算所有因子元素的和平方。因此,例如,如果我们将数字4传递给我们的计算器,我们应该得到4 + 3 + 2 + 1的总和为30的结果。

首先,我们需要创建RecursiveTask的具体实现并实现其compute()方法。这是我们编写业务逻辑的地方:

public class FactorialSquareCalculator extends RecursiveTask<Integer> {
 
   private Integer n;

   public FactorialSquareCalculator(Integer n) {
       this.n = n;
   }

   @Override
   protected Integer compute() {
       if (n <= 1) {
           return n;
       }

       FactorialSquareCalculator calculator 
         = new FactorialSquareCalculator(n - 1);

       calculator.fork();

       return n * n + calculator.join();
   }
}
注意我们如何通过在compute()中创建FactorialSquareCalculator的新实例来实现递归。通过调用fork(),一个非阻塞方法,我们要求ForkJoinPool启动这个子任务的执行。

在join()方法从计算返回的结果,这是我们增加我们目前正在访问数的平方。

现在我们只需要创建一个ForkJoinPool来处理执行和线程管理:

ForkJoinPool forkJoinPool = new ForkJoinPool();

FactorialSquareCalculator calculator = new FactorialSquareCalculator(10);

forkJoinPool.execute(calculator);
6.结论
在本文中,我们对Future接口进行了全面的了解,访问了它的所有方法。我们还学习了如何利用线程池的强大功能来触发多个并行操作。还简要介绍了ForkJoinTask类,fork()和join()的主要方法。

 

上一篇:GUC-11 线程池


下一篇:线程池 掌握治理线程的法宝