- 首先引入spring自带的ThreadPoolTaskExecutor构建线程的方法:
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
也可以自己写一个配置类配置线程池:
@EnableAsync
@Configuration
public class ThreadPoolTaskExecutorConfig {
/**
* 线程池维护线程的最少数量
*/
private final static int CORE_POOL_SIZE = 20;
/**
* 线程池维护线程的最大数量(超过最大值,workQueue将拒绝执行任务)
*/
private final static int MAX_POOL_SIZE = 20;
/**f
* 线程池空闲线程存活的时间
*/
private final static int KEEP_ALIVE_SECONDS = 60;
/**
* 线程池被阻塞线程队列容量
*/
private final static int BLOCKING_QUEUE_CAPACITY = 1200;
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(CORE_POOL_SIZE);
executor.setMaxPoolSize(MAX_POOL_SIZE);
executor.setKeepAliveSeconds(KEEP_ALIVE_SECONDS);
executor.setQueueCapacity(BLOCKING_QUEUE_CAPACITY);
return executor;
}
}
然后在使用@Qualifier()
注解使用自己指定的配置类:
@Autowired
@Qualifier("threadPoolTaskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
- 调用threadPoolTaskExecutor的api:
threadPoolTaskExecutor.execute(Runnable tack);
tack
就是你要处理的业务代码逻辑(封装在一个类),要实现Runnable
的run
方法。
以下例子:同步一批数据到ES
public class GoodsInfo580Task implements Runnable {
private List<GoodsInfo580DTO> data;
private GoodsInfo580Repository goodsInfo580Repository;
public GoodsInfo580Task(){
}
public GoodsInfo580Task(List<GoodsInfo580DTO> data,GoodsInfo580Repository goodsInfo580Repository){
this.data = data;
this.goodsInfo580Repository = goodsInfo580Repository;
}
@Override
public void run() {
List<GoodsInfo580Document> dtos = new ArrayList<>(data.size());
for (GoodsInfo580DTO dto : data) {
if(Func.isNull(dto)){
continue;
}
GoodsInfo580Document goodsInfo580Document = new GoodsInfo580Document();
HealthGoodsInfoConstructHelper.setHealthGoodsInfo(dto,goodsInfo580Document);
dtos.add(goodsInfo580Document);
}
try {
log.info("插入ES数量:{}",dtos.size());
goodsInfo580Repository.saveAll(dtos);
} catch (Exception e) {
log.error("全量同步580商品到ES异常:{},错误信息:{}", JSONObject.toJSONString(dtos),e);
}
}
}
- 也可以不创建一个tack类,直接调用
execute
方法
例子:计算一批数据的商品价格
public void calculate580Pres() {
//查询580处方药品信息
List<PrescriptionDTO> prescriptionDTOList = this.baseMapper.selectPresInfo();
if(Func.isEmpty(prescriptionDTOList)){
log.error("查询580处方信息为空:{}",prescriptionDTOList);
return;
}
//多线程执行计算订单总价
threadPoolTaskExecutor.execute(()->{
for (PrescriptionDTO prescriptionDTO : prescriptionDTOList) {
// 判断是否存在刷取订单价格
Object lock = redisUtils.get(RedisConstant.REDIS_580LOCK + prescriptionDTO.getId());
if (Func.isNotEmpty(lock)){
continue;
}
Integer storeId = prescriptionDTO.getStoreId();
String drugRspList = prescriptionDTO.getDrugRspList();
List<DrugRspListDTO> listDTOS = JSONArray.parseArray(drugRspList, DrugRspListDTO.class);
double price = 0.00;
try {
// 药品品种
int size = listDTOS.size();
// 判断计算后的药品数量
int sum = 0;
//有多药需要累积计算
for (DrugRspListDTO listDTO : listDTOS) {
if (listDTO.getAmout() > 0) {
GoodsUnionIdDTO byGoodsId = iSkuGoodsMapperClient.getByGoodsId(1000001L, listDTO.getCommodityCode());
log.info("商品参数:{}",byGoodsId);
String goodsId = byGoodsId.getGoodsId().toString();
BigDecimal priceByParams = iPriceDeptGoodsClient.getPriceByParams(1000001L, storeId.toString(), goodsId);
log.info(goodsId + "商品价格:{}",priceByParams);
if (Func.isNotEmpty(priceByParams) && priceByParams.compareTo(BigDecimal.ZERO) > 0){
BigDecimal count = BigDecimal.valueOf(priceByParams.doubleValue() * listDTO.getAmout()).setScale(2);
if (count.compareTo(BigDecimal.ZERO)>0){
++sum;
}
price = price + count.doubleValue();
log.info("计算处方单总价:{}", prescriptionDTO.getId() + ":" +price);
}
}
}
if (price > 0 && Func.equals(size,sum)){
this.baseMapper.updatePriceById(Double.toString(price),prescriptionDTO.getId());
redisUtils.set(RedisConstant.REDIS_580LOCK + prescriptionDTO.getId(),"1",60*3);
}else {
this.baseMapper.updatePriceTimeById(prescriptionDTO.getId());
}
} catch (Exception e) {
this.baseMapper.updatePriceTimeById(prescriptionDTO.getId());
log.error("计算处方单总价失败:{}",prescriptionDTO.getId() + ":" +price);
}
}
});
}