畅购商城文章系列
畅购商城:分布式文件系统FastDFS
畅购商城:商品的SPU和SKU概念
畅购商城:Lua、OpenResty、Canal实现广告缓存
畅购商城:微服务网关和JWT令牌(上)
畅购商城:微服务网关和JWT令牌(下)
畅购商城:Spring Security Oauth2 JWT(上)
畅购商城:Spring Security Oauth2 JWT(下)
畅购商城:购物车
畅购商城:订单
畅购商城:微信支付
畅购商城:秒杀(上)
畅购商城:秒杀(下)
目录
学习目标
- 防止秒杀重复排队
重复排队:一个人抢购商品,如果没有支付,不允许重复排队抢购
- 并发超卖问题解决
1个商品卖给多个人:1商品多订单
- 秒杀订单支付
秒杀支付:支付流程需要调整
- 超时支付订单库存回滚
1.RabbitMQ延时队列
2.利用延时队列实现支付订单的监听,根据订单支付状况进行订单数据库回滚
1. 防止秒杀重复排队
用户每次抢单的时候,一旦排队,我们设置一个自增值,让该值的初始值为1,每次进入抢单的时候,对它进行递增,如果值>1,则表明已经排队,不允许重复排队,如果重复排队,则对外抛出异常,并抛出异常信息100表示已经正在排队。
1.1 后台排队记录
修改SeckillOrderServiceImpl的add方法,新增递增值判断是否排队中,代码如下:
//递增,判断是否排队
Long userQueueCount = redisTemplate.boundHashOps("UserQueueCount").increment(username, 1);
if(userQueueCount>1){
//100:表示有重复抢单
throw new RuntimeException(String.valueOf(StatusCode.REPERROR));
}
2. 并发超卖问题解决
超卖问题,这里是指多人抢购同一商品的时候,多人同时判断是否有库存,如果只剩一个,则都会判断有库存,此时会导致超卖现象产生,也就是一个商品下了多个订单的现象。
2.1 思路分析
解决超卖问题,可以利用Redis队列实现,给每件商品创建一个独立的商品个数队列,例如:A商品有2个,A商品的ID为1001,则可以创建一个队列,key=SeckillGoodsCountList_1001,往该队列中塞2次该商品ID。
每次给用户下单的时候,先从队列中取数据,如果能取到数据,则表明有库存,如果取不到,则表明没有库存,这样就可以防止超卖问题产生了。
方法一:在我们队Redis进行操作的时候,很多时候,都是先将数据查询出来,在内存中修改,然后存入到Redis,在并发场景,会出现数据错乱问题,为了控制数量准确,我们单独将商品数量整一个自增键,自增键是线程安全的,所以不担心并发场景的问题。
方法二:给每个sku创建一个队列,比如id为399的商品数量为4,那么就在399的队列里放入4件商品。然后每次查询就从队列里去取,假如现在有五个线程去查库存,因为只有4件商品,所以5个线程只有4个线程能够查询出库存。因为Redis是单线程的,所以不会出现多个线程同时访问数据出错的情况,这样就可以避免并发超卖的问题。
2.2 代码实现
每次将商品压入Redis缓存的时候,另外多创建一个商品的队列。
修改SeckillGoodsPushTask,添加一个pushIds方法,用于将指定商品ID放入到指定的数字中,代码如下:
/***
* 将商品ID存入到数组中
* @param len:长度
* @param id :值
* @return
*/
public Long[] pushIds(int len,Long id){
Long[] ids = new Long[len];
for (int i = 0; i <ids.length ; i++) {
ids[i]=id;
}
return ids;
}
修改SeckillGoodsPushTask的loadGoodsPushRedis方法,添加队列操作,代码如下:
//商品数据队列存储,防止高并发超卖
Long[] ids = pushIds(seckillGood.getStockCount(), seckillGood.getId());
redisTemplate.boundListOps("SeckillGoodsCountList_"+seckillGood.getId()).leftPushAll(ids);
//自增计数器
redisTemplate.boundHashOps("SeckillGoodsCount").increment(seckillGood.getId(),seckillGood.getStockCount());
2.3 超卖控制
修改多线程下单方法,分别修改数量控制,以及售罄后用户抢单排队信息的清理,
/***
* 多线程下单操作
*/
@Async
public void createOrder(){
//从队列中获取排队信息
SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundListOps("SeckillOrderQueue").rightPop();
try {
//从队列中获取一个商品
Object sgood = redisTemplate.boundListOps("SeckillGoodsCountList_" + seckillStatus.getGoodsId()).rightPop();
if(sgood==null){
//清理当前用户的排队信息
clearQueue(seckillStatus);
return;
}
//时间区间
String time = seckillStatus.getTime();
//用户登录名
String username=seckillStatus.getUsername();
//用户抢购商品
Long id = seckillStatus.getGoodsId();
//获取商品数据
SeckillGoods goods = (SeckillGoods) redisTemplate.boundHashOps("SeckillGoods_" + time).get(id);
//如果有库存,则创建秒杀商品订单
SeckillOrder seckillOrder = new SeckillOrder();
seckillOrder.setId(idWorker.nextId());
seckillOrder.setSeckillId(id);
seckillOrder.setMoney(goods.getCostPrice());
seckillOrder.setUserId(username);
seckillOrder.setCreateTime(new Date());
seckillOrder.setStatus("0");
//将秒杀订单存入到Redis中
redisTemplate.boundHashOps("SeckillOrder").put(username,seckillOrder);
//商品库存-1
Long surplusCount = redisTemplate.boundHashOps("SeckillGoodsCount").increment(id, -1);//商品数量递减
goods.setStockCount(surplusCount.intValue()); //根据计数器统计
//判断当前商品是否还有库存
if(surplusCount<=0){
//并且将商品数据同步到MySQL中
seckillGoodsMapper.updateByPrimaryKeySelective(goods);
//如果没有库存,则清空Redis缓存中该商品
redisTemplate.boundHashOps("SeckillGoods_" + time).delete(id);
}else{
//如果有库存,则直数据重置到Reids中
redisTemplate.boundHashOps("SeckillGoods_" + time).put(id,goods);
}
//抢单成功,更新抢单状态,排队->等待支付
seckillStatus.setStatus(2);
seckillStatus.setOrderId(seckillOrder.getId());
seckillStatus.setMoney(seckillOrder.getMoney().floatValue());
redisTemplate.boundHashOps("UserQueueStatus").put(username,seckillStatus);
} catch (Exception e) {
e.printStackTrace();
}
}
/***
* 清理用户排队信息
* @param seckillStatus
*/
public void clearQueue(SeckillStatus seckillStatus){
//清理排队标示
redisTemplate.boundHashOps("UserQueueCount").delete(seckillStatus.getUsername());
//清理抢单标示
redisTemplate.boundHashOps("UserQueueStatus").delete(seckillStatus.getUsername());
}
3. 订单支付
3.1 秒杀支付实现思路
完成秒杀下订单后,进入支付页面,此时前端会每3秒中向后台发送一次请求用于判断当前用户订单是否完成支付,如果完成了支付,则需要清理掉排队信息,并且需要修改订单状态信息。
秒杀支付的流程和之前做的类似,只不过现在秒杀订单的支付状态发送到Queue2中,普通订单还是发送到queue1中,但是我们怎么知道该将订单的支付状态发送给queue1还是queue2呢?如果微信服务器可以将MQ队列的exchange和routingKey返回给我们就好了,这样我们就可以动态地指定要发送的MQ。
3.2 秒杀流程分析
步骤如下:
1.用户抢单,经过秒杀系统实现抢单,下单后会将向MQ发送一个延时队列消息,包含抢单信息,延时半小时后才能监听到
2.秒杀系统同时启用延时消息监听,一旦监听到订单抢单信息,判断Redis缓存中是否存在订单信息,如果存在,则回滚
3.秒杀系统还启动支付回调信息监听,如果支付完成,则将订单吃句话到MySQL,如果没完成,清理排队信息回滚库存
4.每次秒杀下单后调用支付系统,创建二维码,如果用户支付成功了,微信系统会将支付信息发送给支付系统指定的回调地址,支付系统收到信息后,将信息发送给MQ,第3个步骤就可以监听到消息了。
3.3 支付回调更新
支付回调这一块代码已经实现了,但之前实现的是订单信息的回调数据发送给MQ,指定了对应的队列,不过现在需要实现的是秒杀信息发送给指定队列,所以之前的代码那块需要动态指定队列。
3.3.1 支付回调队列指定
关于指定队列如下:
1.创建支付二维码需要指定队列
2.回调地址回调的时候,获取支付二维码指定的队列,将支付信息发送到指定队列中
在微信支付统一下单API中,有一个附加参数,如下:
attach:附加数据,String(127),在查询API和支付通知中原样返回,可作为自定义参数使用。
我们可以在创建二维码的时候,指定该参数,该参数用于指定回调支付信息的对应队列,每次回调的时候,会获取该参数,然后将回调信息发送到该参数对应的队列去。
3.3.2 支付状态监听
支付状态通过回调地址发送给MQ之后,我们需要在秒杀系统中监听支付信息,如果用户已支付,则修改用户订单状态,如果支付失败,则直接删除订单,回滚库存。
在秒杀工程中创建com.changgou.seckill.consumer.SeckillOrderPayMessageListener,实现监听消息,代码如下:
@Component
@RabbitListener(queues = "${mq.pay.queue.seckillorder}")
public class SeckillOrderPayMessageListener {
/**
* 监听消费消息
* @param message
*/
@RabbitHandler
public void consumeMessage(@Payload String message){
System.out.println(message);
//将消息转换成Map对象
Map<String,String> resultMap = JSON.parseObject(message,Map.class);
System.out.println("监听到的消息:"+resultMap);
}
}
修改SeckillApplication创建对应的队列以及绑定对应交换机。
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@MapperScan(basePackages = {"com.changgou.seckill.dao"})
@EnableScheduling
@EnableAsync
public class SeckillApplication {
public static void main(String[] args) {
SpringApplication.run(SeckillApplication.class,args);
}
@Bean
public IdWorker idWorker(){
return new IdWorker(1,1);
}
@Autowired
private Environment env;
/***
* 创建DirectExchange交换机
* @return
*/
@Bean
public DirectExchange basicExchange(){
return new DirectExchange(env.getProperty("mq.pay.exchange.order"), true,false);
}
/***
* 创建队列
* @return
*/
@Bean(name = "queueOrder")
public Queue queueOrder(){
return new Queue(env.getProperty("mq.pay.queue.order"), true);
}
/***
* 创建秒杀队列
* @return
*/
@Bean(name = "queueSeckillOrder")
public Queue queueSeckillOrder(){
return new Queue(env.getProperty("mq.pay.queue.seckillorder"), true);
}
/****
* 队列绑定到交换机上
* @return
*/
@Bean
public Binding basicBindingOrder(){
return BindingBuilder
.bind(queueOrder())
.to(basicExchange())
.with(env.getProperty("mq.pay.routing.orderkey"));
}
/****
* 队列绑定到交换机上
* @return
*/
@Bean
public Binding basicBindingSeckillOrder(){
return BindingBuilder
.bind(queueSeckillOrder())
.to(basicExchange())
.with(env.getProperty("mq.pay.routing.seckillorderkey"));
}
}
修改application.yml文件,添加如下配置:
#位置支付交换机和队列
mq:
pay:
exchange:
order: exchange.order
seckillorder: exchange.seckillorder
queue:
order: queue.order
seckillorder: queue.seckillorder
routing:
key: queue.order
seckillkey: queue.seckillorder
3.3.3 修改订单状态
监听到支付信息后,根据支付信息判断,如果用户支付成功,则修改订单信息,并将订单入库,删除用户排队信息,如果用户支付失败,则删除订单信息,回滚库存,删除用户排队信息。
3.3.3.1 业务层
修改SeckillOrderService,添加修改订单方法,代码如下
/***
* 更新订单状态
* @param out_trade_no
* @param transaction_id
* @param username
*/
void updatePayStatus(String out_trade_no, String transaction_id,String username);
修改SeckillOrderServiceImpl,添加修改订单方法实现,代码如下:
/***
* 更新订单状态
* @param out_trade_no
* @param transaction_id
* @param username
*/
@Override
public void updatePayStatus(String out_trade_no, String transaction_id,String username) {
//订单数据从Redis数据库查询出来
SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps("SeckillOrder").get(username);
//修改状态
seckillOrder.setStatus("1");
//支付时间
seckillOrder.setPayTime(new Date());
//同步到MySQL中
seckillOrderMapper.insertSelective(seckillOrder);
//清空Redis缓存
redisTemplate.boundHashOps("SeckillOrder").delete(username);
//清空用户排队数据
redisTemplate.boundHashOps("UserQueueCount").delete(username);
//删除抢购状态信息
redisTemplate.boundHashOps("UserQueueStatus").delete(username);
}
3.3.3.2 修改订单对接
修改微信支付状态监听的代码,当用户支付成功后,修改订单状态,也就是调用上面的方法,代码如下:
3.3.4 删除订单回滚库存
如果用户支付失败,我们需要删除用户订单数据,并回滚库存。
3.3.4.1 业务层实现
修改SeckillOrderService,创建一个关闭订单方法,代码如下:
/***
* 关闭订单,回滚库存
*/
void closeOrder(String username);
修改SeckillOrderServiceImpl,创建一个关闭订单实现方法,代码如下:
/***
* 关闭订单,回滚库存
* @param username
*/
@Override
public void closeOrder(String username) {
//将消息转换成SeckillStatus
SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundHashOps("UserQueueStatus").get(username);
//获取Redis中订单信息
SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps("SeckillOrder").get(username);
//如果Redis中有订单信息,说明用户未支付
if(seckillStatus!=null && seckillOrder!=null){
//删除订单
redisTemplate.boundHashOps("SeckillOrder").delete(username);
//回滚库存
//1)从Redis中获取该商品
SeckillGoods seckillGoods = (SeckillGoods) redisTemplate.boundHashOps("SeckillGoods_"+seckillStatus.getTime()).get(seckillStatus.getGoodsId());
//2)如果Redis中没有,则从数据库中加载
if(seckillGoods==null){
seckillGoods = seckillGoodsMapper.selectByPrimaryKey(seckillStatus.getGoodsId());
}
//3)数量+1 (递增数量+1,队列数量+1)
Long surplusCount = redisTemplate.boundHashOps("SeckillGoodsCount").increment(seckillStatus.getGoodsId(), 1);
seckillGoods.setStockCount(surplusCount.intValue());
redisTemplate.boundListOps("SeckillGoodsCountList_" + seckillStatus.getGoodsId()).leftPush(seckillStatus.getGoodsId());
//4)数据同步到Redis中
redisTemplate.boundHashOps("SeckillGoods_"+seckillStatus.getTime()).put(seckillStatus.getGoodsId(),seckillGoods);
//清理排队标示
redisTemplate.boundHashOps("UserQueueCount").delete(seckillStatus.getUsername());
//清理抢单标示
redisTemplate.boundHashOps("UserQueueStatus").delete(seckillStatus.getUsername());
}
}
3.3.4.2 调用删除订单
修改SeckillOrderPayMessageListener,在用户支付失败后调用关闭订单方法,代码如下:
//支付失败,删除订单
seckillOrderService.closeOrder(attachMap.get("username"));
4. RabbitMQ延时消息队列
4.1 延时队列介绍
延时队列即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。
Rabbitmq实现延时队列一般而言有两种形式:
第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)[A队列过期->转发给B队列]
第二种方式:利用rabbitmq中的插件x-delay-message
4.2 TTL DLX实现延时队列
TTL
RabbitMQ可以针对队列设置x-expires(则队列中所有的消息都有相同的过期时间)或者针对Message设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
4.3 DLX延时队列实现
4.3.1 队列创建
创建2个队列,用于接收消息的叫延时队列queue.message.delay,用于转发消息的队列叫queue.message,同时创建一个交换机,
@Configuration
public class QueueConfig {
/** 短信发送队列 */
public static final String QUEUE_MESSAGE = "queue.message";
/** 交换机 */
public static final String DLX_EXCHANGE = "dlx.exchange";
/** 短信发送队列 延迟缓冲(按消息) */
public static final String QUEUE_MESSAGE_DELAY = "queue.message.delay";
/**
* 短信发送队列
* @return
*/
@Bean
public Queue messageQueue() {
return new Queue(QUEUE_MESSAGE, true);
}
/**
* 短信发送队列
* @return
*/
@Bean
public Queue delayMessageQueue() {
return QueueBuilder.durable(QUEUE_MESSAGE_DELAY)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE) // 消息超时进入死信队列,绑定死信队列交换机
.withArgument("x-dead-letter-routing-key", QUEUE_MESSAGE) // 绑定指定的routing-key
.build();
}
/***
* 创建交换机
* @return
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DLX_EXCHANGE);
}
/***
* 交换机与队列绑定
* @param messageQueue
* @param directExchange
* @return
*/
@Bean
public Binding basicBinding(Queue messageQueue, DirectExchange directExchange) {
return BindingBuilder.bind(messageQueue)
.to(directExchange)
.with(QUEUE_MESSAGE);
}
}
4.3.2 消息监听
创建MessageListener用于监听消息,
@Component
@RabbitListener(queues = QueueConfig.QUEUE_MESSAGE)
public class MessageListener {
/***
* 监听消息
* @param msg
*/
@RabbitHandler
public void msg(@Payload Object msg){
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("当前时间:"+dateFormat.format(new Date()));
System.out.println("收到信息:"+msg);
}
}
4.3.3 创建启动类
@SpringBootApplication
@EnableRabbit
public class SpringRabbitMQApplication {
public static void main(String[] args) {
SpringApplication.run(SpringRabbitMQApplication.class,args);
}
}
5. 库存回滚
5.1 秒杀流程
1.用户抢单,经过秒杀系统实现抢单,下单后会将向MQ发送一个延时队列消息,包含抢单信息,延时半小时后才能监听到
2.秒杀系统同时启用延时消息监听,一旦监听到订单抢单信息,判断Redis缓存中是否存在订单信息,如果存在,则回滚
3.秒杀系统还启动支付回调信息监听,如果支付完成,则将订单吃句话到MySQL,如果没完成,清理排队信息回滚库存
4.每次秒杀下单后调用支付系统,创建二维码,如果用户支付成功了,微信系统会将支付信息发送给支付系统指定的回调地址,支付系统收到信息后,将信息发送给MQ,第3个步骤就可以监听到消息了。
延时队列实现订单关闭回滚库存:
1.创建一个过期队列 Queue1
2.接收消息的队列 Queue2
3.中转交换机
4.监听Queue2
1)SeckillStatus->检查Redis中是否有订单信息
2)如果有订单信息,调用删除订单回滚库存->[需要先关闭微信支付,防止中途用户支付]
3)如果关闭订单时,用于已支付,修改订单状态即可
4)如果关闭订单时,发生了别的错误,记录日志,人工处理