Java多线程Future与CompletableFuture-异步获取接口返回结果

背景:

当调用一些耗时接口时,如果我们一直在原地等待方法返回,整体程序的运行效率会大大降低。可以把调用的过程放到子线程去执行,再通过 Future 去控制子线程的调用过程,最后获取到计算结果。提高整个程序的运行效率。

创建线程池:

@Configuration
public class ExecutorConfig {

    private final static int THREAD_COUNT = Runtime.getRuntime().availableProcessors() * 2;

    @Bean(name = "batchCallThreadPool")
    public ThreadPoolExecutor batchPredictThreadPool() {
        return new ThreadPoolExecutor(THREAD_COUNT, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(2048),
            new ThreadFactoryBuilder().setNameFormat("batch-call-pool-%d").build());
    }

}

利用Future获取线程执行结果

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.common.collect.Maps;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

public class FutureTestService {

    @Autowired
    private CallService callService;

    @Autowired
    @Qualifier("batchPredictThreadPool")
    private ThreadPoolExecutor batchPredictThreadPool;

    public void fun(List<Long> idList) {
        Map<Long, Future<Integer>> futureMap = Maps.newHashMapWithExpectedSize(idList.size());
        // 让所有调用的子线程启动,参与竞争
        for (Long id : idList) {
            Future<Integer> future = batchPredictThreadPool.submit(() ->
                callService.call(id));
            futureMap.put(id, future);
        }
        
        for (Long id : futureMap.keySet()) {
            try {
                // 阻塞获取执行结果,如果 3s 未获取到会抛出超时异常
                Integer result = futureMap.get(id).get(3000, TimeUnit.MILLISECONDS);
            } catch (TimeoutException e) {
                // 处理超时
            } catch (ExecutionException e) {
                // 处理执行时异常
            } catch (InterruptedException e) {
                // 处理 中断
            }
        }
    }
}

利用CompletableFuture获取线程执行结果

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

public class FutureTestService {

    @Autowired
    private CallService callService;

    @Autowired
    @Qualifier("batchPredictThreadPool")
    private ThreadPoolExecutor batchPredictThreadPool;

    public void fun(List<Long> idList) throws ExecutionException, InterruptedException, TimeoutException {
        List<CompletableFuture<Integer>> completableFutureList = new LinkedList<>();
        Map<Long, Integer> resultMap = new HashMap<>(idList.size());
        // 让所有调用的子线程启动,参与竞争
        for (Long id : idList) {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                Integer result = callService.call(id);
                resultMap.put(id, result);
                return null;
            }, batchPredictThreadPool);
            completableFutureList.add(future);
        }

        // 在此处聚合
        CompletableFuture<Void> allCompletableFuture = CompletableFuture.allOf(completableFutureList.toArray(
            new CompletableFuture[completableFutureList.size()]));

        /**
         * 如果在 3 秒钟之内这些任务都可以顺利返回,则这个 get 方法就可以及时正常返回,并且往下执行。
         * 如果有某一个任务没能来得及在 3 秒钟之内返回,那么这个带超时参数的 get 方法便会抛出 TimeoutException 异常
         * 会尝试等待所有的任务完成,但是最多只会等 3 秒钟,在此之间,如及时完成则及时返回。
         */
        allCompletableFuture.get(3, TimeUnit.SECONDS);
    }
}


上一篇:YourSQLDba设置共享路径备份


下一篇:[Unity3d]Player Settings导出设置