背景
基础技术组的接口重推组件基于smart-retry源码进行了改造
smart-retry信息
仓库地址
https://gitee.com/hack3389/smart-retry/
阅读分支
分支:master
commit
主要功能
Smart Retry主要是用来进行方法重试的。和Guava Retry、Spring Retry相比,Smart Retry最大的特点是异步重试,支持持久化,系统重启之后可以继续重试。
功能特点
- 方法重试持久化,系统重启之后可以继续重试
- 异步重试(不支持同步重试)
- 支持接口实现和声明式方式
架构图
如何使用
引入依赖
com.github.hadoop002.smartretry
retry-spring4
使用最新版本
初始化表
create table sys_retry_task (
task_id bigint not null primary key auto_increment,
identity_name varchar(50) not null COMMENT '任务的唯一标识',
params text COMMENT '参数',
status tinyint not null COMMENT '状态。1: 处理中,2: 成功,3: 失败',
retry_count int not null default 0 COMMENT '重试次数',
remark varchar(1000) COMMENT '备注',
create_date datetime not null,
edit_date datetime) ENGINE=InnoDB COMMENT='系统重试表';
create index idx_identityname_status ON sys_retry_task(identity_name asc,status asc);
编写业务逻辑
@RetryFunction(identity = "order.payment")
public void payOrderAndUpdateStatus(Order order) {
boolean success = paymentBusiness.doPayment(order);
if (success) {
orderBusiness.updateOrderPayStatus(order);
} else {
orderBusiness.updateOrderPayFail(order);
}
}
或者
@Slf4j
@Service("orderPaymentBusiness")
public class OrderPaymentBusiness implements RetryHandler {
@Autowired
private PaymentBusiness paymentBusiness;
@Autowired
private OrderBusiness orderBusiness;
@Override
public String identity() {
return "order.payment";
}
@Override
public Void handle(Order order) {
boolean success = paymentBusiness.doPayment(order);
if (success) {
orderBusiness.updateOrderPayStatus(order);
} else {
orderBusiness.updateOrderPayFail(order);
}
return null;
}
}
打开开关
在启动入口上加上@EnableRetrying 注解
源码阅读
源码结构
- retry-cpre:重试模块的核心,定义了一系列的接口和扩展点
- retry-spring4:基于spring4实现的重试模块
- retry-serializer-jackson2:使用jackson2来实现参数的序列化和反序列化
- retry-serializer-gson:使用gson来实现参数的序列化和反序列化
- retry-serializer-fastjson:使用fastjson来实现参数的序列化和反序列化
- retry-samples:配套的示例demo,可直接使用
大致流程
- 系统启动后,把所有com.github.smartretry.core.RetryHandler和带有@RetryFunction注解的方法注册为定时任务。
- 所有com.github.smartretry.core.RetryHandler和带有@RetryFunction注解的方法都会被Spring进行代理,执行的时候,会先把参数序列化,然后把执行任务插入到数据库。最后根据任务执行的成功与否,更新任务的相应状态。
- 定时任务定时从表里面获取未成功的任务,进行重试
根据流程走读代码
根据整个流程走读代码应该会对代码有更清晰的认识
系统启动
系统启动的核心处理逻辑主要是在类RetryAnnotationBeanPostProcessor中,下面通过流程仔细分析该类
- 扫描所有带有@RetryFunction注解和实现RetryHandler接口的类
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof AopInfrastructureBean) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
Class targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.postedClasseCache.contains(targetClass)) {
Object targetObject = AopProxyUtils.getSingletonTarget(bean);
if (RetryHandler.class.isAssignableFrom(targetClass)) {
RetryHandlerUtils.validateRetryHandler(targetClass);
log.info("发现RetryHandler的实例:{},准备注册", targetClass);
retryHandlers.add((RetryHandler) targetObject);
return bean;
}
ReflectionUtils.MethodFilter methodFilter = method -> method.getAnnotation(RetryFunction.class) != null;
Set methods = MethodIntrospector.selectMethods(targetClass, methodFilter);
methods.forEach(method -> processRetryFunction(targetObject, method));
postedClasseCache.add(targetClass);
}
return bean;
}
改类实现了BeanPostProcessor接口,重写了postProcessAfterInitialization方法(每个bean初始化之后执行)
主要为两处判断/过滤
1.判断是否实现了RetryHandler
if (RetryHandler.class.isAssignableFrom(targetClass)) {
2.过滤打了@RetryFunction的方法
ReflectionUtils.MethodFilter methodFilter = method -> method.getAnnotation(RetryFunction.class) != null;
Set methods = MethodIntrospector.selectMethods(targetClass, methodFilter);
- 把打了@RetryFunction注解的都转化为RetryHandler,即最终都是走的RetryHandler
protected void processRetryFunction(Object bean, Method method) {
log.info("发现@RetryFunction的实例:{},准备注册", method.toString());
Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
RetryHandlerUtils.validateRetryFunction(method);
RetryFunction retryFunction = method.getAnnotation(RetryFunction.class);
Supplier retryListenerSupplier = () -> {
RetryListener retryListener = null;
String retryListenerName = retryFunction.retryListener();
if (StringUtils.isNotBlank(retryListenerName)) {
retryListener = defaultListableBeanFactory.getBean(retryListenerName, RetryListener.class);
}
return retryListener;
};
retryHandlers.add(new MethodRetryHandler(bean, invocableMethod, retryFunction, retryListenerSupplier));
}
- 把所有的retryHandlers遍历注册为定时任务,默认用的quartz
@Override
public void afterSingletonsInstantiated() {
postedClasseCache.clear();
this.retryTaskMapper = defaultListableBeanFactory.getBean(RetryTaskMapper.class);
this.retryRegistry = defaultListableBeanFactory.getBean(RetryRegistry.class);
boolean beforeTask = environment.getProperty(EnvironmentConstants.RETRY_BEFORETASK, Boolean.class, Boolean.TRUE);
this.retrySerializer = getRetrySerializerFromBeanFactory(defaultListableBeanFactory);
if (this.retrySerializer == null) {
this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(retryTaskMapper, beforeTask);
} else {
this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(new DefaultRetryTaskFactory(retrySerializer), retryTaskMapper, beforeTask);
}
retryHandlers.forEach(this::registerJobBean);
retryHandlers.clear();
}
protected void registerJobBean(RetryHandler retryHandler) {
if (retryHandler.identity().length() > 50) {
throw new IllegalArgumentException("identity=" + retryHandler.identity() + " is too long, it must be less than 50");
}
RetryHandler retryHandlerProxy = retryHandlerPostProcessor.doPost(retryHandler);
RetryHandlerRegistration.registry(retryHandlerProxy);
RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer);
retryRegistry.register(retryHandler, retryProcessor);
}
至此,系统启动所做的任务就完成了
打有@RetryFunction的注解和实现RetryHandler的接口的方法都会被Spring代理
- @RetryFunction注解的方法如何被代理
public class RetryHandlerMethodPointcut implements Pointcut {
@Override
public ClassFilter getClassFilter() {
return ClassFilter.TRUE;
}
@Override
public MethodMatcher getMethodMatcher() {
return new StaticMethodMatcher() {
@Override
public boolean matches(Method method, Class targetClass) {
return RetryHandlerUtils.isRetryFunctionMethod(method);
}
};
}
}
public static boolean isRetryFunctionMethod(Method method) {
if (method.getAnnotation(RetryFunction.class) != null && method.getParameterCount() == 1) {
return !Object.class.equals(method.getParameterTypes()[0]);
}
return false;
}
实现Pointcut接口通过isRetryFunctionMethod(Method method)方法判断是否是需要代理的方法
- 实现RetryHandler接口的方法如何被代理
public class RetryHandlerClassPointcut implements Pointcut {
@Override
public ClassFilter getClassFilter() {
return RetryHandler.class::isAssignableFrom;
}
@Override
public MethodMatcher getMethodMatcher() {
return new StaticMethodMatcher() {
@Override
public boolean matches(Method method, Class targetClass) {
return RetryHandlerUtils.isRetryHandlerMethod(targetClass, method);
}
};
}
}
public static boolean isRetryHandlerMethod(Class targetClass, Method method) {
if ("handle".equals(method.getName()) && method.getParameterCount() == 1 && method.isBridge() && method.isSynthetic()) {
//RetryHandler接口有泛型,需要特殊处理
return true;
}
Type interfaceType = getRetryHandlerGenericInterface(targetClass);
if (interfaceType == null) {
return false;
}
Class argsInputType = Object.class;
if (interfaceType instanceof ParameterizedType) {
argsInputType = (Class) ((ParameterizedType) interfaceType).getActualTypeArguments()[0];
}
Class parameterType = argsInputType;
return "handle".equals(method.getName()) && method.getParameterCount() == 1 && method.getParameterTypes()[0].equals(parameterType);
}
先对类进行过滤,要求实现RetryHandler
@Override
public ClassFilter getClassFilter() {
return RetryHandler.class::isAssignableFrom;
}
再对方法进行过滤,详细请看isRetryHandlerMethod(Class targetClass, Method method)方法
打有@RetryFunction注解的方法被调用时
当执行到带有@RetryFunction方法时(实现了RetryHandler也差不多的逻辑,就不再赘述了),会被方法拦截器拦截,
public class RetryHandlerMethodInterceptor implements MethodInterceptor {
@Override
public Object invoke(MethodInvocation invocation) {
RetryFunction retryFunction = invocation.getMethod().getAnnotation(RetryFunction.class);
Object[] args = invocation.getArguments();
String identity = retryFunction.identity();
if (StringUtils.isBlank(identity)) {
identity = RetryHandlerUtils.getMethodIdentity(invocation.getMethod());
}
Optional optional = RetryHandlerRegistration.get(identity);
if (optional.isPresent()) {
return optional.get().handle(ArrayUtils.isEmpty(args) ? null : args[0]);
}
throw new IllegalArgumentException("找不到对应的RetryHandler代理,identity=" + identity);
}
}
因为RetryHandlerRegistration中注册的是ImmediatelyRetryHandler,所以执行的是ImmediatelyRetryHandler的handle方法
RetryHandler retryHandlerProxy = retryHandlerPostProcessor.doPost(retryHandler);
RetryHandlerRegistration.registry(retryHandlerProxy);
doPost方法创建的是ImmediatelyRetryHandler
@Override
public RetryHandler doPost(RetryHandler retryHandler) {
if (retryHandler instanceof GenericRetryHandler) {
return new ImmediatelyRetryHandler((GenericRetryHandler) retryHandler, retryTaskFactory, retryTaskMapper, beforeTask);
}
return new ImmediatelyRetryHandler(new DefaultRetryHandler(retryHandler), retryTaskFactory, retryTaskMapper, beforeTask);
}
接下来我们看看ImmediatelyRetryHandler.handle方法做了什么
@Override
public Object handle(Object arg) {
RetryContext retryContext = new RetryContext(genericRetryHandler, arg);
Object result;
RetryTask retryTask;
// 是否在执行任务之前插入数据库 |配置false则表示,只有任务执行报错才插入数据库|
if (beforeTask) {
retryTask = retryTaskFactory.create(genericRetryHandler, arg);
retryTaskMapper.insert(retryTask);
try {
result = genericRetryHandler.handle(arg);
retryContext.setResult(result);
completeTask(retryTask);
onRetry(retryContext);
onComplete(retryContext);
} catch (NoRetryException e) {
retryContext.setException(e);
failureTask(retryTask, retryContext);
onRetry(retryContext);
onError(retryContext);
throw e;
} catch (RuntimeException e) {
retryContext.setException(e);
if (retryContext.getRetryCount() == genericRetryHandler.maxRetryCount()) {
//只有最大可重试次数为0,才会执行到这里
failureTask(retryTask, retryContext);
onRetry(retryContext);
onError(retryContext);
} else {
updateRemark(retryTask, e);
onRetry(retryContext);
}
throw e;
}
return result;
} else {
try {
result = genericRetryHandler.handle(arg);
retryContext.setResult(result);
onRetry(retryContext);
onComplete(retryContext);
} catch (NoRetryException e) {
retryContext.setException(e);
onRetry(retryContext);
onError(retryContext);
throw e;
} catch (RuntimeException e) {
retryContext.setException(e);
if (retryContext.getRetryCount() == genericRetryHandler.maxRetryCount()) {
//只有最大可重试次数为0,才会执行到这里
onRetry(retryContext);
onError(retryContext);
} else {
//等待重试
retryTask = retryTaskFactory.create(genericRetryHandler, arg);
retryTask.setRemark(StringUtils.left(e.getMessage(), 1000));
retryTaskMapper.insert(retryTask);
onRetry(retryContext);
}
throw e;
}
}
return result;
}
这个方法有点长,不过代码还算简单,简而言之就是重试的方法发生异常后入库( retryTaskFactory.create(genericRetryHandler, arg);)的操作,当然里边有序列化参数,修改重试表的状态等操作,就不再详细讲了(比较简单,相信大家都看得懂(*╹▽╹*)
至此,调用带有@RetryFunction注解的方法第一被调用,以及如何把重试任务入库的操作就完成了,下面讲解重试的逻辑
定时重试逻辑
上边有讲到把重试任务注册为定时任务的逻辑,再看一下代码吧
RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer);
retryRegistry.register(retryHandler, retryProcessor);
可以看到,注册的是一个DefaultRetryProcessor,就是说,每次定时任务调用的是该类的doRetry方法,以quartz为例
public class RetryJob implements Job {
private RetryProcessor retryProcessor;
public RetryJob() {
}
public RetryJob(RetryProcessor retryProcessor) {
this.retryProcessor = retryProcessor;
}
@Override
public void execute(JobExecutionContext context) {
retryProcessor.doRetry();
}
}
下边我们看看doRetry都做了些什么
@Override
public void doRetry() {
log.info("开始执行Identity={}的重试,maxRetryCount={}, initialDelay={}", genericRetryHandler.identity(), genericRetryHandler.maxRetryCount(), genericRetryHandler.initialDelay());
List tasks = retryTaskMapper.queryNeedRetryTaskList(genericRetryHandler.identity(), genericRetryHandler.maxRetryCount(), genericRetryHandler.initialDelay());
if (tasks == null) {
return;
}
log.info("Identity={}当前有{}个任务准备重试", genericRetryHandler.identity(), tasks.size());
if (genericRetryHandler.ignoreException()) {
tasks.forEach(this::doRetryWithIgnoreException);
} else {
tasks.forEach(this::doRetry);
}
}
相信聪明的你肯定猜到了,没错,取出之前入库的数据开始进行重试
private void doRetry(RetryTask retryTask) {
log.info("开始重试Identity={},Id={}的任务", retryTask.getIdentity(), retryTask.getTaskId());
retryedRetryHandler.setRetryTask(retryTask);
String json = retryTask.getParams();
if (StringUtils.isBlank(json)) {
retryedRetryHandler.handle(null);
} else {
retryedRetryHandler.parseArgsAndhandle(json);
}
}
重试调用的是retryedRetryHandler.handle()的方法
@Override
public Object handle(Object arg) {
retryTask.setRetryCount(retryTask.getRetryCount() + 1);
RetryContext retryContext = new RetryContext(genericRetryHandler, arg, retryTask.getRetryCount());
Object result;
try {
result = genericRetryHandler.handle(arg);
retryContext.setResult(result);
completeTask(retryTask);
onRetry(retryContext);
onComplete(retryContext);
} catch (NoRetryException e) {
retryContext.setException(e);
failureTask(retryTask, retryContext);
onRetry(retryContext);
onError(retryContext);
throw e;
} catch (RuntimeException e) {
retryContext.setException(e);
if (retryTask.getRetryCount() == genericRetryHandler.maxRetryCount()) {
failureTask(retryTask, retryContext);
} else {
update(retryTask, retryContext);
}
onRetry(retryContext);
if (retryContext.getRetryCount() == genericRetryHandler.maxRetryCount()) {
//重试次数达到最大,触发失败回调
onError(retryContext);
}
throw e;
}
return result;
}
retryedRetryHandler.handle方法主要是调用目标方法后,如果目标方法没报错,则把表中的状态修改成功,发生异常后更新表中的异常信息,达到最大重试次数后,把表中的状态改为失败,当然其中有把数据库中的参数反序列的操作.
到这里,smart-retry的大致流程,源码解读就完成了,当然,这并不是全部代码,只是主流程的代码,有兴趣的同学可以把代码拉下来,详细阅读以下
总结
smart-retry支持异步重试,支持重试持久化,用着还是相当不错的,但是还是有缺点的,比如,1. 只支持有且仅有一个参数2.每一个重试方法都对应一个定时任务,会造成线程的过度使用
所以,我在该源码的基础上,对smart-retry进行了改造,改造点如下
- 支持重试的方法有多个参数
- 支持指定抛出哪些异常后重试
- 支持配置在注解上是否在执行方法前入库
- 只提供重试的接口给用户,具体定时任务让用户自己去实现,比如改造后例子中的定时任务用的是xxl-job
如果想详细了解改造后的smart-retry,请参照smart-retry改造升级文档