一、初始化线程的4种方式
1)、继承Thread
2)、实现Runnable接口
3)、实现Callable接口 + FutureTask(可以拿到返回结果,可以处理异常)
4)、线程池
方式1和方式2;主线程无法获取线程的运算结果,不适合当前场景
方式3:主线程可以获取线程的运算结果,但是不利于控制服务器中的线程资源,可以导致服务器耗尽
方式4:
@ConfigurationProperties(prefix = "gulimall.thread")
@Component
@Data
public class ThreadPoolProperties {
private Integer coreSize;
private Integer maxSize;
private Integer keepAliveTime;
}
//@EnableConfigurationProperties(ThreadPoolProperties.class)
@Configuration
public class MyThreadConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolProperties pool) {
return new ThreadPoolExecutor(
pool.getCoreSize(),
pool.getMaxSize(),
pool.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
}
}
二、开发中为什么使用线程池
1)、降低资源的消耗
通过重复利用已经建好的线程降低线程的创建和销毁带夹得损耗
2)、提高响应速度
因为线程池中得线程数没有超过线程池得最大上限时,有得线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行
3)、提高线程的可管理性
线程池根据当前线程的特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销,无线的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配
三、CompletableFuture异步编排
1)CompletableFuture 提供了四个静态方法来创建一个异步操作
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同
a: runAsync方法不支持返回值。
b: supplyAsync可以支持返回值。
c: 可以传入自定义的线程池,否则使用默认的线程池
2) 计算结果完成时的回调方法
//可以处理异常,无返回值
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
//可以处理异常,有返回值
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
a、whenComplete:可以处理正常和异常的计算结果;exceptionally:处理异常情况
b、whenComplete 和 whenCompleteAsync 的区别
whenComplete :是当前线程任务执行完后继续执行 whenComplete 中的任务
whenCompleteAsync :是把 whenCompleteAsync 这个任务继续提交给线程池来执行
c、方法不以 Async 结尾,意味着Actiuon使用的是相同的线程执行,而 Async可能使用其他线程执行【如果使用相同的线程池,也可能会被同一个线程所选中】
四、线程串行化
a、thenRun:不能获取上一步的执行结果
b、thenAcceptAsync:能接受上一步结果,但是无返回值
c、thenApplyAsync:能接受上一步结果,并有返回值,传递给下一步
五、两任务组合
1)、thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor)
2)、thenAcceptBoth:当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
3)、runAfterBoth:两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable)
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor
a、thenCombine :组合两个 future,获取两个future 的返回结果,并返回当前任务的返回值
b、thenAcceptBoth :组合两个 future,获取两个future 的返回结果,然后处理任务,没有返回值
c、runAfterBoth :组合两个 future,不需要获取future的结果,只需要两个future处理完后任务后,处理该任务
4)、applyToEither : 当两个任务中,任意一个future完成的时候,执行任务
六、多任务组合
a、allof:等待所有的任务执行完
b、anyof:只要有一个线程完成
七、运用的实例
/**
* 前台查询商品详情页信息 - 加入异步编排
* @param skuId
* @return
*/
@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
SkuItemVo skuItemVo = new SkuItemVo();
// supplyAsync 需要返回结果 因为 3 4 5 依赖1
CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
// 1、获取sku基本信息 pms_sku_info
SkuInfoEntity skuInfoEntity = getById(skuId);
skuItemVo.setInfo(skuInfoEntity);
return skuInfoEntity;
}, executor);
CompletableFuture<Void> saleFuture = infoFuture.thenAcceptAsync((res) -> {
// 3、获取spu的销售属性组合
List<SkuItemSaleAttrVo> skuItemSaleAttrVos = skuSaleAttrValueService.getSaleAttrsBySpuId(res.getSpuId());
skuItemVo.setSaleAttrs(skuItemSaleAttrVos);
}, executor);
CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
// 4、获取spu的介绍 pms_spu_info_desc
SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
skuItemVo.setDesp(spuInfoDescEntity);
}, executor);
CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync((res) -> {
// 5、获取spu的规格参数信息
List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getCatalogId(), res.getSpuId());
skuItemVo.setAttrGroups(attrGroupVos);
}, executor);
CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
// 2、获取sku的图片信息 pms_spu_images
List<SkuImagesEntity> skuImagesEntities = skuImagesService.getImageBySkuId(skuId);
skuItemVo.setImages(skuImagesEntities);
}, executor);
// 6、查询当前sku是否参与秒杀优惠
CompletableFuture<Void> secKillFuture = CompletableFuture.runAsync(() -> {
R skuSecKillInfo = secKillFeignService.getSkuSecKillInfo(skuId);
if (skuSecKillInfo.getCode() == 0) {
SecKillInfoVo skuSecKillInfoData = skuSecKillInfo.getData(new TypeReference<SecKillInfoVo>() {
});
skuItemVo.setSecKillInfoVo(skuSecKillInfoData);
}
}, executor);
// 等到所有任务都完成
CompletableFuture.allOf(saleFuture, descFuture, baseAttrFuture, imageFuture, secKillFuture).get();
return skuItemVo;
}