我对如何以及何时完成可完成的期货感到苦苦挣扎.我创建了这个测试用例:
import org.junit.Test;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class StreamOfCompletableFuturesTest {
@Test
public void testList() {
completeFirstTwoElements(
Stream.of("list one", "list two", "list three", "list four", "list five")
);
}
@Test
public void testIterator() {
Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();
completeFirstTwoElements(
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
);
}
private void completeFirstTwoElements(Stream<String> stream) {
stream
.map(this::cf)
.limit(2)
.parallel()
.forEach(cf -> {
try {
System.out.println(cf.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
private CompletableFuture<String> cf(String result) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Running " + result);
return result;
});
}
}
输出是:
Running list one
Running list two
list two
list one
Running iterator one
Running iterator two
Running iterator three
Running iterator four
Running iterator five
iterator two
iterator one
testList方法按预期工作. CompletableFuture仅在最后评估,因此在限制方法仅保留前两项之后.
但是,testIterator方法是意外的.所有CompletableFuture都已完成,限制仅在之后完成.
如果我从流中删除parallel()方法,它按预期工作.但是,处理(forEach())应该并行完成,因为在我的完整程序中,它是一个长期运行的方法.
任何人都可以解释为什么会这样吗?
看起来这取决于Java版本,所以我在1.8:
$java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
解决方法:
并行性适用于整个管道,因此在并行Stream中应用limit()之前,您无法真正控制将执行的内容.唯一的保证是limit()之后的内容只会在保留的元素上执行.
两者之间的差异可能是由于某些实现细节或其他Stream特性.实际上,您可以通过播放SIZED特征轻松地反转行为.当Stream具有已知大小时,似乎只处理了2个元素.
因此,例如,应用简单的filter()将失去列表版本的大小:
completeFirstTwoElements(
Stream.of("list one", "list two", "list three", "list four", "list five").filter(a -> true)
);
输出例如:
Running list one
Running list five
Running list two
Running list three
list one
list two
并且不使用未知大小版本的Spliterator.spliterator()“修复”该行为:
Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();
completeFirstTwoElements(
StreamSupport.stream(Spliterators.spliterator(iterator, Spliterator.ORDERED, 5), false)
);
输出:
Running iterator two
Running iterator one
iterator one
iterator two