多线程

一、初始化线程的4种方式

  1)、继承Thread

  2)、实现Runnable接口

  3)、实现Callable接口 + FutureTask(可以拿到返回结果,可以处理异常)

  4)、线程池

方式1和方式2;主线程无法获取线程的运算结果,不适合当前场景

方式3:主线程可以获取线程的运算结果,但是不利于控制服务器中的线程资源,可以导致服务器耗尽

方式4:

        @ConfigurationProperties(prefix = "gulimall.thread")
        @Component
        @Data
        public class ThreadPoolProperties {
              private Integer coreSize;
              private Integer maxSize;
              private Integer keepAliveTime;
        }

        //@EnableConfigurationProperties(ThreadPoolProperties.class)
        @Configuration
        public class MyThreadConfig {

               @Bean
               public ThreadPoolExecutor threadPoolExecutor(ThreadPoolProperties pool) {
                      return new ThreadPoolExecutor(
                      pool.getCoreSize(),
                      pool.getMaxSize(),
                      pool.getKeepAliveTime(),
                      TimeUnit.SECONDS,
                      new LinkedBlockingDeque<>(100000),
                      Executors.defaultThreadFactory(),
                      new ThreadPoolExecutor.AbortPolicy()
                      );
            }
       }

二、开发中为什么使用线程池

      1)、降低资源的消耗

                  通过重复利用已经建好的线程降低线程的创建和销毁带夹得损耗

       2)、提高响应速度

                  因为线程池中得线程数没有超过线程池得最大上限时,有得线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行

        3)、提高线程的可管理性

                 线程池根据当前线程的特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销,无线的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配

三、CompletableFuture异步编排

多线程

        1)CompletableFuture 提供了四个静态方法来创建一个异步操作

                  public static CompletableFuture<Void> runAsync(Runnable runnable)
                  public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
                  public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
                  public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
      
            没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同
                 a: runAsync方法不支持返回值。
                 b: supplyAsync可以支持返回值。
                 c: 可以传入自定义的线程池,否则使用默认的线程池

         2) 计算结果完成时的回调方法

                  //可以处理异常,无返回值
                  public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
                  public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
                  public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
                  //可以处理异常,有返回值
                  public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

                  a、whenComplete:可以处理正常和异常的计算结果;exceptionally:处理异常情况
                  b、whenComplete 和 whenCompleteAsync 的区别
                          whenComplete :是当前线程任务执行完后继续执行 whenComplete 中的任务
                          whenCompleteAsync :是把 whenCompleteAsync 这个任务继续提交给线程池来执行

                  c、方法不以 Async 结尾,意味着Actiuon使用的是相同的线程执行,而 Async可能使用其他线程执行【如果使用相同的线程池,也可能会被同一个线程所选中】

四、线程串行化

      a、thenRun:不能获取上一步的执行结果
      b、thenAcceptAsync:能接受上一步结果,但是无返回值
      c、thenApplyAsync:能接受上一步结果,并有返回值,传递给下一步

五、两任务组合

        1)、thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理
        
        public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
        public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
        public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor)

        2)、thenAcceptBoth:当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗

        public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
        public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
        public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);

        3)、runAfterBoth:两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable)

        public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
        public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
        public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor

        a、thenCombine :组合两个 future,获取两个future 的返回结果,并返回当前任务的返回值

        b、thenAcceptBoth :组合两个 future,获取两个future 的返回结果,然后处理任务,没有返回值

        c、runAfterBoth :组合两个 future,不需要获取future的结果,只需要两个future处理完后任务后,处理该任务

       4)、applyToEither : 当两个任务中,任意一个future完成的时候,执行任务

六、多任务组合

      a、allof:等待所有的任务执行完

      b、anyof:只要有一个线程完成

七、运用的实例

/**
 * 前台查询商品详情页信息 - 加入异步编排
 * @param skuId
 * @return
 */
@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {

    SkuItemVo skuItemVo = new SkuItemVo();

    // supplyAsync 需要返回结果  因为 3 4 5 依赖1
    CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
        // 1、获取sku基本信息 pms_sku_info
        SkuInfoEntity skuInfoEntity = getById(skuId);
        skuItemVo.setInfo(skuInfoEntity);
        return skuInfoEntity;
    }, executor);

    CompletableFuture<Void> saleFuture = infoFuture.thenAcceptAsync((res) -> {
        // 3、获取spu的销售属性组合
        List<SkuItemSaleAttrVo> skuItemSaleAttrVos = skuSaleAttrValueService.getSaleAttrsBySpuId(res.getSpuId());
        skuItemVo.setSaleAttrs(skuItemSaleAttrVos);
    }, executor);

    CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
        // 4、获取spu的介绍 pms_spu_info_desc
        SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
        skuItemVo.setDesp(spuInfoDescEntity);
    }, executor);

    CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync((res) -> {
        // 5、获取spu的规格参数信息
        List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getCatalogId(), res.getSpuId());
        skuItemVo.setAttrGroups(attrGroupVos);
    }, executor);

    CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
        // 2、获取sku的图片信息 pms_spu_images
        List<SkuImagesEntity> skuImagesEntities = skuImagesService.getImageBySkuId(skuId);
        skuItemVo.setImages(skuImagesEntities);
    }, executor);

    // 6、查询当前sku是否参与秒杀优惠
    CompletableFuture<Void> secKillFuture = CompletableFuture.runAsync(() -> {
        R skuSecKillInfo = secKillFeignService.getSkuSecKillInfo(skuId);
        if (skuSecKillInfo.getCode() == 0) {
            SecKillInfoVo skuSecKillInfoData = skuSecKillInfo.getData(new TypeReference<SecKillInfoVo>() {
            });
            skuItemVo.setSecKillInfoVo(skuSecKillInfoData);
        }
    }, executor);


    // 等到所有任务都完成
    CompletableFuture.allOf(saleFuture, descFuture, baseAttrFuture, imageFuture, secKillFuture).get();

    return skuItemVo;
}
上一篇:NumPy:数组和标量列表中的二维数组


下一篇:Java 8 - CompletableFuture组合式异步编程