我的第一个开源项目:基于Redis的延迟队列

redis-delay

背景

我们先看看以下业务场景:

当订单一直处于未支付状态时,如何在半小时后自动取消?

如何定期检查处于退款状态的订单是否已经退款成功?

实现方案

  1. 定时任务扫表

为了解决以上问题,最简单直接的办法就是定时去扫表。每个业务都要维护一个自己的扫表逻辑。

  • 优点:简单
  • 缺点:每分钟全局扫表,浪费资源,损耗数据库性能
  1. 消息中间件

使用消息中间件可以实现延迟消息,像RocketMQ就自带这个功能

  • 优点:成熟稳定,使用方便
  • 缺点:针对那些项目里没有使用消息中间件的,需要额外部署,成本大。

3.基于Redis

根据Redis的zset、list的特性,我们可以利用Redis来实现一个延迟队列。

  • 优点:性能高,容易实现
  • 缺点:消息持久化依赖于redis

方案参考有赞延迟队列:https://tech.youzan.com/queuing_delay/

软件架构

我的第一个开源项目:基于Redis的延迟队列

消息流程:

  • 用户对某个商品下单,系统创建订单成功,同时往延迟队列里put一个job。
  • 延迟队列收到该job后,先往job pool中存入job信息,然后根据delay计算出绝对执行时间,放入delay bucket.
  • 如果不是延迟job,直接放入ready set中。
  • DelayMoveToReadyTimer定时器循环取出delay中延迟时间达到的数据,放入ready set里
  • ReadyQueueTimer定时器循环取出消息消费

接入流程

在项目里的redis-delay-test测试项目中有完整的接入示例

大概流程如下:

  1. 引入pom
        <dependency>
            <groupId>com.mmc</groupId>
            <artifactId>redis-delay-core</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
  1. 注入两个bean
@Configuration
public class DelayConfig {

    @Autowired
    private RedisTemplate redisTemplate;

    @Bean
    public DelayFacade delayFacade(){
        return new DelayFacadeImpl(redisTemplate);
    }

    @Bean
    public RedisDelayService redisDelayService(){
        return new RedisDelayServiceImpl(delayFacade());
    }

}
  1. 建议创建一个topic枚举类
public enum  TopicEnum {

    ORDER,
    LOG;
}
  1. 继承AbstractTopicRegister,实现不同的topic的消息消费的业务方法
@Component
public class OrderTopicRegister extends AbstractTopicRegister {

    @Override
    public String getTopic() {
        return ORDER.name();
    }

    @Override
    public boolean handle(Job job) {
        System.out.println("处理订单:"+ JSON.toJSONString(job));
        return true;
    }


}
  1. 使用api

目前提供三个api:

  • asyncPutMessage 异步放入消息
  • syncPutMessage 同步放入消息
  • queryAllMessage 查询所有消息
 @RequestMapping("/testDelay")
    public String testDelay(Long id, @RequestParam(required = false) Long delayTime, String topic){
        JobParam jobParam=new JobParam();
        jobParam.setBody("");
        jobParam.setJobId(id);
        jobParam.setTopic(topic);
        jobParam.setDelayTime(delayTime==null?0:delayTime);
        jobParam.setRetries(5);
        redisDelayService.syncPutMessage(jobParam);
        return "success";
    }

项目地址

https://gitee.com/mmcLine/redis-delay/

只完成了第一版,需要优化的地方还很多。欢迎大家一起来完善。

我的第一个开源项目:基于Redis的延迟队列

上一篇:[IOI2021D2T1] dna 简要题解


下一篇:linux 系统如何删除特定的行