使用redis的zset实现简单的延时队列

一、需求切入点
   在公司做的一个系统业务需要有个定时提醒的功能(数据在mysql中),要求提醒的时间差精准到分钟
解决方案有:

  1. 使用定时器,每分钟执行一次,查符合提醒的数据,发起提醒(数据库连接与系统的负载都承受不住的!!)
  2. 将待提醒数据提前查出存进redis中,根据提醒时间设置过期时间,做redis的过期监听,监听到过期的数据再做业务处理(优点 : 不用实时查数据库,一定程度上减少系统压力 缺点: 一旦系统重启或者系统出现异常,可能导致一些过期的数据没有监听到,造成数据没有推送)
  3. 使用一个延时队列,利用redis的zset(sort set,有序不重复集合,关联分数score进行排序),将提醒时间作为分数,提取符合条件的score对应的集合发起提醒(本文所述也是围绕这个方案)
    二、延时队列的基本操作流程
    1. 基本流程图
      使用redis的zset实现简单的延时队列
    2. 代码实现
      生产者,只关心数据进队列
public class MessageProvider {
    // 延时队列的服务 通过这个bean来统一管理数据
    private final DelayingQueueService delayingQueueService;

    private static String APPOINTMENT_REMIND = "APPOINTMENT_REMIND";

    /**
     * 往队列中添加消息
     * @param messageContent
     */
    public void sendMessage(String messageContent, long delay) {
        try {
            //业务代码 ……
              //  将分装好的数据写进队列
                delayingQueueService.push(queueMessage);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 撤回消息 业务延伸
     * @param members
     */
    public  void withdrawMessage(Long members){
        delayingQueueService.removeByMembersId(members);
    }


}

消费者,只关心需要消费的数据

// 从延伸队列拉取符合消费的数据
 List<QueueMessage> msgList = delayingQueueService.pull();

   ```
msgList.stream().forEach(msg -> {
                        // 拿出已经到期的预约提示 发起提醒
                        if (current >= msg.getDelayTime()) {
                            try {
                            //  进行业务消费  ……   
                            //成功消费后移除消息
                            delayingQueueService.remove(msg);
                        }
```
                

延时队列实现

public class DelayingQueueService {

    private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build();
    //key:membersId value:message
    private static ConcurrentHashMap <Long,String> membersMap= new ConcurrentHashMap<>();

    private final StringRedisTemplate redisTemplate;

    /**
     * 可以不同业务用不同的key
     */
    public static final String QUEUE_NAME = "message:queue";
    /**
     * 锁key
     */
    public static final String LOCK_KEY="message_lock_key";


    /**
     * 插入消息
     *
     * @param queueMessage
     * @return
     */
    @SneakyThrows
    public Boolean push(QueueMessage queueMessage) {
 
        String messageStr = mapper.writeValueAsString(queueMessage);
        Boolean addFlag = redisTemplate.opsForZSet().add(QUEUE_NAME, messageStr, queueMessage.getDelayTime());
        membersMap.put(membersId,messageStr);
        return addFlag;
    }

    /**
     * 移除消息
     *
     * @param queueMessage
     * @return
     */
    @SneakyThrows
    public Boolean remove(QueueMessage queueMessage) {
  
        Long remove = redisTemplate.opsForZSet().remove(QUEUE_NAME, mapper.writeValueAsString(queueMessage));
        if(remove>0){
            membersMap.remove(membersId);
        }
        return remove > 0 ? true : false;
    }




    /**
     * 拉取最新需要
     * 被消费的消息
     * rangeByScore 根据score范围获取 0-当前时间戳可以拉取当前时间及以前的需要被消费的消息
     *
     * @return
     */
    public List<QueueMessage> pull() {

        List<QueueMessage> msgList  =new ArrayList<>();
            try {
                Set<String> strings = redisTemplate.opsForZSet().rangeByScore(QUEUE_NAME, 0, System.currentTimeMillis());
                if (strings == null) {
                    return null;
                }
                msgList = strings.stream().map(msg -> {
                    QueueMessage message = null;
                    try {
                        message = mapper.readValue(msg, QueueMessage.class);
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }
                    return message;
                }).collect(Collectors.toList());
            } catch (Exception e) {
                log.error(e.toString());
            }
        return msgList;

    }

    //获得锁
    public Boolean getLock(){
         boolean lock = false;
        //获得锁
        lock = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY,QUEUE_NAME+"is locking !",30, TimeUnit.SECONDS);
        return lock;
    }

    public void releaseLock(){
        redisTemplate.delete(LOCK_KEY);
    }


}

三、结束语

本文所述的方法也是存在一些小的缺点,比如,数据的正常操作依赖于第三方组件,如果redis挂掉了,这个服务就down掉了,实现延时队列的方法有很多种,基于业务与系统本身的情况,兼容利弊去做一些取舍,以达到最好的效果

上一篇:redis复习笔记(二)


下一篇:【Redis】——常用五大数据类型之Zset