小伙伴们,你们好呀!我是老寇!废话不多说,跟我一起抢红包
目录
1.运行效果图(b站-地址)
<iframe allowfullscreen="true" data-mediaembed="bilibili" id="SQMBxci6-1625893173770" src="https://player.bilibili.com/player.html?aid=804078092"></iframe>redis分布式锁实现抢红包
2.技术架构
基础框架:springboot + springcloud
消息队列:rabbitMQ
数据缓存:redis
消息推送:websocket
3.技术选型
为什么要用redis实现分布式锁的方案,放弃synchronized的方案,采用synchronized在分布式系统无法做到持有的锁是同一把锁(单体应用适用,不适用于微服务应用),因此这个方案被丢弃啦
4.业务逻辑(重点)
* 进阶-> 采用gateway网关限流 -> 令牌桶算法 + redis,限制并发量,请求过多放入队列->rabbitmq
* 1.判断红包是否还有剩余的(套路:先在redis里面读,redis没有数据,然后去mysql查询放到redis)
* 2.判断该用户是否已经抢了红包
* 3.获取红包金额,并剩余红包数和红包金额写入redis
* 4.保存抢红包的记录(记录放入mysql)
* 5.响应给前端(调用消息服务,用websocket推送消息到前端->用户点进来的时候就必须初始化连接)
5.分布式锁的设计(个人理解)
在分布式系统中,会遇到服务宕机的情况,服务宕机有很多情况,比如说内存不足,流量暴增等等都会让服务宕机,因此设计分布式锁就要考虑两点:
1.原子性:理解为一个线程持有锁的时候,其他线程只能等待,当该锁释放时,其他线程才有去抢占该锁的机会,最终只会有一个线程持有该锁,我举个例子:大家都有乘坐电梯或摩天轮的经历,排除满员的情况,在同一时刻,按下电梯按钮,每个人做到电梯的概率是相同的,停到某一层,就是说该人可以做电梯,也就该层的人都持有这个电梯。这一栋的楼层可以看做一个个的线程,电梯大致可以看成锁,特别是吃饭或下班的时候,都想快点做到电梯,这个也是一个高并发的场景。
2.服务宕机:服务宕机导致锁无法释放,然后就造成了死锁的情况,设置过期时间,服务重启后,会自动清除过期的数据
6.分布式锁的实现(核心代码)
基于原子性设计,因此就用lua脚本语句,获取锁,不要忘了释放锁(大牛二虎上代码)
private static final RedisScript<String> SCRIPT_LOCK = new DefaultRedisScript<>(
//当这个key不存在,则设置一个值,并设置过期时间(px表示毫秒)
"return redis.call('set',KEYS[1],ARGV[1],'NX','PX',ARGV[2])",String.class
);
private static final RedisScript<String> SCRIPT_UNLOCK = new DefaultRedisScript<>(
//判断有这个key没有返回'false',有则删除 并返回'true'
"if redis.call('get',KEYS[1]) == ARGV[1] then return tostring(redis.call('del',KEYS[1]) == 1) else return 'false' end",String.class
);
private static final String LOCK_SUCCESS = "OK";
@Autowired
private RedisTemplate redisTemplate;
@Override
public boolean acquireLock(String lockKey, String lockValue) {
Object lockResult = redisTemplate.execute(SCRIPT_LOCK,
redisTemplate.getStringSerializer(),
redisTemplate.getStringSerializer(),
Collections.singletonList(lockKey),
lockValue);
return LOCK_SUCCESS.equals(lockResult);
}
@Override
public boolean releaseLock(String lockKey,String lockValue) {
Object releaseResult = redisTemplate.execute(SCRIPT_UNLOCK,
redisTemplate.getStringSerializer(),
redisTemplate.getStringSerializer(),
Collections.singletonList(lockKey),
lockValue);
return Boolean.valueOf(releaseResult.toString());
}
7.websocket消息推送(核心代码)
package io.laokou.chat.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @author 寇申海
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
package io.laokou.chat.websocket;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.validation.constraints.NotNull;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* @author 寇申海
*/
@Component
@Data
@Slf4j
@ServerEndpoint("/ws/{userId}")
public class WebSocketServer {
/**
* 静态变量,用来记录当前在线连接数。设计成线程安全
*/
private static int onlineCount = 0;
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的websocketserver对象
*/
private static CopyOnWriteArraySet<WebSocketServer> webSocketServerCopyOnWriteArraySet = new CopyOnWriteArraySet<>();
/**
* 与某些客户端的连接会话,需要通过它来给客户打发送数据
*/
private Session session;
/**
* 接收userId
*/
private Long userId;
/**
* 连接成功后回调方法
* @param session
* @param userId
* @throws IOException
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId")Long userId) throws IOException {
this.session = session;
//先设置在添加
this.userId = userId;
boolean addFlag = webSocketServerCopyOnWriteArraySet.add(this);
if (addFlag) {
addOnlineCount();
}
log.info("新加入:{}",userId,",在线人数:{}",getOnlineCount());
}
/**
* 连接关闭调用
* @throws IOException
*/
@OnClose
public void onClose() throws IOException {
boolean removeFlag = webSocketServerCopyOnWriteArraySet.remove(this);
if (removeFlag) {
subOnlineCount();
}
log.info("当前在线人数:{}",getOnlineCount());
}
/**
* 收到客户端消息后调用
* @param message
* @param session
* @throws IOException
*/
@OnMessage
public void onMessage(String message,Session session) throws IOException {
log.info("收到来自:{}",this.userId,"的消息:{}",message);
for (WebSocketServer webSocketServer:webSocketServerCopyOnWriteArraySet) {
log.info("在线用户:{}" , webSocketServer.userId);
webSocketServer.SendMessage(message);
}
}
/**
* 发生错误时调用
* @param session
* @param throwable
*/
@OnError
public void one rror(Session session, @NotNull Throwable throwable){
log.error("发生错误:{}",throwable.getMessage());
throwable.printStackTrace();
}
/**
* 发送消息
* @param message
* @throws IOException
*/
private void SendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 发送自定义消息
* @param message
* @param userId
* @throws IOException
*/
public void sendMessages(String message,Long userId)throws IOException{
for (WebSocketServer webSocketServer:webSocketServerCopyOnWriteArraySet){
if (userId == null) {
log.info("推送消息给:{}" , webSocketServer.userId + ",推送内容:{}" , message);
webSocketServer.SendMessage(message);
} else if (userId.equals(webSocketServer.userId)) {
log.info("推送消息给:{}" , webSocketServer.userId + ",推送内容:{}" , message);
webSocketServer.SendMessage(message);
}
}
}
/**
* 返回在线数
* @return
*/
private static synchronized int getOnlineCount(){
return onlineCount;
}
/**
* 连接人数增加时
*/
private static synchronized void addOnlineCount(){
WebSocketServer.onlineCount++;
}
/**
* 连接人数减少时
*/
private static synchronized void subOnlineCount(){
WebSocketServer.onlineCount--;
}
}
如果这都还没理解,那就一键三连,深夜扣我私聊 (咳咳,最近加班严重 那必须得深夜,安排)