基于smart-retry源码改造升级

简介: 本文主要讲述基于smart-retry进行优化改造思路

前瞻

阅读本篇前,请先阅读smart-retry源码阅读文章

背景

smart-retry有以下缺点

  • 只支持入参有且仅有一个
  • 每一个重试方法都对应一个定时任务,会造成线程的过度使用
  • 不支持抛出指定异常后重试

基于此,对smart-retry做了升级改造

改造后特性

针对smart-retry的确定的改进

  • 支持多个入参方法重试
  • 只提供重试接口给用户,具体定时任务选用由用户决定,灵活性大大增加
  • 支持抛出指定异常后重试


新增的功能

  • 支持配置在注解上配置是否在执行方法前入库
  • 增加了重试记录表,把每次重试都记录下来
  • 重试规则由cron表达式改为按时间,或者间隔重试,更通俗易懂

代码跟进改造点

下边会讲述代码如何改造

支持多个入参方法重试

改造前入参,以及参数类型都是单个,改成数组,我列举这两种的前后对比,因为改的地方比较多,我不一一列举

改造前入参

   public Object handle(Object arg) {

改造后入参

public Object handle(Object arg[]) {

改造前入参类型(用于序列化和反序列化)

   public Class<?> getInputArgsType() {

       return inputArgsType;

   }

改造后入参类型(用于序列化和反序列化)

   public Class<?>[] getInputArgsType() {

       return inputArgsType;

   }

只提供重试接口给用户,具体定时任务选用由用户决定,灵活性大大增加

我们先看原来的代码

   @Override

   public void afterSingletonsInstantiated() {

       postedClasseCache.clear();


       this.retryTaskMapper = defaultListableBeanFactory.getBean(RetryTaskMapper.class);

       this.retryRegistry = defaultListableBeanFactory.getBean(RetryRegistry.class);


       boolean beforeTask = environment.getProperty(EnvironmentConstants.RETRY_BEFORETASK, Boolean.class, Boolean.TRUE);

       this.retrySerializer = getRetrySerializerFromBeanFactory(defaultListableBeanFactory);

       if (this.retrySerializer == null) {

           this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(retryTaskMapper, beforeTask);

       } else {

           this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(new DefaultRetryTaskFactory(retrySerializer), retryTaskMapper, beforeTask);

       }


       retryHandlers.forEach(this::registerJobBean);


       retryHandlers.clear();

   }

   protected void registerJobBean(RetryHandler retryHandler) {

       if (retryHandler.identity().length() > 50) {

           throw new IllegalArgumentException("identity=" + retryHandler.identity() + " is too long, it must be less than 50");

       }


       RetryHandler retryHandlerProxy = retryHandlerPostProcessor.doPost(retryHandler);

       RetryHandlerRegistration.registry(retryHandlerProxy);


       RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer);


       retryRegistry.register(retryHandler, retryProcessor);

   }

   @Override

   public void register(RetryHandler retryHandler, RetryProcessor retryProcessor) {

       if (StringUtils.isBlank(retryHandler.cron())) {

           throw new IllegalArgumentException("identity=" + retryHandler.identity() + ", 使用Elastic-Job注册器,必须指定RetryHandler/RetryFunction的cron表达式");

       }

       BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);

       beanDefinitionBuilder.addConstructorArgValue(new RetryJob(retryProcessor));

       beanDefinitionBuilder.addConstructorArgValue(registryCenter);

       beanDefinitionBuilder.addConstructorArgValue(createLiteJobConfiguration(retryHandler));

       if (jobEventConfiguration != null) {

           beanDefinitionBuilder.addConstructorArgValue(jobEventConfiguration);

       }

       beanDefinitionBuilder.addConstructorArgValue(getElasticJobListeners());

       beanDefinitionBuilder.setInitMethodName("init");


       String jobBeanName = getJobBeanName(retryHandler);

       defaultListableBeanFactory.registerBeanDefinition(jobBeanName, beanDefinitionBuilder.getBeanDefinition());


       //此处的getBean调用是为了手工触发Bean的初始化

       defaultListableBeanFactory.getBean(jobBeanName);

       log.info("identity={}已成功注册到Elastic-Job", retryHandler.identity());

   }

可以看到,每一个retryHandler会单独注册一个定时任务,并且注册的时候需要指定注册到哪个定时任务

再看改造后代码,先拿到所有retryHandler,封装为可执行的RetryFunctionProcessor

   @Override

   public void afterSingletonsInstantiated() {

       postedClasseCache.clear();


       this.retryTaskMapper = defaultListableBeanFactory.getBean(RetryTaskMapper.class);

       RetryFunctionProcessor retryFunctionProcessor = defaultListableBeanFactory.getBean(RetryFunctionProcessor.class);


       this.retrySerializer = getRetrySerializerFromBeanFactory(defaultListableBeanFactory);


       List<RetryProcessor> retryProcessorList = new ArrayList<>();

       for (RetryHandler retryHandler : retryHandlers) {

           if (retryHandler.identity().length() > 50) {

               throw new IllegalArgumentException("identity=" + retryHandler.identity() + " is too long, it must be less than 50");

           }


           RetryHandler retryHandlerProxy =new ImmediatelyRetryHandler(retryHandler, new DefaultRetryTaskFactory(retrySerializer), retryTaskMapper, retryRule);

           RetryHandlerRegistration.registry(retryHandlerProxy);


           RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer, retryRule);

           retryProcessorList.add(retryProcessor);

       }

       

       retryFunctionProcessor.register(retryProcessorList);


       retryHandlers.clear();

   }

RetryFunctionProcessor其实就是把retryProcessorList保存为成员变量,提供给用户去调用,调用方式将在如何使用篇章讲解

支持抛出特定异常后重试

smart-retry只能在抛出RuntimeException的时候,进行重试,改造后,可以支持抛出特定异常后重试,这样当自定义异常时,更加灵活,代码改造点如下

RetryFunction注解增加变量

   /**

    * 如果方法抛出该异常则会创建重试任务

    */

   Class<? extends RuntimeException>[] retryException() default {RuntimeException.class};

在执行方法抛出异常时,判断是否在配置的这些异常范围,来决定是否要进行重试

       } catch (RuntimeException e) {

           boolean isIncludeException = RetryHandlerUtils.isIncludeException(e, retryException);

           if (!isIncludeException) {

               throw e;

           }

支持配置在注解上配置是否在执行方法前入库

smart-retry是支持是否在执行方法前入库的,但是是全局的配置,改造后可以对每一个任务单独配置

   /**

    * 是否在执行任务之前插入数据库 |配置false则表示,只有任务执行报错才插入数据库|

    * @return

    */

   boolean beforeTask() default false;

具体逻辑就比较简单了,就是根据配置,来决定是否在抛出异常前插入数据

增加了重试记录表,把每次重试都记录下来

smart-retry只有一张重试表,比如重试过5次,没办法知道这5次重试的具体详情,只能知道最后一次是成功还是失败等,所以增加了重试记录表,来记录每一次重试的记录,利用了spring的切面的原理,对原代码没有任何侵入

@Aspect

@Slf4j

public class RetryTaskRecordAspect implements ApplicationContextAware {


   private ApplicationContext applicationContext;

   

   @Pointcut("execution(* com.aliyun.gts.bpaas.retry.core.RetryTaskMapper.insert(..)) ||" +

           "execution(* com.aliyun.gts.bpaas.retry.core.RetryTaskMapper.update(..))")

   public void pointCut(){

       

   }

   

   @Around(value = "pointCut()")

   public Object insertTaskRecord(ProceedingJoinPoint proceedingJoinPoint) {

       Object o = null;

       try {

           o = proceedingJoinPoint.proceed();

       } catch (Throwable throwable) {

           log.error("切面insertTaskRecord报错,", throwable);

       }


       Object[] args = proceedingJoinPoint.getArgs();

       RetryTask retryTask = (RetryTask) args[0];

       RetryTaskMapper retryTaskMapper = applicationContext.getBean(RetryTaskMapper.class);

       retryTaskMapper.insertTaskRecord(retryTask);

       return o;

   }


   @Override

   public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

       this.applicationContext = applicationContext;  

   }

}

重试规则由cron表达式改为按时间,或者间隔重试,更通俗易懂

改造前:smart-retry原来的重试规则,是为每一条重试任务配置一个cron表达式

改造后:重试规则在application.properties中全局配置,更方便管理,cron表达式改为按时间,或者间隔,更容易配置,不易配置错

   /**

    * 重试规则,有两种方式,指定时间或者间隔

    * 1.时间,格式为:onTime-时间-最大重试次数,例如  onTime-23:59:59-3,onTime为类型,23:59:59为时间的时分秒,3位最大重试次数,用中划线隔开

    * 2.间隔,格式为:onInterval-间隔重试规则,例如 onInterval-10,20,30,onInterval为类型,10,20,30表示每次重试的间隔时间,单位为秒

    */

   private String retryRule;

指定时间重试的实现逻辑:获取到下次的执行时间,如果执行时间小于当前时间,则需要加一天,否则,就是该时间

指定间隔重试的实现逻辑:现在的时间,加上重试次数对应的间隔时间即可

代码如下

   /**

    * 获取下次执行时间

    * @param retryRule

    * @param retryCount

    * @return

    */

   public static LocalDateTime getNextExecTime(RetryRule retryRule, int retryCount) {

       

       if (retryRule instanceof OnTimeRetryRule) {

           OnTimeRetryRule onTimeRetryRule = (OnTimeRetryRule) retryRule;


           LocalTime retryTime = onTimeRetryRule.getRetryTime();


           LocalDateTime nextExecTime = LocalDateTime.of(LocalDate.now(), retryTime);

           

           if (nextExecTime.isAfter(LocalDateTime.now())) {

               return nextExecTime;

           } else {

               return nextExecTime.plusDays(1L);

           }

       }


       OnIntervalRetryRule onIntervalRetryRule = (OnIntervalRetryRule) retryRule;


       Long[] retryInterval = onIntervalRetryRule.getRetryInterval();

       

       return plusSeconds(retryInterval[retryCount]);

   }

如何使用


建表

接口重推依赖两张表重试表以及重试记录表

重试表sql

create table sys_retry_task (

task_id bigint not null primary key auto_increment,

identity_name varchar(50) not null COMMENT '任务的唯一标识',

params text COMMENT '参数',

status tinyint not null COMMENT '状态。1: 处理中,2: 成功,3: 失败',

retry_count int not null default 0 COMMENT '重试次数',

remark varchar(1000) COMMENT '备注',

create_date datetime not null,

edit_date datetime,

next_date DATETIME COMMENT '下次执行的时间') ENGINE=InnoDB COMMENT='系统重试表';


create index idx_identityname_status ON sys_retry_task(identity_name asc,status asc);

重试记录表sql

CREATE TABLE `sys_retry_task_record`

(

   `id`            BIGINT(20)    NOT NULL AUTO_INCREMENT,

   `task_id`       BIGINT(20)    NULL     DEFAULT NULL COMMENT '重试任务id',

   `identity_name` VARCHAR(50)   NOT NULL COMMENT '任务的唯一标识' COLLATE 'utf8_general_ci',

   `params`        TEXT          NULL     DEFAULT NULL COMMENT '参数' COLLATE 'utf8_general_ci',

   `status`        TINYINT(4)    NOT NULL COMMENT '状态。1: 处理中,2: 成功,3: 失败',

   `retry_count`   INT(11)       NOT NULL DEFAULT '0' COMMENT '重试次数',

   `remark`        VARCHAR(1000) NULL     DEFAULT NULL COMMENT '备注' COLLATE 'utf8_general_ci',

   `next_date`     DATETIME      NULL     DEFAULT NULL COMMENT '下次执行的时间',

   `edit_date`     DATETIME      NULL     DEFAULT NULL,

   `create_date`   DATETIME      NOT NULL,

   PRIMARY KEY (`id`) USING BTREE,

   INDEX `idx_identityname_status` (`identity_name`, `status`) USING BTREE

)

   COMMENT ='系统重试记录表'

   COLLATE = 'utf8_general_ci'

   ENGINE = InnoDB


引入依赖

       <dependency>

           <groupId>com.aliyun.gts.bpaas</groupId>

           <artifactId>aliyun-gts-retry-starter</artifactId>

           <version>1.0.0-SNAPSHOT</version>

       </dependency>

application.properties配置

# 数据源配置,注意要和建表的数据源和数据库在同一个

spring.datasource.driver-class-name=com.mysql.jdbc.Driver

spring.datasource.url=jdbc:mysql://localhost:3306/test?characterEncoding=utf8&autoReconnect=true&useSSL=false

spring.datasource.username=root

spring.datasource.password=123456


# 重试开关,默认为false

gts.retry.enable=true

# 重试的规则

gts.retry.retry-rule=onInterval-10,30,30,30

# 序列化方式,支持fastjson,jackson以及gson,默认为fastjson

gts.retry.serialize-type=fastjson

下边对重试规则配置gts.retry.retry-rule进行详细说明

重试规则,有两种方式,指定时间或者间隔
1. 时间,格式为:onTime-时间-最大重试次数,例如(onTime-23:59:59-3),onTime为类型,23:59:59为时间的时分秒,3位最大重试次数,用中划线隔开
2. 间隔,格式为:onInterval-间隔重试规则,例如 (onInterval-10,20,30),onInterval为类型,10,20,30表示每次重试的间隔时间,单位为秒

编程界面

接口重推需要的编程界面有两处,第一处是在方法上打上@RetryFunction注解,第二处是在定时任务中调用处理定时任务的方法

在方法上打上@RetryFunction注解

   @RetryFunction(identity = "demo.simplest", beforeTask = true,

           retryListener = SimpleTestRetryListener.class,

           retryException = {RuntimeException.class}, ignoreException = true)

   public void simplestWithId(int id) {

       

       log.info("simplestWithId[{}]执行开始", id);

       // doSomething()

       log.info("simplestWithId[{}]执行完成", id);

   }

下面对注解中的每一个属性做详细的解释

属性

类型

备注

默认值

identity

string

唯一标识,系统内不能重复

长度要小50个字节

类的全名称+方法名称

beforeTask

boolean

是否在执行任务之前插入数据

配置false则表示,只有任务执行报错才插入数据库|,true表示在方法执行前就会插入数据库

false

retryListener

Class<? extends RetryListener>

任务监听器。可以在任务重试、任务完成、任务失败时进行回调

onRetry():每次重试时触发(执行后触发)

onComplete():任务完成时触发

onError():失败时触发(超过最大重试次数)

不进行任务监听

retryException

Class<? extends RuntimeException>[]

如果方法抛出指定异常则会创建重试任务或执行重试动作

{RuntimeException.class}

ignoreException

boolean

当重试任务有多个的时候,上一个重试报错,是否忽略错误继续执行下一个任务

true

定时任务中触发重试任务示例

@Slf4j

@Component

public class DistributeRetrySchedule extends AbstractGtsSchedulerTaskProcessor {


   

   @Autowired

   private RetryFunctionProcessor retryFunctionProcessor;

   

   @Override

   public String getTaskId(){

       return "retry-job";

   }


   @Override

   public GtsSchedulerTaskResult process(GtsSchedulerTaskParameter parameter) throws Exception{

       DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

       log.info("执行定时任务,当前时间{}",format.format(new Date()));

       retryFunctionProcessor.processTask();

       return GtsSchedulerTaskResult.successResult();

   }

}

重试任务持久化示例

重试表

重试记录表

相关文章
Java版本对比:特性、升级改动与优势分析
Java版本对比:特性、升级改动与优势分析
181 0
将良田高拍仪的EloamViewJavaDemo改造为ActiveMQ网络调用
与客户端的硬件驱动交互一般要通过独立的消息服务客户端来完成,本文以良田高拍仪的自动裁剪摄像调用与ActiveMQ消息服务的交互为例,展开这一过程。
227 0
|
10月前
|
总结反思 持续进步-开源即时通讯(IM)项目OpenIM 新版本release-v3.7发布
背景 过去,我们团队对开源项目的认知较浅,过分追求进度,而忽视了代码的质量和规范。这导致了一些问题,例如部署流程设计不当:流程复杂、不规范,以及Mac与Windows部署的明显缺陷。这些问题不仅给开发者带来了困扰,也增加了社区维护的难度。 针对这些挑战,我们团队进行了深刻的反思并总结出了相关问题。目前,我们正在专注于提高代码质量和规范化工作,并在完善我们的开源贡献流程,以吸引更多贡献者参与。我们相信这是关键步骤,以扩大社区的影响力并将OpenIM发展成为一流的开源项目。 为此,我们已规划推出release-v3.7版本,该版本将全面改造部署、规范和流程等方面,并且与3.6版本数据完全兼容。
232 0
低版本SpringBoot Redis缓存旁路设计改造方案实践
低版本SpringBoot Redis缓存旁路设计改造方案实践
412 0
Zookeeper的服务器的log4j升级为log4j2的升级方案(忽略配置化兼容问题)
Zookeeper的服务器的log4j升级为log4j2的升级方案(忽略配置化兼容问题)
506 0
Zookeeper的服务器的log4j升级为log4j2的升级方案(忽略配置化兼容问题)
【第五篇-完结篇】XiaoZaiMultiAutoAiDevices之改造扩展
在前面系列文章中有讲到,使用configparser,ini格式的文件作为配置文件,在新增或者删除其中的值时,会丢失所有注释,所以在框架源码注释中我有写到,如果对这方面比较介意或者是有需求的话,可以进行更改配置文件。
162 0
SpringCloud升级之路2020.0.x版-42.SpringCloudGateway 现有的可供分析的请求日志以及缺陷(下)
SpringCloud升级之路2020.0.x版-42.SpringCloudGateway 现有的可供分析的请求日志以及缺陷(下)
SpringCloud升级之路2020.0.x版-42.SpringCloudGateway 现有的可供分析的请求日志以及缺陷(上)
SpringCloud升级之路2020.0.x版-42.SpringCloudGateway 现有的可供分析的请求日志以及缺陷(上)
SpringCloud升级之路2020.0.x版-42.SpringCloudGateway 现有的可供分析的请求日志以及缺陷(上)