smart-retry源代码阅读

背景

基础技术组的接口重推组件基于smart-retry源码进行了改造

smart-retry信息


仓库地址

https://gitee.com/hack3389/smart-retry/

阅读分支

分支:master

commit

主要功能

Smart Retry主要是用来进行方法重试的。和Guava Retry、Spring Retry相比,Smart Retry最大的特点是异步重试,支持持久化,系统重启之后可以继续重试。

功能特点

  • 方法重试持久化,系统重启之后可以继续重试
  • 异步重试(不支持同步重试)
  • 支持接口实现和声明式方式

架构图


smart-retry源代码阅读

如何使用

引入依赖


 

      com.github.hadoop002.smartretry

      retry-spring4

      使用最新版本

 


初始化表

create table sys_retry_task (

task_id bigint not null primary key auto_increment,

identity_name varchar(50) not null COMMENT '任务的唯一标识',

params text COMMENT '参数',

status tinyint not null COMMENT '状态。1: 处理中,2: 成功,3: 失败',

retry_count int not null default 0 COMMENT '重试次数',

remark varchar(1000) COMMENT '备注',

create_date datetime not null,

edit_date datetime) ENGINE=InnoDB COMMENT='系统重试表';


create index idx_identityname_status ON sys_retry_task(identity_name asc,status asc);

编写业务逻辑

  @RetryFunction(identity = "order.payment")

  public void payOrderAndUpdateStatus(Order order) {

      boolean success = paymentBusiness.doPayment(order);

      if (success) {

          orderBusiness.updateOrderPayStatus(order);

      } else {

          orderBusiness.updateOrderPayFail(order);

      }

  }

或者

  @Slf4j

  @Service("orderPaymentBusiness")

  public class OrderPaymentBusiness implements RetryHandler {

  

      @Autowired

      private PaymentBusiness paymentBusiness;

  

      @Autowired

      private OrderBusiness orderBusiness;

  

      @Override

      public String identity() {

          return "order.payment";

      }

  

      @Override

      public Void handle(Order order) {

          boolean success = paymentBusiness.doPayment(order);

          if (success) {

              orderBusiness.updateOrderPayStatus(order);

          } else {

              orderBusiness.updateOrderPayFail(order);

          }

          return null;

      }

  }

打开开关

在启动入口上加上@EnableRetrying 注解

源码阅读

源码结构

  • retry-cpre:重试模块的核心,定义了一系列的接口和扩展点
  • retry-spring4:基于spring4实现的重试模块
  • retry-serializer-jackson2:使用jackson2来实现参数的序列化和反序列化
  • retry-serializer-gson:使用gson来实现参数的序列化和反序列化
  • retry-serializer-fastjson:使用fastjson来实现参数的序列化和反序列化
  • retry-samples:配套的示例demo,可直接使用

大致流程

  • 系统启动后,把所有com.github.smartretry.core.RetryHandler和带有@RetryFunction注解的方法注册为定时任务。
  • 所有com.github.smartretry.core.RetryHandler和带有@RetryFunction注解的方法都会被Spring进行代理,执行的时候,会先把参数序列化,然后把执行任务插入到数据库。最后根据任务执行的成功与否,更新任务的相应状态。
  • 定时任务定时从表里面获取未成功的任务,进行重试

根据流程走读代码

根据整个流程走读代码应该会对代码有更清晰的认识

系统启动

系统启动的核心处理逻辑主要是在类RetryAnnotationBeanPostProcessor中,下面通过流程仔细分析该类


  • 扫描所有带有@RetryFunction注解和实现RetryHandler接口的类

    @Override

    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        if (bean instanceof AopInfrastructureBean) {

            // Ignore AOP infrastructure such as scoped proxies.

            return bean;

        }


        Class targetClass = AopProxyUtils.ultimateTargetClass(bean);

        if (!this.postedClasseCache.contains(targetClass)) {

            Object targetObject = AopProxyUtils.getSingletonTarget(bean);

            if (RetryHandler.class.isAssignableFrom(targetClass)) {

                RetryHandlerUtils.validateRetryHandler(targetClass);

                log.info("发现RetryHandler的实例:{},准备注册", targetClass);

                retryHandlers.add((RetryHandler) targetObject);

                return bean;

            }

            ReflectionUtils.MethodFilter methodFilter = method -> method.getAnnotation(RetryFunction.class) != null;

            Set methods = MethodIntrospector.selectMethods(targetClass, methodFilter);

            methods.forEach(method -> processRetryFunction(targetObject, method));


            postedClasseCache.add(targetClass);

        }

        return bean;

    }

改类实现了BeanPostProcessor接口,重写了postProcessAfterInitialization方法(每个bean初始化之后执行)

主要为两处判断/过滤

1.判断是否实现了RetryHandler

 if (RetryHandler.class.isAssignableFrom(targetClass)) {

2.过滤打了@RetryFunction的方法

ReflectionUtils.MethodFilter methodFilter = method -> method.getAnnotation(RetryFunction.class) != null;

Set methods = MethodIntrospector.selectMethods(targetClass, methodFilter);

  • 把打了@RetryFunction注解的都转化为RetryHandler,即最终都是走的RetryHandler

    protected void processRetryFunction(Object bean, Method method) {

        log.info("发现@RetryFunction的实例:{},准备注册", method.toString());

        Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());

        RetryHandlerUtils.validateRetryFunction(method);


        RetryFunction retryFunction = method.getAnnotation(RetryFunction.class);

        Supplier retryListenerSupplier = () -> {

            RetryListener retryListener = null;

            String retryListenerName = retryFunction.retryListener();

            if (StringUtils.isNotBlank(retryListenerName)) {

                retryListener = defaultListableBeanFactory.getBean(retryListenerName, RetryListener.class);

            }

            return retryListener;

        };

        retryHandlers.add(new MethodRetryHandler(bean, invocableMethod, retryFunction, retryListenerSupplier));

    }

  • 把所有的retryHandlers遍历注册为定时任务,默认用的quartz

    @Override

    public void afterSingletonsInstantiated() {

        postedClasseCache.clear();


        this.retryTaskMapper = defaultListableBeanFactory.getBean(RetryTaskMapper.class);

        this.retryRegistry = defaultListableBeanFactory.getBean(RetryRegistry.class);


        boolean beforeTask = environment.getProperty(EnvironmentConstants.RETRY_BEFORETASK, Boolean.class, Boolean.TRUE);

        this.retrySerializer = getRetrySerializerFromBeanFactory(defaultListableBeanFactory);

        if (this.retrySerializer == null) {

            this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(retryTaskMapper, beforeTask);

        } else {

            this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(new DefaultRetryTaskFactory(retrySerializer), retryTaskMapper, beforeTask);

        }


        retryHandlers.forEach(this::registerJobBean);


        retryHandlers.clear();

    }


    protected void registerJobBean(RetryHandler retryHandler) {

        if (retryHandler.identity().length() > 50) {

            throw new IllegalArgumentException("identity=" + retryHandler.identity() + " is too long, it must be less than 50");

        }


        RetryHandler retryHandlerProxy = retryHandlerPostProcessor.doPost(retryHandler);

        RetryHandlerRegistration.registry(retryHandlerProxy);


        RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer);


        retryRegistry.register(retryHandler, retryProcessor);

    }

至此,系统启动所做的任务就完成了

打有@RetryFunction的注解和实现RetryHandler的接口的方法都会被Spring代理

  • @RetryFunction注解的方法如何被代理

public class RetryHandlerMethodPointcut implements Pointcut {


    @Override

    public ClassFilter getClassFilter() {

        return ClassFilter.TRUE;

    }


    @Override

    public MethodMatcher getMethodMatcher() {

        return new StaticMethodMatcher() {


            @Override

            public boolean matches(Method method, Class targetClass) {

                return RetryHandlerUtils.isRetryFunctionMethod(method);

            }

        };

    }

}


    public static boolean isRetryFunctionMethod(Method method) {

        if (method.getAnnotation(RetryFunction.class) != null && method.getParameterCount() == 1) {

            return !Object.class.equals(method.getParameterTypes()[0]);

        }

        return false;

    }

实现Pointcut接口通过isRetryFunctionMethod(Method method)方法判断是否是需要代理的方法

  • 实现RetryHandler接口的方法如何被代理

public class RetryHandlerClassPointcut implements Pointcut {


    @Override

    public ClassFilter getClassFilter() {

        return RetryHandler.class::isAssignableFrom;

    }


    @Override

    public MethodMatcher getMethodMatcher() {

        return new StaticMethodMatcher() {


            @Override

            public boolean matches(Method method, Class targetClass) {

                return RetryHandlerUtils.isRetryHandlerMethod(targetClass, method);

            }

        };

    }

}

    public static boolean isRetryHandlerMethod(Class targetClass, Method method) {

        if ("handle".equals(method.getName()) && method.getParameterCount() == 1 && method.isBridge() && method.isSynthetic()) {

            //RetryHandler接口有泛型,需要特殊处理

            return true;

        }

        Type interfaceType = getRetryHandlerGenericInterface(targetClass);

        if (interfaceType == null) {

            return false;

        }

        Class argsInputType = Object.class;

        if (interfaceType instanceof ParameterizedType) {

            argsInputType = (Class) ((ParameterizedType) interfaceType).getActualTypeArguments()[0];

        }

        Class parameterType = argsInputType;

        return "handle".equals(method.getName()) && method.getParameterCount() == 1 && method.getParameterTypes()[0].equals(parameterType);

    }

先对类进行过滤,要求实现RetryHandler

    @Override

   public ClassFilter getClassFilter() {

       return RetryHandler.class::isAssignableFrom;

   }

再对方法进行过滤,详细请看isRetryHandlerMethod(Class targetClass, Method method)方法


打有@RetryFunction注解的方法被调用时

当执行到带有@RetryFunction方法时(实现了RetryHandler也差不多的逻辑,就不再赘述了),会被方法拦截器拦截,

public class RetryHandlerMethodInterceptor implements MethodInterceptor {


    @Override

    public Object invoke(MethodInvocation invocation) {

        RetryFunction retryFunction = invocation.getMethod().getAnnotation(RetryFunction.class);

        Object[] args = invocation.getArguments();

        String identity = retryFunction.identity();

        if (StringUtils.isBlank(identity)) {

            identity = RetryHandlerUtils.getMethodIdentity(invocation.getMethod());

        }

        Optional optional = RetryHandlerRegistration.get(identity);

        if (optional.isPresent()) {

            return optional.get().handle(ArrayUtils.isEmpty(args) ? null : args[0]);

        }

        throw new IllegalArgumentException("找不到对应的RetryHandler代理,identity=" + identity);

    }

}

因为RetryHandlerRegistration中注册的是ImmediatelyRetryHandler,所以执行的是ImmediatelyRetryHandler的handle方法

        RetryHandler retryHandlerProxy = retryHandlerPostProcessor.doPost(retryHandler);

        RetryHandlerRegistration.registry(retryHandlerProxy);

doPost方法创建的是ImmediatelyRetryHandler

    @Override

    public RetryHandler doPost(RetryHandler retryHandler) {

        if (retryHandler instanceof GenericRetryHandler) {

            return new ImmediatelyRetryHandler((GenericRetryHandler) retryHandler, retryTaskFactory, retryTaskMapper, beforeTask);

        }

        return new ImmediatelyRetryHandler(new DefaultRetryHandler(retryHandler), retryTaskFactory, retryTaskMapper, beforeTask);

    }

接下来我们看看ImmediatelyRetryHandler.handle方法做了什么

    @Override

    public Object handle(Object arg) {

        RetryContext retryContext = new RetryContext(genericRetryHandler, arg);

        Object result;

        RetryTask retryTask;

        // 是否在执行任务之前插入数据库 |配置false则表示,只有任务执行报错才插入数据库|

        if (beforeTask) {

            retryTask = retryTaskFactory.create(genericRetryHandler, arg);

            retryTaskMapper.insert(retryTask);

            try {

                result = genericRetryHandler.handle(arg);

                retryContext.setResult(result);

                completeTask(retryTask);

                onRetry(retryContext);

                onComplete(retryContext);

            } catch (NoRetryException e) {

                retryContext.setException(e);

                failureTask(retryTask, retryContext);


                onRetry(retryContext);

                onError(retryContext);

                throw e;

            } catch (RuntimeException e) {

                retryContext.setException(e);


                if (retryContext.getRetryCount() == genericRetryHandler.maxRetryCount()) {

                    //只有最大可重试次数为0,才会执行到这里

                    failureTask(retryTask, retryContext);


                    onRetry(retryContext);

                    onError(retryContext);

                } else {

                    updateRemark(retryTask, e);

                    onRetry(retryContext);

                }


                throw e;

            }

            return result;

        } else {

            try {

                result = genericRetryHandler.handle(arg);

                retryContext.setResult(result);

                onRetry(retryContext);

                onComplete(retryContext);

            } catch (NoRetryException e) {

                retryContext.setException(e);


                onRetry(retryContext);

                onError(retryContext);


                throw e;

            } catch (RuntimeException e) {

                retryContext.setException(e);

                if (retryContext.getRetryCount() == genericRetryHandler.maxRetryCount()) {

                    //只有最大可重试次数为0,才会执行到这里

                    onRetry(retryContext);

                    onError(retryContext);

                } else {

                    //等待重试

                    retryTask = retryTaskFactory.create(genericRetryHandler, arg);

                    retryTask.setRemark(StringUtils.left(e.getMessage(), 1000));

                    retryTaskMapper.insert(retryTask);

                    onRetry(retryContext);

                }


                throw e;

            }

        }

        return result;

    }

这个方法有点长,不过代码还算简单,简而言之就是重试的方法发生异常后入库( retryTaskFactory.create(genericRetryHandler, arg);)的操作,当然里边有序列化参数,修改重试表的状态等操作,就不再详细讲了(比较简单,相信大家都看得懂(*╹▽╹*)

至此,调用带有@RetryFunction注解的方法第一被调用,以及如何把重试任务入库的操作就完成了,下面讲解重试的逻辑

定时重试逻辑

上边有讲到把重试任务注册为定时任务的逻辑,再看一下代码吧

        RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer);


        retryRegistry.register(retryHandler, retryProcessor);

可以看到,注册的是一个DefaultRetryProcessor,就是说,每次定时任务调用的是该类的doRetry方法,以quartz为例

public class RetryJob implements Job {


    private RetryProcessor retryProcessor;


    public RetryJob() {

    }


    public RetryJob(RetryProcessor retryProcessor) {

        this.retryProcessor = retryProcessor;

    }


    @Override

    public void execute(JobExecutionContext context) {

        retryProcessor.doRetry();

    }

}

下边我们看看doRetry都做了些什么

    @Override

    public void doRetry() {

        log.info("开始执行Identity={}的重试,maxRetryCount={}, initialDelay={}", genericRetryHandler.identity(), genericRetryHandler.maxRetryCount(), genericRetryHandler.initialDelay());

        List tasks = retryTaskMapper.queryNeedRetryTaskList(genericRetryHandler.identity(), genericRetryHandler.maxRetryCount(), genericRetryHandler.initialDelay());

        if (tasks == null) {

            return;

        }

        log.info("Identity={}当前有{}个任务准备重试", genericRetryHandler.identity(), tasks.size());

        if (genericRetryHandler.ignoreException()) {

            tasks.forEach(this::doRetryWithIgnoreException);

        } else {

            tasks.forEach(this::doRetry);

        }

    }

相信聪明的你肯定猜到了,没错,取出之前入库的数据开始进行重试

    private void doRetry(RetryTask retryTask) {

        log.info("开始重试Identity={},Id={}的任务", retryTask.getIdentity(), retryTask.getTaskId());

        retryedRetryHandler.setRetryTask(retryTask);

        String json = retryTask.getParams();

        if (StringUtils.isBlank(json)) {

            retryedRetryHandler.handle(null);

        } else {

            retryedRetryHandler.parseArgsAndhandle(json);

        }

    }

重试调用的是retryedRetryHandler.handle()的方法

    @Override

    public Object handle(Object arg) {

        retryTask.setRetryCount(retryTask.getRetryCount() + 1);

        RetryContext retryContext = new RetryContext(genericRetryHandler, arg, retryTask.getRetryCount());

        Object result;

        try {

            result = genericRetryHandler.handle(arg);

            retryContext.setResult(result);

            completeTask(retryTask);

            onRetry(retryContext);

            onComplete(retryContext);

        } catch (NoRetryException e) {

            retryContext.setException(e);


            failureTask(retryTask, retryContext);

            onRetry(retryContext);

            onError(retryContext);

            throw e;

        } catch (RuntimeException e) {

            retryContext.setException(e);


            if (retryTask.getRetryCount() == genericRetryHandler.maxRetryCount()) {

                failureTask(retryTask, retryContext);

            } else {

                update(retryTask, retryContext);

            }


            onRetry(retryContext);


            if (retryContext.getRetryCount() == genericRetryHandler.maxRetryCount()) {

                //重试次数达到最大,触发失败回调

                onError(retryContext);

            }


            throw e;

        }


        return result;

    }

retryedRetryHandler.handle方法主要是调用目标方法后,如果目标方法没报错,则把表中的状态修改成功,发生异常后更新表中的异常信息,达到最大重试次数后,把表中的状态改为失败,当然其中有把数据库中的参数反序列的操作.


到这里,smart-retry的大致流程,源码解读就完成了,当然,这并不是全部代码,只是主流程的代码,有兴趣的同学可以把代码拉下来,详细阅读以下

总结

smart-retry支持异步重试,支持重试持久化,用着还是相当不错的,但是还是有缺点的,比如,1. 只支持有且仅有一个参数2.每一个重试方法都对应一个定时任务,会造成线程的过度使用

所以,我在该源码的基础上,对smart-retry进行了改造,改造点如下

  • 支持重试的方法有多个参数
  • 支持指定抛出哪些异常后重试
  • 支持配置在注解上是否在执行方法前入库
  • 只提供重试的接口给用户,具体定时任务让用户自己去实现,比如改造后例子中的定时任务用的是xxl-job


如果想详细了解改造后的smart-retry,请参照smart-retry改造升级文档




上一篇:5月安全新品播课(2)|企业主机安全面临的三大风险如何解?


下一篇:最新前端初中级面试题合集一,你确定不看一看嘛