线程池
池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我
线程池的好处:线程复用,可以控制最大并发数,管理线程
1、降低资源的消耗;
2、提高响应速度;
3、方便管理;
三大方法、七大参数、四种拒绝策略
三大方法:
1、ExecutorService threadPool = Executors.newSingleThreadExecutor();// 创建单个线程
2、ExecutorService threadPool = Executors.newFixedThreadPool(5); // 创建一个固定的线程池的大小
3、ExecutorService threadPool = Executors.newCachedThreadPool(); // 可伸缩的,遇强则强,遇弱则弱
源码分析:
//三大方法
//请求队列LinkedBlockingQueue长度为默认为 Integer.MAX_VALUE
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {//21亿
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// 本质ThreadPoolExecutor()的方法调用
// 七大参数
public ThreadPoolExecutor(int corePoolSize,// 1. 核心线程池大小
int maximumPoolSize,// 2. 最大核心线程池大小
long keepAliveTime,// 3. 超时了没有人调用就会释放
TimeUnit unit,// 4. 超时单位
BlockingQueue<Runnable> workQueue,// 5. 阻塞队列,候客区
ThreadFactory threadFactory,// 6. 线程工厂:创建线程的,一般不用动
RejectedExecutionHandler handler// 7. 拒绝策略
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
//四种拒绝策略
new ThreadPoolExecutor.AbortPolicy() // 银行满了,还有人进来,不处理这个人的,抛出异常
new ThreadPoolExecutor.CallerRunsPolicy() // 哪来的去哪里!
new ThreadPoolExecutor.DiscardPolicy() //队列满了,丢掉任务,不会抛出异常!
new ThreadPoolExecutor.DiscardOldestPolicy() //队列满了,尝试去和最早的竞争,也不会抛出异常!
阿里巴巴开发规约:
线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允许的请求队列LinkedBlockingQueue长度为默认为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2) CachedThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
手动创建线程池
package com.zgq.pool;
import java.util.concurrent.*;
public class Demo02 {
public static void main(String[] args) {
// 自定义线程池!工作 ThreadPoolExecutor
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了,尝试去和最早的竞争,也不会抛出异常!
try {
// 最大承载:Deque + max
// 超过最大承载,就会出发拒绝策略,引发异常 RejectedExecutionException
for (int i = 1; i <= 9; i++) {
// 使用了线程池之后,使用线程池来创建线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 线程池用完,程序结束,关闭线程池
threadPool.shutdown();
}
}
}
池的最大的大小如何去设置!调优
1、CPU密集型:
尽量使用较小的线程池,一般为CPU核心数+1,因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,会造成CPU过度切换;
2、IO密集型:
可以使用稍大的线程池,一般为2*CPU核心数。IO密集型任务对CPU使用率并不高,因此可以让CPU在等待IO的时候有其他线程去处理别的任务,充分利用CPU时间。
3、混合型任务:
可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。只要分完之后两个任务的执行时间相差不大,那么就会比串行执行起来高效。因为如果划分之后两个任务执行时间存在有数据级的差距,那么拆分没有意义。因为先执行完的任务就要等后执行完的任务,最终时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。
// 最大线程到底该如何定义
// 1、CPU 密集型,几核,就是几,可以保持CPu的效率最高!一般为cpu核数+1
// 2、IO 密集型 > 判断你程序中十分耗IO的线程,
// 程序 15个大型任务 io十分占用资源!
// 获取CPU的核数
System.out.println(Runtime.getRuntime().availableProcessors());
实际项目中的使用
以谷粒商城的购物车系统为例,因为购物车需要用到CompletableFuture进行异步编排的优化,所以需要引入线程池来执行异步任务;
1、写配置文件的属性配置,因为线程池的参数尽量在配置文件中配置,这样后面易于修改与维护
package com.atguigu.gulimall.cart.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@ConfigurationProperties(prefix = "gulimall.thread")
@Component
@Data
public class ThreadPoolConfigProperties {
private Integer coreSize;
private Integer maxSize;
private Integer keepAliveTime;
}
在配置文件中设置
gulimall:
thread:
core-size: 20
max-size: 200
keep-alive-time: 10
2、写线程池的配置类,记住一定要使用ThreadPoolExecutor来进行创建,原因上面详细说了
package com.atguigu.gulimall.cart.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
// 开启这个属性配置
//@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties threadPoolConfigProperties){
return new ThreadPoolExecutor(threadPoolConfigProperties.getCoreSize(),
threadPoolConfigProperties.getMaxSize(),
threadPoolConfigProperties.getKeepAliveTime() ,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
}
}
3、业务代码中使用,直接注入即可,然后就可以直接在CompletableFuture中使用了;至于添加到购物车的代码这里就不详细说了,以后应该会专门写谷粒商城的博客;
@Slf4j
@Service
public class CartServiceImpl implements CartService {
@Autowired
private ThreadPoolExecutor executor;
@Override // CartServiceImpl
public CartItem addToCart(Long skuId, Integer num) throws ExecutionException, InterruptedException {
// 获取当前用户的map
BoundHashOperations<String, Object, Object> cartOps = getCartOps();
// 查看该用户购物车里是否有指定的skuId
String res = (String) cartOps.get(skuId.toString());
// 查看用户购物车里是否已经有了该sku项
if (StringUtils.isEmpty(res)) {
CartItem cartItem = new CartItem();
// 异步编排
CompletableFuture<Void> getSkuInfo = CompletableFuture.runAsync(() -> {
// 1. 远程查询当前要添加的商品的信息
R skuInfo = productFeignService.SkuInfo(skuId);
SkuInfoVo sku = skuInfo.getData("skuInfo", new TypeReference<SkuInfoVo>() {
});
// 2. 填充购物项
cartItem.setCount(num);
cartItem.setCheck(true);
cartItem.setImage(sku.getSkuDefaultImg());
cartItem.setPrice(sku.getPrice());
cartItem.setTitle(sku.getSkuTitle());
cartItem.setSkuId(skuId);
}, executor);
// 3. 远程查询sku销售属性,销售属性是个list
CompletableFuture<Void> getSkuSaleAttrValues = CompletableFuture.runAsync(() -> {
List<String> values = productFeignService.getSkuSaleAttrValues(skuId);
cartItem.setSkuAttr(values);
}, executor);
// 等待执行完成
CompletableFuture.allOf(getSkuInfo, getSkuSaleAttrValues).get();
// sku放到用户购物车redis中
cartOps.put(skuId.toString(), JSON.toJSONString(cartItem));
return cartItem;
} else {//购物车里已经有该sku了,数量+1即可
CartItem cartItem = JSON.parseObject(res, CartItem.class);
// 不太可能并发,无需加锁
cartItem.setCount(cartItem.getCount() + num);
cartOps.put(skuId.toString(), JSON.toJSONString(cartItem));
return cartItem;
}
}
}
希望大家看我的文章能够有所帮助,有些细微的提升。