利用Redisson实现分布式延时任务调度功能

 定时任务

定时任务是在编码世界中经常遇到的问题,比如定时备份数据库、定时刷新缓存等,可以通过Linux定时任务完成,也可以通过框架如Spring完成,但是在分布式场景中传统单机可以完成功能就不太行了,所以需要借助其他工具来实现任务调度的功能

 场景:在一些订单场景中,用户下单后会锁定一些资源,然后用户非正常退出(没有触发取消订单操作),导致订单资源占用无法释放的问题。

借助工具:redisson分布式服务中的分布式调度任务服务(Scheduler Service)

代码

关单任务

定时执行具体任务,主要实现关单,释放相关资源(优惠券等),设置相关状态标志位

注意:Runnable、Callable接口二选一,必须实现序列化字段,因为任务最终要被序列化存储在Redis中

@Slf4j
@Data
public class CloseOrderTask implements Runnable, Serializable {
    private static final long serialVersionUID = -8193920383968460660L;

    /**
     * 订单id
     */
    private String orderId;

    @Override
    @SneakyThrows
    public void run() {
        // 获取SpringBean,因为此对象不能为Spring所管理,所以需要通过工具获取SpringBean
        RedissonClient redissonClient = SpringUtils.getBean(RedissonClient.class);
        OrderService orderService = SpringUtils.getBean(OrderService.class);
        // 任务必须加锁,因为同一任务可能被多个实例所执行
        RLock lock = redissonClient.getLock(ProductProperties.ORDER_TASK_LOCK + orderId);
        boolean lockFlag = false;
        try {
            //尝试获取锁
            lockFlag = lock.tryLock(0L, TimeUnit.SECONDS);
            if (!lockFlag){ return; }
            //获取到锁,正常执行关单操作
            log.info("get lock order:{} ", orderId);
            OrderDTO order = orderService.getOrder(orderId);
            if(orderDetails.getStatus() == OrderStatus.NEW){
                // 自动关单操作
                orderService.autoCloseOrder(orderDetails);
            }
        }catch (Exception e){
            //TODO 异常情况应添加邮件通知
        }finally {
            if(lockFlag) {
                lock.unlock();
            }
        }

    }
}

关单订单任务调度器

注册一个任务调度器的Bean

注意:Bean的destory方法必须重写,否则在进行关闭Spring容器时,任务调度中心会被关闭,再次启动后不会唤醒

    /**
     * 关单定时任务
     */
    @Bean(destroyMethod = "")
    public RScheduledExecutorService rScheduledExecutorService(@Autowired RedissonClient redissonClient){
        WorkerOptions workerOptions = WorkerOptions.defaults().workers(CPU_NUM + 1);
        ExecutorOptions executorOptions = ExecutorOptions.defaults()
                .taskRetryInterval(10 * 60, TimeUnit.SECONDS);
        RScheduledExecutorService executorService = redissonClient
                .getExecutorService(ProductProperties.CLOSE_ORDER_TASK_EXECUTOR, executorOptions);
        executorService.registerWorkers(workerOptions);
        return executorService;
    }

服务层

例子实现了两个方法,开启一个定时任务和取消一个定时任务。

因为定时任务的取消时通过taskId取消的,所以在提交任务或获取taskId,并对orderId和taskId做了一下映射,在取消订单的时候就比较容易了

@Slf4j
@Service
public class CloseOrderServiceImpl implements CloseOrderService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private RScheduledExecutorService executorService;

    @Override
    public void submitScheduleCloseTask(String orderId) {
        CloseOrderTask closeOrderTask = new CloseOrderTask();
        closeOrderTask.setOrderId(orderId);
        RScheduledFuture<?> schedule = executorService.schedule(closeOrderTask, ProductProperties.ORDER_TTL_MIN, TimeUnit.MINUTES);
        redisTemplate.opsForValue().set(ProductProperties.ORDER_TASK_MAPPING + orderId, schedule.getTaskId(), ProductProperties.ORDER_TTL_MIN, TimeUnit.MINUTES);
        log.info("submit automatic close task. orderId: {}, taskId : {}", orderId, schedule.getTaskId());
    }

    @Override
    public void cancelScheduleCloseTask(String orderId) {
        String taskId = (String) redisTemplate.opsForValue().get(ProductProperties.ORDER_TASK_MAPPING + orderId);
        log.info("cancel automatic close task. orderId: {}, taskId : {}", orderId, taskId);
        if (!StringUtils.isEmptyStr(taskId)){
            executorService.cancelTask(taskId);
        }

    }
}

具体使用地点

在用户进行下单操作时可以提交一个定时任务

在用户取消订单时可以取消orderId对应的定时任务

原理分析

数据结构

提交一个任务后,Redis中会添加4个键值对

利用Redisson实现分布式延时任务调度功能

  • {任务队列名:包路径}:counter

    • string类型

    • 记录当前有多少任务待执行

  • {任务队列名:包路径}:shceduler

    • zset类型

    • 通过score排名,任务队列。通过value关联tasks具体任务

  • {任务队列名:包路径}:retry-interval

    • string类型

    • 任务重试时间,默认5000

  • {任务队列名:包路径}:tasks

    • Hash类型

    • 所有具体任务,通过key与shceduler关联执行顺序

    • value序列化的任务

执行流程

提交一个延时任务调度任务,会在:scheduler中产生两条数据,分别是任务下一次执行时间和任务下一次执行时间+重试时间

Spring在注册ExecutorService时指定了工人(worker)的数量,会在本地起线程来执行这些待执行的任务。

问题探究

1、任务过期,客户端挂了,然后过一度时间重启,任务是否还会执行。

客户端重启后,过期的任务都会被拿到客户端里面进行消费。

存在同时启动多个客户端,是否会发生任务抢占问题,内部是否有锁机制?答案是会的

2、同一个任务是否会被多个客户端执行

 通过两台服务器,每台提交10个任务,多轮测试没有发生同一个任务被多次执行的情况。

加上线程执行sleep后同样进行测试,发现同一任务被不同客户端消费,所以需要有锁机制

利用Redisson实现分布式延时任务调度功能

3、任务(schedule 单次)执行完毕后,Redis是否会删除任务队列中的任务

任务执行完毕后,会删除tasks、scheduler中的任务,同时counter-1

4、任务队列的过期策略

所有任务的过期时间都是-1(永不过期),保证任务不会丢

5、任务执行异常情况

能够正常捕捉到异常,并进行处理,同时任务会从Redis中删除

上一篇:flowable实战(六)flowable的意见表和附件表应用


下一篇:Maven基础知识