Java 8 给引入了 `CompletableFuture` 和 Stream API 这样的工具。让我们尝试把它们结合起来,创建一个 Stream 在 future 完成时返回一组 `CompletableFutures` 集合。
在 [parallel-collectors][1] V1.0.0 开发中也使用了这种方法。
[1]:https://github.com/pivovarit/parallel-collectors
把 CompletableFuture 转成 Steam
基本上,我们要做的就是设计一种方案,把一组 future 集合转换成由任务返回值组成的 Steam:
```java
Collection<CompletableFuture<T>> -> Stream<T>
```
在 Java 的世界里,这可以通过使用 `static` 方法实现:
```java
public static <T> Stream<T> inCompletionOrder(Collection<CompletableFuture<T>> futures) {
// ...
}
```
要创建自定义 `Stream`,需要自己实现一个 `java.util.Spliterator`:
```java
final class CompletionOrderSpliterator<T>
implements Spliterator<T> { ... }
```
下面是 `static` 方法的具体实现:
```java
public static <T> Stream<T> completionOrder(Collection<CompletableFuture<T>> futures) {
return StreamSupport.stream(
new CompletionOrderSpliterator<>(futures), false);
}
```
这部分相对简单,现在让我们实现 `CompletionOrderSpliterator`。
实现 CompletionOrderSpliterator
要实现自定义 `Spliterator`,需要完成下列方法:
```java
final class CompletionOrderSpliterator<T> implements Spliterator<T> {
CompletionOrderSpliterator(Collection<CompletableFuture<T>> futures) {
// TODO
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
// TODO
}
@Override
public Spliterator<T> trySplit() {
// TODO
}
@Override
public long estimateSize() {
// TODO
}
@Override
public int characteristics() {
// TODO
}
}
```
当然,构造函数也要实现。
"最直接的解决方法:拷贝传入的集合,等待 future 完成,把完成的 future 从集合里移除,把结果传给 `Spliterator`。"
使用 `CompletableFuture#anyOf` 可等待 future 完成,并且默认实现了正确的异常处理。
然而,还有一个问题略显复杂。
如果仔细查看 `CompletableFuture#anyOf` 方法,会发现它不是很实用,因为要求传入多个 `CompletableFutures<?>` 然后返回一个 `CompletableFuture< Object>` 对象,但这不是主要问题,只是稍有不便。
真正的问题在于,方法返回的 `CompletableFuture<Object>` 对象并不是第一个完成的 future,而是当有任何一个 future 完成时新建的 `CompletableFuture` 实例。
这种方案把"等待 future 完成,然后从列表移除"变复杂了。"我们不能依赖引用想等性,所以要么在 `CompletableFuture#anyof` 触发后执行线性扫描,要么试着想出更好的办法。"
> 译注:"Reference Equality 引用相等性"是对象相等性的一部分,在两个被比较的引用都指向同一个对象的情况下,通过使用 `==` 而不是进一步进行对象比较。
一种简单的解决方案:
```java
private T takeNextCompleted() {
anyOf(futureQueue.toArray(new CompletableFuture[0])).join();
CompletableFuture<T> next = null;
for (CompletableFuture<T> future : futureQueue) {
if (future.isDone()) {
next = future;
break;
}
}
futureQueue.remove(next);
return next.join();
}
```
上面的代码中,执行线性扫描并记录了 `index`,确保移出操作时间复杂度为常量。尽管已经知道数组大小,为什么还要向 `CompletableFuture[]` 传 0?
[2]:https://shipilev.net/blog/2016/arrays-wisdom-ancients/
从实用角度看,这个方案应该是足够好了,"通常没有人会处理1万~2万大小的 future 集合",而且硬件支持的线程数量有上限。受堆栈大小等多种因素影响,实际支持的线程数量会有所差别。不过,一旦“[Loom 项目][3]”投入使用,这种情况可能会有改善。
> 译注:Loom 项目提供一个轻量级用户态的纤程,简化并发编程并且更为高效。
[3]:https://openjdk.java.net/projects/loom/
尽管如此,2万次迭代最乐观的情况下会访问2万个节点(即总是第一个完成的 future),至多访问[2亿个节点][4]节点。
[4]:https://en.wikipedia.org/wiki/Arithmetic_progression#Sum
如果无法依赖 `CompletableFuture` 引用相等性或者 hashcode 还可以做怎样的改进?
可以为 future 分配 id,将它们与对象 future 一起存储到 map 中,这样 future 可以通过关联的 index 标记自己。
所以,让我们把 future 存到 map 中:
```java
private final Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> indexedFutures;
```
现在,可以从一个单调递增序列中手动指定 id,并让 future 返回时带上 id:
```java
private static <T> Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> toIndexedFutures(List<CompletableFuture<T>> futures) {
Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> map
= new HashMap<>(futures.size(), 1); // 因为知道集合大小和预期的冲突计数 (0), 可以提前指定 HashMap 大小
int seq = 0;
for (CompletableFuture<T> future : futures) {
int index = seq++;
map.put(
index,
future.thenApply(
value -> new AbstractMap.SimpleEntry<>(index, value)));
}
return map;
}
```
现在,可以高效地找到并处理下一个完成的 future:等待 future,读取序列号,根据序列号从剩余序列中移除:
```java
private T nextCompleted() {
return anyOf(indexedFutures.values()
.toArray(new CompletableFuture[0]))
.thenApply(result -> ((Map.Entry<Integer, T>) result))
.thenApply(result -> {
indexedFutures.remove(result.getKey());
return result.getValue();
}).join();
}
```
`tryAdvance()` 的实现很简单:
```java
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (!indexedFutures.isEmpty()) {
action.accept(nextCompleted());
return true;
} else {
return false;
}
}
```
最困难的部分已经解决,现在需要实现剩下的三个方法:
```java
@Override
public Spliterator<T> trySplit() {
return null; // 不支持 split
}
@Override
public long estimateSize() {
return indexedFutures.size(); // 提前知道集合的大小
}
@Override
public int characteristics() {
return
SIZED // 知道前面的大小
& IMMUTABLE // 输入的集合可安全地修改
& NONNULL; // 输入的集合不支持 null
}
```
到这里代码已经完成。
示例演示
可以加入随机处理延迟快速验证代码是否正确:
```java
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<CompletableFuture<Integer>> futures = Stream
.iterate(0, i -> i + 1)
.limit(100)
.map(i -> CompletableFuture.supplyAsync(
withRandomDelay(i), executorService))
.collect(Collectors.toList());
completionOrder(futures)
.forEach(System.out::println);
}
private static Supplier<Integer> withRandomDelay(Integer i) {
return () -> {
try {
Thread.sleep(ThreadLocalRandom.current()
.nextInt(10000));
} catch (InterruptedException e) {
// 无耻地留白了, 请不要在生产环境中这么做
}
return i;
};
}
```
可以看到,结果没有按照原来的顺序返回:
Streaming Future 的原始顺序
```shell
6
5
2
4
1
11
8
12
3
```
按原始顺序 Streaming Future
假如要求只保持原来的顺序该怎么处理?
幸运的是,可以像下面这样实现,无需添加特别的实现:
```java
public static <T> Stream<T> originalOrder(
Collection<CompletableFuture<T>> futures) {
return futures.stream().map(CompletableFuture::join);
}
```
完整示例
```java
package com.pivovarit.collectors;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Spliterator;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import static java.util.concurrent.CompletableFuture.anyOf;
/**
* @author Grzegorz Piwowarek
*/
final class CompletionOrderSpliterator<T> implements Spliterator<T> {
private final Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> indexedFutures;
CompletionOrderSpliterator(Collection<CompletableFuture<T>> futures) {
indexedFutures = toIndexedFutures(futures);
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (!indexedFutures.isEmpty()) {
action.accept(nextCompleted());
return true;
} else {
return false;
}
}
private T nextCompleted() {
return anyOf(indexedFutures.values().toArray(new CompletableFuture[0]))
.thenApply(result -> ((Map.Entry<Integer, T>) result))
.thenApply(result -> {
indexedFutures.remove(result.getKey());
return result.getValue();
}).join();
}
@Override
public Spliterator<T> trySplit() {
return null;
}
@Override
public long estimateSize() {
return indexedFutures.size();
}
@Override
public int characteristics() {
return SIZED & IMMUTABLE & NONNULL;
}
private static <T> Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> toIndexedFutures(Collection<CompletableFuture<T>> futures) {
Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> map = new HashMap<>(futures.size(), 1);
int counter = 0;
for (CompletableFuture<T> f : futures) {
int index = counter++;
map.put(index, f.thenApply(value -> new AbstractMap.SimpleEntry<>(index, value)));
}
return map;
}
}
```
本文完整的源代码也可以[在 GitHub 上找到][5]。
[5]:https://github.com/pivovarit/articles/blob/master/java-completion-order-spliterator/src/main/java/com/pivovarit/stream/CompletionOrderSpliterator.java