线程池与异步编排

线程池的七大参数

/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
*        if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
*        pool
* @param keepAliveTime when the number of threads is greater than
*        the core, this is the maximum time that excess idle threads
*        will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
*        executed.  This queue will hold only the {@code Runnable}
*        tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
*        creates a new thread
* @param handler the handler to use when execution is blocked
*        because the thread bounds and queue capacities are reached
  1. corePoolSize: 池中一直保持的线程的数量,即使线程空闲。除非设置了 allowCoreThreadTimeOut
  2. maximumPoolSize: 池中允许的最大的线程数
  1. keepAliveTime: 当线程数大于核心线程数的时候,超出核心线程数的线程在最大多长时间没有接到新任务就会终止释放 ,最终线程池维持在 corePoolSize 大小
  2. unit: 时间单位
  1. workQueue: 阻塞队列,用来存储等待执行的任务,如果当前对线程的需求超过了 corePoolSize大小, 就 会放在这里 等待空闲线程执行.
  2. threadFactory:创建线程的工厂,比如指定线程名等
  1. handler:拒绝策略,如果线程满了,线程池就会使用拒绝策略。

运行流程

1、线程池创建,准备好 core 数量的核心线程,准备接受任务

2、新的任务进来,用 core 准备好的空闲线程执行。

(1) 、core 满了,就将再进来的任务放入阻塞队列中。空闲的 core 就会自己去阻塞队列获取任务执行

(2) 、阻塞队列满了,就直接开新线程执行,最大只能开到 max 指定的数量

(3) 、max 都执行好了。Max-core 数量空闲的线程会在 keepAliveTime 指定的时间后自动销毁。最终保持到core 大小

(4) 、如果线程数开到了 max 的数量,还有新任务进来,就会使用 reject 指定的拒绝策略进行处理

3、所有的线程创建都是由指定的 factory 创建的。

面试:

一个线程池 core 7; ; max 20 ,queue :50 ,100 并发进来怎么分配的;

先有 7 个能直接得到执行,接下来 50 个进入队列排队,在多开 13 个继续执行。现在 70 个

被安排上了。剩下 30 个使用拒绝策略。

创建异步操作

创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

1、runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的

2、可以传入自定义的线程池,否则就用默认的线程池

计算完成时回调方法

whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。 whenComplete 和 whenCompleteAsync 的区别: whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。 whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池 来进行执行。 方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程 执行(如果是使用相同的线程池,也可能会被同一个线程选中执行 )

handle 方法

和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。

线程串行化方法

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前 任务的返回值。 thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。 thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行 thenRun 的后续操作

带有 Async 默认是异步执行的。同之前。 以上都要前置任务成功完成。

Function<?super T,?extends U>

T:上一个任务返回结果的类型 U:当前任务的返回值类型

两任务组合 - 都要完成

两个任务必须都完成,触发该任务。 thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值 thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有 返回值。 runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后, 处理该任务。

两任务组合 - 一个完成

当两个任务中,任意一个 future 任务完成的时候,执行任务。 applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。 acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。 runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返 回值

示例代码:

@Override
    public OrderConfirmVo confirmOrder() throws ExecutionException, InterruptedException {

        //构建OrderConfirmVo
        OrderConfirmVo confirmVo = new OrderConfirmVo();

        //获取当前用户登录的信息
        MemberResponseVo memberResponseVo = LoginUserInterceptor.loginUser.get();

        //TODO :获取当前线程请求头信息(解决Feign异步调用丢失请求头问题)
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();

        //开启第一个异步任务
        CompletableFuture<Void> addressFuture = CompletableFuture.runAsync(() -> {

            //每一个线程都来共享之前的请求数据
            RequestContextHolder.setRequestAttributes(requestAttributes);

            //1、远程查询所有的收获地址列表
            List<MemberAddressVo> address = memberFeignService.getAddress(memberResponseVo.getId());
            confirmVo.setMemberAddressVos(address);
        }, threadPoolExecutor);

        //开启第二个异步任务
        CompletableFuture<Void> cartInfoFuture = CompletableFuture.runAsync(() -> {

            //每一个线程都来共享之前的请求数据
            RequestContextHolder.setRequestAttributes(requestAttributes);

            //2、远程查询购物车所有选中的购物项
            List<OrderItemVo> currentCartItems = cartFeignService.getCurrentCartItems();
            confirmVo.setItems(currentCartItems);
            //feign在远程调用之前要构造请求,调用很多的拦截器
        }, threadPoolExecutor).thenRunAsync(() -> {
            List<OrderItemVo> items = confirmVo.getItems();
            //获取全部商品的id
            List<Long> skuIds = items.stream()
                    .map((itemVo -> itemVo.getSkuId()))
                    .collect(Collectors.toList());

            //远程查询商品库存信息
            R skuHasStock = wmsFeignService.getSkuHasStock(skuIds);
            List<SkuStockVo> skuStockVos = skuHasStock.getData("data", new TypeReference<List<SkuStockVo>>() {});

            if (skuStockVos != null && skuStockVos.size() > 0) {
                //将skuStockVos集合转换为map
                Map<Long, Boolean> skuHasStockMap = skuStockVos.stream().collect(Collectors.toMap(SkuStockVo::getSkuId, SkuStockVo::getHasStock));
                confirmVo.setStocks(skuHasStockMap);
            }
        },threadPoolExecutor);

        //3、查询用户积分
        Integer integration = memberResponseVo.getIntegration();
        confirmVo.setIntegration(integration);

        //4、价格数据自动计算

        //TODO 5、防重令牌(防止表单重复提交)
        //为用户设置一个token,三十分钟过期时间(存在redis)
        String token = UUID.randomUUID().toString().replace("-", "");
        redisTemplate.opsForValue().set(USER_ORDER_TOKEN_PREFIX+memberResponseVo.getId(),token,30, TimeUnit.MINUTES);
        confirmVo.setOrderToken(token);


        CompletableFuture.allOf(addressFuture,cartInfoFuture).get();

        return confirmVo;
    }

上一篇:彻底理解Java的Future模式


下一篇:springboot定时任务的启停