java – 并发:如何使用传入和传出队列实现执行程序?

众所周知,ThreadPoolExecutor使用一些BlockingQueue作为传入任务的队列.我想要的是让ThreadPoolExecutor为任务结果准备好第二个队列.我想将此队列用作发送或存储这些结果的输入/输出服务的源.

为什么我要创建一个单独的队列?因为我想要将结果的发送与获取结果的动作分开.另外,我认为伴随输入/输出操作的任何异常和延迟都不应该影响我正在计算结果的ThreadPoolExecutor.

我已经创建了一些天真的实现.我想对此提出一些批评.可能是,它可以用更开箱即用的Java类实现吗?我使用Java 7.

public class ThreadPoolWithResultQueue {
    interface Callback<T> {
        void complete(T t);
    }
    public abstract static class CallbackTask<T> implements Runnable {
        private final Callback callback;   
        CallbackTask(Callback callback) {
            this.callback = callback;
        }    
        public abstract T execute();   
        final public void run() {
            T t = execute();
            callback.complete(t);
        }
    }   
    public static class CallBackTaskString extends CallbackTask<String> {
        public CallBackTaskString(Callback callback) {
            super(callback);
        }
        @Override
        public String execute() {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
            }
            return hashCode() + "-" + System.currentTimeMillis();
        }
    }    
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
        final BlockingQueue<String> resultQueue = new LinkedBlockingQueue<String>();
        Callback<String> addToQueueCallback = new Callback<String>() {
            @Override
            public void complete(String s) {
                System.out.println("Adding Result To Queue " + s);
                resultQueue.add(s); //adding to outgoing queue. some other executor (or same one?) will process it
            }
        };
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 1000l, TimeUnit.DAYS, workQueue);
        for (int i = 0; i <= 5; i++) {
            executor.submit(new CallBackTaskString(addToQueueCallback));
        };
        System.out.println("All submitted.");
        executor.shutdown();
        executor.awaitTermination(10l, TimeUnit.SECONDS);
        System.out.println("Result queue size " + resultQueue.size());
    }
}

解决方法:

为了makinf一个库组件,你必须把事情包装起来……

您可以扩展线程池执行程序,它具有许多方法来拦截提交的任务,因此您可以将事物排队到构造函数中传递的队列.

这基本上是ExecutorCompletionService,但您可以允许用户插入队列而不是显示为队列.

否则,这是任务的典型代理.公平的工作.

上一篇:java-FutureTask保持活动状态


下一篇:关机还是不关机?在ExecutorService(Java8)中