一、前言:
从自身的感受聊一下异步编程的思想,异步可以理解成是一种生产者消费者、服务调用方与被调用方之间的解耦,服务发起方不用等待某个环节的结果(不会因为网上订个快餐,快餐送达之前啥事都不了):
场景一:数据的生产者的生产能力不应该严格依赖于消费者的消费速度,不会被消费者的响应异常而拖慢整体服务能力,常规的异步思路:
1)单体架构中,可以采用池化技术,线程池是最好的选择,生产者可以将task推入线程池,当线程池处理速度跟不上,可以在硬件资源足够满足的情况,调整最大线程的数量;
2)单体架构中,当硬件资源不足,CPU资源占用很高,可以采用水平扩展(演进为集群架构),采用消息队列中间件(异步、解耦、削峰)生产者将消息推入消息队列,消费者从消息队列中拉取消息到本地消费;
场景二:socket通信,古老的通信方式,采用bio,连接阻塞、消息读取阻塞,常规的异步思路:
1)采用线程池技术,处理已经建立连接的socket请求,由对应的线程处理后续的读写逻辑,但是依旧需要有一个守护线程等待接收连接请求;
2)采用NIO,多路复用技术,根据连接、读写事件来处理对应的通信逻辑;
再举个实际开发的栗子:
从某个电商网站下单,下单成功需要发放优惠券,以及短信提示客人下单成功,这两个操作都是下单成功之后的处理逻辑,但是没有依赖关系。
优惠券发放失败,只要由后台的定时任务重试即可,实在不行由运营人工介入。
当需要依赖被调用方的返回结果时,咱们的处理方式又有啥不一样呢?咱们知道线程的执行run方法返回值是void,在jdk1.5版本引入Future/FutureTask/Callable。
通过Future.get() 或者Future.isDone()来获取线程返回值,或者判断是否处理结束。当调用get方式,类似主线程调用子线程的join(),会导致主线程阻塞。
以前我在其他的博文中聊过,每项新的技术组件诞生都有其诞生的背景,sun公司在没有解决get()阻塞的情况下,社区有哪些可以异步获取线程返回值得框架呢??
二、谷歌异步线程框架:
1、谷歌GUAVA:
在Jdk迟迟没有解决异步阻塞的问题时,谷歌的大神们已经悄悄有了解决方案,demo如下:
maven依赖:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
测试代码:
import com.google.common.util.concurrent.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
public class ListenerFutureDemo {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(1);
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(3));
ListenableFuture < String > listenableFuture = pool.submit(()->{
return "Guava Listening future test...";
});
//回调方法:
Futures.addCallback(listenableFuture, new FutureCallback < String > () {
public void onSuccess(String s) {
latch.countDown();
System.out.println(s);
}
public void onFailure(Throwable throwable) {
System.out.println(throwable.getMessage());
}
});
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.shutdown();
}
}
三、jdk1.8的异步线程框架:
@Test
public void test1() {
//提供函数式的链式编程:
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenApply(s -> s + "\nThis is CompletableFuture test")
.thenApply(String::toLowerCase)
.thenAccept(System.out::print);
}
创建任务并执行任务
无参创建
CompletableFuture<String> noArgsFuture = new CompletableFuture<>();
传入相应任务,无返回值
runAsync
方法可以在后台执行异步计算,但是此时并没有返回值。持有一个Runnable
对象。
CompletableFuture noReturn = CompletableFuture.runAsync(()->{
//执行逻辑,无返回值
});
传入相应任务,有返回值
此时我们看到返回的是CompletableFuture<T>
此处的T
就是你想要的返回值的类型。其中的Supplier<T>
是一个简单的函数式接口。
//有返回值
CompletableFuture<String> returns = CompletableFuture.supplyAsync(()->{
return "hello world supplyAsync";
});
获取返回值
异步任务也是有返回值的,当我们想要用到异步任务的返回值时,我们可以调用CompletableFuture
的get()
阻塞,直到有异步任务执行完有返回值才往下执行。
可以看到输出如下,只有调用get()
方法的时候才会阻塞当前线程.
自定义返回值
除了等待异步任务返回值以外,我们也可以在任意时候调用complete()
方法来自定义返回值。
我们可以发现输出是新起线程的输出值,当然这是因为我们的异步方法设置了等待10秒,如果此时异步方法等待1秒,新起的线程等待10秒,那么输出的值就是异步方法中的值了。
//有返回值
CompletableFuture<String> returns = CompletableFuture.supplyAsync(()->{
return "hello world supplyAsync";
});
returns.complete("completed");//自定义返回值
try {
System.out.println(returns.get());//返回值为completed 而不是hello world supplyAsync
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
按顺序执行异步任务(链式编程)
如果有一个异步任务的完成需要依赖前一个异步任务的完成,那么该如何写呢?是调用get()
方法获得返回值以后然后再执行吗?这样写有些麻烦,CompletableFuture
为我们提供了方法来完成我们想要顺序执行一些异步任务的需求。thenApply
、thenAccept
、thenRun
这三个方法。这三个方法的区别就是。
方法名 | 是否可获得前一个任务的返回值 | 是否有返回值 |
---|---|---|
thenApply |
能获得 | 有 |
thenAccept |
能获得 | 无 |
thenRun |
不可获得 | 无 |
所以一般来说thenAccept
、thenRun
这两个方法在调用链的最末端使用。接下来我们用真实的例子感受一下。
CompletableFuture<String> chainFuture=CompletableFuture.supplyAsync(()->"chainFutre1")
.thenApply(name->name+"----chainFutre2");
try {
System.out.println(chainFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
返回的信息如下
chainFutre1----chainFutre2
thenApply和thenApplyAsync的区别
这两个方法区别就在于谁去执行这个任务,如果使用thenApplyAsync
,那么执行的线程是从ForkJoinPool.commonPool()
中获取不同的线程进行执行,如果使用thenApply
,如果supplyAsync
方法执行速度特别快,那么thenApply
任务就是主线程进行执行,如果执行特别慢的话就是和supplyAsync
执行线程一样。接下来我们通过例子来看一下,使用sleep
方法来反应supplyAsync
执行速度的快慢。
//thenApply和thenApplyAsync的区别
System.out.println("-------------");
CompletableFuture<String> supplyAsyncWithSleep = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "supplyAsyncWithSleep Thread Id : " + Thread.currentThread();
});
CompletableFuture<String> thenApply = supplyAsyncWithSleep
.thenApply(name -> name + "------thenApply Thread Id : " + Thread.currentThread());
CompletableFuture<String> thenApplyAsync = supplyAsyncWithSleep
.thenApplyAsync(name -> name + "------thenApplyAsync Thread Id : " + Thread.currentThread());
System.out.println("Main Thread Id: "+ Thread.currentThread());
System.out.println(thenApply.get());
System.out.println(thenApplyAsync.get());
System.out.println("-------------No Sleep");
CompletableFuture<String> supplyAsyncNoSleep = CompletableFuture.supplyAsync(()->{
return "supplyAsyncNoSleep Thread Id : " + Thread.currentThread();
});
CompletableFuture<String> thenApplyNoSleep = supplyAsyncNoSleep
.thenApply(name -> name + "------thenApply Thread Id : " + Thread.currentThread());
CompletableFuture<String> thenApplyAsyncNoSleep = supplyAsyncNoSleep
.thenApplyAsync(name -> name + "------thenApplyAsync Thread Id : " + Thread.currentThread());
System.out.println("Main Thread Id: "+ Thread.currentThread());
System.out.println(thenApplyNoSleep.get());
System.out.println(thenApplyAsyncNoSleep.get());
我们可以看到输出为
-------------
Main Thread Id: Thread[main,5,main]
supplyAsyncWithSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]------thenApply Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]
supplyAsyncWithSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]------thenApplyAsync Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]
-------------No Sleep
Main Thread Id: Thread[main,5,main]
supplyAsyncNoSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]------thenApply Thread Id : Thread[main,5,main]
supplyAsyncNoSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]------thenApplyAsync Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]
可以看到supplyAsync
方法执行速度慢的话thenApply
方法执行线程和supplyAsync
执行线程相同,如果supplyAsync
方法执行速度快的话,那么thenApply
方法执行线程和Main
方法执行线程相同。
组合CompletableFuture
将两个CompletableFuture
组合到一起有两个方法
-
thenCompose()
:当第一个任务完成时才会执行第二个操作 -
thenCombine()
:两个异步任务全部完成时才会执行某些操作
thenCompose() 用法
我们定义两个异步任务,假设第二个定时任务需要用到第一个定时任务的返回值。
public static CompletableFuture<String> getTastOne(){
return CompletableFuture.supplyAsync(()-> "topOne");
}
public static CompletableFuture<String> getTastTwo(String s){
return CompletableFuture.supplyAsync(()-> s + " topTwo");
}
我们利用thenCompose()
方法进行编写
CompletableFuture<String> thenComposeComplet = getTastOne().thenCompose(s -> getTastTwo(s));
System.out.println(thenComposeComplet.get());
输出就是
topOne topTwo
如果还记得前面的thenApply()
方法的话,应该会想这个利用thenApply()
方法也是能够实现类似的功能的。
//thenApply
CompletableFuture<CompletableFuture<String>> thenApply = getTastOne()
.thenApply(s -> getTastTwo(s));
System.out.println(thenApply.get().get());
但是我们发现返回值是嵌套返回的一个类型,而想要获得最终的返回值需要调用两次get()
thenCombine() 用法
例如我们此时需要计算两个异步方法返回值的和。求和这个操作是必须是两个异步方法得出来值的情况下才能进行计算,因此我们可以用thenCombine()
方法进行计算。
CompletableFuture<Integer> thenComposeOne = CompletableFuture.supplyAsync(() -> 192);
CompletableFuture<Integer> thenComposeTwo = CompletableFuture.supplyAsync(() -> 196);
CompletableFuture<Integer> thenComposeCount = thenComposeOne
.thenCombine(thenComposeTwo, (s, y) -> s + y);
System.out.println(thenComposeCount.get());
此时thenComposeOne
和thenComposeTwo
都完成时才会调用传给thenCombine
方法的回调函数。
组合多个CompletableFuture
在上面我们用thenCompose()
和thenCombine()
两个方法将两个CompletableFuture
组装起来,如果我们想要将任意数量的CompletableFuture
组合起来呢?可以使用下面两个方法进行组合。
-
allOf()
:等待所有CompletableFuture
完后以后才会运行回调函数 -
anyOf()
:只要其中一个CompletableFuture
完成,那么就会执行回调函数。注意此时其他的任务也就不执行了。
接下来演示一下两个方法的用法
//allOf()
CompletableFuture<Integer> one = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> two = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Integer> three = CompletableFuture.supplyAsync(() -> 3);
CompletableFuture<Integer> four = CompletableFuture.supplyAsync(() -> 4);
CompletableFuture<Integer> five = CompletableFuture.supplyAsync(() -> 5);
CompletableFuture<Integer> six = CompletableFuture.supplyAsync(() -> 6);
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(one, two, three, four, five, six);
voidCompletableFuture.thenApply(v->{
return Stream.of(one,two,three,four, five, six)
.map(CompletableFuture::join)
.collect(Collectors.toList());
}).thenAccept(System.out::println);
CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
System.out.println("1");
});
我们定义了6个CompletableFuture
等待所有的CompletableFuture
等待所有任务完成以后然后将其值输出。
anyOf()
的用法
CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
System.out.println("voidCompletableFuture1");
});
CompletableFuture<Void> voidCompletableFutur2 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
System.out.println("voidCompletableFutur2");
});
CompletableFuture<Void> voidCompletableFuture3 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
} catch (Exception e) {
}
System.out.println("voidCompletableFuture3");
});
CompletableFuture<Object> objectCompletableFuture = CompletableFuture
.anyOf(voidCompletableFuture1, voidCompletableFutur2, voidCompletableFuture3);
objectCompletableFuture.get();
这里我们定义了3个CompletableFuture
进行一些耗时的任务,此时第一个CompletableFuture
会率先完成。打印结果如下。
voidCompletableFuture1
异常处理
我们了解了CompletableFuture
如何异步执行,如何组合不同的CompletableFuture
,如何顺序执行CompletableFuture
。那么接下来还有一个重要的一步,就是在执行异步任务时发生异常的话该怎么办。我们先写个例子。
CompletableFuture.supplyAsync(()->{
//发生异常
int i = 10/0;
return "Success";
}).thenRun(()-> System.out.println("thenRun"))
.thenAccept(v -> System.out.println("thenAccept"));
CompletableFuture.runAsync(()-> System.out.println("CompletableFuture.runAsync"));
执行结果为,我们发现只要执行链中有一个发生了异常,那么接下来的链条也就不执行了,但是主流程下的其他CompletableFuture
还是会运行的。
CompletableFuture.runAsync
exceptionally()
我们可以使用exceptionally
进行异常的处理
//处理异常
CompletableFuture<String> exceptionally = CompletableFuture.supplyAsync(() -> {
//发生异常
int i = 10 / 0;
return "Success";
}).exceptionally(e -> {
System.out.println(e);
return "Exception has Handl";
});
System.out.println(exceptionally.get());
打印如下,可以发现其接收值是异常信息,也能够返回自定义返回值。
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
Exception has Handl
handle()
调用handle()
方法也能够捕捉到异常并且自定义返回值,他和exceptionally()
方法不同一点是handle()
方法无论发没发生异常都会被调用。例子如下
System.out.println("-------有异常-------");
CompletableFuture.supplyAsync(()->{
//发生异常
int i = 10/0;
return "Success";
}).handle((response,e)->{
System.out.println("Exception:" + e);
System.out.println("Response:" + response);
return response;
});
System.out.println("-------无异常-------");
CompletableFuture.supplyAsync(()->{
return "Sucess";
}).handle((response,e)->{
System.out.println("Exception:" + e);
System.out.println("Response:" + response);
return response;
});
打印如下,我们可以看到在没有发生异常的时候handle()
方法也被调用了
-------有异常-------
Exception:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
Response:null
-------无异常-------
Exception:null
Response:Sucess