基于smart-retry源码改造升级

前瞻

阅读本篇前,请先阅读smart-retry源码阅读文章

背景

smart-retry有以下缺点

  • 只支持入参有且仅有一个
  • 每一个重试方法都对应一个定时任务,会造成线程的过度使用
  • 不支持抛出指定异常后重试

基于此,对smart-retry做了升级改造

改造后特性

针对smart-retry的确定的改进

  • 支持多个入参方法重试
  • 只提供重试接口给用户,具体定时任务选用由用户决定,灵活性大大增加
  • 支持抛出指定异常后重试


新增的功能

  • 支持配置在注解上配置是否在执行方法前入库
  • 增加了重试记录表,把每次重试都记录下来
  • 重试规则由cron表达式改为按时间,或者间隔重试,更通俗易懂

代码跟进改造点

下边会讲述代码如何改造

支持多个入参方法重试

改造前入参,以及参数类型都是单个,改成数组,我列举这两种的前后对比,因为改的地方比较多,我不一一列举

改造前入参

    public Object handle(Object arg) {

改造后入参

 public Object handle(Object arg[]) {

改造前入参类型(用于序列化和反序列化)

    public Class<?> getInputArgsType() {

        return inputArgsType;

    }

改造后入参类型(用于序列化和反序列化)

    public Class<?>[] getInputArgsType() {

        return inputArgsType;

    }

只提供重试接口给用户,具体定时任务选用由用户决定,灵活性大大增加

我们先看原来的代码

    @Override

    public void afterSingletonsInstantiated() {

        postedClasseCache.clear();


        this.retryTaskMapper = defaultListableBeanFactory.getBean(RetryTaskMapper.class);

        this.retryRegistry = defaultListableBeanFactory.getBean(RetryRegistry.class);


        boolean beforeTask = environment.getProperty(EnvironmentConstants.RETRY_BEFORETASK, Boolean.class, Boolean.TRUE);

        this.retrySerializer = getRetrySerializerFromBeanFactory(defaultListableBeanFactory);

        if (this.retrySerializer == null) {

            this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(retryTaskMapper, beforeTask);

        } else {

            this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(new DefaultRetryTaskFactory(retrySerializer), retryTaskMapper, beforeTask);

        }


        retryHandlers.forEach(this::registerJobBean);


        retryHandlers.clear();

    }

    protected void registerJobBean(RetryHandler retryHandler) {

        if (retryHandler.identity().length() > 50) {

            throw new IllegalArgumentException("identity=" + retryHandler.identity() + " is too long, it must be less than 50");

        }


        RetryHandler retryHandlerProxy = retryHandlerPostProcessor.doPost(retryHandler);

        RetryHandlerRegistration.registry(retryHandlerProxy);


        RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer);


        retryRegistry.register(retryHandler, retryProcessor);

    }

    @Override

    public void register(RetryHandler retryHandler, RetryProcessor retryProcessor) {

        if (StringUtils.isBlank(retryHandler.cron())) {

            throw new IllegalArgumentException("identity=" + retryHandler.identity() + ", 使用Elastic-Job注册器,必须指定RetryHandler/RetryFunction的cron表达式");

        }

        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);

        beanDefinitionBuilder.addConstructorArgValue(new RetryJob(retryProcessor));

        beanDefinitionBuilder.addConstructorArgValue(registryCenter);

        beanDefinitionBuilder.addConstructorArgValue(createLiteJobConfiguration(retryHandler));

        if (jobEventConfiguration != null) {

            beanDefinitionBuilder.addConstructorArgValue(jobEventConfiguration);

        }

        beanDefinitionBuilder.addConstructorArgValue(getElasticJobListeners());

        beanDefinitionBuilder.setInitMethodName("init");


        String jobBeanName = getJobBeanName(retryHandler);

        defaultListableBeanFactory.registerBeanDefinition(jobBeanName, beanDefinitionBuilder.getBeanDefinition());


        //此处的getBean调用是为了手工触发Bean的初始化

        defaultListableBeanFactory.getBean(jobBeanName);

        log.info("identity={}已成功注册到Elastic-Job", retryHandler.identity());

    }

可以看到,每一个retryHandler会单独注册一个定时任务,并且注册的时候需要指定注册到哪个定时任务

再看改造后代码,先拿到所有retryHandler,封装为可执行的RetryFunctionProcessor

    @Override

    public void afterSingletonsInstantiated() {

        postedClasseCache.clear();


        this.retryTaskMapper = defaultListableBeanFactory.getBean(RetryTaskMapper.class);

        RetryFunctionProcessor retryFunctionProcessor = defaultListableBeanFactory.getBean(RetryFunctionProcessor.class);


        this.retrySerializer = getRetrySerializerFromBeanFactory(defaultListableBeanFactory);


        List<RetryProcessor> retryProcessorList = new ArrayList<>();

        for (RetryHandler retryHandler : retryHandlers) {

            if (retryHandler.identity().length() > 50) {

                throw new IllegalArgumentException("identity=" + retryHandler.identity() + " is too long, it must be less than 50");

            }


            RetryHandler retryHandlerProxy =new ImmediatelyRetryHandler(retryHandler, new DefaultRetryTaskFactory(retrySerializer), retryTaskMapper, retryRule);

            RetryHandlerRegistration.registry(retryHandlerProxy);


            RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer, retryRule);

            retryProcessorList.add(retryProcessor);

        }

        

        retryFunctionProcessor.register(retryProcessorList);


        retryHandlers.clear();

    }

RetryFunctionProcessor其实就是把retryProcessorList保存为成员变量,提供给用户去调用,调用方式将在如何使用篇章讲解

支持抛出特定异常后重试

smart-retry只能在抛出RuntimeException的时候,进行重试,改造后,可以支持抛出特定异常后重试,这样当自定义异常时,更加灵活,代码改造点如下

RetryFunction注解增加变量

    /**

     * 如果方法抛出该异常则会创建重试任务

     */

    Class<? extends RuntimeException>[] retryException() default {RuntimeException.class};

在执行方法抛出异常时,判断是否在配置的这些异常范围,来决定是否要进行重试

        } catch (RuntimeException e) {

            boolean isIncludeException = RetryHandlerUtils.isIncludeException(e, retryException);

            if (!isIncludeException) {

                throw e;

            }

支持配置在注解上配置是否在执行方法前入库

smart-retry是支持是否在执行方法前入库的,但是是全局的配置,改造后可以对每一个任务单独配置

    /**

     * 是否在执行任务之前插入数据库 |配置false则表示,只有任务执行报错才插入数据库|

     * @return

     */

    boolean beforeTask() default false;

具体逻辑就比较简单了,就是根据配置,来决定是否在抛出异常前插入数据

增加了重试记录表,把每次重试都记录下来

smart-retry只有一张重试表,比如重试过5次,没办法知道这5次重试的具体详情,只能知道最后一次是成功还是失败等,所以增加了重试记录表,来记录每一次重试的记录,利用了spring的切面的原理,对原代码没有任何侵入

@Aspect

@Slf4j

public class RetryTaskRecordAspect implements ApplicationContextAware {


    private ApplicationContext applicationContext;

    

    @Pointcut("execution(* com.aliyun.gts.bpaas.retry.core.RetryTaskMapper.insert(..)) ||" +

            "execution(* com.aliyun.gts.bpaas.retry.core.RetryTaskMapper.update(..))")

    public void pointCut(){

        

    }

    

    @Around(value = "pointCut()")

    public Object insertTaskRecord(ProceedingJoinPoint proceedingJoinPoint) {

        Object o = null;

        try {

            o = proceedingJoinPoint.proceed();

        } catch (Throwable throwable) {

            log.error("切面insertTaskRecord报错,", throwable);

        }


        Object[] args = proceedingJoinPoint.getArgs();

        RetryTask retryTask = (RetryTask) args[0];

        RetryTaskMapper retryTaskMapper = applicationContext.getBean(RetryTaskMapper.class);

        retryTaskMapper.insertTaskRecord(retryTask);

        return o;

    }


    @Override

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        this.applicationContext = applicationContext;   

    }

}

重试规则由cron表达式改为按时间,或者间隔重试,更通俗易懂

改造前:smart-retry原来的重试规则,是为每一条重试任务配置一个cron表达式

改造后:重试规则在application.properties中全局配置,更方便管理,cron表达式改为按时间,或者间隔,更容易配置,不易配置错

    /**

     * 重试规则,有两种方式,指定时间或者间隔

     * 1.时间,格式为:onTime-时间-最大重试次数,例如  onTime-23:59:59-3,onTime为类型,23:59:59为时间的时分秒,3位最大重试次数,用中划线隔开

     * 2.间隔,格式为:onInterval-间隔重试规则,例如 onInterval-10,20,30,onInterval为类型,10,20,30表示每次重试的间隔时间,单位为秒

     */

    private String retryRule;

指定时间重试的实现逻辑:获取到下次的执行时间,如果执行时间小于当前时间,则需要加一天,否则,就是该时间

指定间隔重试的实现逻辑:现在的时间,加上重试次数对应的间隔时间即可

代码如下

    /**

     * 获取下次执行时间

     * @param retryRule

     * @param retryCount

     * @return

     */

    public static LocalDateTime getNextExecTime(RetryRule retryRule, int retryCount) {

        

        if (retryRule instanceof OnTimeRetryRule) {

            OnTimeRetryRule onTimeRetryRule = (OnTimeRetryRule) retryRule;


            LocalTime retryTime = onTimeRetryRule.getRetryTime();


            LocalDateTime nextExecTime = LocalDateTime.of(LocalDate.now(), retryTime);

            

            if (nextExecTime.isAfter(LocalDateTime.now())) {

                return nextExecTime;

            } else {

                return nextExecTime.plusDays(1L);

            }

        }


        OnIntervalRetryRule onIntervalRetryRule = (OnIntervalRetryRule) retryRule;


        Long[] retryInterval = onIntervalRetryRule.getRetryInterval();

        

        return plusSeconds(retryInterval[retryCount]);

    }

如何使用


建表

接口重推依赖两张表重试表以及重试记录表

重试表sql

create table sys_retry_task (

task_id bigint not null primary key auto_increment,

identity_name varchar(50) not null COMMENT '任务的唯一标识',

params text COMMENT '参数',

status tinyint not null COMMENT '状态。1: 处理中,2: 成功,3: 失败',

retry_count int not null default 0 COMMENT '重试次数',

remark varchar(1000) COMMENT '备注',

create_date datetime not null,

edit_date datetime,

next_date DATETIME COMMENT '下次执行的时间') ENGINE=InnoDB COMMENT='系统重试表';


create index idx_identityname_status ON sys_retry_task(identity_name asc,status asc);

重试记录表sql

CREATE TABLE `sys_retry_task_record`

(

    `id`            BIGINT(20)    NOT NULL AUTO_INCREMENT,

    `task_id`       BIGINT(20)    NULL     DEFAULT NULL COMMENT '重试任务id',

    `identity_name` VARCHAR(50)   NOT NULL COMMENT '任务的唯一标识' COLLATE 'utf8_general_ci',

    `params`        TEXT          NULL     DEFAULT NULL COMMENT '参数' COLLATE 'utf8_general_ci',

    `status`        TINYINT(4)    NOT NULL COMMENT '状态。1: 处理中,2: 成功,3: 失败',

    `retry_count`   INT(11)       NOT NULL DEFAULT '0' COMMENT '重试次数',

    `remark`        VARCHAR(1000) NULL     DEFAULT NULL COMMENT '备注' COLLATE 'utf8_general_ci',

    `next_date`     DATETIME      NULL     DEFAULT NULL COMMENT '下次执行的时间',

    `edit_date`     DATETIME      NULL     DEFAULT NULL,

    `create_date`   DATETIME      NOT NULL,

    PRIMARY KEY (`id`) USING BTREE,

    INDEX `idx_identityname_status` (`identity_name`, `status`) USING BTREE

)

    COMMENT ='系统重试记录表'

    COLLATE = 'utf8_general_ci'

    ENGINE = InnoDB


引入依赖

        <dependency>

            <groupId>com.aliyun.gts.bpaas</groupId>

            <artifactId>aliyun-gts-retry-starter</artifactId>

            <version>1.0.0-SNAPSHOT</version>

        </dependency>

application.properties配置

# 数据源配置,注意要和建表的数据源和数据库在同一个

spring.datasource.driver-class-name=com.mysql.jdbc.Driver

spring.datasource.url=jdbc:mysql://localhost:3306/test?characterEncoding=utf8&autoReconnect=true&useSSL=false

spring.datasource.username=root

spring.datasource.password=123456


# 重试开关,默认为false

gts.retry.enable=true

# 重试的规则

gts.retry.retry-rule=onInterval-10,30,30,30

# 序列化方式,支持fastjson,jackson以及gson,默认为fastjson

gts.retry.serialize-type=fastjson

下边对重试规则配置gts.retry.retry-rule进行详细说明

重试规则,有两种方式,指定时间或者间隔
1. 时间,格式为:onTime-时间-最大重试次数,例如(onTime-23:59:59-3),onTime为类型,23:59:59为时间的时分秒,3位最大重试次数,用中划线隔开
2. 间隔,格式为:onInterval-间隔重试规则,例如 (onInterval-10,20,30),onInterval为类型,10,20,30表示每次重试的间隔时间,单位为秒

编程界面

接口重推需要的编程界面有两处,第一处是在方法上打上@RetryFunction注解,第二处是在定时任务中调用处理定时任务的方法

在方法上打上@RetryFunction注解

    @RetryFunction(identity = "demo.simplest", beforeTask = true, 

            retryListener = SimpleTestRetryListener.class, 

            retryException = {RuntimeException.class}, ignoreException = true)

    public void simplestWithId(int id) {

        

        log.info("simplestWithId[{}]执行开始", id);

        // doSomething()

        log.info("simplestWithId[{}]执行完成", id);

    }

下面对注解中的每一个属性做详细的解释

属性

类型

备注

默认值

identity

string

唯一标识,系统内不能重复

长度要小50个字节

类的全名称+方法名称

beforeTask

boolean

是否在执行任务之前插入数据

配置false则表示,只有任务执行报错才插入数据库|,true表示在方法执行前就会插入数据库

false

retryListener

Class<? extends RetryListener>

任务监听器。可以在任务重试、任务完成、任务失败时进行回调

onRetry():每次重试时触发(执行后触发)

onComplete():任务完成时触发

onError():失败时触发(超过最大重试次数)

不进行任务监听

retryException

Class<? extends RuntimeException>[]

如果方法抛出指定异常则会创建重试任务或执行重试动作

{RuntimeException.class}

ignoreException

boolean

当重试任务有多个的时候,上一个重试报错,是否忽略错误继续执行下一个任务

true

定时任务中触发重试任务示例

@Slf4j

@Component

public class DistributeRetrySchedule extends AbstractGtsSchedulerTaskProcessor {


    

    @Autowired

    private RetryFunctionProcessor retryFunctionProcessor;

    

    @Override

    public String getTaskId(){

        return "retry-job";

    }


    @Override

    public GtsSchedulerTaskResult process(GtsSchedulerTaskParameter parameter) throws Exception{

        DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        log.info("执行定时任务,当前时间{}",format.format(new Date()));

        retryFunctionProcessor.processTask();

        return GtsSchedulerTaskResult.successResult();

    }

}

重试任务持久化示例

重试表

基于smart-retry源码改造升级

重试记录表

基于smart-retry源码改造升级

上一篇:ABAP,Java, nodejs和go语言的web server编程


下一篇:高可用集群