想象一下,我正在构建一个将接收整数流的库,并且所有库代码需要做的是返回一个字符串流,其中包含数字的字符串表示形式.
public Stream<String> convertToString(Stream<Integer> input) {
return input.map(n -> String.valueOf(n));
}
但是,想象某人决定使用以下代码调用我的函数:
Stream<Integer> input = Stream.of(1,2,3)
.map(n -> {
if (n %3 ) { throw new RuntimeException("3s are not welcome here!"); }
else { return n; }
}
Stream<String> output = convertToString(input);
由于我希望我的API合约能够处理这种情况并且仍然返回一个字符串流,我想知道如何更改我的代码,以便它检测到流中存在异常并从异常中“恢复”通过将其映射到特殊的“NaN”值.
最后,我希望我的输出Stream为“1”,“2”,“NaN”
笔记:
>我不希望我的API必须为输入定义一个特殊的包装器.
>由于依赖性的影响,我不想依赖第三方库.
>我不知道流的大小或生成的速度有多大,因此我不能使用任何类型的缓存/预加载所有值.
这可能与Java 8 Streams一起使用吗?
有了Iterator,我想我可以做到:
public class SafeIntegerToStringIterator implements Iterator<String> {
Iterator<Integer> innerIterator;
...
public String next() throws NoSuchElementException {
try { return String.valueOf(this.innerIterator.next()); }
catch (RuntimeException e) { return "NaN"; }
}
}
...
public Iterator<String> convertToString(Iterator<Integer> input) {
return new SafeIntegerToStringIterator(input);
}
谢谢
解决方法:
注意:请参阅本文末尾的编辑,修复了原始答案中的错误.无论如何,我正在离开我的原始答案,因为它在许多情况下仍然有用,我认为它有助于解决OP的问题,至少有一些限制.
您使用迭代器的方法是正确的方向.可以按如下方式起草解决方案:将流转换为迭代器,按照您已经完成的方式包装迭代器,然后从包装器迭代器创建流,除了您应该使用Spliterator
.这是代码:
private static <T> Stream<T> asNonThrowingStream(
Stream<T> stream,
Supplier<? extends T> valueOnException) {
// Get spliterator from original stream
Spliterator<T> spliterator = stream.spliterator();
// Return new stream from wrapper spliterator
return StreamSupport.stream(
// Extending AbstractSpliterator is enough for our purpose
new Spliterators.AbstractSpliterator<T>(
spliterator.estimateSize(),
spliterator.characteristics()) {
// We only need to implement tryAdvance
@Override
public boolean tryAdvance(Consumer<? super T> action) {
try {
return spliterator.tryAdvance(action);
} catch (RuntimeException e) {
action.accept(valueOnException.get());
return true;
}
}
}, stream.isParallel());
}
我们正在扩展AbstractSpliterator
以包装原始流返回的spliterator.我们只需要实现tryAdvance
方法,该方法可以委托给原始的spliterator的tryAdvance方法,也可以捕获RuntimeException并使用提供的valueOnException值调用该操作.
Spliterator的契约指定如果操作被占用,则tryAdvance的返回值必须为true,因此如果捕获了RuntimeException,则意味着原始的spliterator已从其自己的tryAdvance方法中抛出它.因此,在这种情况下我们返回true,这意味着该元素无论如何都被消耗了.
通过将这些值作为参数传递给AbstractSpliterator的构造函数来保留原始spliterator的估计大小和特征.
最后,我们通过StreamSupport.stream方法从新的spliterator创建一个新流.如果原始流也是并行的,则新流是并行的.
以下是如何使用上述方法:
public Stream<String> convertToString(Stream<Integer> input) {
return asNonThrowingStream(input.map(String::valueOf), () -> "NaN");
}
编辑
根据下面的Holger’s comment,7004提供了一个解决方案,避免了Holger指出的陷阱.
这是代码:
<T> Stream<T> exceptionally(Stream<T> source, BiConsumer<Exception, Consumer<? super T>> handler) {
class ExceptionallySpliterator extends AbstractSpliterator<T>
implements Consumer<T> {
private Spliterator<T> source;
private T value;
private long fence;
ExceptionallySpliterator(Spliterator<T> source) {
super(source.estimateSize(), source.characteristics());
this.fence = source.getExactSizeIfKnown();
this.source = source;
}
@Override
public Spliterator<T> trySplit() {
Spliterator<T> it = source.trySplit();
return it == null ? null : new ExceptionallySpliterator(it);
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
return fence != 0 && consuming(action);
}
private boolean consuming(Consumer<? super T> action) {
Boolean state = tryConsuming(action);
if (state == null) {
return true;
}
if (state) {
action.accept(value);
value = null;
return true;
}
return false;
}
private Boolean tryConsuming(Consumer<? super T> action) {
fence--;
try {
return source.tryAdvance(this);
} catch (Exception ex) {
handler.accept(ex, action);
return null;
}
}
@Override
public void accept(T value) {
this.value = value;
}
}
return stream(new ExceptionallySpliterator(source.spliterator()), source.isParallel()).onClose(source::close);
}
如果您想进一步了解此解决方案,请参阅to the tests.