如何映射Java流中的RuntimeExceptions以从无效流元素中“恢复”

想象一下,我正在构建一个将接收整数流的库,并且所有库代码需要做的是返回一个字符串流,其中包含数字的字符串表示形式.

public Stream<String> convertToString(Stream<Integer> input) {
  return input.map(n -> String.valueOf(n));
}

但是,想象某人决定使用以下代码调用我的函数:

Stream<Integer> input = Stream.of(1,2,3)
  .map(n -> {
    if (n %3 ) { throw new RuntimeException("3s are not welcome here!"); }
    else { return n; }
  }

Stream<String> output = convertToString(input);

由于我希望我的API合约能够处理这种情况并且仍然返回一个字符串流,我想知道如何更改我的代码,以便它检测到流中存在异常并从异常中“恢复”通过将其映射到特殊的“NaN”值.

最后,我希望我的输出Stream为“1”,“2”,“NaN”

笔记:

>我不希望我的API必须为输入定义一个特殊的包装器.
>由于依赖性的影响,我不想依赖第三方库.
>我不知道流的大小或生成的速度有多大,因此我不能使用任何类型的缓存/预加载所有值.

这可能与Java 8 Streams一起使用吗?

有了Iterator,我想我可以做到:

public class SafeIntegerToStringIterator implements Iterator<String> {
  Iterator<Integer> innerIterator;
  ...
  public String next() throws NoSuchElementException {
    try { return String.valueOf(this.innerIterator.next()); }
    catch (RuntimeException e) { return "NaN"; }
  }

}
...
public Iterator<String> convertToString(Iterator<Integer> input) {
  return new SafeIntegerToStringIterator(input);
}

谢谢

解决方法:

注意:请参阅本文末尾的编辑,修复了原始答案中的错误.无论如何,我正在离开我的原始答案,因为它在许多情况下仍然有用,我认为它有助于解决OP的问题,至少有一些限制.

您使用迭代器的方法是正确的方向.可以按如下方式起草解决方案:将流转换为迭代器,按照您已经完成的方式包装迭代器,然后从包装器迭代器创建流,除了您应该使用Spliterator.这是代码:

private static <T> Stream<T> asNonThrowingStream(
        Stream<T> stream,
        Supplier<? extends T> valueOnException) {

    // Get spliterator from original stream
    Spliterator<T> spliterator = stream.spliterator();

    // Return new stream from wrapper spliterator
    return StreamSupport.stream(

        // Extending AbstractSpliterator is enough for our purpose
        new Spliterators.AbstractSpliterator<T>(
                spliterator.estimateSize(),
                spliterator.characteristics()) {

            // We only need to implement tryAdvance
            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                try {
                    return spliterator.tryAdvance(action);
                } catch (RuntimeException e) {
                    action.accept(valueOnException.get());
                    return true;
                }
            }
        }, stream.isParallel());
}

我们正在扩展AbstractSpliterator以包装原始流返回的spliterator.我们只需要实现tryAdvance方法,该方法可以委托给原始的spliterator的tryAdvance方法,也可以捕获RuntimeException并使用提供的valueOnException值调用该操作.

Spliterator的契约指定如果操作被占用,则tryAdvance的返回值必须为true,因此如果捕获了RuntimeException,则意味着原始的spliterator已从其自己的tryAdvance方法中抛出它.因此,在这种情况下我们返回true,这意味着该元素无论如何都被消耗了.

通过将这些值作为参数传递给AbstractSpliterator的构造函数来保留原始spliterator的估计大小和特征.

最后,我们通过StreamSupport.stream方法从新的spliterator创建一个新流.如果原始流也是并行的,则新流是并行的.

以下是如何使用上述方法:

public Stream<String> convertToString(Stream<Integer> input) {
    return asNonThrowingStream(input.map(String::valueOf), () -> "NaN");
}

编辑

根据下面的Holger’s comment,7004提供了一个解决方案,避免了Holger指出的陷阱.

这是代码:

<T> Stream<T> exceptionally(Stream<T> source, BiConsumer<Exception, Consumer<? super T>> handler) {
    class ExceptionallySpliterator extends AbstractSpliterator<T>
            implements Consumer<T> {

        private Spliterator<T> source;
        private T value;
        private long fence;

        ExceptionallySpliterator(Spliterator<T> source) {
            super(source.estimateSize(), source.characteristics());
            this.fence = source.getExactSizeIfKnown();
            this.source = source;
        }

        @Override
        public Spliterator<T> trySplit() {
            Spliterator<T> it = source.trySplit();
            return it == null ? null : new ExceptionallySpliterator(it);
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            return fence != 0 && consuming(action);
        }

        private boolean consuming(Consumer<? super T> action) {
            Boolean state = tryConsuming(action);
            if (state == null) {
                return true;
            }
            if (state) {
                action.accept(value);
                value = null;
                return true;
            }
            return false;
        }


        private Boolean tryConsuming(Consumer<? super T> action) {
            fence--;
            try {
                return source.tryAdvance(this);
            } catch (Exception ex) {
                handler.accept(ex, action);
                return null;
            }
        }

        @Override
        public void accept(T value) {
            this.value = value;
        }
    }

    return stream(new ExceptionallySpliterator(source.spliterator()), source.isParallel()).onClose(source::close);
}

如果您想进一步了解此解决方案,请参阅to the tests.

上一篇:java – 为什么在扩展RuntimeException时需要serialVersionUID?


下一篇:安全模式下卸载windows installer打包的软件(转)