异步_CompletableFuture

在JDK1.5已经提供了Future和Callable的实现,可以用于阻塞式获取结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果,如下:

//定义一个异步任务
Future<String> future = executor.submit(()->{
       Thread.sleep(2000);
       return "hello world";
});
//轮询获取结果
while (true){
    if(future.isDone()) {
         System.out.println(future.get());
         break;
     }
}

从上面的形式看来轮询的方式会耗费无谓的CPU资源,而且也不能及时地得到计算结果.所以要实现真正的异步,上述这样是完全不够的,在Netty中,我们随处可见异步编程。

ChannelFuture f = serverBootstrap.bind(port).sync();
f.addListener(new GenericFutureListener<Future<? super Void>>() {
    @Override
    public void operationComplete(Future<? super Void> future) throws Exception {
        System.out.println("complete");
    }
});
而JDK1.8中的CompletableFuture就为我们提供了异步函数式编程,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture常用方法

CompletableFuture是JDK8提出的一个支持非阻塞的多功能的Future,同样也是实现了Future接口,Future是Java 5添加的类,用来描述一个异步计算的结果。java8对future进一步完善,扩展了诸多功能形成了CompletableFuture,它拥有Future所有的功能,包括获取异步执行结果,取消正在执行的任务等。

runAsync 和 supplyAsync方法

CompletableFuture 提供了四个静态方法来创建一个异步操作。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
  • runAsync方法不支持返回值。
  • supplyAsync可以支持返回值。

示例:

//无返回值
public static void runAsync() throws Exception {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
        System.out.println("run end ...");
    });
    
    future.get();
}

//有返回值
public static void supplyAsync() throws Exception {         
    CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
        System.out.println("run end ...");
        return System.currentTimeMillis();
    });

    long time = future.get();
    System.out.println("time = "+time);
}

计算结果完成时的回调方法

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

可以看到Action的类型是BiConsumer<? super T,? super Throwable>它可以处理正常的计算结果,或者异常情况。

whenComplete 和 whenCompleteAsync 的区别?
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

示例:

public static void whenComplete() throws Exception {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
        if(new Random().nextInt()%2>=0) {
            int i = 12/0;
        }
        System.out.println("run end ...");
    });
    
    future.whenComplete(new BiConsumer<Void, Throwable>() {
        @Override
        public void accept(Void t, Throwable action) {
            System.out.println("执行完成!");
        }
        
    });
    future.exceptionally(new Function<Throwable, Void>() {
        @Override
        public Void apply(Throwable t) {
            System.out.println("执行失败!"+t.getMessage());
            return null;
        }
    });
    
    TimeUnit.SECONDS.sleep(2);
}

thenApply 方法

                                                 
上一篇:Netty核心组件Future与ChannelFuture


下一篇:python leetcode 每日打卡之1052