前瞻
阅读本篇前,请先阅读smart-retry源码阅读文章
背景
smart-retry有以下缺点
- 只支持入参有且仅有一个
- 每一个重试方法都对应一个定时任务,会造成线程的过度使用
- 不支持抛出指定异常后重试
基于此,对smart-retry做了升级改造
改造后特性
针对smart-retry的确定的改进
- 支持多个入参方法重试
- 只提供重试接口给用户,具体定时任务选用由用户决定,灵活性大大增加
- 支持抛出指定异常后重试
新增的功能
- 支持配置在注解上配置是否在执行方法前入库
- 增加了重试记录表,把每次重试都记录下来
- 重试规则由cron表达式改为按时间,或者间隔重试,更通俗易懂
代码跟进改造点
下边会讲述代码如何改造
支持多个入参方法重试
改造前入参,以及参数类型都是单个,改成数组,我列举这两种的前后对比,因为改的地方比较多,我不一一列举
改造前入参
public Object handle(Object arg) {
改造后入参
public Object handle(Object arg[]) {
改造前入参类型(用于序列化和反序列化)
public Class<?> getInputArgsType() {
return inputArgsType;
}
改造后入参类型(用于序列化和反序列化)
public Class<?>[] getInputArgsType() {
return inputArgsType;
}
只提供重试接口给用户,具体定时任务选用由用户决定,灵活性大大增加
我们先看原来的代码
@Override
public void afterSingletonsInstantiated() {
postedClasseCache.clear();
this.retryTaskMapper = defaultListableBeanFactory.getBean(RetryTaskMapper.class);
this.retryRegistry = defaultListableBeanFactory.getBean(RetryRegistry.class);
boolean beforeTask = environment.getProperty(EnvironmentConstants.RETRY_BEFORETASK, Boolean.class, Boolean.TRUE);
this.retrySerializer = getRetrySerializerFromBeanFactory(defaultListableBeanFactory);
if (this.retrySerializer == null) {
this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(retryTaskMapper, beforeTask);
} else {
this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(new DefaultRetryTaskFactory(retrySerializer), retryTaskMapper, beforeTask);
}
retryHandlers.forEach(this::registerJobBean);
retryHandlers.clear();
}
protected void registerJobBean(RetryHandler retryHandler) {
if (retryHandler.identity().length() > 50) {
throw new IllegalArgumentException("identity=" + retryHandler.identity() + " is too long, it must be less than 50");
}
RetryHandler retryHandlerProxy = retryHandlerPostProcessor.doPost(retryHandler);
RetryHandlerRegistration.registry(retryHandlerProxy);
RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer);
retryRegistry.register(retryHandler, retryProcessor);
}
@Override
public void register(RetryHandler retryHandler, RetryProcessor retryProcessor) {
if (StringUtils.isBlank(retryHandler.cron())) {
throw new IllegalArgumentException("identity=" + retryHandler.identity() + ", 使用Elastic-Job注册器,必须指定RetryHandler/RetryFunction的cron表达式");
}
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
beanDefinitionBuilder.addConstructorArgValue(new RetryJob(retryProcessor));
beanDefinitionBuilder.addConstructorArgValue(registryCenter);
beanDefinitionBuilder.addConstructorArgValue(createLiteJobConfiguration(retryHandler));
if (jobEventConfiguration != null) {
beanDefinitionBuilder.addConstructorArgValue(jobEventConfiguration);
}
beanDefinitionBuilder.addConstructorArgValue(getElasticJobListeners());
beanDefinitionBuilder.setInitMethodName("init");
String jobBeanName = getJobBeanName(retryHandler);
defaultListableBeanFactory.registerBeanDefinition(jobBeanName, beanDefinitionBuilder.getBeanDefinition());
//此处的getBean调用是为了手工触发Bean的初始化
defaultListableBeanFactory.getBean(jobBeanName);
log.info("identity={}已成功注册到Elastic-Job", retryHandler.identity());
}
可以看到,每一个retryHandler会单独注册一个定时任务,并且注册的时候需要指定注册到哪个定时任务
再看改造后代码,先拿到所有retryHandler,封装为可执行的RetryFunctionProcessor
@Override
public void afterSingletonsInstantiated() {
postedClasseCache.clear();
this.retryTaskMapper = defaultListableBeanFactory.getBean(RetryTaskMapper.class);
RetryFunctionProcessor retryFunctionProcessor = defaultListableBeanFactory.getBean(RetryFunctionProcessor.class);
this.retrySerializer = getRetrySerializerFromBeanFactory(defaultListableBeanFactory);
List<RetryProcessor> retryProcessorList = new ArrayList<>();
for (RetryHandler retryHandler : retryHandlers) {
if (retryHandler.identity().length() > 50) {
throw new IllegalArgumentException("identity=" + retryHandler.identity() + " is too long, it must be less than 50");
}
RetryHandler retryHandlerProxy =new ImmediatelyRetryHandler(retryHandler, new DefaultRetryTaskFactory(retrySerializer), retryTaskMapper, retryRule);
RetryHandlerRegistration.registry(retryHandlerProxy);
RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer, retryRule);
retryProcessorList.add(retryProcessor);
}
retryFunctionProcessor.register(retryProcessorList);
retryHandlers.clear();
}
RetryFunctionProcessor其实就是把retryProcessorList保存为成员变量,提供给用户去调用,调用方式将在如何使用篇章讲解
支持抛出特定异常后重试
smart-retry只能在抛出RuntimeException的时候,进行重试,改造后,可以支持抛出特定异常后重试,这样当自定义异常时,更加灵活,代码改造点如下
RetryFunction注解增加变量
/**
* 如果方法抛出该异常则会创建重试任务
*/
Class<? extends RuntimeException>[] retryException() default {RuntimeException.class};
在执行方法抛出异常时,判断是否在配置的这些异常范围,来决定是否要进行重试
} catch (RuntimeException e) {
boolean isIncludeException = RetryHandlerUtils.isIncludeException(e, retryException);
if (!isIncludeException) {
throw e;
}
支持配置在注解上配置是否在执行方法前入库
smart-retry是支持是否在执行方法前入库的,但是是全局的配置,改造后可以对每一个任务单独配置
/**
* 是否在执行任务之前插入数据库 |配置false则表示,只有任务执行报错才插入数据库|
* @return
*/
boolean beforeTask() default false;
具体逻辑就比较简单了,就是根据配置,来决定是否在抛出异常前插入数据
增加了重试记录表,把每次重试都记录下来
smart-retry只有一张重试表,比如重试过5次,没办法知道这5次重试的具体详情,只能知道最后一次是成功还是失败等,所以增加了重试记录表,来记录每一次重试的记录,利用了spring的切面的原理,对原代码没有任何侵入
@Aspect
@Slf4j
public class RetryTaskRecordAspect implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Pointcut("execution(* com.aliyun.gts.bpaas.retry.core.RetryTaskMapper.insert(..)) ||" +
"execution(* com.aliyun.gts.bpaas.retry.core.RetryTaskMapper.update(..))")
public void pointCut(){
}
@Around(value = "pointCut()")
public Object insertTaskRecord(ProceedingJoinPoint proceedingJoinPoint) {
Object o = null;
try {
o = proceedingJoinPoint.proceed();
} catch (Throwable throwable) {
log.error("切面insertTaskRecord报错,", throwable);
}
Object[] args = proceedingJoinPoint.getArgs();
RetryTask retryTask = (RetryTask) args[0];
RetryTaskMapper retryTaskMapper = applicationContext.getBean(RetryTaskMapper.class);
retryTaskMapper.insertTaskRecord(retryTask);
return o;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
重试规则由cron表达式改为按时间,或者间隔重试,更通俗易懂
改造前:smart-retry原来的重试规则,是为每一条重试任务配置一个cron表达式
改造后:重试规则在application.properties中全局配置,更方便管理,cron表达式改为按时间,或者间隔,更容易配置,不易配置错
/**
* 重试规则,有两种方式,指定时间或者间隔
* 1.时间,格式为:onTime-时间-最大重试次数,例如 onTime-23:59:59-3,onTime为类型,23:59:59为时间的时分秒,3位最大重试次数,用中划线隔开
* 2.间隔,格式为:onInterval-间隔重试规则,例如 onInterval-10,20,30,onInterval为类型,10,20,30表示每次重试的间隔时间,单位为秒
*/
private String retryRule;
指定时间重试的实现逻辑:获取到下次的执行时间,如果执行时间小于当前时间,则需要加一天,否则,就是该时间
指定间隔重试的实现逻辑:现在的时间,加上重试次数对应的间隔时间即可
代码如下
/**
* 获取下次执行时间
* @param retryRule
* @param retryCount
* @return
*/
public static LocalDateTime getNextExecTime(RetryRule retryRule, int retryCount) {
if (retryRule instanceof OnTimeRetryRule) {
OnTimeRetryRule onTimeRetryRule = (OnTimeRetryRule) retryRule;
LocalTime retryTime = onTimeRetryRule.getRetryTime();
LocalDateTime nextExecTime = LocalDateTime.of(LocalDate.now(), retryTime);
if (nextExecTime.isAfter(LocalDateTime.now())) {
return nextExecTime;
} else {
return nextExecTime.plusDays(1L);
}
}
OnIntervalRetryRule onIntervalRetryRule = (OnIntervalRetryRule) retryRule;
Long[] retryInterval = onIntervalRetryRule.getRetryInterval();
return plusSeconds(retryInterval[retryCount]);
}
如何使用
建表
接口重推依赖两张表重试表以及重试记录表
重试表sql
create table sys_retry_task (
task_id bigint not null primary key auto_increment,
identity_name varchar(50) not null COMMENT '任务的唯一标识',
params text COMMENT '参数',
status tinyint not null COMMENT '状态。1: 处理中,2: 成功,3: 失败',
retry_count int not null default 0 COMMENT '重试次数',
remark varchar(1000) COMMENT '备注',
create_date datetime not null,
edit_date datetime,
next_date DATETIME COMMENT '下次执行的时间') ENGINE=InnoDB COMMENT='系统重试表';
create index idx_identityname_status ON sys_retry_task(identity_name asc,status asc);
重试记录表sql
CREATE TABLE `sys_retry_task_record`
(
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`task_id` BIGINT(20) NULL DEFAULT NULL COMMENT '重试任务id',
`identity_name` VARCHAR(50) NOT NULL COMMENT '任务的唯一标识' COLLATE 'utf8_general_ci',
`params` TEXT NULL DEFAULT NULL COMMENT '参数' COLLATE 'utf8_general_ci',
`status` TINYINT(4) NOT NULL COMMENT '状态。1: 处理中,2: 成功,3: 失败',
`retry_count` INT(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
`remark` VARCHAR(1000) NULL DEFAULT NULL COMMENT '备注' COLLATE 'utf8_general_ci',
`next_date` DATETIME NULL DEFAULT NULL COMMENT '下次执行的时间',
`edit_date` DATETIME NULL DEFAULT NULL,
`create_date` DATETIME NOT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `idx_identityname_status` (`identity_name`, `status`) USING BTREE
)
COMMENT ='系统重试记录表'
COLLATE = 'utf8_general_ci'
ENGINE = InnoDB
引入依赖
<dependency>
<groupId>com.aliyun.gts.bpaas</groupId>
<artifactId>aliyun-gts-retry-starter</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
application.properties配置
# 数据源配置,注意要和建表的数据源和数据库在同一个
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test?characterEncoding=utf8&autoReconnect=true&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
# 重试开关,默认为false
gts.retry.enable=true
# 重试的规则
gts.retry.retry-rule=onInterval-10,30,30,30
# 序列化方式,支持fastjson,jackson以及gson,默认为fastjson
gts.retry.serialize-type=fastjson
下边对重试规则配置gts.retry.retry-rule进行详细说明
重试规则,有两种方式,指定时间或者间隔
1. 时间,格式为:onTime-时间-最大重试次数,例如(onTime-23:59:59-3),onTime为类型,23:59:59为时间的时分秒,3位最大重试次数,用中划线隔开
2. 间隔,格式为:onInterval-间隔重试规则,例如 (onInterval-10,20,30),onInterval为类型,10,20,30表示每次重试的间隔时间,单位为秒
编程界面
接口重推需要的编程界面有两处,第一处是在方法上打上@RetryFunction注解,第二处是在定时任务中调用处理定时任务的方法
在方法上打上@RetryFunction注解
@RetryFunction(identity = "demo.simplest", beforeTask = true,
retryListener = SimpleTestRetryListener.class,
retryException = {RuntimeException.class}, ignoreException = true)
public void simplestWithId(int id) {
log.info("simplestWithId[{}]执行开始", id);
// doSomething()
log.info("simplestWithId[{}]执行完成", id);
}
下面对注解中的每一个属性做详细的解释
属性 |
类型 |
备注 |
默认值 |
identity |
string |
唯一标识,系统内不能重复 长度要小50个字节 |
类的全名称+方法名称 |
beforeTask |
boolean |
是否在执行任务之前插入数据 配置false则表示,只有任务执行报错才插入数据库|,true表示在方法执行前就会插入数据库 |
false |
retryListener |
Class<? extends RetryListener> |
任务监听器。可以在任务重试、任务完成、任务失败时进行回调 onRetry():每次重试时触发(执行后触发) onComplete():任务完成时触发 onError():失败时触发(超过最大重试次数) |
不进行任务监听 |
retryException |
Class<? extends RuntimeException>[] |
如果方法抛出指定异常则会创建重试任务或执行重试动作 |
{RuntimeException.class} |
ignoreException |
boolean |
当重试任务有多个的时候,上一个重试报错,是否忽略错误继续执行下一个任务 |
true |
定时任务中触发重试任务示例
@Slf4j
@Component
public class DistributeRetrySchedule extends AbstractGtsSchedulerTaskProcessor {
@Autowired
private RetryFunctionProcessor retryFunctionProcessor;
@Override
public String getTaskId(){
return "retry-job";
}
@Override
public GtsSchedulerTaskResult process(GtsSchedulerTaskParameter parameter) throws Exception{
DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("执行定时任务,当前时间{}",format.format(new Date()));
retryFunctionProcessor.processTask();
return GtsSchedulerTaskResult.successResult();
}
}