问题:下单操作需要多次调用数据库,查询商品信息,用户信息,修改库存数据,造成性能瓶颈。
优化方向:读取数据改为从缓存读取,修改库存数据改为修改缓存数据在用消息队列异步修改数据库。可以用rocketmq的异步事务型消息来保证redis和数据库数据同步,在缓存异常情况可以用数据库数据来恢复。
1.交易验证优化
(1)校验商品是否存在,改为缓存
之前是直接从数据库根据itemId获取商品数据
现在改为从redis缓存中获取,没有在从数据库获取
@Override public ItemModel getItemByIdInCache(Integer id) { ItemModel itemModel = (ItemModel)redisTemplate.opsForValue().get("item_validate_"+id); if(itemModel == null) { itemModel = this.getItemById(id); redisTemplate.opsForValue().set("item_validate_"+id, itemModel); } return itemModel; }
(2)校验用户信息是否存在,改为缓存
之前是直接从数据库根据userId获取用户数据
现在改为从redis缓存中获取,没有在从数据库获取
@Override public UserModel getUserByIdInCache(Integer id) { UserModel userModel = (UserModel)redisTemplate.opsForValue().get("user_"+id); if(userModel == null) { userModel = this.getUserById(id); redisTemplate.opsForValue().getAndSet("user_"+id, userModel); } return userModel; }
2. 修改库存数据改为修改缓存
库存修改表数据因为itemId为库存表主键,所以会有行锁,防止并发,但会影响效率
所以在抢购活动发布时,将库存存入redis中,下单时先修改redis缓存数据,再用rocketmq同步修改库存来保证可靠性
a.发布活动,缓存库存
//发布活动,这个应该是运维操作
@RequestMapping(value = "/publishpromo",method = {RequestMethod.GET}) @ResponseBody public CommonReturnType publisPromo(Integer id) { promoService.publishPromo(id); return CommonReturnType.create(null); }
@Override
public void publishPromo(Integer promoId) {
// 通过活动ID获取活动信息
PromoDO promoDO = promoDOMapper.selectByPrimaryKey(promoId);
if(promoDO == null || promoDO.getItemId().intValue() == 0) {
return;
}
ItemModel itemModel = itemService.getItemById(promoDO.getItemId());
//将库存同步到redis中
redisTemplate.opsForValue().getAndSet("promo_item_stock_"+promoDO.getItemId(), itemModel.getStock());
}
b.下单减库存
@Override @Transactional public boolean decreaseStock(Integer itemId, Integer amount) throws BusinessException { /*int affectedRow = itemStockDOMapper.decreaseStock(itemId,amount); if(affectedRow > 0){ //更新库存成功 return true; }else{ //更新库存失败 return false; }*/ Long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId, amount.intValue()*-1); // 发送消息队列修改数据库库存
boolean sendResult = producer.asyncReduceStock(itemId, amount); // 还剩的数据 if(result > 0 && sendResult){ //更新库存成功 return true; }else { //更新库存失败 redisTemplate.opsForValue().increment("promo_item_stock_"+itemId, amount.intValue()); return false; } }
消息队列生产者发送
@Component public class MQProducer { Log log = LogFactory.getLog(getClass()); @Value("${mq.nameserver.addr}") private String nameServer; @Value("${mq.topicname}") private String topicName; DefaultMQProducer producer; @PostConstruct public void init() throws MQClientException { producer = new DefaultMQProducer("producer"); producer.setNamesrvAddr(nameServer); producer.start(); } // 同步扣减库存消息 public boolean asyncReduceStock(Integer itemId, Integer amount) { Map<String,Object> bodyMap = new HashMap<String,Object>(); bodyMap.put("itemId", itemId); bodyMap.put("amount", amount); Message msg = new Message(topicName,"increase", JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8"))); try { log.error("begin to send msg"); producer.send(msg); log.error("finish send msg"); } catch (MQClientException e) { e.printStackTrace(); return false; } catch (RemotingException e) { // TODO Auto-generated catch block e.printStackTrace(); return false; } catch (MQBrokerException e) { // TODO Auto-generated catch block e.printStackTrace(); return false; } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); return false; } return true; } }
消息队列消费者减库存数据库
@Component public class MQConsumer { @Value("${mq.nameserver.addr}") private String nameServer; @Value("${mq.topicname}") private String topicName; DefaultMQPushConsumer consumer; @Autowired private ItemStockDOMapper itemStockDOMapper; private Log log = LogFactory.getLog(getClass()); @PostConstruct public void init() throws MQClientException { consumer = new DefaultMQPushConsumer("stock_consumer_group"); consumer.setNamesrvAddr(nameServer); consumer.subscribe(topicName, "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { log.error("begin to cosume msg"); Message msg = msgs.get(0); String jsonStr = new String(msg.getBody()); Map<String,Object> map = JSON.parseObject(jsonStr, Map.class); Integer itemId= (Integer)map.get("itemId"); Integer amount= (Integer)map.get("amount"); log.error("itemId:"+itemId+",amount:"+amount); int affectedRow = itemStockDOMapper.decreaseStock(itemId,amount); if(affectedRow > 0){ //更新库存成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }else{ //更新库存失败 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); } }