我想实现能够获得工作者(callables)集合的东西,在线程池上并行运行它,并且当最快的worker返回结果时,优雅地关闭(ExecutorService.shutdownNow)其他worker,以免浪费更多资源.如果所有工人都以异常结束,我需要重新抛出最重要的工人(工人抛出的所有异常都与重要性值相关联).此外,我需要在整个执行程序上有一个超时,如果它们运行的时间太长,它将终止所有工作程序.
我已经考虑过使用RxJava,因为它可以在这里实现简洁而美观的解决方案.但也许你可以为它提出一些更好的工具(CompletableFutures,ForkJoinTasks?).这是我已经编写过的代码,但它远不是一个有效的解决方案(我对反应式编程并不是很有经验,因此真的很难解决这个问题):
public T run(Collection<? extends Worker<T>> workers, long timeout) {
ExecutorService executorService = Executors.newFixedThreadPool(workers.size());
return Observable.from(workers)
.timeout(timeout, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.from(executorService))
.map(worker -> {
try {
T res = worker.call();
executorService.shutdownNow();
return res;
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}).doOnError(Exceptions::propagate).toBlocking().first();
如果有任何帮助,我将不胜感激.
解决方法:
相当有趣的技术挑战,谢谢你的提问.这是使用CompletableFuture for Java8的解决方案.在Java7中,您可以以完全相同的方式使用io.netty.util.concurrent.Promise.
最简单的部分是处理正常情况:
>创造一个可以完善的未来
>安排任务
>回归未来
>第一个完成未来,其他人被忽略(如果没有被杀死,那么原子布尔控制他们不会覆盖值)
>在未来的下一阶段关闭执行器服务
更棘手的部分是在每个单独抛出保持相同的逻辑流时异常完成.这可以通过累积所有异常并在计数达到最后一个失败作业中的作业计数时异常完成未来来解决.传递的异常是按排名排序的列表中的第一个(这里它将是最小排名,相应地更改).调用future.get()并将其包装到ExecutionException中时,该异常将可用.
最后,因为您将来会回来,所以可以将超时值传递给get方法.
所以这是实际的工作解决方案,异常类和测试如下:
public <R> CompletableFuture<R> execute(Collection<? extends Callable<R>> jobs) {
final CompletableFuture<R> result = new CompletableFuture<>();
if (jobs == null || jobs.isEmpty()) {
result.completeExceptionally(new IllegalArgumentException("there must be at least one job"));
return result;
}
final ExecutorService service = Executors.newFixedThreadPool(jobs.size());
// accumulate all exceptions to rank later (only if all throw)
final List<RankedException> exceptions = Collections.synchronizedList(Lists.newArrayList());
final AtomicBoolean done = new AtomicBoolean(false);
for (Callable<R> job: jobs) {
service.execute(() -> {
try {
// this is where the actual work is done
R res = job.call();
// set result if still unset
if (done.compareAndSet(false, true)) {
// complete the future, move to service shutdown
result.complete(res);
}
// beware of catching Exception, change to your own checked type
} catch (Exception ex) {
if (ex instanceof RankedException) {
exceptions.add((RankedException) ex);
} else {
exceptions.add(new RankedException(ex));
}
if (exceptions.size() >= jobs.size()) {
// the last to throw and only if all have thrown will run:
Collections.sort(exceptions, (left, right) -> Integer.compare(left.rank, right.rank));
// complete the future, move to service shutdown
result.completeExceptionally(exceptions.get(0));
}
}
});
}
// shutdown also on error, do not wait for this stage
result.whenCompleteAsync((action, t) -> service.shutdownNow());
return result;
}
RankedExeption:
public static class RankedException extends Exception {
private final int rank;
public RankedException(Throwable t) {
this(0, t);
}
public RankedException(int rank, Throwable t) {
super(t);
this.rank = rank;
}
}
现在有两个测试,成功和失败案例(有点简化,但仍然):
@Rule
public ExpectedException exception = ExpectedException.none();
private static class TestJob implements Callable<Double> {
private final int index;
private final int failOnCount;
TestJob(int index, int failOnCount) {
this.index = index;
this.failOnCount = failOnCount;
}
@Override
public Double call() throws RankedException {
double res = 0;
int count = (int) (Math.random() * 1e6) + 1;
if (count > failOnCount) {
throw new RankedException(count, new RuntimeException("job " + index + " failed"));
}
for (int i = 0; i < count; i++) {
res += Math.random();
}
return res;
}
}
@Test
public void test_success() throws Exception {
List<TestJob> jobs = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
jobs.add(new TestJob(i, (int)(5*1e5))); // 50% should be alright
}
CompletableFuture<Double> res = execute(jobs);
logger.info("SUCCESS-TEST completed with " + res.get(30, TimeUnit.SECONDS));
}
@Test
public void test_failure() throws Exception {
List<TestJob> jobs = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
jobs.add(new TestJob(i, 0)); // all should fail
}
CompletableFuture<Double> res = execute(jobs);
exception.expect(ExecutionException.class);
try {
res.get(30, TimeUnit.SECONDS);
} catch (ExecutionException ex) {
logger.severe(String.format("FAIL-TEST rank: %s", ((RankedException) ex.getCause()).rank));
throw ex;
}
}
最后,测试的截断输出运行:
INFO: SUCCESS-TEST completed with 115863.20802680103
SEVERE: FAIL-TEST rank: 388150
Process finished with exit code 0
注意:您可能希望通过AtomicBoolean进一步发送信号,以便在第一个就绪时实际发出所有线程的信号
我不保证上面的代码没有错误,因为它是在匆忙中完成的,测试是初步的.它旨在指出进一步挖掘的方向.