众所周知,ThreadPoolExecutor使用一些BlockingQueue作为传入任务的队列.我想要的是让ThreadPoolExecutor为任务结果准备好第二个队列.我想将此队列用作发送或存储这些结果的输入/输出服务的源.
为什么我要创建一个单独的队列?因为我想要将结果的发送与获取结果的动作分开.另外,我认为伴随输入/输出操作的任何异常和延迟都不应该影响我正在计算结果的ThreadPoolExecutor.
我已经创建了一些天真的实现.我想对此提出一些批评.可能是,它可以用更开箱即用的Java类实现吗?我使用Java 7.
public class ThreadPoolWithResultQueue {
interface Callback<T> {
void complete(T t);
}
public abstract static class CallbackTask<T> implements Runnable {
private final Callback callback;
CallbackTask(Callback callback) {
this.callback = callback;
}
public abstract T execute();
final public void run() {
T t = execute();
callback.complete(t);
}
}
public static class CallBackTaskString extends CallbackTask<String> {
public CallBackTaskString(Callback callback) {
super(callback);
}
@Override
public String execute() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
return hashCode() + "-" + System.currentTimeMillis();
}
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
final BlockingQueue<String> resultQueue = new LinkedBlockingQueue<String>();
Callback<String> addToQueueCallback = new Callback<String>() {
@Override
public void complete(String s) {
System.out.println("Adding Result To Queue " + s);
resultQueue.add(s); //adding to outgoing queue. some other executor (or same one?) will process it
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 1000l, TimeUnit.DAYS, workQueue);
for (int i = 0; i <= 5; i++) {
executor.submit(new CallBackTaskString(addToQueueCallback));
};
System.out.println("All submitted.");
executor.shutdown();
executor.awaitTermination(10l, TimeUnit.SECONDS);
System.out.println("Result queue size " + resultQueue.size());
}
}
解决方法:
为了makinf一个库组件,你必须把事情包装起来……
您可以扩展线程池执行程序,它具有许多方法来拦截提交的任务,因此您可以将事物排队到构造函数中传递的队列.
这基本上是ExecutorCompletionService,但您可以允许用户插入队列而不是显示为队列.
否则,这是任务的典型代理.公平的工作.