在流处理期间遇到RuntimeException时,流处理是否应该中止?应该先完成吗?是否应该在Stream.close()上重新抛出异常?异常是否被重新抛出或被包裹? Stream
的JavaDoc和包java.util.stream没有什么可说的.
我发现的所有关于*的问题似乎都集中在如何从功能界面中包装已检查的异常以便编译代码.事实上,互联网上的博客文章和类似文章都集中在同一个警告上.这不是我关心的问题.
我根据自己的经验知道,一旦抛出RuntimeException,顺序流的处理就会中止,并且这个异常会按原样重新抛出.只有在客户端线程抛出异常时,并行流才会出现相同的情况.
但是,放置here的示例代码演示了如果在并行流处理期间由“工作线程”(=与调用终端操作的线程不同的线程)抛出异常,则此异常将永远丢失并且流处理完成.
示例代码将首先并行运行IntStream
.然后一个“正常”Stream
并行.
这个例子将表明,
1)如果遇到RuntimeException,IntStream在中止并行处理方面没有问题.异常被重新抛出,包装在另一个RuntimeException中.
2)流不好玩.实际上,客户端线程永远不会看到抛出的RuntimeException的痕迹.流不仅完成处理;将处理更多元素而不是指定的limit()!
在示例代码中,使用IntStream.range()生成IntStream.“普通”流没有“范围”的概念,而是由1:s组成,但调用Stream.limit()将流限制为10亿个元素.
这是另一个转折点.生成IntStream的示例代码执行如下操作:
IntStream.range(0, 1_000_000_000).parallel().forEach(..)
将其更改为生成的流,就像代码中的第二个示例一样:
IntStream.generate(() -> 1).limit(1_000_000_000).parallel().forEach(..)
此IntStream的结果是相同的:异常被包装并重新抛出,处理中止.但是,第二个流现在也将包装并重新抛出异常,而不是处理超出限制的元素!因此:更改第一个流的生成方式会对第二个流的行为产生副作用.对我来说,这很奇怪.
ForkJoinPool.invoke()和ForkJoinTask
的JavaDoc说重新抛出异常,这是我对并行流的期望.
背景
当处理从Collection.stream().parallel()获取的并行流中的元素时,我遇到了这个“问题”(我没有验证Collection.parallelStream()的行为,但它应该是相同的).发生的事情是“工作线程”崩溃然后静静地离开,而所有其他线程成功完成了流.我的应用程序使用default exception handler将异常写入日志文件.但是甚至没有创建这个日志文件.线程和他的例外根本就消失了.由于我需要在捕获运行时异常时立即中止,因此一种替代方法是编写将此异常泄漏给其他工作程序的代码,使其在任何其他线程抛出异常时不愿意继续.当然,这并不能保证流实现只是继续生成尝试完成流的新线程.所以我可能最终不会使用并行流,而是使用线程池/执行器进行“正常”并发编程.
这表明丢失的运行时异常问题不会与Stream.generate()或使用Stream.limit()生成的流隔离.最重要的是,我很想知道…是预期的行为?
解决方法:
关于异常报告,这两个流的行为没有区别,问题是您将两个测试一个接一个地放入一个方法中,并让它们访问共享数据结构.
有一个微妙的,可能没有充分记录(如果有意)的行为:当流操作异常完成时,它不会等待所有并发操作的完成.
因此,当您捕获第一个流操作的异常时,仍然有一些线程正在运行并访问您的共享数据.因此,当您重置AtomicBoolean时,属于第一个作业的其中一个线程将读取false值,将其变为true,打印消息并抛出一个丢失的异常,因为流操作已经异常完成.此外,这些线程中的一些将在您重置后提升您的计数器,这就是为什么它的数量高于第二个作业允许的数量.您的第二个作业没有异常完成,因为属于第二个作业的所有线程都将从AtomicBoolean读取一个真值.
有一些方法可以发现这一点.
当您删除第一个流操作时,第二个将按预期异常完成.另外,插入语句
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
两个流操作之间将解决问题,因为它等待所有线程的完成.
但是,更干净的解决方案是让两个流操作都使用自己的计数器和标志.
也就是说,如果你只是交换这两个操作,那么存在一个微妙的,依赖于实现的差异会导致问题消失. IntStream.range操作生成具有已知大小的流,这允许将其拆分为并发任务,这些任务本质上知道要处理多少元素.这允许在如上所述的特殊情况下放弃这些任务.另一方面,组合由generate和limit返回的无限流不会产生大小的流(尽管这是可能的).由于这种流被视为具有未知大小,因此子任务必须在计数器上同步以确保遵守该限制.这导致子任务(有时)完成,即使在特殊情况下也是如此.但正如所说,这是实施细节的副作用,而非故意等待完成.因为它是关于并发性的,所以如果你多次运行它,结果可能会有所不同.