按 CompletableFuture 完成顺序实现 Streaming Future

Java 8 给引入了 `CompletableFuture` 和 Stream API 这样的工具。让我们尝试把它们结合起来,创建一个 Stream 在 future 完成时返回一组 `CompletableFutures` 集合。


在 [parallel-collectors][1] V1.0.0 开发中也使用了这种方法。


[1]:https://github.com/pivovarit/parallel-collectors


把 CompletableFuture 转成 Steam


基本上,我们要做的就是设计一种方案,把一组 future 集合转换成由任务返回值组成的 Steam:


```java
Collection<CompletableFuture<T>> -> Stream<T>
```


在 Java 的世界里,这可以通过使用 `static` 方法实现:


```java
public static <T> Stream<T> inCompletionOrder(Collection<CompletableFuture<T>> futures) {
   // ...
}
```


要创建自定义 `Stream`,需要自己实现一个 `java.util.Spliterator`:


```java
final class CompletionOrderSpliterator<T>
 implements Spliterator<T> { ... }
```


下面是 `static` 方法的具体实现:


```java
public static <T> Stream<T> completionOrder(Collection<CompletableFuture<T>> futures) {
   return StreamSupport.stream(
     new CompletionOrderSpliterator<>(futures), false);
}
```


这部分相对简单,现在让我们实现 `CompletionOrderSpliterator`。


实现 CompletionOrderSpliterator


要实现自定义 `Spliterator`,需要完成下列方法:


```java
final class CompletionOrderSpliterator<T> implements Spliterator<T> {
   CompletionOrderSpliterator(Collection<CompletableFuture<T>> futures) {
       // TODO
   }
   @Override
   public boolean tryAdvance(Consumer<? super T> action) {
       // TODO
   }
   @Override
   public Spliterator<T> trySplit() {
       // TODO
   }
   @Override
   public long estimateSize() {
       // TODO
   }
   @Override
   public int characteristics() {
       // TODO
   }
}
```


当然,构造函数也要实现。


"最直接的解决方法:拷贝传入的集合,等待 future 完成,把完成的 future 从集合里移除,把结果传给 `Spliterator`。"


使用 `CompletableFuture#anyOf` 可等待 future 完成,并且默认实现了正确的异常处理。


然而,还有一个问题略显复杂。


如果仔细查看 `CompletableFuture#anyOf` 方法,会发现它不是很实用,因为要求传入多个 `CompletableFutures<?>` 然后返回一个 `CompletableFuture< Object>` 对象,但这不是主要问题,只是稍有不便。


真正的问题在于,方法返回的 `CompletableFuture<Object>` 对象并不是第一个完成的 future,而是当有任何一个 future 完成时新建的 `CompletableFuture` 实例。


这种方案把"等待 future 完成,然后从列表移除"变复杂了。"我们不能依赖引用想等性,所以要么在 `CompletableFuture#anyof` 触发后执行线性扫描,要么试着想出更好的办法。"


> 译注:"Reference Equality 引用相等性"是对象相等性的一部分,在两个被比较的引用都指向同一个对象的情况下,通过使用 `==` 而不是进一步进行对象比较。


一种简单的解决方案:


```java
private T takeNextCompleted() {
   anyOf(futureQueue.toArray(new CompletableFuture[0])).join();
   CompletableFuture<T> next = null;
   for (CompletableFuture<T> future : futureQueue) {
       if (future.isDone()) {
           next = future;
           break;
       }
   }
   futureQueue.remove(next);
   return next.join();
}
```


上面的代码中,执行线性扫描并记录了 `index`,确保移出操作时间复杂度为常量。尽管已经知道数组大小,为什么还要向 `CompletableFuture[]` 传 0?


[2]:https://shipilev.net/blog/2016/arrays-wisdom-ancients/


从实用角度看,这个方案应该是足够好了,"通常没有人会处理1万~2万大小的 future 集合",而且硬件支持的线程数量有上限。受堆栈大小等多种因素影响,实际支持的线程数量会有所差别。不过,一旦“[Loom 项目][3]”投入使用,这种情况可能会有改善。


> 译注:Loom 项目提供一个轻量级用户态的纤程,简化并发编程并且更为高效。


[3]:https://openjdk.java.net/projects/loom/


尽管如此,2万次迭代最乐观的情况下会访问2万个节点(即总是第一个完成的 future),至多访问[2亿个节点][4]节点。


[4]:https://en.wikipedia.org/wiki/Arithmetic_progression#Sum


如果无法依赖 `CompletableFuture` 引用相等性或者 hashcode 还可以做怎样的改进?


可以为 future 分配 id,将它们与对象 future 一起存储到 map 中,这样 future 可以通过关联的 index 标记自己。


所以,让我们把 future 存到 map 中:


```java
private final Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> indexedFutures;
```


现在,可以从一个单调递增序列中手动指定 id,并让 future 返回时带上 id:


```java
private static <T> Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> toIndexedFutures(List<CompletableFuture<T>> futures) {
   Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> map
     = new HashMap<>(futures.size(), 1); // 因为知道集合大小和预期的冲突计数 (0), 可以提前指定 HashMap 大小
   int seq = 0;
   for (CompletableFuture<T> future : futures) {
       int index = seq++;
       map.put(
         index,
         future.thenApply(
           value -> new AbstractMap.SimpleEntry<>(index, value)));
   }
   return map;
}
```


现在,可以高效地找到并处理下一个完成的 future:等待 future,读取序列号,根据序列号从剩余序列中移除:


```java
private T nextCompleted() {
   return anyOf(indexedFutures.values()
     .toArray(new CompletableFuture[0]))
       .thenApply(result -> ((Map.Entry<Integer, T>) result))
       .thenApply(result -> {
           indexedFutures.remove(result.getKey());
           return result.getValue();
       }).join();
}
```


`tryAdvance()` 的实现很简单:


```java
@Override
public boolean tryAdvance(Consumer<? super T> action) {
   if (!indexedFutures.isEmpty()) {
       action.accept(nextCompleted());
       return true;
   } else {
       return false;
   }
}
```


最困难的部分已经解决,现在需要实现剩下的三个方法:


```java
@Override
public Spliterator<T> trySplit() {
   return null; // 不支持 split
}
@Override
public long estimateSize() {
   return indexedFutures.size(); // 提前知道集合的大小
}
@Override
public int characteristics() {
   return
     SIZED       // 知道前面的大小
     & IMMUTABLE // 输入的集合可安全地修改
     & NONNULL;  // 输入的集合不支持 null
}
```


到这里代码已经完成。


示例演示


可以加入随机处理延迟快速验证代码是否正确:


```java
public static void main(String[] args) {
   ExecutorService executorService = Executors.newFixedThreadPool(10);
   List<CompletableFuture<Integer>> futures = Stream
     .iterate(0, i -> i + 1)
     .limit(100)
     .map(i -> CompletableFuture.supplyAsync(
       withRandomDelay(i), executorService))
     .collect(Collectors.toList());
   completionOrder(futures)
     .forEach(System.out::println);
}

private static Supplier<Integer> withRandomDelay(Integer i) {
   return () -> {
       try {
           Thread.sleep(ThreadLocalRandom.current()
             .nextInt(10000));
       } catch (InterruptedException e) {
           // 无耻地留白了, 请不要在生产环境中这么做
       }
       return i;
   };
}
```


可以看到,结果没有按照原来的顺序返回:


Streaming Future 的原始顺序


```shell
6
5
2
4
1
11
8
12
3
```



按原始顺序 Streaming Future


假如要求只保持原来的顺序该怎么处理?


幸运的是,可以像下面这样实现,无需添加特别的实现:


```java
public static <T> Stream<T> originalOrder(
 Collection<CompletableFuture<T>> futures) {
   return futures.stream().map(CompletableFuture::join);
}
```


完整示例


```java
package com.pivovarit.collectors;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Spliterator;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import static java.util.concurrent.CompletableFuture.anyOf;
/**
* @author Grzegorz Piwowarek
*/
final class CompletionOrderSpliterator<T> implements Spliterator<T> {
   private final Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> indexedFutures;
   CompletionOrderSpliterator(Collection<CompletableFuture<T>> futures) {
       indexedFutures = toIndexedFutures(futures);
   }
   @Override
   public boolean tryAdvance(Consumer<? super T> action) {
       if (!indexedFutures.isEmpty()) {
           action.accept(nextCompleted());
           return true;
       } else {
           return false;
       }
   }
   private T nextCompleted() {
       return anyOf(indexedFutures.values().toArray(new CompletableFuture[0]))
         .thenApply(result -> ((Map.Entry<Integer, T>) result))
         .thenApply(result -> {
             indexedFutures.remove(result.getKey());
             return result.getValue();
         }).join();
   }
   @Override
   public Spliterator<T> trySplit() {
       return null;
   }
   @Override
   public long estimateSize() {
       return indexedFutures.size();
   }
   @Override
   public int characteristics() {
       return SIZED & IMMUTABLE & NONNULL;
   }
   private static <T> Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> toIndexedFutures(Collection<CompletableFuture<T>> futures) {
       Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> map = new HashMap<>(futures.size(), 1);
       int counter = 0;
       for (CompletableFuture<T> f : futures) {
           int index = counter++;
           map.put(index, f.thenApply(value -> new AbstractMap.SimpleEntry<>(index, value)));
       }
       return map;
   }
}
```


本文完整的源代码也可以[在 GitHub 上找到][5]。


[5]:https://github.com/pivovarit/articles/blob/master/java-completion-order-spliterator/src/main/java/com/pivovarit/stream/CompletionOrderSpliterator.java


上一篇:std::async的使用总结


下一篇:Python并发编程之多线程(线程池的概念以及线程池的使用,concurrent.future模块的使用,Pool)