一、背景
jdk8中加入了实现类CompletableFuture,用于异步编程。底层做任务使用的是ForkJoin, 顾名思义,是将任务的数据集分为多个子数据集,而每个子集,都可以由独立的子任务来处理,最后将每个子任务的结果汇集起来。它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。从api文档看,它实现了2个接口CompletionStage和Future。CompletionStage支持lambda表达式,接口的方法的功能都是在某个阶段得到结果后要做的事情。因此,CompletableFuture不仅拥有Future的所有特性,而且还内置了lambda表达式,支持异步回调,结果转换等功能,它有以下Future实现不了的功能:
-
合并两个相互独立的异步计算的结果
-
等待异步任务的所有任务都完成
-
等待异步任务的其中一个任务完成就返回结果
-
任务完成后调用回调方法
-
任务完成的结果可以用于下一个任务。
-
任务完成时发出通知提供原生的异常处理api
二、代码
package com.example.demo; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.*; public class CompletableFutureDemo { //CPU核数 private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors(); private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(AVAILABLE_PROCESSORS, 3 * AVAILABLE_PROCESSORS, 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(20)); public static void main(String[] args) throws Exception { long startTime = System.currentTimeMillis(); System.out.println("demo start....." + startTime); demo3(); System.out.println("demo end.....costTime = " + (System.currentTimeMillis() - startTime)); } /** * 基于allOf,并行处理多个任务,等待所有任务执行完毕后返回 */ public static void demo3() throws Exception { //用户整体接收各个任务的返回值 Map<String,String> dataMap = new ConcurrentHashMap<>(); List<CompletableFuture<String>> futureList = new ArrayList<>(); futureList.add(doSomethingA("A", dataMap)); futureList.add(doSomethingB("B", dataMap)); futureList.add(doSomethingC("C", dataMap)); CompletableFuture<Void> result = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])); try { result.get(3, TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } System.out.println("dataMap = " + dataMap); //结果为:{doSomeThingB=B, doSomeThingA=A} } /** * 基于thenCompose,第一个任务执行完后,第二个任务使用第一个任务的返回作为参数 */ public static void demo1() throws Exception { Map<String,String> dataMap = new HashMap<>(); CompletableFuture<String> completableFuture = doSomethingA("A", dataMap) .thenCompose(id -> doSomethingB(id, dataMap)); String result = completableFuture.get(3, TimeUnit.SECONDS); System.out.println("result = " + result); //结果为:A is done is done } /** * 基于thenCombine,当两个任务都完成后,使用两者的结果作为参数再执行一个异步任务 */ public static void demo2() throws Exception { Map<String,String> dataMap = new HashMap<>(); CompletableFuture<String> completableFuture = doSomethingA("A", dataMap) .thenCombine(doSomethingB("B", dataMap), (a, b) -> a + " - " + b); String result = completableFuture.get(3, TimeUnit.SECONDS); System.out.println("result = " + result); //结果为:A is done - B is done } /** * @param dataMap 用户整体接收方法的返回值 * @return */ public static CompletableFuture<String> doSomethingA(String taskId, Map<String,String> dataMap) { System.out.println("doSomethingA start....." + System.currentTimeMillis()); return CompletableFuture.supplyAsync(() -> { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } dataMap.put("doSomeThingA", "A"); System.out.println(taskId + " is done and dataMap"+dataMap); return taskId + " is done"; }, threadPoolExecutor); } public static CompletableFuture<String> doSomethingB(String taskId, Map<String,String> dataMap) { System.out.println("doSomethingB start....." + System.currentTimeMillis()); return CompletableFuture.supplyAsync(() -> { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } dataMap.put("doSomeThingB", "B"); System.out.println(taskId + " is done and dataMap"+dataMap); return taskId + " -> B is done"; }, threadPoolExecutor); } public static CompletableFuture<String> doSomethingC(String taskId, Map<String,String> dataMap) { System.out.println("doSomethingC start....." + System.currentTimeMillis()); return CompletableFuture.supplyAsync(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } dataMap.put("doSomeThingC", "C"); System.out.println(taskId + " is done and dataMap"+dataMap); return taskId + " is done"; }, threadPoolExecutor); } }
三、效率比较
很明显,异步更快
package com.example.demo; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; /** * @author d00018641 * @date 2021/11/4 15:10 */ public class TestDemo2 { private static final String key = "llllllllllllllllllllllll"; public static void main(String[] args) { List<String> requestList = new ArrayList<>(); requestList.add("3"); requestList.add("4"); requestList.add("5"); requestList.add("6"); // 响应参数list String[] returnArray = new String[requestList.size()]; // 异步查询每一列,定义响应列数的futures List<CompletableFuture<String>> futures = new ArrayList<>(); long startTime = System.currentTimeMillis(); for (int i = 0; i < requestList.size(); i++) { final int a = i; CompletableFuture<String> tf = CompletableFuture.supplyAsync(() -> { return calc(requestList.get(a)); }).whenComplete((m, e) -> returnArray[a] = m); futures.add(tf); } CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); //CompletableFuture end.....costTime = 147 System.out.println("CompletableFuture end.....costTime = " + (System.currentTimeMillis() - startTime)); long startTime1 = System.currentTimeMillis(); for(int i = 0; i < requestList.size(); i++){ returnArray[i] = calc(requestList.get(i)); } //连续 end.....costTime = 432 System.out.println("连续 end.....costTime = " + (System.currentTimeMillis() - startTime1)); System.out.println(Arrays.asList(returnArray)); } private static String calc(String source) { int as = Integer.parseInt(source); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return String.valueOf(Math.pow(as, 3)); } }