为什么需要异步?
随着应用越来越复杂,用户基数越来越大,后端服务将面临着极大的压力,这个压力主要体现在两个方面:
- 是否能为用户提供持续高效稳定的服务,提高可用性;
- 是否能充分利用服务器资源做更多的事情,降低成本;
我们先看第1点,大家作为资深互联网用户,其实都是知道自己不好伺候的,比如说你现在去某个电商平台购买物品,你几乎无法忍受任何延迟,哪怕对方客服告诉你:我们在你购买时,会同时帮你计算积分,所以会有所延迟...但真正“抢”购时,韭菜都更在意速度而不是积分的,对吧?
再看看第2点,后端服务的压力越来越大时,工程师们会非常上火,为了周末能顺利陪女朋友玩耍(假如有的话),打算来个暴力解决方案:升配!这个时候,运维站出来,给你贴了一张CPU和内存利用率的图,说道:代码写的烂,升配解决不了的,资源利用率也太低了。架构师老脸一红,站出来了:大家周末加加班,一起优化下代码吧~
好了,我们怎么通过优化代码,来解决这两个问题呢?答案是:异步。
我们大部分代码,都是同步调用的,所谓同步,是指调用方必须等到方法返回后才会进行下一步,而异步,是指调用方不必等到方法返回,只需要方法在完成任务后通知调用方即可。
以上面的电商为例,当用户购买商品时,系统会计算用户积分,而积分的计算,涉及到很多规则,也需要操作数据库或其他数据源,往往会比较耗时,最重要的,它并不属于购物主流程,假如做成同步顺序执行,用户会等待很长时间。对于这种业务,我们可以考虑将其异步化,让其不耽误主流程的继续进行。
那么,异步是怎么解决资源利用的问题的呢?当你的业务逻辑都是同步执行的,也就意味着一旦被耗时的操作阻塞,是没有执行其他业务逻辑的机会的,此时CPU几乎被闲置,利用率低,假如此时是通过多线程异步执行的,那么其他线程仍然能同时处理其他逻辑,CPU会一直跑。这里顺便提一下:异步和多线程并不是一个等价的概念,Java中的异步是依靠多线程实现的。
Java对异步编程的支持
使用Future编写异步代码
Java对异步API的直接支持是从JDK1.5开始的,在该版本的并发包下(JUG),提供了Future,它表示一个异步计算的结果,该结果只能在执行完成后,由其get方法获取到,在这之前,get方法会一直阻塞,除非设置了超时时间,下面先看个例子:
ExecutorService executorService=Executors.newFixedThreadPool(5);
Future<Integer> future=executorService.submit(()->{
System.out.println("hello future");
Thread.sleep(3000);
//模拟计算错误
//int i=10/0;
return 1;
});
System.out.println("...其他操作...");
try {
Integer count = future.get(5,TimeUnit.SECONDS);
System.out.println("count: "+count);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
这段代码并不难,但是很多工程师用法不对,会导致很多不该出的问题。Future是靠线程池驱动的,以前线程池的使用经验仍然是基础,以前线程池出什么问题,现在依然会出什么问题。Future适用于把计算剥离在一个单独的线程环境中处理,然后期待它“未来”返回一个结果给你。【在任何时候】,这个“未来”都是要做超时控制的,否则会一直阻塞下去,你无法处理一个看不见的未来...
在上面的代码中,我们调用了Future的get方法来获取返回值,假如计算还未结束(模拟了3秒),那么此时会阻塞当前线程。在这个过程中,可能会遇到三种异常,InterruptedException表示线程被中断,ExecutionException表示计算异常,TimeoutException表示超时异常。很多工程师习惯用一个Exception来catch所有异常,这是会出现很多问题的。比如说ExecutionException,是计算体内抛出的异常包装的(可以解开模拟异常代码的注释进行测试),在发生异常时,一般情况下内部计算是失败的,而TimeoutException是计算体执行太长,外面等不了导致的,并不一定代表内部的计算是失败的(会继续执行下去),假如说把这两种异常放一起处理,是有可能出现问题的,除非能完全保证计算的幂等性,然后统一进行重试。
Future的增强版:CompletableFuture
Future使用起来比较简单,但是也能看出它的一些局限性。比如说,我们要获取计算结果,仍然只能通过阻塞调用(get),并且,它不太便于我们做异步计算的编排组合。而JDK8出现的CompletableFuture弥补了它的这些短板。
CompletableFuture实现了Future接口,既可以完成Future所有能做的事。最关键是,它还实现了CompletionStage接口,该接口描述了一个异步计算的阶段。很多计算可以分成多个阶段或步骤,此时可以通过它将所有步骤组合起来,形成异步计算的流水线。
CompletableFuture基本用法
CompletableFuture主要提供两套工具函数来创建异步处理对象:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
其中supplyAsync主要用于需要返回计算结果的异步处理;runAsync主要用于不需要返回计算结果的异步处理,下面我们看看supplyAsync的用法:
CompletableFuture<String> completableFuture=CompletableFuture.supplyAsync(()->{
//模拟耗时操作,sleep 3秒
timeConsuming(3000);
System.out.println(Thread.currentThread().getName());
return "Hello CompletableFuture";
});
/**
* 模拟耗时处理
* @param millis
*/
public static void timeConsuming(long millis){
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
这段代码中,我们在模拟耗时3秒后,返回了计算结果,那么怎么获取该结果呢?前面我们提到,CompletableFuture实现了Future接口,所以通过get方法肯定是能获取到的,但是这样做并不好,更好的做法是发挥其另外一个实现接口CompletionStage的作用,让结果处理也异步化,CompletionStage提供很多链式函数来做各种结果处理:
completableFuture.thenApply(word->{
System.out.println(Thread.currentThread().getName());
//模拟异常
//int i=10/0;
return word+" stage";
}).exceptionally(ex->{
System.out.println("stage1: "+ex.getMessage());
return "hello";
}).thenApply(word->{
System.out.println(Thread.currentThread().getName());
return word.length();
}).thenAccept(System.out::println);
thenApply方法会沿用之前的线程上下文来执行计算,这段代码中,我们连续调用两次thenApplay,每次都是将上次转换的结果作为本次计算的入参,最后,调用thenAccept完成最终处理(消费)。thenApply方法还有个对应的thenApplyAsync方法,它与前者的区别在于:会将任务重新提交到线程池异步执行,而不是沿用之前的线程上下文,说白了就是另外一个异步执行,thenAccept同理。exceptionally用于异常处理,当发生异常时,返回一个默认或者修正后的结果,后面的thenApply会继续将此结果作为入参进行计算。
线程池解惑
不知道大家是否会有疑问,之前我们讲过,异步是依赖于线程池实现的,但是我们在使用CompletableFuture的过程中,并没有看到任何线程池的设置,这是什么原因呢?实际上,JDK8中内置了一个公共线程池ForkJoinPool.commonPool(),在没有显示设置线程池的时候,就使用的该公共线程池。大家可以运行上述代码,看下当前线程名称的打印,会出现“ForkJoinPool.commonPool-worker”的字样。在实际上项目中,笔者还是建议大家手动设置一下线程池,线程隔离会让计算更加高效、安全(比如不会因为线程池的不可用,导致所有依赖它的任务都得不到执行),我们可以调用supplyAsync或runAsync带有Executor的重载方法进行设置。这里顺便提一下,Java8中的并行流parallelStream,内部也是通过ForkJoinPool.commonPool()执行的,我们是无法为它指定线程池的。
组合CompletableFuture
CompletableFuture提供的另外一个超强功能就是可以让多个异步处理组合成新的CompletableFuture,被合并的异步处理,可以有依赖关系,也可以是两个完全独立的个体,工程师完全可以把精力放在业务开发上,而无需像以前一样,自己管理异步流水线。下面我们分别以thenCompose和thenCombine为例进行说明。
CompletableFuture可以通过调用thenCompose,将处理结果传递给下一个CompletableFuture去处理,然后返回一个新的CompletableFuture。设想一个虚拟场景:在购物应用里,我们要在处理订单之后,通过订单金额来给用户算积分,下面看看示例代码:
CompletableFuture<Integer> resultFuture=CompletableFuture.supplyAsync(()->{
//模拟耗时2秒处理订单
timeConsuming(2000);
//返回订单金额
return 30;
}).thenCompose(value->
CompletableFuture.supplyAsync(()->{
//根据订单金额计算积分
if(value<50){
return 1;
}else{
return 2;
}
})
);
try {
Integer result=resultFuture.get(5,TimeUnit.SECONDS);
System.out.println("result= "+result);//result=1
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
在这段代码中,我们仍然首先通过supplyAsync构建了订单异步处理的CompletableFuture对象,并将计算好的结果通过thenCompose传给了积分异步处理的CompletableFuture对象,最终通过get方法获取到积分结果。
CompletableFuture的thenCombine方法更有趣,它不需要关心多个异步之间是否有数据依赖关系,只要我们需要,就可以并行的将各个异步处理的结果合并起来。比如说,我们想获取某本书在A、B两家书店的价格并计算均值,那么可以这么做:
CompletableFuture<Integer> resultFuture=CompletableFuture.supplyAsync(()->{
//模拟耗时1秒获取A书店价格
timeConsuming(1000);
//返回价格
return 30;
}).thenCombine(
CompletableFuture.supplyAsync(()->{
//模拟耗时1秒获取B书店价格
timeConsuming(1000);
//返回价格
return 50;
}),(a,b)->{
System.out.println("a="+a);//30
System.out.println("b="+b);//50
return (a+b)/2;
}
);
这两个操作A、B是完全异步执行的,理论上,获取两个价格的最大时间=max(A、B),而不是A、B两个时间叠加,在实际应用中,性能提升会比较明显。
结合Stream
接着上面那个场景来说,假如现在有多本书,都需要知道在书店A、B中的价格和均价,并且以列表形式存储起来,这个如何做呢?这种场景下,结合Stream来做是非常爽的。
/*********准备【书籍_书店_价格】数据 begin***********/
Map<String,Integer> priceMap=new HashMap<>();
priceMap.put("001_A",30);
priceMap.put("001_B",50);
priceMap.put("002_A",40);
priceMap.put("002_B",60);
priceMap.put("003_A",100);
priceMap.put("003_B",120);
/*********准备【书籍_书店_价格】数据 end***********/
//待查书籍ID列表
Stream<String> bookStreams=Arrays.stream(new String[]{"001","002","003"});
List<CompletableFuture<Integer>> futureList=bookStreams.map(book->CompletableFuture.supplyAsync(()->{
//模拟耗时1秒获取A书店价格
timeConsuming(2000);
//返回价格
return priceMap.get(book+"_A");
}).thenCombine(
CompletableFuture.supplyAsync(()->{
//模拟耗时1秒获取A书店价格
timeConsuming(2000);
//返回价格
return priceMap.get(book+"_B");
}),(a,b)->{
//计算均值
return (a+b)/2;
}
)).collect(Collectors.toList());
List<Integer> list=futureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
这段代码中,我们首先准备了一些基础数据,为了演示方便,我这里直接用Map预存储了【书籍_书店_价格】的关系,其中key=书籍ID_书店标识,value=价格。最核心的代码就是Stream.map方法,这个方法在之前文章里面讲过,它主要用于将Stream中的每个元素迭代出来,然后转换成新的元素。在这里,我们是把之前的异步处理逻辑直接放在map里,即:让每个元素都去异步计算两个书店的价格并取均值。注意:map里面得到的元素并不是最终的均值,而是包含均值的CompletableFuture对象。我们在获取到CompletableFuture列表后,最终通过join方法,将每个对象中的值提取出来,形成最终的价格列表。
总结
本文从后端面临的几个压力开始讲起,分析了异步为什么可以提高性能、提高资源利用率,然后通过代码示例,介绍了Java对异步的API级别的支持,如Java5就出现的Future,以及Java8增强的CompletableFuture,最后,我们结合了Java Stream,完成了一个综合的异步应用,大家不妨自行尝试一下。同时也提醒大家:在使用这些API的过程中,有一些非常重要的细节要处理好,比如异常处理、线程池定义(隔离)等,生产级代码,全靠细节的把控!