我需要运行N个慢速计算(其中N是一个相当大的数字),并希望在M个线程上运行,因为慢速计算有很多IO等待时间.我整理了一个小示例,该示例非常适合所有计算成功的情况.但是,如果计算失败,则期望的行为是停止处理进一步的计算.每次成功的计算都已经将结果写入数据库,因此我只需要确定哪个计算失败并停止尚未开始的计算即可.
我的方法是将ExecutorService接口用于Executors.newFixedThreadPool.但是,我看不到一种干净的方法来识别其中一个计算失败(在我的示例中返回false),并停止已提交给ExecutorService但尚未从池中分配线程的计算.
有没有一种干净的方法可以做到这一点?有没有更好的方法可供我考虑?
import java.util.*;
import java.util.concurrent.*;
class Future
{
static private class MyWorker implements Callable
{
private Integer item;
public MyWorker(Integer item)
{
this.item = item;
}
public Boolean call() throws InterruptedException
{
if (item == 42)
{
return false;
}
else
{
System.out.println("Processing: " + item.toString() + " on " + Thread.currentThread().getName());
Thread.sleep(1000);
return true;
}
}
}
static int NTHREADS = 2;
public static void main(String args[])
{
Queue<Integer> numbers = new LinkedList<Integer>();
for (int i=1; i<10000; i++)
{
numbers.add(i);
}
System.out.println("Starting thread test.");
ExecutorService exec = Executors.newFixedThreadPool(NTHREADS);
for (Integer i : numbers)
{
MyWorker my = new MyWorker(i);
System.out.println("Submit..." + i.toString());
exec.submit(my);
System.out.println("... Done Submit");
}
exec.shutdown();
System.out.println("Exiting thread test.");
}
}
编辑:这是afk建议的有效实施.仍计划查看回调解决方案,并希望有其他建议.
import java.util.*;
import java.util.concurrent.*;
class MyFuture
{
static private class MyWorker implements Callable
{
private Integer item;
public MyWorker(Integer item)
{
this.item = item;
}
public Boolean call()
{
if (item == 42)
{
return false;
}
else
{
System.out.println("Processing: " + item.toString() + " on " + Thread.currentThread().getName());
try
{
Thread.sleep(1000);
}
catch (InterruptedException ie)
{
// Not much to do here except be grumpy they woke us up...
}
return true;
}
}
}
static int NTHREADS = 4;
public static void main(String args[]) throws InterruptedException
{
Queue<Integer> numbers = new LinkedList<Integer>();
for (int i=1; i<100; i++)
{
numbers.add(i);
}
System.out.println("Starting thread test.");
ExecutorService exec = Executors.newFixedThreadPool(NTHREADS);
List<Future<Boolean>> futures = new LinkedList<Future<Boolean>>();
for (Integer i : numbers)
{
MyWorker my = new MyWorker(i);
System.out.println("Submit..." + i.toString());
Future<Boolean> f = exec.submit(my);
futures.add(f);
System.out.println("... Done Submit");
}
boolean done = false;
while (!done)
{
Iterator<Future<Boolean>> it = futures.iterator();
while (it.hasNext())
{
Future<Boolean> f = it.next();
if (f.isDone())
{
try
{
System.out.println("CHECK RETURN VALUE");
if (f.get())
{
it.remove();
}
else
{
System.out.println("IMMEDIATE SHUTDOWN");
exec.shutdownNow();
done = true;
break;
}
}
catch (InterruptedException ie)
{
}
catch (ExecutionException ee)
{
}
}
}
Thread.sleep(1000);
if (futures.size() == 0)
{
done = true;
}
}
exec.shutdown();
System.out.println("Exiting thread test.");
}
}
解决方法:
使用回调,可以将失败通知为I outline in another answer,,并将所有已提交的作业通知cancel
. (在我的示例中,Callback实现类可以引用添加了每个Future的Collection.)对于那些已完成(或启动,取决于参数的值)的任务,cancel无效.其余的将永远不会开始.