smart-retry源代码阅读

简介: 本文主要讲述smart-retry主流程源码阅读

背景

基础技术组的接口重推组件基于smart-retry源码进行了改造

smart-retry信息


仓库地址

https://gitee.com/hack3389/smart-retry/

阅读分支

分支:master

commit

主要功能

Smart Retry主要是用来进行方法重试的。和Guava Retry、Spring Retry相比,Smart Retry最大的特点是异步重试,支持持久化,系统重启之后可以继续重试。

功能特点

  • 方法重试持久化,系统重启之后可以继续重试
  • 异步重试(不支持同步重试)
  • 支持接口实现和声明式方式

架构图


如何使用

引入依赖


 

     com.github.hadoop002.smartretry

     retry-spring4

     使用最新版本

 


初始化表

create table sys_retry_task (

task_id bigint not null primary key auto_increment,

identity_name varchar(50) not null COMMENT '任务的唯一标识',

params text COMMENT '参数',

status tinyint not null COMMENT '状态。1: 处理中,2: 成功,3: 失败',

retry_count int not null default 0 COMMENT '重试次数',

remark varchar(1000) COMMENT '备注',

create_date datetime not null,

edit_date datetime) ENGINE=InnoDB COMMENT='系统重试表';


create index idx_identityname_status ON sys_retry_task(identity_name asc,status asc);

编写业务逻辑

 @RetryFunction(identity = "order.payment")

 public void payOrderAndUpdateStatus(Order order) {

     boolean success = paymentBusiness.doPayment(order);

     if (success) {

         orderBusiness.updateOrderPayStatus(order);

     } else {

         orderBusiness.updateOrderPayFail(order);

     }

 }

或者

 @Slf4j

 @Service("orderPaymentBusiness")

 public class OrderPaymentBusiness implements RetryHandler {

 

     @Autowired

     private PaymentBusiness paymentBusiness;

 

     @Autowired

     private OrderBusiness orderBusiness;

 

     @Override

     public String identity() {

         return "order.payment";

     }

 

     @Override

     public Void handle(Order order) {

         boolean success = paymentBusiness.doPayment(order);

         if (success) {

             orderBusiness.updateOrderPayStatus(order);

         } else {

             orderBusiness.updateOrderPayFail(order);

         }

         return null;

     }

 }

打开开关

在启动入口上加上@EnableRetrying 注解

源码阅读

源码结构

  • retry-cpre:重试模块的核心,定义了一系列的接口和扩展点
  • retry-spring4:基于spring4实现的重试模块
  • retry-serializer-jackson2:使用jackson2来实现参数的序列化和反序列化
  • retry-serializer-gson:使用gson来实现参数的序列化和反序列化
  • retry-serializer-fastjson:使用fastjson来实现参数的序列化和反序列化
  • retry-samples:配套的示例demo,可直接使用

大致流程

  • 系统启动后,把所有com.github.smartretry.core.RetryHandler和带有@RetryFunction注解的方法注册为定时任务。
  • 所有com.github.smartretry.core.RetryHandler和带有@RetryFunction注解的方法都会被Spring进行代理,执行的时候,会先把参数序列化,然后把执行任务插入到数据库。最后根据任务执行的成功与否,更新任务的相应状态。
  • 定时任务定时从表里面获取未成功的任务,进行重试

根据流程走读代码

根据整个流程走读代码应该会对代码有更清晰的认识

系统启动

系统启动的核心处理逻辑主要是在类RetryAnnotationBeanPostProcessor中,下面通过流程仔细分析该类


  • 扫描所有带有@RetryFunction注解和实现RetryHandler接口的类

   @Override

   public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

       if (bean instanceof AopInfrastructureBean) {

           // Ignore AOP infrastructure such as scoped proxies.

           return bean;

       }


       Class targetClass = AopProxyUtils.ultimateTargetClass(bean);

       if (!this.postedClasseCache.contains(targetClass)) {

           Object targetObject = AopProxyUtils.getSingletonTarget(bean);

           if (RetryHandler.class.isAssignableFrom(targetClass)) {

               RetryHandlerUtils.validateRetryHandler(targetClass);

               log.info("发现RetryHandler的实例:{},准备注册", targetClass);

               retryHandlers.add((RetryHandler) targetObject);

               return bean;

           }

           ReflectionUtils.MethodFilter methodFilter = method -> method.getAnnotation(RetryFunction.class) != null;

           Set methods = MethodIntrospector.selectMethods(targetClass, methodFilter);

           methods.forEach(method -> processRetryFunction(targetObject, method));


           postedClasseCache.add(targetClass);

       }

       return bean;

   }

改类实现了BeanPostProcessor接口,重写了postProcessAfterInitialization方法(每个bean初始化之后执行)

主要为两处判断/过滤

1.判断是否实现了RetryHandler

if (RetryHandler.class.isAssignableFrom(targetClass)) {

2.过滤打了@RetryFunction的方法

ReflectionUtils.MethodFilter methodFilter = method -> method.getAnnotation(RetryFunction.class) != null;

Set methods = MethodIntrospector.selectMethods(targetClass, methodFilter);

  • 把打了@RetryFunction注解的都转化为RetryHandler,即最终都是走的RetryHandler

   protected void processRetryFunction(Object bean, Method method) {

       log.info("发现@RetryFunction的实例:{},准备注册", method.toString());

       Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());

       RetryHandlerUtils.validateRetryFunction(method);


       RetryFunction retryFunction = method.getAnnotation(RetryFunction.class);

       Supplier retryListenerSupplier = () -> {

           RetryListener retryListener = null;

           String retryListenerName = retryFunction.retryListener();

           if (StringUtils.isNotBlank(retryListenerName)) {

               retryListener = defaultListableBeanFactory.getBean(retryListenerName, RetryListener.class);

           }

           return retryListener;

       };

       retryHandlers.add(new MethodRetryHandler(bean, invocableMethod, retryFunction, retryListenerSupplier));

   }

  • 把所有的retryHandlers遍历注册为定时任务,默认用的quartz

   @Override

   public void afterSingletonsInstantiated() {

       postedClasseCache.clear();


       this.retryTaskMapper = defaultListableBeanFactory.getBean(RetryTaskMapper.class);

       this.retryRegistry = defaultListableBeanFactory.getBean(RetryRegistry.class);


       boolean beforeTask = environment.getProperty(EnvironmentConstants.RETRY_BEFORETASK, Boolean.class, Boolean.TRUE);

       this.retrySerializer = getRetrySerializerFromBeanFactory(defaultListableBeanFactory);

       if (this.retrySerializer == null) {

           this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(retryTaskMapper, beforeTask);

       } else {

           this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(new DefaultRetryTaskFactory(retrySerializer), retryTaskMapper, beforeTask);

       }


       retryHandlers.forEach(this::registerJobBean);


       retryHandlers.clear();

   }


   protected void registerJobBean(RetryHandler retryHandler) {

       if (retryHandler.identity().length() > 50) {

           throw new IllegalArgumentException("identity=" + retryHandler.identity() + " is too long, it must be less than 50");

       }


       RetryHandler retryHandlerProxy = retryHandlerPostProcessor.doPost(retryHandler);

       RetryHandlerRegistration.registry(retryHandlerProxy);


       RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer);


       retryRegistry.register(retryHandler, retryProcessor);

   }

至此,系统启动所做的任务就完成了

打有@RetryFunction的注解和实现RetryHandler的接口的方法都会被Spring代理

  • @RetryFunction注解的方法如何被代理

public class RetryHandlerMethodPointcut implements Pointcut {


   @Override

   public ClassFilter getClassFilter() {

       return ClassFilter.TRUE;

   }


   @Override

   public MethodMatcher getMethodMatcher() {

       return new StaticMethodMatcher() {


           @Override

           public boolean matches(Method method, Class targetClass) {

               return RetryHandlerUtils.isRetryFunctionMethod(method);

           }

       };

   }

}


   public static boolean isRetryFunctionMethod(Method method) {

       if (method.getAnnotation(RetryFunction.class) != null && method.getParameterCount() == 1) {

           return !Object.class.equals(method.getParameterTypes()[0]);

       }

       return false;

   }

实现Pointcut接口通过isRetryFunctionMethod(Method method)方法判断是否是需要代理的方法

  • 实现RetryHandler接口的方法如何被代理

public class RetryHandlerClassPointcut implements Pointcut {


   @Override

   public ClassFilter getClassFilter() {

       return RetryHandler.class::isAssignableFrom;

   }


   @Override

   public MethodMatcher getMethodMatcher() {

       return new StaticMethodMatcher() {


           @Override

           public boolean matches(Method method, Class targetClass) {

               return RetryHandlerUtils.isRetryHandlerMethod(targetClass, method);

           }

       };

   }

}

   public static boolean isRetryHandlerMethod(Class targetClass, Method method) {

       if ("handle".equals(method.getName()) && method.getParameterCount() == 1 && method.isBridge() && method.isSynthetic()) {

           //RetryHandler接口有泛型,需要特殊处理

           return true;

       }

       Type interfaceType = getRetryHandlerGenericInterface(targetClass);

       if (interfaceType == null) {

           return false;

       }

       Class argsInputType = Object.class;

       if (interfaceType instanceof ParameterizedType) {

           argsInputType = (Class) ((ParameterizedType) interfaceType).getActualTypeArguments()[0];

       }

       Class parameterType = argsInputType;

       return "handle".equals(method.getName()) && method.getParameterCount() == 1 && method.getParameterTypes()[0].equals(parameterType);

   }

先对类进行过滤,要求实现RetryHandler

   @Override

   public ClassFilter getClassFilter() {

       return RetryHandler.class::isAssignableFrom;

   }

再对方法进行过滤,详细请看isRetryHandlerMethod(Class targetClass, Method method)方法


打有@RetryFunction注解的方法被调用时

当执行到带有@RetryFunction方法时(实现了RetryHandler也差不多的逻辑,就不再赘述了),会被方法拦截器拦截,

public class RetryHandlerMethodInterceptor implements MethodInterceptor {


   @Override

   public Object invoke(MethodInvocation invocation) {

       RetryFunction retryFunction = invocation.getMethod().getAnnotation(RetryFunction.class);

       Object[] args = invocation.getArguments();

       String identity = retryFunction.identity();

       if (StringUtils.isBlank(identity)) {

           identity = RetryHandlerUtils.getMethodIdentity(invocation.getMethod());

       }

       Optional optional = RetryHandlerRegistration.get(identity);

       if (optional.isPresent()) {

           return optional.get().handle(ArrayUtils.isEmpty(args) ? null : args[0]);

       }

       throw new IllegalArgumentException("找不到对应的RetryHandler代理,identity=" + identity);

   }

}

因为RetryHandlerRegistration中注册的是ImmediatelyRetryHandler,所以执行的是ImmediatelyRetryHandler的handle方法

       RetryHandler retryHandlerProxy = retryHandlerPostProcessor.doPost(retryHandler);

       RetryHandlerRegistration.registry(retryHandlerProxy);

doPost方法创建的是ImmediatelyRetryHandler

   @Override

   public RetryHandler doPost(RetryHandler retryHandler) {

       if (retryHandler instanceof GenericRetryHandler) {

           return new ImmediatelyRetryHandler((GenericRetryHandler) retryHandler, retryTaskFactory, retryTaskMapper, beforeTask);

       }

       return new ImmediatelyRetryHandler(new DefaultRetryHandler(retryHandler), retryTaskFactory, retryTaskMapper, beforeTask);

   }

接下来我们看看ImmediatelyRetryHandler.handle方法做了什么

   @Override

   public Object handle(Object arg) {

       RetryContext retryContext = new RetryContext(genericRetryHandler, arg);

       Object result;

       RetryTask retryTask;

       // 是否在执行任务之前插入数据库 |配置false则表示,只有任务执行报错才插入数据库|

       if (beforeTask) {

           retryTask = retryTaskFactory.create(genericRetryHandler, arg);

           retryTaskMapper.insert(retryTask);

           try {

               result = genericRetryHandler.handle(arg);

               retryContext.setResult(result);

               completeTask(retryTask);

               onRetry(retryContext);

               onComplete(retryContext);

           } catch (NoRetryException e) {

               retryContext.setException(e);

               failureTask(retryTask, retryContext);


               onRetry(retryContext);

               onError(retryContext);

               throw e;

           } catch (RuntimeException e) {

               retryContext.setException(e);


               if (retryContext.getRetryCount() == genericRetryHandler.maxRetryCount()) {

                   //只有最大可重试次数为0,才会执行到这里

                   failureTask(retryTask, retryContext);


                   onRetry(retryContext);

                   onError(retryContext);

               } else {

                   updateRemark(retryTask, e);

                   onRetry(retryContext);

               }


               throw e;

           }

           return result;

       } else {

           try {

               result = genericRetryHandler.handle(arg);

               retryContext.setResult(result);

               onRetry(retryContext);

               onComplete(retryContext);

           } catch (NoRetryException e) {

               retryContext.setException(e);


               onRetry(retryContext);

               onError(retryContext);


               throw e;

           } catch (RuntimeException e) {

               retryContext.setException(e);

               if (retryContext.getRetryCount() == genericRetryHandler.maxRetryCount()) {

                   //只有最大可重试次数为0,才会执行到这里

                   onRetry(retryContext);

                   onError(retryContext);

               } else {

                   //等待重试

                   retryTask = retryTaskFactory.create(genericRetryHandler, arg);

                   retryTask.setRemark(StringUtils.left(e.getMessage(), 1000));

                   retryTaskMapper.insert(retryTask);

                   onRetry(retryContext);

               }


               throw e;

           }

       }

       return result;

   }

这个方法有点长,不过代码还算简单,简而言之就是重试的方法发生异常后入库( retryTaskFactory.create(genericRetryHandler, arg);)的操作,当然里边有序列化参数,修改重试表的状态等操作,就不再详细讲了(比较简单,相信大家都看得懂(*╹▽╹*)

至此,调用带有@RetryFunction注解的方法第一被调用,以及如何把重试任务入库的操作就完成了,下面讲解重试的逻辑

定时重试逻辑

上边有讲到把重试任务注册为定时任务的逻辑,再看一下代码吧

       RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer);


       retryRegistry.register(retryHandler, retryProcessor);

可以看到,注册的是一个DefaultRetryProcessor,就是说,每次定时任务调用的是该类的doRetry方法,以quartz为例

public class RetryJob implements Job {


   private RetryProcessor retryProcessor;


   public RetryJob() {

   }


   public RetryJob(RetryProcessor retryProcessor) {

       this.retryProcessor = retryProcessor;

   }


   @Override

   public void execute(JobExecutionContext context) {

       retryProcessor.doRetry();

   }

}

下边我们看看doRetry都做了些什么

   @Override

   public void doRetry() {

       log.info("开始执行Identity={}的重试,maxRetryCount={}, initialDelay={}", genericRetryHandler.identity(), genericRetryHandler.maxRetryCount(), genericRetryHandler.initialDelay());

       List tasks = retryTaskMapper.queryNeedRetryTaskList(genericRetryHandler.identity(), genericRetryHandler.maxRetryCount(), genericRetryHandler.initialDelay());

       if (tasks == null) {

           return;

       }

       log.info("Identity={}当前有{}个任务准备重试", genericRetryHandler.identity(), tasks.size());

       if (genericRetryHandler.ignoreException()) {

           tasks.forEach(this::doRetryWithIgnoreException);

       } else {

           tasks.forEach(this::doRetry);

       }

   }

相信聪明的你肯定猜到了,没错,取出之前入库的数据开始进行重试

   private void doRetry(RetryTask retryTask) {

       log.info("开始重试Identity={},Id={}的任务", retryTask.getIdentity(), retryTask.getTaskId());

       retryedRetryHandler.setRetryTask(retryTask);

       String json = retryTask.getParams();

       if (StringUtils.isBlank(json)) {

           retryedRetryHandler.handle(null);

       } else {

           retryedRetryHandler.parseArgsAndhandle(json);

       }

   }

重试调用的是retryedRetryHandler.handle()的方法

   @Override

   public Object handle(Object arg) {

       retryTask.setRetryCount(retryTask.getRetryCount() + 1);

       RetryContext retryContext = new RetryContext(genericRetryHandler, arg, retryTask.getRetryCount());

       Object result;

       try {

           result = genericRetryHandler.handle(arg);

           retryContext.setResult(result);

           completeTask(retryTask);

           onRetry(retryContext);

           onComplete(retryContext);

       } catch (NoRetryException e) {

           retryContext.setException(e);


           failureTask(retryTask, retryContext);

           onRetry(retryContext);

           onError(retryContext);

           throw e;

       } catch (RuntimeException e) {

           retryContext.setException(e);


           if (retryTask.getRetryCount() == genericRetryHandler.maxRetryCount()) {

               failureTask(retryTask, retryContext);

           } else {

               update(retryTask, retryContext);

           }


           onRetry(retryContext);


           if (retryContext.getRetryCount() == genericRetryHandler.maxRetryCount()) {

               //重试次数达到最大,触发失败回调

               onError(retryContext);

           }


           throw e;

       }


       return result;

   }

retryedRetryHandler.handle方法主要是调用目标方法后,如果目标方法没报错,则把表中的状态修改成功,发生异常后更新表中的异常信息,达到最大重试次数后,把表中的状态改为失败,当然其中有把数据库中的参数反序列的操作.


到这里,smart-retry的大致流程,源码解读就完成了,当然,这并不是全部代码,只是主流程的代码,有兴趣的同学可以把代码拉下来,详细阅读以下

总结

smart-retry支持异步重试,支持重试持久化,用着还是相当不错的,但是还是有缺点的,比如,1. 只支持有且仅有一个参数2.每一个重试方法都对应一个定时任务,会造成线程的过度使用

所以,我在该源码的基础上,对smart-retry进行了改造,改造点如下

  • 支持重试的方法有多个参数
  • 支持指定抛出哪些异常后重试
  • 支持配置在注解上是否在执行方法前入库
  • 只提供重试的接口给用户,具体定时任务让用户自己去实现,比如改造后例子中的定时任务用的是xxl-job


如果想详细了解改造后的smart-retry,请参照smart-retry改造升级文档




相关文章
|
4月前
|
Java 网络安全 Maven
简记:一个flutter构建错误A problem occurred configuring project ‘:smart_auth‘. > Could not res
简记:一个flutter构建错误A problem occurred configuring project ‘:smart_auth‘. > Could not res
92 0
|
12月前
|
Oracle 关系型数据库 Linux
Autonomous Health Framework (AHF)笔记
Oracle Autonomous Health包括Oracle ORAchk或 Oracle EXAchk 和 Oracle Trace File Analyzer (TFA)
111 0
|
关系型数据库 MySQL 数据库
Communications link failure的解决办法
Communications link failure的解决办法
553 0
|
监控 人机交互 芯片
西门子S7-200 SMART的功能和特点有哪些?STEP7-Micro/WIN SMART编程软件简介
西门子S7-200 SMART是西门子公司针对中国小型自动化市场客户需求设计研发的一款高性价比小型PLC产品。S7-200 SMART CPU将微处理器、集成电源、输入输出电路组合到一个设计紧凑的外壳中,已形成功能强大的小型plc。面板包含电源接线端子、直流24V电源输出端子、数字量输入输出接线端子、CPU状态指示灯、IO状态指示灯、存储卡插槽、以太网接口、RS485接口等。
西门子S7-200 SMART的功能和特点有哪些?STEP7-Micro/WIN SMART编程软件简介
【VCS】PCIe Native Protocol Analyzer 使用方法
【VCS】PCIe Native Protocol Analyzer 使用方法
301 0
【VCS】PCIe Native Protocol Analyzer 使用方法
SAP WM初阶Interim Storage Type不好启用Storage Unit Management
SAP WM初阶Interim Storage Type不好启用Storage Unit Management
SAP WM初阶Interim Storage Type不好启用Storage Unit Management
SAP MM在ML81N事务代码界面报错- Customizing incorrectly maintained –之对策
SAP MM在ML81N事务代码界面报错- Customizing incorrectly maintained –之对策
SAP MM在ML81N事务代码界面报错- Customizing incorrectly maintained –之对策
|
SQL fastjson 关系型数据库
基于smart-retry源码改造升级
本文主要讲述基于smart-retry进行优化改造思路
艾伟_转载:.NET Discovery 系列之六--Me JIT(下)
本系列文章导航 .NET Discovery 系列之一--string从入门到精通(上) .NET Discovery 系列之二--string从入门到精通(勘误版下) .NET Discovery 系列之三--深入理解.
1040 0
|
.NET C#
艾伟_转载:.Net Discovery 系列之五--Me JIT(上)
本系列文章导航 .NET Discovery 系列之一--string从入门到精通(上) .NET Discovery 系列之二--string从入门到精通(勘误版下) .NET Discovery 系列之三--深入理解.
1005 0