JAVA 多线程、CompleableFuture异步

初始化线程的四种办法

  • 继承Thread类
  • 实现Rannable 接口
  • 实现callable 接口 + FutureTask (可以获取处理结果和处理异常)
  • 线程池

继承Thread实现

public class ThreadTest {
    public static void main(String[] args) {
        System.out.println("主线程启动");
        Thread1 thread1 = new Thread1();
        thread1.start();
        System.out.println("主线程结束");
    }

    public static class Thread1 extends Thread{
        @Override
        public void run() {
            System.out.println("===========================");
            System.out.println("当前线程---->"+Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("当前线程处理结果---->"+i);
            System.out.println("===========================");
        }
    }
}

执行结果:
JAVA 多线程、CompleableFuture异步

实现Runnable接口实现

public class ThreadTest {
    public static void main(String[] args) {
        System.out.println("主线程启动");
        new Thread(new Thread1()).start();
        System.out.println("主线程结束");
    }

    public static class Thread1 implements Runnable{
        @Override
        public void run() {
            System.out.println("===========================");
            System.out.println("当前线程---->"+Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("当前Runnable线程处理结果---->"+i);
            System.out.println("===========================");
        }
    }
}

执行结果:
JAVA 多线程、CompleableFuture异步

实现Runnable实现

可以使用FatureTask接收异步线程的执行结果,只有所有线程执行完才能得到结果,是阻塞等待

public class ThreadTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主线程启动");
        FutureTask<Integer> futureTask = new FutureTask<>(new Thread1());
        new Thread(futureTask).start();
        //阻塞等待,当所有线程执行完才能得到结果
        //等待整个线程执行完,获取执行结果
        Integer integer = futureTask.get();
        System.out.println("futureTask执行结果:"+integer);
        System.out.println("主线程结束");
    }

    public static class Thread1 implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("===========================");
            System.out.println("当前线程---->"+Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("当前Callable线程处理结果---->"+i);
            System.out.println("===========================");
            return i;
        }
    }
}

执行结果:
JAVA 多线程、CompleableFuture异步

线程池

为什么使用线程池
降低资源消耗:通过重复利用已经创建好的线程,减少线程创建销毁的损耗。

提高响应速度:因为线程池线程没有超过最大线程数时,有的线程处于空闲等待任务状态,当任务进来无需创建新的线程就能执行

提高线程的可管理性:线程池会根据系统的特点会对线程池内线程进行优化处理,减少创建和销毁带来的系统开销,无限的创建和销毁线程不仅损耗系统性能,还降低系统的稳定性,使用线程池来进行统一的管理分配

在我们的业务代码中,都不用以上三种但是,因为可能会导致我们的资源耗尽,这将会是没有更多的资源来处理核心业务。应该将所有的多线程异步任务都交给线程池,这样只有一个执行完才能继续拿到线程资源执行下一个任务,这样能有效的控制我们的资源。

创建线程池的方式

  • 使用JUC包下的Executors来创建一个线程池

		//固定长度的线程池
        ExecutorService service = Executors.newFixedThreadPool(10);
        service.execute(new Thread(()-> System.out.println("异步线程执行")));

这种方式有常见的四种线程池

  1. Executors.newCachedThreadPool():创建一个可缓存线程池,如果线程池长度超过需要处理,可灵活的回收空闲线程,没有则创建新线程
	public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  1. Executors.newFixedThreadPool():创建一个固定大小的线程池,可控制线程的最大并发量,超出的将会在队列中等待。核心线程数等于最大线程数
	public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  1. Executors.newScheduledThreadPool():创建一个定时线程池,支持定时或周期性的执行任务
  2. Executors.newSingleThreadExecutor():创建一个单线程的线程池,后台从队列中获取任务,挨个执行
	public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  • 使用new ThreadPoolExecutor();来创建线程池,

    ThreadPoolExecutor七大参数:

  1. int corePoolSize:核心线程数,线程池创建好就准备就绪的线程数,等待异步任务执行,这些线程将会一直存在,除非allowCoreThreadTimeOut
  2. int maximumPoolSize:最大线程数,用来控制资源并发
  3. long keepAliveTime:存活时间,当线程数大于核心时,这是多余空闲线程在终止前等待新任务的最长时间,超过存活时间将会释放空闲线程
  4. TimeUnit unit:时间单位,存活时间的时间单位
  5. BlockingQueue workQueue:阻塞队列,如果任务有很多,查出最大线程数,多余的任务将会放入队列,等待线程空闲从队列中取出并继续执行
  6. ThreadFactory threadFactory:线程的创建工厂
  7. RejectedExecutionHandler handler:拒绝处理控制器,如果队列的满了,按照我们指定的拒绝策略来拒绝执行任务

工作顺序:

  1. 线程池创建,准备好核心线程数的核心线程,准备接收任务
  2. 如果核心线程已经满了,后续进来的任务将放到阻塞队列中等待,当核心线程执行完以后就会从队列中获取任务执行
  3. 如果阻塞队列满了,将会启动新的线程执行任务,最大数不能超过最大线程数,来进行资源控制
  4. 如果最大线程数也用完,将会执行我们的拒绝策略RejectedExecutionHandler来拒绝任务
  5. 如果最大线程数都执行完,有很多空闲的线程,这些线程将会在执行的存活之间过后进行释放,释放最大线程数-核心线程数,核心线程会一直存在

创建线城池
创建一个核心数为10、最大线程数为100、存活时间为10秒,JUC包下LinkedBlockingDeque长度为1000的阻塞队列,JUC下默认的线程工厂defaultThreadFactory,ThreadPoolExecutor.AbortPolicy()直接丢弃的拒绝策略的线程池

ThreadPoolExecutor executor = new ThreadPoolExecutor(
                10,
                100,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(1000),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

比如一个线程池,核心线程8、最大线程20、阻塞队列50,如果此时进来100个请求进来将会怎么分配。

首先核心线程直接执行8个,这时候不会直接开其他线程,将会放到队列50个,队列满了以后将会开启12个线程,其他的任务都会被丢弃。

如果不想直接被丢弃,可以采用DiscardPolicy这种拒绝策略,这将会直接执行线程的run()方法,是同步执行
JAVA 多线程、CompleableFuture异步

区别

继承Thread、实现Rannable接口创建出来的线程都不能得到返回值,Callable可以获取返回值
以上三种都不能有效的控制资源
使用线程池的方式来执行多线程任务可以有效控制资源,让性能更加稳定

CompletableFature异步编排

假设有三个异步任务A、 B、 C,异步任务都是同时执行,但是C任务需要A任务的执行结果来进行,这种情况就可以用到异步编排

 * @author Doug Lea
 * @since 1.8
 */
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
/**
 * 可以看到CompletableFuture实现了Future,和我们的Callablel实现线程的FutureTask类似,
 * 都实现了Fature接口,可以获取异步任务的结果
 */
public class FutureTask<V> implements RunnableFuture<V> {

public interface RunnableFuture<V> extends Runnable, Future<V> {

CompletableFature提供了四个方法给我们执行异步操作


/**
 * 没有返回值的
 */
public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
}
/**
 * 指定线程池的
 */
public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
}

/**
 * 有返回值的
 */
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
}

/**
 * 指定线程池的
 */
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
}

runAsync测试

public class ThreadTest {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            10,
            100,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主线程启动");
        CompletableFuture.runAsync(()->{
            int i = 10 / 5;
            System.out.println(i);
        },executor);
        System.out.println("主线程结束");
    }

}

JAVA 多线程、CompleableFuture异步

supplyAsync测试

public class ThreadTest {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            10,
            100,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主线程启动");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 5;
            System.out.println(i);
            return i;
        }, executor);
        Integer integer = future.get();
        System.out.println("异步执行结果:"+integer);
        System.out.println("主线程结束");
    }
}

JAVA 多线程、CompleableFuture异步

线程执行完回调

public class ThreadTest {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            10,
            100,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主线程启动");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 0;
            return i;
        }, executor).whenComplete((result,exception)->{
            System.out.println("线程执行结果:"+result);
            System.out.println("线程执行异常:"+exception);
        }).exceptionally(throwable -> {
            return 0;
        });
        Integer integer = future.get();
        System.out.println("异步执行结果:"+integer);
        System.out.println("主线程结束");
    }
}

JAVA 多线程、CompleableFuture异步
链式调用whenComplete只能感知是否发生异常,并不能处理异常

使用handle来处理结果

public class ThreadTest {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            10,
            100,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主线程启动");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 2;
            return i;
        }, executor).handle((result,exception)->{
            if (result != null){
                return result;
            }
            if (exception != null){
                return 0;
            }
            return 0;
        });
        Integer integer = future.get();
        System.out.println("异步执行结果:"+integer);
        System.out.println("主线程结束");
    }
}

线程串行化

     /**
      * 当一个线程依赖上一个线程时,获取上一个线程返回结果,并返回当前线程的结果
      *  A B两个线程,B 依赖 A 的返回结果时,B 可以获取 A 的返回结果拿过来计算,并返回 B 线程的执行结果
      */
	 public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn, Executor executor) {
        return uniApplyStage(screenExecutor(executor), fn);
    }

	//===============================================================================

	/**
	 *  当一个线程依赖上一个线程时,获取上一个线程返回结果,并拿来计算,当前线程并无返回结果
	 *  A B 两个线程,B 可以拿到 A 线程的返回结果,并用 A 的返回结果来计算操作,B 没有返回结果
	 */
    public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
        return uniAcceptStage(null, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
        return uniAcceptStage(asyncPool, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                                   Executor executor) {
        return uniAcceptStage(screenExecutor(executor), action);
    }

	//===============================================================================

	/**
	 *  只要上面的任务执行完成,就执行ThenRun()方法,只是一个后续操作
	 */

    public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action,
                                                Executor executor) {
        return uniRunStage(screenExecutor(executor), action);
    }

带有Async的都是异步的,就是另外开一个线程,不带的都是和上一个线程公用一个线程

thenRun测试

thenRun:没有返回值,不能过去到上个任务的执行结果

public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主线程启动");
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 2;
            System.out.println("任务一执行完成");
            return i;
        }, executor).thenRunAsync(() -> {
            System.out.println("任务二执行完成");
        }, executor);
        System.out.println("主线程结束");
    }

JAVA 多线程、CompleableFuture异步

thenAccept测试

thenAccept:可以获取到上一任务的执行结果,没有返回值

public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主线程启动");
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 2;
            System.out.println("任务一执行完成");
            return i;
        }, executor).thenAcceptAsync(res -> {
            System.out.println("任务一执行结果:"+res);
        },executor);
        System.out.println("主线程结束");
    }

JAVA 多线程、CompleableFuture异步

thenApply测试

thenApply:可以接收到上一个任务的执行结果,并且也有返回值

public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主线程启动");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 2;
            System.out.println("任务一执行完成");
            return i;
        }, executor).thenApplyAsync(res -> {
            System.out.println("任务一执行结果:" + res);
            return res;
        }, executor);
        System.out.println(future.get());
        System.out.println("主线程结束");
    }

JAVA 多线程、CompleableFuture异步

两个任务组合,都要完成

当两个CompletableFuture组合完成之后,如果还需继续执行其他任务,可以使用任务组合,必须等待前两个任务执行完才能继续执行

runAfterBoth():当两个任务执行完成之后,继续执行第三个任务,不能获取其他任务的返回结果,没有返回值

public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主线程启动");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 2;
            System.out.println("任务一执行完成");
            return i;
        }, executor);

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务二执行完成");
            return "hello world";
        });
        CompletableFuture<Void> voidCompletableFuture = future.runAfterBothAsync(future1, () -> {
            System.out.println("任务三执行完成");
        }, executor);
        System.out.println(future.get());
        System.out.println("主线程结束");
    }

JAVA 多线程、CompleableFuture异步
thenAcceptBoth():当两个任务执行完成之后,继续执行第三个任务,可以获取前两个任务的执行结果,没有返回值

public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主线程启动");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 2;
            System.out.println("任务一执行完成");
            return i;
        }, executor);

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务二执行完成");
            return "hello world";
        });
        CompletableFuture<Void> voidCompletableFuture = future.thenAcceptBothAsync(future1, (f1,f2) -> {
            System.out.println("任务一的执行结果:"+f1);
            System.out.println("任务二的执行结果:"+f2);
            System.out.println("任务三执行完成");
        }, executor);
        System.out.println(future.get());
        System.out.println("主线程结束");
    }

JAVA 多线程、CompleableFuture异步
thenCombine():当两个任务执行完成之后,继续执行第三个任务,可以获取前两个任务的执行结果,有返回值

public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主线程启动");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 2;
            System.out.println("任务一执行完成");
            return i;
        }, executor);

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务二执行完成");
            return "hello world";
        });
        CompletableFuture<String> future2 = future.thenCombineAsync(future1, (f1, f2) -> {
            System.out.println("任务一的执行结果:" + f1);
            System.out.println("任务二的执行结果:" + f2);
            System.out.println("任务三执行完成");
            return f1 + f2;
        }, executor);
        System.out.println("任务三执行结果"+future2.get());
        System.out.println("主线程结束");
    }

JAVA 多线程、CompleableFuture异步

两个任务组合,一个完成

当两个CompletableFuture组合完成之后,如果还需继续执行其他任务,可以使用任务组合,只需等待两个中的其中一个执行完成,就可以执行其他任务

runAfterEither():当两个任务其中一个执行完成之后,就执行第三个任务,不能获取其他任务的返回结果,没有返回值
acceptEither():当两个任务其中一个执行完成之后,就执行第三个任务,可以获取其他任务的返回结果,没有返回值
acceptEither():当两个任务其中一个执行完成之后,就执行第三个任务,可以获取其他任务的返回结果,有返回值

多任务组合

CompletableFuture.allOf(future, future1):等待所有的任务执行完成
CompletableFuture.anyOf():只要有一个任务执行完就结束

public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主线程启动---"+ LocalDateTime.now());
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("任务一执行完成");
        }, executor);

        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务二执行完成");
        }, executor);
        CompletableFuture<Void> allOf = CompletableFuture.allOf(future, future1);
        allOf.get();//等待所有任务执行完
        System.out.println("主线程结束---"+ LocalDateTime.now());

    }
上一篇:这么优雅的Java ORM没见过吧!


下一篇:分布式开发(2)-异步线程