多线程的实现、CompletableFuture异步任务、@Async注解异步调用

多线程的实现、CompletableFuture异步任务、@Async注解异步调用

多线程的实现、CompletableFuture异步任务、@Async注解异步调用

一、异步和多线程有什么区别?

其实,异步和多线程并不时一个同等关系,异步是目的,多线程只是我们实现异步的一个手段.

什么是异步?

异步是当一个调用请求发送给被调用者,而调用者不用等待其结果的返回.实现异步可以采用多线程技术或则交给另外的进程来处理

多线程和异步操作两者都可以达到避免调用线程阻塞的目的,从而提高软件的可响应性。甚至有些时候我们就认为多线程和异步操作是等同的概念。但是,多线程和异步操作还是有一些区别的。而这些区别造成了使用多线程和异步操作的时机的区别。

异步和多线程是两个不同的概念,不能这样比较.异步请求一般用在IO等耗时操作上,他的好处是函数调用立即返回,相应的工作线程立即返还给系统以供重用。由于系统的线程资源是非常宝贵的,通常有一定的数目限制,如.net默认是25。若使用异步方式,用这些固定数目的线程在固定的时间内就可以服务更多的请求,而如果用同步方式,那么每个请求都自始至终占用这一个线程,服务器可以同时服务的请求数就少了。当异步操作执行完成后,系统会从可用线程中选取一个执行回调程序,这时的这个线程可能是刚开始发出请求的那个线程,也可能是其他的线程,因为系统选取线程是随机的事情,所以不能说绝对不是刚开始的那个线程。多线程是用来并发的执行多个任务。

不过有个问题,异步有时优先级比主线程还高。这个特点和多线程不同。

详见:https://blog.csdn.net/qq_36936155/article/details/78991050

二、多线程方式

java多线程实现方式主要有三种:继承Thread类、实现Runnable接口、使用ExecutorServiceCallableFuture实现有返回结果的多线程。

其中前两种方式线程执行完后都没有返回值,只有最后一种是带返回值的。

1、继承Thread类实现多线程

继承Thread类的方法尽管被我列为一种多线程实现方式,但Thread本质上也是实现了Runnable接口的一个实例,它代表一个线程的实例,并且,启动线程的唯一方法就是通过Thread类的start()实例方法。start()方法是一个native方法,它将启动一个新线程,并执行run()方法。这种方式实现多线程很简单,通过自己的类直接extend Thread,并复写run()方法,就可以启动新线程并执行自己定义的run()方法。例如:

 public class MyThread extends Thread {
     @Override
     public void run() {
         System.out.println(Thread.currentThread().getName() + ": 使用thread初始化了一个线程");
     }
 }

在合适的地方启动线程:

// 启动MyThread线程
for (int i = 0; i < 10; i++) {
    new MyThread().start();
}

2、实现Runnable接口方式实现多线程

如果自己的类已经extends另一个类,就无法直接extends Thread,此时,必须实现一个Runnable接口,如下:

public class MyRunnable extends OtherClass implements Runnable {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + ": 使用runnable初始化一个线程");
    }
}

为了启动MyThread,需要首先实例化一个Thread,并传入自己的MyRunnable实例

 for (int i = 0; i < 10; i++) {
     new Thread(new MyRunnable()).start();
 }

匿名内部类的方式

 for (int i = 0; i < 10; i++) {
     new Thread(new Runnable() {
         @Override
         public void run() {
             System.out.println(Thread.currentThread().getName() + ": 使用runnable初始化一个线程");
         }
     }).start();
 }
 // Thread本质上也是实现了Runnable接口的一个实例(匿名内部类简化)
 for (int i = 0; i < 10; i++) {
     new Thread(() -> {
         System.out.println(Thread.currentThread().getName() + ": 使用runnable匿名内部类初始化一个线程");
     }).start();
 }

3、实现Callable接口通过FutureTask包装器来创建Thread线程

public class MyCallable implements Callable<String> {
    @Override
    public String call() {
        System.out.println(Thread.currentThread().getName() + ": 使用Callable初始化一个线程");
        return "zhangsan";
    }
}

调用返回Future对象的get()方法,从Future对象上获取任务的返回值,会阻塞直到计算完成。

不管是异常还是正常,只要运行完毕了,isDone()方法结果一样是true

 for (int i = 0; i < 10; i++) {
     FutureTask<String> futureTask = new FutureTask<>(new MyCallable());
     new Thread(futureTask).start();
     //System.out.println(futureTask.get());  // 阻塞
     while (!futureTask.isDone()) { // 轮询
         System.out.println("有结果了吗?");
     }
     System.out.println("对方同意了!");
     System.in.read();
 }

4、使用ExecutorService、Callable、Future实现有返回结果的多线程

ExecutorService、Callable、Future这个对象实际上都是属于Executor框架中的功能类。

**ExecutorService提供了submit()方法,传递一个Callable,或Runnable,返回Future。**如果Executor后台线程池还没有完成Callable的计算,这调用返回Future对象的get()方法,会阻塞直到计算完成。

 // 创建固定数目线程的线程池
 ExecutorService executorService = Executors.newFixedThreadPool(3);
 for (int i = 0; i < 10; i++) {
     executorService.submit(() -> {
         System.out.println(Thread.currentThread().getName() + ": 线程池执行任务!");
     });
 }
 // 如果Executor后台线程池还没有完成Callable的计算,这调用返回Future对象的get()方法,会阻塞直到计算完成。
 for (int i = 0; i < 10; i++) {
     Future<String> submit = executorService.submit(new MyCallable());
     System.out.println(submit.get().toString());
 }
 // 关闭线程池
 executorService.shutdown();

5、通过线程池创建线程

避免使用Executors创建线程池主要是为了避免其中的默认实现,可以改用ThreadPoolExecutor构造方法指定参数即可。

需要指定核心线程池的大小、最大线程池的数量、保持存活的时间、等待队列容量的大小。在这种情况下一旦提交的线程数超过当前可用的线程数时就会抛出拒绝执行的异常java.util.concurrent.RejectedExecutionException 有界队列已经满了便无法处理新的任务。

上述代码中Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。

创建固定数目线程的线程池。
public static ExecutorService newFixedThreadPool(int nThreads)
创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
public static ExecutorService newCachedThreadPool()
创建一个单线程化的Executor。
public static ExecutorService newSingleThreadExecutor()
创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

 long start = System.currentTimeMillis();
 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(500));
 for (int i = 0; i < 100; i++) {
     threadPoolExecutor.execute(() -> {
         try {
             Thread.sleep(500);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         System.out.println(Thread.currentThread().getName() + ": 自定义线程池执行任务");
     });
 }
 // 关闭线程池 - 执行后停止接受新任务,会把队列的任务执行完毕。
 threadPoolExecutor.shutdown();
 // 关闭线程池 - 也是停止接受新任务,但会中断所有的任务,将线程池状态变为 stop。
 //threadPoolExecutor.shutdownNow();
 // 会每隔一秒钟检查一次是否执行完毕(状态为 TERMINATED),当从 while 循环退出时就表明线程池已经完全终止了。
 while (!threadPoolExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
     LOGGER.info("线程还在执行。。。");
 }
 long end = System.currentTimeMillis();
 LOGGER.info("一共处理了【{}】", (end - start));
使用工具类来创建线程池:

除了自己定义的ThreadPool之外,还可以使用开源库apache guava等。

个人推荐使用guavaThreadFactoryBuilder() 来创建线程池:

 /**
  * ThreadFactory 为线程池创建的线程命名
  */
 private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

 public static void main(String[] args) {
     // 线程池创建 指定属性
     ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 5, 60, TimeUnit.SECONDS,
             new ArrayBlockingQueue<Runnable>(100), threadFactory, new ThreadPoolExecutor.AbortPolicy());
     for (int i = 0; i < 10; i++) {
         pool.execute(() -> System.out.println("测试一下guava命名的线程:" + Thread.currentThread().getName()));
     }
 }

使用上面的方法创建线程池不仅可以避免OOM的问题,还可以自定义线程名称,更加方便出错时溯源。

6、定时任务

开发中,往往遇到另起线程执行其他代码的情况,用java定时任务接口ScheduledExecutorService来实现。

ScheduledExecutorService是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说,任务是并发执行,互不影响。

注意,只有当调度任务来的时候,ScheduledExecutorService才会真正启动一个线程,其余时间ScheduledExecutorService都是处于轮询任务的状态。

// 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);

// scheduleAtFixedRate() 每次执行时间为上一次任务开始起向后推一个时间间隔,是基于固定时间间隔进行任务调度
scheduledExecutorService.scheduleAtFixedRate(() -> {
    System.out.println(Thread.currentThread().getName() + ": 定时执行任务!" + new Date());
}, 5, 10, TimeUnit.SECONDS);

// scheduleWithFixedDelay() 每次执行时间为上一次任务结束起向后推一个时间间隔,取决于每次任务执行的时间长短
scheduledExecutorService.scheduleWithFixedDelay(() -> {
    System.out.println(Thread.currentThread().getName() + ": 定时执行任务!" + new Date());
}, 5, 10, TimeUnit.SECONDS);

 // 只执行一次延时任务
 ScheduledExecutorService scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
         new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
 scheduledThreadPoolExecutor.schedule(() -> {
     System.out.println(Thread.currentThread().getName() + ": 定时执行任务!");
 }, 20, TimeUnit.SECONDS);

ScheduleAtFixedRate每次执行时间为上一次任务开始起向后推一个时间间隔,即每次执行时间为initialDelay,initialDelay+period,initialDelay+2*period。。。。。

ScheduleWithFixedDelay每次执行时间为上一次任务结束起向后推一个时间间隔,即每次执行时间为:initialDelay,initialDelay+executeTime+delay,initialDelay+2*executeTime+2*delay。。。。。

由此可见,ScheduleAtFixedRate是基于固定时间间隔进行任务调度,ScheduleWithFixedDelay取决于每次任务执行的时间长短,是基于不固定时间间隔进行任务调度

三、用CompletableFuture实现异步任务

CompletableFutureFuture API的扩展。

Future 被用于作为一个异步计算结果的引用。提供一个 isDone() 方法来检查计算任务是否完成。当任务完成时,get() 方法用来接收计算任务的结果。

项目需求:

项目中需要优化一个接口,这个接口需要拉取2,3个第三方接口,需求延迟时间小于200ms;

技术选型:

在Java中CompletableFuture用于异步编程,异步编程是编写非阻塞的代码,运行的任务在一个单独的线程,与主线程隔离,并且会通知主线程它的进度,成功或者失败。

在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务。使用这种并行方式,可以极大的提高程序的性能。

CompletableFutureJDK8提出的一个支持非阻塞的多功能的Future,同样也是实现了Future接口,FutureJava 5添加的类,用来描述一个异步计算的结果。java8对future进一步完善,扩展了诸多功能形成了CompletableFuture,它拥有Future所有的功能,包括获取异步执行结果,取消正在执行的任务等。

1、CompletableFuture功能介绍

CompletableFuture还是一个CompletionStage

我们看下CompletableFuture的定义:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

什么是CompletionStage呢?

在异步程序中,如果将每次的异步执行都看成是一个stage的话,我们通常很难控制异步程序的执行顺序,在javascript中,我们需要在回调中执行回调。这就会形成传说中的回调地狱。

好在在ES6中引入了promise的概念,可以将回调中的回调转写为链式调用,从而大大的提升了程序的可读性和可写性。

同样的在java中,我们使用CompletionStage来实现异步调用的链式操作。CompletionStage定义了一系列的then*** 操作来实现这一功能。

详见:https://segmentfault.com/a/1190000022197398

2、使用CompletableFuture作为Future实现

使用无构参构造函数创建此类的实例

CompletableFuture<String> completableFuture = new CompletableFuture<String>();

这是一个最简单的 CompletableFuture,想获取CompletableFuture 的结果可以使用 CompletableFuture.get() 方法:

String result = completableFuture.get()

get() 方法会一直阻塞直到 Future 完成。因此,以上的调用将被永远阻塞,因为该Future一直不会完成。

另请注意,get方法抛出一些已检查的异常,即ExecutionException(封装计算期间发生的异常)和InterruptedException(表示执行方法的线程被中断的异常)

你可以使用 CompletableFuture.complete() 手工的完成一个 Future:

completableFuture.complete("Future's Result")

所有等待这个 Future 的客户端都将得到一个指定的结果,并且 completableFuture.complete() 之后的调用将被忽略。

public Future<String> calculateAsync() {

    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.complete("Hello");
        return null;
    });

    return completableFuture;
}

这种创建和完成CompletableFuture的方法可以与任何并发包(包括原始线程)一起使用。

如果你知道执行的结果,那么可以使用CompletableFuturecompletedFuture方法来直接返回一个Future。

public Future<String> useCompletableFuture(){
    Future<String> completableFuture =
            CompletableFuture.completedFuture("Hello");
    return completableFuture;
}

假设我们没有找到结果并决定完全取消异步执行任务。CompletableFuture还提供了一个cancel方法来立马取消任务的执行。

public Future<String> calculateAsyncWithCancellation() {
    
    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.cancel(false);
        return null;
    });
    
    return completableFuture;
}

当我们使用Future.get()方法阻塞结果时,cancel()表示取消执行,它将抛出CancellationException异常。java.util.concurrent.CancellationException

3、异步执行

上面的代码很简单,下面介绍几个 static 方法,它们使用任务来实例化一个 CompletableFuture 实例。

CompletableFuture提供了runAsyncsupplyAsync的方法,可以以异步的方式执行代码。

CompletableFuture.runAsync(Runnable runnable);
CompletableFuture.runAsync(Runnable runnable, Executor executor);

CompletableFuture.supplyAsync(Supplier<U> supplier);
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)

这两个方法是executor 的升级,表示让任务在指定的线程池中执行,不指定的话,通常任务是在 ForkJoinPool.commonPool() 线程池中执行的

3.1、runAsync()

方法接收的是 Runnable 的实例,但是它没有返回值

public void runAsync() {
    CompletableFuture.runAsync(() -> {
        LOGGER.info("初始化CompletableFuture子任务! runAsync");
    }, Executors.newFixedThreadPool(3));
}

3.2、supplyAsync()

方法是JDK8函数式接口,无参数,会返回一个结果

public void supplyAsync() throws ExecutionException, InterruptedException {

    CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
        @Override
        public String get() {
            LOGGER.info("初始化CompletableFuture子任务! supplyAsync");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "Result of the asynchronous computation";
        }
    });

    String result = future.get();
    LOGGER.info(result);
}

使用lambda表达式使得上面的示例更加简明:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    LOGGER.info("初始化CompletableFuture子任务! supplyAsync");
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
});

4、转换和运行

对于构建异步系统,我们应该附上一个回调给CompletableFuture,当Future完成的时候,自动的获取结果。如果我们不想等待结果返回,我们可以把需要等待Future完成执行的逻辑写入到回调函数中。可以使用thenApplyAsync(), thenAccept()thenRun()方法附上一个回调给CompletableFuture

为了控制执行回调任务的线程,你可以使用异步回调。将从ForkJoinPool.commonPool()获取不同的线程执行。此外,如果你传入一个ExecutorthenApplyAsync()回调中,,任务将从Executor线程池获取一个线程执行。

4.1、 thenApplyAsync()

在两个任务任务A,任务B中,任务B想要任务A计算的结果,可以用thenApplyAsync方法来接受一个函数实例,用它来处理结果,并返回一个Future函数的返回值:

模板

CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB");
CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");

任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值。

多个任务的情况下,如果任务 B 后面还有任务 C,往下继续调用.thenApplyAsync()即可。

实战

public void supplyAsync() throws ExecutionException, InterruptedException {

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("初始化CompletableFuture子任务! supplyAsync");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of the asynchronous computation";
    });
    
    CompletableFuture<String> completableFuture = future.thenApplyAsync(resultA -> {
        LOGGER.info(resultA);
        return "Hello " + resultA;
    }).thenApplyAsync(resultB -> {
        LOGGER.info(resultB);
        return resultB + ", Welcome to the arjun Blog";
    });

    System.out.println(completableFuture.get());
    // Prints - Hello Result of the asynchronous computation, Welcome to the arjun Blog
}

4.2、thenAcceptAsync()

在两个任务任务A,任务B中,如果你不需要在Future中有返回值,则可以用 thenAcceptAsync方法接收将计算结果传递给它。最后的future.get()调用返回Void类型的实例。CompletableFuture.thenAcceptAsync()持有一个Consumer<T>,返回一个CompletableFuture<Void>。它可以访问CompletableFuture的结果:

模板

CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); 

CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});
  • runAsync不会有返回值,方法thenAcceptAsync,接收到的resultA值为null,同时任务B也不会有返回结果

  • supplyAsync有返回值,同时任务B不会有返回结果。

实战

public void supplyAsync() throws ExecutionException, InterruptedException {

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("初始化CompletableFuture子任务! supplyAsync");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of the asynchronous computation";
    });
    
    CompletableFuture<Void> completableFuture = future.thenAcceptAsync(resultA -> {
        LOGGER.info("Computation returned: {}", resultA);
    });

    System.out.println(completableFuture.get());
    // Prints - null
}

4.3、thenRunAsync()

如果你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,你可以使用thenAcceptAsync()thenRunAsync()方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。thenRun()不能访Future的结果,它持有一个Runnable返回CompletableFuture<Void>

模板

CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); 
CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
  • runAsync不会有返回值,方法thenRunAsync(Runnable runnable),任务 A 执行完执行 B,并且 B 不需要 A 的结果。

  • supplyAsync有返回值,方法thenRunAsync(Runnable runnable),任务 A 执行完执行 B,会返回resultA,但是 B 不需要 A 的结果。

实战

public void supplyAsync() throws ExecutionException, InterruptedException {

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("初始化CompletableFuture子任务! supplyAsync");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of the asynchronous computation";
    });
    
    CompletableFuture<Void> completableFuture = future.thenRunAsync(() -> System.out.println("Computation finished."));
    // Prints - Computation finished.
    System.out.println(completableFuture.get());
    // Prints - null
}

5、组合Futures

上面讲到CompletableFuture的一个重大作用就是将回调改为链式调用,从而将Futures组合起来。

5.1. 使用 thenCompose()组合两个独立的future

thenCompose将前一个Future的返回结果作为后一个操作的输入。

模板

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello")
        .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

实战

假设你想从一个远程API中获取一个用户的详细信息,一旦用户信息可用,你想从另外一个服务中获取他的贷方。

考虑下以下两个方法getUserDetail()getCreditRating()的实现:

CompletableFuture<User> getUsersDetail(String userId) {
    return CompletableFuture.supplyAsync(() -> {
        UserService.getUserDetails(userId);
    });    
}

CompletableFuture<Double> getCreditRating(User user) {
    return CompletableFuture.supplyAsync(() -> {
        CreditRatingService.getCreditRating(user);
    });
}

现在让我们弄明白当使用了thenApply()后是否会达到我们期望的结果?

 CompletableFuture<CompletableFuture<Double>> result = getUsersDetail(userId)
         .thenApply(user -> getCreditRating(user));

在以上thenApply的示例中,Supplier函数传入thenApply将返回一个简单的值,但是在本例中,将返回一个CompletableFuture。以上示例的最终结果是一个嵌套的CompletableFuture
如果你想获取最终的结果给最顶层future,使用 thenCompose()方法代替

CompletableFuture<Double> result = getUsersDetail(userId)
        .thenCompose(user -> getCreditRating(user));

因此,规则就是-如果你的回调函数返回一个CompletableFuture,但是你想从CompletableFuture链中获取一个直接合并后的结果,这时候你可以使用thenCompose()。因此,如果想要继续嵌套链接CompletableFuture 方法,那么最好使用thenCompose()

thenApply()thenCompose()之间的区别

thenCompose()方法类似于thenApply()在都返回一个新的计算结果。但是,thenCompose()使用前一个Future作为参数。它会直接使结果变新的Future,而不是我们在thenApply()中到的嵌套Future,而是用来连接两个CompletableFuture,是生成一个新的CompletableFuture

5.2、使用thenCombine()组合两个独立的 future

如果要执行两个独立的任务,并对其结果执行某些操作,可以用Future的thenCombine方法。被用来当两个独立的Future都完成的时候,用来做一些事情。

模板

CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");

cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});

cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");

实战

System.out.println("Retrieving weight.");
CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 65.0;
});

System.out.println("Retrieving height.");
CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 177.8;
});

System.out.println("Calculating BMI.");
CompletableFuture<Double> combinedFuture = weightInKgFuture
        .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
    Double heightInMeter = heightInCm/100;
    return weightInKg/(heightInMeter*heightInMeter);
});

System.out.println("Your BMI is - " + combinedFuture.get());

当两个Future都完成的时候,传给thenCombine()的回调函数将被调用。

5.3、使用thenAcceptBoth()组合两个独立的 future

更简单的情况是,当你想要使用两个Future结果时,但不需要将任何结果值进行返回时,可以用thenAcceptBoth,它表示后续的处理不需要返回值,而 thenCombine 表示需要返回值:

如果你不想返回结果,则可以使用thenAcceptBoth

CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> "Hello")
        .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> System.out.println(s1 + s2));

6、并行运行多个任务

我们使用thenCompose()thenCombine()把两个CompletableFuture组合在一起。当我们需要并行执行多个任务时,我们通常希望等待所有它们执行,然后处理它们的组合结果。现在如果你想组合任意数量的CompletableFuture,应该怎么做?

我们可以使用以下两个方法组合任意数量的CompletableFuture

API

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...}
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs){...}

6.1、CompletableFuture.allOf()

CompletableFuture.allOf静态方法允许等待所有的完成任务:

CompletableFuture.allOf的使用场景是当你一个列表的独立future,并且你想在它们都完成后并行的做一些事情。

实战场景

假设你想下载一个网站的100个不同的页面。你可以串行的做这个操作,但是这非常消耗时间。因此你想写一个函数,传入一个页面链接,返回一个CompletableFuture,异步的下载页面内容。

CompletableFuture<String> downloadWebPage(String pageLink) {
    return CompletableFuture.supplyAsync(() -> {
        // TODO Code to download and return the web page's content
    });
} 

现在,当所有的页面已经下载完毕,你想计算包含关键字CompletableFuture页面的数量。可以使用CompletableFuture.allOf()达成目的。

List<String> webPageLinks = Arrays.asList(...)    // A list of 100 web page links

// Download contents of all the web pages asynchronously
List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
        .map(webPageLink -> downloadWebPage(webPageLink))
        .collect(Collectors.toList());

// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
        pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
);

使用CompletableFuture.allOf()的问题是它返回CompletableFuture<Void>。这种方法的局限性在于它不会返回所有任务的综合结果。相反,你必须手动从Futures获取结果。幸运的是,CompletableFuture.join()方法和Java 8 Streams API可以解决:

// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
   return pageContentFutures.stream()
           //.map(pageContentFuture -> pageContentFuture.join())
       	   .map(CompletableFuture::join)
           .collect(Collectors.toList());
});

CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一样的,都是阻塞获取值,它们的区别在于 join() 抛出的是 unchecked Exception。这使得它可以在Stream.map()方法中用作方法引用。

现在让我们计算包含关键字页面的数量。

 // Count the number of web pages having the "CompletableFuture" keyword.
 CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
     return pageContents.stream()
             .filter(pageContent -> pageContent.contains("CompletableFuture"))
             .count();
 });

 System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get());

6.2、CompletableFuture.anyOf()

CompletableFuture.anyOf()和其名字介绍的一样,当任何一个CompletableFuture完成的时候【相同的结果类型】,返回一个新的CompletableFuture

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 2";
});

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 3";
});

CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

System.out.println(anyOfFuture.get()); 
// Prints - Result of Future 2

在以上示例中,当三个中的任何一个CompletableFuture完成, anyOfFuture就会完成。因为future2的休眠时间最少,因此它最先完成,最终的结果将是future2的结果。

CompletableFuture.anyOf()传入一个Future可变参数,返回CompletableFuture<Object>CompletableFuture.anyOf()的问题是如果你的CompletableFuture返回的结果是不同类型的,这时候你讲会不知道你最终CompletableFuture是什么类型。

7、异常处理

我们探寻了怎样创建CompletableFuture,转换它们,并组合多个CompletableFuture。现在让我们弄明白当发生错误的时候我们应该怎么做。

首先让我们明白在一个回调链中错误是怎么传递的。思考下以下回调链:

CompletableFuture.supplyAsync(() -> {
    // Code which might throw an exception
    return "Some result";
}).thenApply(result -> {
    return "processed result";
}).thenApply(result -> {
    return "result after further processing";
}).thenAccept(result -> {
    // do something with the final result
});

如果在原始的supplyAsync()任务中发生一个错误,这时候没有任何thenApply会被调用并且future将以一个异常结束。如果在第一个thenApply发生错误,这时候第二个和第三个将不会被调用,同样的,future将以异常结束。

7.1、使用 exceptionally() 回调处理异常

exceptionally()回调给你一个从原始Future中生成的错误恢复的机会。你可以在这里记录这个异常并返回一个默认值。

API

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);

看下代码

CompletableFuture.supplyAsync(() -> "resultA")
    .thenApply(resultA -> resultA + " resultB")
    .thenApply(resultB -> resultB + " resultC")
    .thenApply(resultC -> resultC + " resultD");

上面的代码中,任务 A、B、C、D 依次执行,如果任务 A 抛出异常(当然上面的代码不会抛出异常),那么后面的任务都得不到执行。如果任务 C 抛出异常,那么任务 D 得不到执行。

那么我们怎么处理异常呢?看下面的代码,我们在任务 A 中抛出异常,并对其进行处理:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException();
})
        .exceptionally(ex -> "ERROR - ResultA")
        .thenApply(resultA -> resultA + ", resultB")
        .thenApply(resultB -> resultB + ", resultC")
        .thenApply(resultC -> resultC + ", resultD");
System.out.println(future.join());
// Prints - ERROR - ResultA, resultB, resultC, resultD

上面的代码中,任务 A 抛出异常,然后通过.exceptionally() 方法处理了异常,并返回新的结果,这个新的结果将传递给任务 B。

实战情景

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).exceptionally(ex -> {
    LOGGER.error("Oops! We have an exception - {}", ex.getMessage());
    return "Unknown!";
});

System.out.println("Maturity : " + maturityFuture.get()); 
// Prints - Maturity : Unknown!

7.2、使用 handle() 方法处理异常

API提供了一个更通用的方法 - handle()从异常恢复,无论一个异常是否发生它都会被调用。

API

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

如果在链式调用的时候抛出异常,则可以在最后使用handle来接收。

public void handleError() throws ExecutionException, InterruptedException {
    String name = null;
    
    CompletableFuture<String> completableFuture
            =  CompletableFuture.supplyAsync(() -> {
        if (name == null) {
            throw new RuntimeException("Computation error!");
        }
        return "Hello, " + name;
    }).handle((res, ex) -> res != null ? res : "Hello, Stranger!");

    System.out.println(completableFuture.get());
    // Prints - Hello, Stranger!
}

实战情景

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).handle((res, ex) -> {
    if(ex != null) {
        LOGGER.error("Oops! We have an exception - {}", ex.getMessage());
        return "Unknown!";
    }
    return res;
});

System.out.println("Maturity : " + maturityFuture.get());
// Prints - Maturity : Unknown!

如果异常发生,res参数将是 null,否则,ex将是 null。

7.3、使用whenComplete()方法处理异常

API

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);

返回传进去的值

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // long running task
    throw new RuntimeException("error!");
});

future.whenComplete((result, exception) -> {
    if (null == exception) {
        System.out.println("result from previous task: " + result);
    } else {
        System.err.println("Exception thrown from previous task: " + exception.getMessage());
    }
});

exceptionally()的使用非常类似于try{}catch{}中的catch{},但是由于支持链式编程方式,所以相对更简单。

whenComplete()handle()系列方法就类似于 try{}finally{}中的finally{},无论是否发生异常都会执行 whenComplete()中的回调函数 BiConsumerhandle()中的回调函数 BiFunction。顾名思义,BiConsumer是直接消费的,而BiFunction是有返回值的,

whenComplete()handle() 的区别在于whenComplete()不支持返回结果,而handle()是支持返回结果的。

8、Async后缀方法

CompletableFuture类中的API的大多数方法都有两个带有Async后缀的附加修饰。这些方法表示用于异步线程。

没有Async后缀的方法使用调用线程运行下一个执行线程阶段。不带Async方法使用ForkJoinPool.commonPool()线程池的fork / join实现运算任务。带有Async方法使用传递式的Executor任务去运行。

9、JDK 9 CompletableFuture API

Java 9中, CompletableFuture API通过以下更改得到了进一步增强:

  • 新工厂方法增加了
  • 支持延迟和超时
  • 改进了对子类化的支持。

引入了新的实例API

  • Executor defaultExecutor()
  • CompletableFuture<U> newIncompleteFuture()
  • CompletableFuture<T> copy()
  • CompletionStage<T> minimalCompletionStage()
  • CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor)
  • CompletableFuture<T> completeAsync(Supplier<? extends T> supplier)
  • CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)
  • CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)

还有一些静态实用方法:

  • Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
  • Executor delayedExecutor(long delay, TimeUnit unit)
  • <U> CompletionStage<U> completedStage(U value)
  • <U> CompletionStage<U> failedStage(Throwable ex)
  • <U> CompletableFuture<U> failedFuture(Throwable ex)

最后,为了解决超时问题,Java 9又引入了两个新功能:

  • orTimeout()
  • completeOnTimeout()

四、Springboot 的异步调用 @Async注解

@Async默认异步配置使用的是SimpleAsyncTaskExecutor,该线程池默认来一个任务创建一个线程,在大量的请求的时候,这时就会不断创建大量线程,极有可能压爆服务器内存
@Async的时候一定要设置线程数,以防万一OOM

​ 异步调用,类似我们多年前的ajax调用,局部刷新,整体不变,当然,在java的后台的异步调用,类似于自己实现一个多线程的程序,任务开启一个线程后由它最去执行,我们其实是不能干预太多的。。

​ 在实际的开发中,如果某一个方法需要异步去执行,那么我们可以在它前面加上注解。@Async

1、 @Async介绍

​ 在Spring中,基于@Async标注的方法,称之为异步方法;这些方法将在执行的时候,将会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作。分为不带参数的异步调用;带参数的异步调用;调用返回Future的异步线程

2、@Async调用中的事务处理机制

@Async标注的方法,同时也适用了@Transactional进行了标注;

​ 在其调用数据库操作之时,将无法产生事务管理的控制,原因就在于其是基于异步处理的操作。 **那该如何给这些操作添加事务管理呢?**可以将需要事务管理操作的方法放置到异步方法内部,在内部被调用的方法上添加@Transactional.

例如:

方法A,使用了@Async/@Transactional来标注,但是无法产生事务控制的目的。

方法B,使用了@Async来标注, B中调用了C、D,C/D分别使用@Transactional做了标注,则可实现事务控制的目的。

3、配合使用@EnableAsync

启动类或者Controller类加上@EnableAsync注解

@SpringBootApplication
@EnableAsync
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}
@EnableAsync
@RestController
public class HelloController {
    
    @Autowired
    TestAsyncService testAsyncService;

}

@EnableAsync注解的意思是可以异步执行,就是开启多线程的意思。可以标注在方法、类上。@Async所修饰的函数不要定义为static类型,这样异步调用不会生效

4、举例:

比如需要调用一个发送短信的任务,实际短信是渠道方去发的,那么我们在把请求提交过去基本就结束了,这个时候就可以做一个异步的调用来实现。

先使用@EnableAsync来开启异步的支持,配置一个线程池:

Spring 4 中,对异步方法可以做一些配置,将配置类实现AsyncConfigurer 接口后,可以实现自定义线程池的功能,和统一处理异步方法的异常。

如果不限制并发数,可能会造成系统压力。

AsyncConfigurer 接口中的方法 Executor getAsyncExecutor() 实现自定义线程池。控制并发数。

AsyncConfigurer 接口中的方法 public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler()用于处理异步方法的异常。

AsyncUncaughtExceptionHandler 接口,只有一个方法:void handleUncaughtException(Throwable ex, Method method, Object… params);

因此,AsyncUncaughtExceptionHandler 接口可以认为是一个函数式接口,可以用拉姆达表达式实现该接口。当然不处理也是可以的,没用直接返回null

@Configuration
@EnableAsync
public class ThreadPoolConfig implements AsyncConfigurer {
     
    /**
     * ThreadFactory 为线程池创建的线程命名
     */
    private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("example-pool-%d").build();
    
    /**
     * 获取异步线程池执行对象
     */
    @Override
    public Executor getAsyncExecutor() {
        // 使用Spring内置线程池任务对象
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置线程池参数
        executor.initialize();
        // 获取到服务器的cpu内核
        int i = Runtime.getRuntime().availableProcessors();
        // 核心池大小
        executor.setCorePoolSize(5);
        // 最大线程数
        executor.setMaxPoolSize(100);
        // 队列容量
        executor.setQueueCapacity(1000);
        // 线程空闲时间(秒)
        executor.setKeepAliveSeconds(1000);
        // 线程前缀名称
        executor.setThreadNamePrefix("task-async-");
        // 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务 CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
		executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
		// 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
//        return (throwable, method, objects) -> System.out.println(
//                "-- exception handler -- " + throwable + "-- method -- " + method + "-- objects -- " + objects);
    }
    
    
    /**
     * 自定义线程池
     */
    @Bean(name = "asyncTaskExecutor")
    public Executor asyncTaskExecutor() {
        //获取CPU 核心数
        int nThreads = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                nThreads,
                2 * nThreads + 5,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                threadFactory,
                new ThreadPoolExecutor.CallerRunsPolicy()); 
        // 先行创建符合corePoolSize参数值的线程数
        threadPoolExecutor.prestartAllCoreThreads();
        return threadPoolExecutor;
    }
}

如果我们项目中需要多个线程池,则可以参考asyncTaskExecutor()方法实例出自定义的线程池asyncTaskExecutor,在使用的时候使用@Async("asyncTaskExecutor")来实现使用asyncTaskExecutor的线程池实现异步操作。

@RequestMapping("")
public String doTask() throws InterruptedException {
    long currentTimeMillis = System.currentTimeMillis();
    System.out.println(Thread.currentThread().getName()+"主线程请求异步执行task1()");
    this.task1();
    System.out.println(Thread.currentThread().getName()+"主线程请求异步执行task1()结束");
    System.out.println(Thread.currentThread().getName()+"主线程请求异步执行task2()");
    this.task2();
    System.out.println(Thread.currentThread().getName()+"主线程请求异步执行task2()结束");
    this.task3();
    long currentTimeMillis1 = System.currentTimeMillis();
    return "task任务总耗时:" + (currentTimeMillis1 - currentTimeMillis) + "ms";
}

然后在指定需要异步执行方法上加入@Async注解,并自定线程池(当然可以不指定,直接写@Async

@Async("asyncTaskExecutor")
public void task1() throws InterruptedException{  
    long currentTimeMillis = System.currentTimeMillis();  
    System.out.println("task1," + Thread.currentThread().getName() + "," + new Date());
    Thread.sleep(1000);  
    long currentTimeMillis1 = System.currentTimeMillis();  
    System.out.println("task1任务异步执行耗时:"+(currentTimeMillis1-currentTimeMillis)+"ms");  
}  

@Async
public void task2() throws InterruptedException{  
    long currentTimeMillis = System.currentTimeMillis();  
    System.out.println("task2," + Thread.currentThread().getName() + "," + new Date());
    Thread.sleep(2000);  
    long currentTimeMillis1 = System.currentTimeMillis();  
    System.out.println("task2任务异步执行耗时:"+(currentTimeMillis1-currentTimeMillis)+"ms");  
}  

直接写@Async不指定线程池时,如果线程池配置只配了上面asyncTaskExecutor一种,则会默认使用该线程池执行,结果和上面一样,如果线程池配置配置了多个线程池,则此时不指定线程池时则会使用系统默认的SimpleAsyncTaskExecutor线程执行

将其中一个异步方法,写一行会产生异常的代码:

@Async
public void task3() {
    System.out.println("task3," + Thread.currentThread().getName() + "," + new Date());
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    int i=100/0;//抛出异常

    System.out.println("task3," + Thread.currentThread().getName() + "," + new Date());
}

运行后的结果如下:

task3,example-pool-2,Sat Sep 15 21:52:48 CST 2018
-- exception handler -- java.lang.ArithmeticException: / by zero-- method -- public void com.learn.AsyncDemo.task3()-- objects -- [Ljava.lang.Object;@1626ab0b
task3,example-pool-2,Sat Sep 15 21:52:50 CST 2018

两个异步方法,使用的是同一个线程运行的。异常的处理也由AsyncUncaughtExceptionHandler接口处理掉了。

5、@Async失效

在同一个类中,一个方法调用另外一个有注解(比如@Async@Transational)的方法,注解是不会生效的。

比如,下面代码例子中,有两方法,一个有@Async注解,一个没有。第一次如果调用了有注解的test()方法,会启动@Async注解作用;第一次如果调用testAsync(),因为它内部调用了有注解的test(),如果你以为系统也会为它启动Async作用,那就错了,实际上是没有的。

@Async 基于AOP的,调用同类中的方法不走代理。

@Service
public class TestAsyncService {

    public void testAsync() throws Exception {
        //这里的调用相当于:this.test();   这个this是service本身 而不是spring的service代理对象.所以aop不生效.
        test();
    }

    @Async
    public void test() throws InterruptedException{
        Thread.sleep(10000);//让线程休眠,根据输出结果判断主线程和从线程是同步还是异步
        System.out.println("异步threadId:"+Thread.currentThread().getId());
    }
}

运行结果:testAsync()主线程 和 从线程 test() 同步执行。

原因:spring在扫描bean的时候会扫描方法上是否包含@Async注解,如果包含,spring会为这个bean动态地生成一个子类(即代理类,proxy),代理类是继承原来那个bean的。此时,当这个有注解的方法被调用的时候,实际上是由代理类来调用的,代理类在调用时增加异步作用。然而,如果这个有注解的方法是被同一个类中的其他方法调用的,那么该方法的调用并没有通过代理类,而是直接通过原来的那个bean,所以就没有增加异步作用,我们看到的现象就是@Async注解无效。

上一篇:C++11笔记-多线程-细说Future


下一篇:7-并发编程-Future&ForkJoin框架原理分析