java – 当ThreadPoolExecutor的所有任务完成执行作业时插入数据库

问题定义: –

只要ExecutorService的所有任务都完成了对作业的执行,我就需要在数据库中插入一些值.换句话说,只有当所有任务都完成执行时,我才可以插入到数据库中,因为我需要插入到db中的东西,取决于完成任务的所有线程.

那么我如何检查ExecutorService的所有任务是否已经完成执行,然后开始插入数据库.

下面是我使用ThreadPoolExecutor创建任务的代码.

executorService = new ThreadPoolExecutor(
    noOfThreads, 
    noOfThreads, 
    500L, 
    TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<Runnable>(noOfThreads), 
    new ThreadPoolExecutor.CallerRunsPolicy()
);


// Running for particular duration of time
while(System.currentTimeMillis() <= endTime) {
    Command newCommand = getNextCommand();
    Task nextRunnable = new Task(newCommand, existId, newId);
    executorService.submit(nextRunnable); // Submit it for execution
}

/* 
    Previously I was inserting into database here, but it was wrong as it might be 
    possible that some threads are still in progress. And If I am inserting right here 
    into database, then some information I will be loosing for sure. So how can I check 
    whether all the tasks have finished executing and then I can insert into database.
*/      
executorService.shutdown();
if (!executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)) {
    executorService.shutdownNow();
}

我将用于插入的代码如下 –

// Inserting into Database when all the task have finished executing, currently I
for (Entry<Integer, LinkedHashMap<Integer, String>> entry : GUID_ID_MAPPING.entrySet()) {
    pstatement = db_connection.prepareStatement(LnPConstants.UPSERT_SQL);
    pstatement.setInt(1, entry.getKey());
    pstatement.setString(2, entry.getValue().get(LnPConstants.CGUID_ID));
    pstatement.setString(3, entry.getValue().get(LnPConstants.PGUID_ID));
    pstatement.executeUpdate();
}

因此,我需要在所有任务完成执行后将其置于某个地方.

更新: –

所以这样的东西 –

executorService.shutdown();
if (!executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)) {
    executorService.shutdownNow();
}

// Now Insert into Database when all the task have finished executing
for (Entry<Integer, LinkedHashMap<Integer, String>> entry : GUID_ID_MAPPING.entrySet()) {
    pstatement = db_connection.prepareStatement(PDSLnPConstants.UPSERT_SQL);
    pstatement.setInt(1, entry.getKey());
    pstatement.setString(2, entry.getValue().get(PDSLnPConstants.CGUID_ID));
    pstatement.setString(3, entry.getValue().get(PDSLnPConstants.PGUID_ID));
    pstatement.executeUpdate();
}

解决方法:

你基本上已经有了答案.等待executorService.awaitTermination返回,执行程序将完成它的所有任务.

这会忽略由于错误而可能失败的任务.另一种方法,并检查错误是做类似的事情,

 List<Future> futures = ...
 for(...) {
      futures.add(executor.submit(...));
 }

 for(Future f : futures) {
       //this will throw an exception if an exception
       //occurred executing the task, insert error handling as 
       //appropriate, perhaps calling cancel on tasks that
       //have not yet completed
       f.get();
 }
 //at this point all tasks have completed

要查看的另一个类是ExecutorCompletionService. ExecutorCompletionService允许您按照实际完成的顺序获取任务.

上一篇:Java执行程序无法对任务进行排队


下一篇:java – 来自ExecutorService的CompletableFuture