在Java中的N个线程上处理M个缓慢的计算

我需要运行N个慢速计算(其中N是一个相当大的数字),并希望在M个线程上运行,因为慢速计算有很多IO等待时间.我整理了一个小示例,该示例非常适合所有计算成功的情况.但是,如果计算失败,则期望的行为是停止处理进一步的计算.每次成功的计算都已经将结果写入数据库,因此我只需要确定哪个计算失败并停止尚未开始的计算即可.

我的方法是将ExecutorService接口用于Executors.newFixedThreadPool.但是,我看不到一种干净的方法来识别其中一个计算失败(在我的示例中返回false),并停止已提交给ExecutorService但尚未从池中分配线程的计算.

有没有一种干净的方法可以做到这一点?有没有更好的方法可供我考虑?

import java.util.*;
import java.util.concurrent.*;

class Future
{
    static private class MyWorker implements Callable
    {   
        private Integer item;
        public MyWorker(Integer item)
        {
            this.item = item;
        }

        public Boolean call() throws InterruptedException
        {
            if (item == 42) 
            {
                return false;
            }
            else
            {
                System.out.println("Processing: " + item.toString() + " on " + Thread.currentThread().getName());
                Thread.sleep(1000);
                return true;
            }
        }   
    }

    static int NTHREADS = 2;

    public static void main(String args[]) 
    {
        Queue<Integer> numbers = new LinkedList<Integer>();     
        for (int i=1; i<10000; i++)
        {
            numbers.add(i);
        }

        System.out.println("Starting thread test.");

        ExecutorService exec = Executors.newFixedThreadPool(NTHREADS);

        for (Integer i : numbers)
        {
            MyWorker my = new MyWorker(i);
            System.out.println("Submit..." + i.toString());
            exec.submit(my);
            System.out.println("... Done Submit");
        }

        exec.shutdown();

        System.out.println("Exiting thread test.");

    }
}

编辑:这是afk建议的有效实施.仍计划查看回调解决方案,并希望有其他建议.

import java.util.*;
import java.util.concurrent.*;

class MyFuture
{
    static private class MyWorker implements Callable
    {   
        private Integer item;
        public MyWorker(Integer item)
        {
            this.item = item;
        }

        public Boolean call() 
        {
            if (item == 42) 
            {
                return false;
            }
            else
            {
                System.out.println("Processing: " + item.toString() + " on " + Thread.currentThread().getName());
                try
                {
                    Thread.sleep(1000);
                }
                catch (InterruptedException ie) 
                { 
                // Not much to do here except be grumpy they woke us up...
                } 
                return true;
            }
        }   
    }

    static int NTHREADS = 4;

    public static void main(String args[]) throws InterruptedException
    {
        Queue<Integer> numbers = new LinkedList<Integer>();     
        for (int i=1; i<100; i++)
        {
            numbers.add(i);
        }

        System.out.println("Starting thread test.");

        ExecutorService exec = Executors.newFixedThreadPool(NTHREADS);

        List<Future<Boolean>> futures = new LinkedList<Future<Boolean>>();

        for (Integer i : numbers)
        {
            MyWorker my = new MyWorker(i);
            System.out.println("Submit..." + i.toString());
            Future<Boolean> f = exec.submit(my);
            futures.add(f);
            System.out.println("... Done Submit");
        }

        boolean done = false;

        while (!done)
        {
            Iterator<Future<Boolean>> it = futures.iterator();

            while (it.hasNext()) 
            {
                Future<Boolean> f = it.next();
                if (f.isDone())
                {
                    try
                    {
                        System.out.println("CHECK RETURN VALUE");
                        if (f.get()) 
                        {
                            it.remove();
                        }
                        else
                        {                   
                            System.out.println("IMMEDIATE SHUTDOWN");
                            exec.shutdownNow();
                            done = true;
                            break;
                        }
                    }
                    catch (InterruptedException ie)
                    {
                    }
                    catch (ExecutionException ee)
                    {
                    }
                }
            }
            Thread.sleep(1000);
            if (futures.size() == 0)
            {
                done = true;
            }
        }

        exec.shutdown();

        System.out.println("Exiting thread test.");

    }
}

解决方法:

使用回调,可以将失败通知为I outline in another answer,,并将所有已提交的作业通知cancel. (在我的示例中,Callback实现类可以引用添加了每个Future的Collection.)对于那些已完成(或启动,取决于参数的值)的任务,cancel无效.其余的将永远不会开始.

上一篇:跨线程UI组件调用


下一篇:c#-2有关命名管道的问题