老寇云-java技术栈进阶-武侠篇-redis分布式锁实现抢红包(文章持续调优中)

小伙伴们,你们好呀!我是老寇!废话不多说,跟我一起抢红包

目录

1.运行效果图(b站-地址)

2.技术架构

3.技术选型

4.业务逻辑(重点)

5.分布式锁的设计(个人理解)

6.分布式锁的实现(核心代码)

7.websocket消息推送(核心代码)

1.运行效果图(b站-地址

<iframe allowfullscreen="true" data-mediaembed="bilibili" id="SQMBxci6-1625893173770" src="https://player.bilibili.com/player.html?aid=804078092"></iframe>

redis分布式锁实现抢红包

2.技术架构

基础框架:springboot + springcloud

消息队列:rabbitMQ

数据缓存:redis

消息推送:websocket

3.技术选型

为什么要用redis实现分布式锁的方案,放弃synchronized的方案,采用synchronized在分布式系统无法做到持有的锁是同一把锁(单体应用适用,不适用于微服务应用),因此这个方案被丢弃啦

4.业务逻辑(重点)

* 进阶-> 采用gateway网关限流 -> 令牌桶算法 + redis,限制并发量,请求过多放入队列->rabbitmq
* 1.判断红包是否还有剩余的(套路:先在redis里面读,redis没有数据,然后去mysql查询放到redis)
* 2.判断该用户是否已经抢了红包
* 3.获取红包金额,并剩余红包数和红包金额写入redis
* 4.保存抢红包的记录(记录放入mysql)
* 5.响应给前端(调用消息服务,用websocket推送消息到前端->用户点进来的时候就必须初始化连接)

5.分布式锁的设计(个人理解)

在分布式系统中,会遇到服务宕机的情况,服务宕机有很多情况,比如说内存不足,流量暴增等等都会让服务宕机,因此设计分布式锁就要考虑两点:

1.原子性:理解为一个线程持有锁的时候,其他线程只能等待,当该锁释放时,其他线程才有去抢占该锁的机会,最终只会有一个线程持有该锁,我举个例子:大家都有乘坐电梯或摩天轮的经历,排除满员的情况,在同一时刻,按下电梯按钮,每个人做到电梯的概率是相同的,停到某一层,就是说该人可以做电梯,也就该层的人都持有这个电梯。这一栋的楼层可以看做一个个的线程,电梯大致可以看成锁,特别是吃饭或下班的时候,都想快点做到电梯,这个也是一个高并发的场景。

2.服务宕机:服务宕机导致锁无法释放,然后就造成了死锁的情况,设置过期时间,服务重启后,会自动清除过期的数据

6.分布式锁的实现(核心代码)

基于原子性设计,因此就用lua脚本语句,获取锁,不要忘了释放锁(大牛二虎上代码)

    private static final RedisScript<String> SCRIPT_LOCK = new DefaultRedisScript<>(
            //当这个key不存在,则设置一个值,并设置过期时间(px表示毫秒)
            "return redis.call('set',KEYS[1],ARGV[1],'NX','PX',ARGV[2])",String.class
    );
    private static final RedisScript<String> SCRIPT_UNLOCK = new DefaultRedisScript<>(
            //判断有这个key没有返回'false',有则删除 并返回'true'
            "if redis.call('get',KEYS[1]) == ARGV[1] then return tostring(redis.call('del',KEYS[1]) == 1) else return 'false' end",String.class
    );
    private static final String LOCK_SUCCESS = "OK";

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public boolean acquireLock(String lockKey, String lockValue) {
        Object lockResult = redisTemplate.execute(SCRIPT_LOCK,
                redisTemplate.getStringSerializer(),
                redisTemplate.getStringSerializer(),
                Collections.singletonList(lockKey),
                lockValue);
        return LOCK_SUCCESS.equals(lockResult);
    }

    @Override
    public boolean releaseLock(String lockKey,String lockValue) {
        Object releaseResult = redisTemplate.execute(SCRIPT_UNLOCK,
                redisTemplate.getStringSerializer(),
                redisTemplate.getStringSerializer(),
                Collections.singletonList(lockKey),
                lockValue);
        return Boolean.valueOf(releaseResult.toString());
    }

7.websocket消息推送(核心代码)

package io.laokou.chat.websocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author 寇申海
 */
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}
package io.laokou.chat.websocket;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.validation.constraints.NotNull;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @author 寇申海
 */
@Component
@Data
@Slf4j
@ServerEndpoint("/ws/{userId}")
public class WebSocketServer {

    /**
     * 静态变量,用来记录当前在线连接数。设计成线程安全
     */
    private static int onlineCount = 0;
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的websocketserver对象
     */
    private static CopyOnWriteArraySet<WebSocketServer> webSocketServerCopyOnWriteArraySet = new CopyOnWriteArraySet<>();
    /**
     * 与某些客户端的连接会话,需要通过它来给客户打发送数据
     */
    private Session session;
    /**
     * 接收userId
     */
    private Long userId;

    /**
     * 连接成功后回调方法
     * @param session
     * @param userId
     * @throws IOException
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId")Long userId) throws IOException {
        this.session = session;
        //先设置在添加
        this.userId = userId;
        boolean addFlag = webSocketServerCopyOnWriteArraySet.add(this);
        if (addFlag) {
            addOnlineCount();
        }
        log.info("新加入:{}",userId,",在线人数:{}",getOnlineCount());
    }

    /**
     * 连接关闭调用
     * @throws IOException
     */
    @OnClose
    public void onClose() throws IOException {
        boolean removeFlag = webSocketServerCopyOnWriteArraySet.remove(this);
        if (removeFlag) {
            subOnlineCount();
        }
         log.info("当前在线人数:{}",getOnlineCount());
    }

    /**
     * 收到客户端消息后调用
     * @param message
     * @param session
     * @throws IOException
     */
    @OnMessage
    public void onMessage(String message,Session session) throws IOException {
        log.info("收到来自:{}",this.userId,"的消息:{}",message);
        for (WebSocketServer webSocketServer:webSocketServerCopyOnWriteArraySet) {
                log.info("在线用户:{}" , webSocketServer.userId);
                webSocketServer.SendMessage(message);
        }
    }

    /**
     * 发生错误时调用
     * @param session
     * @param throwable
     */
    @OnError
    public void one rror(Session session, @NotNull Throwable throwable){
        log.error("发生错误:{}",throwable.getMessage());
        throwable.printStackTrace();
    }

    /**
     * 发送消息
     * @param message
     * @throws IOException
     */
   private void SendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
   }

    /**
     * 发送自定义消息
     * @param message
     * @param userId
     * @throws IOException
     */
    public   void  sendMessages(String message,Long userId)throws IOException{
        for (WebSocketServer webSocketServer:webSocketServerCopyOnWriteArraySet){
                if (userId == null) {
                    log.info("推送消息给:{}" , webSocketServer.userId + ",推送内容:{}" , message);
                    webSocketServer.SendMessage(message);
                } else if (userId.equals(webSocketServer.userId)) {
                    log.info("推送消息给:{}" , webSocketServer.userId + ",推送内容:{}" , message);
                    webSocketServer.SendMessage(message);
                }
        }
    }

    /**
     * 返回在线数
     * @return
     */
    private static synchronized int getOnlineCount(){
        return onlineCount;
    }

    /**
     * 连接人数增加时
     */
    private static synchronized void addOnlineCount(){
        WebSocketServer.onlineCount++;
    }

    /**
     * 连接人数减少时
     */
    private static synchronized void subOnlineCount(){
        WebSocketServer.onlineCount--;
    }

}

如果这都还没理解,那就一键三连,深夜扣我私聊 (咳咳,最近加班严重 那必须得深夜,安排)

上一篇:JavaIO编程


下一篇:RabbitMQ基础理解