基于smart-retry源码改造升级

简介: 本文主要讲述基于smart-retry进行优化改造思路

前瞻

阅读本篇前,请先阅读smart-retry源码阅读文章

背景

smart-retry有以下缺点

  • 只支持入参有且仅有一个
  • 每一个重试方法都对应一个定时任务,会造成线程的过度使用
  • 不支持抛出指定异常后重试

基于此,对smart-retry做了升级改造

改造后特性

针对smart-retry的确定的改进

  • 支持多个入参方法重试
  • 只提供重试接口给用户,具体定时任务选用由用户决定,灵活性大大增加
  • 支持抛出指定异常后重试


新增的功能

  • 支持配置在注解上配置是否在执行方法前入库
  • 增加了重试记录表,把每次重试都记录下来
  • 重试规则由cron表达式改为按时间,或者间隔重试,更通俗易懂

代码跟进改造点

下边会讲述代码如何改造

支持多个入参方法重试

改造前入参,以及参数类型都是单个,改成数组,我列举这两种的前后对比,因为改的地方比较多,我不一一列举

改造前入参

   public Object handle(Object arg) {

改造后入参

public Object handle(Object arg[]) {

改造前入参类型(用于序列化和反序列化)

   public Class<?> getInputArgsType() {

       return inputArgsType;

   }

改造后入参类型(用于序列化和反序列化)

   public Class<?>[] getInputArgsType() {

       return inputArgsType;

   }

只提供重试接口给用户,具体定时任务选用由用户决定,灵活性大大增加

我们先看原来的代码

   @Override

   public void afterSingletonsInstantiated() {

       postedClasseCache.clear();


       this.retryTaskMapper = defaultListableBeanFactory.getBean(RetryTaskMapper.class);

       this.retryRegistry = defaultListableBeanFactory.getBean(RetryRegistry.class);


       boolean beforeTask = environment.getProperty(EnvironmentConstants.RETRY_BEFORETASK, Boolean.class, Boolean.TRUE);

       this.retrySerializer = getRetrySerializerFromBeanFactory(defaultListableBeanFactory);

       if (this.retrySerializer == null) {

           this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(retryTaskMapper, beforeTask);

       } else {

           this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(new DefaultRetryTaskFactory(retrySerializer), retryTaskMapper, beforeTask);

       }


       retryHandlers.forEach(this::registerJobBean);


       retryHandlers.clear();

   }

   protected void registerJobBean(RetryHandler retryHandler) {

       if (retryHandler.identity().length() > 50) {

           throw new IllegalArgumentException("identity=" + retryHandler.identity() + " is too long, it must be less than 50");

       }


       RetryHandler retryHandlerProxy = retryHandlerPostProcessor.doPost(retryHandler);

       RetryHandlerRegistration.registry(retryHandlerProxy);


       RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer);


       retryRegistry.register(retryHandler, retryProcessor);

   }

   @Override

   public void register(RetryHandler retryHandler, RetryProcessor retryProcessor) {

       if (StringUtils.isBlank(retryHandler.cron())) {

           throw new IllegalArgumentException("identity=" + retryHandler.identity() + ", 使用Elastic-Job注册器,必须指定RetryHandler/RetryFunction的cron表达式");

       }

       BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);

       beanDefinitionBuilder.addConstructorArgValue(new RetryJob(retryProcessor));

       beanDefinitionBuilder.addConstructorArgValue(registryCenter);

       beanDefinitionBuilder.addConstructorArgValue(createLiteJobConfiguration(retryHandler));

       if (jobEventConfiguration != null) {

           beanDefinitionBuilder.addConstructorArgValue(jobEventConfiguration);

       }

       beanDefinitionBuilder.addConstructorArgValue(getElasticJobListeners());

       beanDefinitionBuilder.setInitMethodName("init");


       String jobBeanName = getJobBeanName(retryHandler);

       defaultListableBeanFactory.registerBeanDefinition(jobBeanName, beanDefinitionBuilder.getBeanDefinition());


       //此处的getBean调用是为了手工触发Bean的初始化

       defaultListableBeanFactory.getBean(jobBeanName);

       log.info("identity={}已成功注册到Elastic-Job", retryHandler.identity());

   }

可以看到,每一个retryHandler会单独注册一个定时任务,并且注册的时候需要指定注册到哪个定时任务

再看改造后代码,先拿到所有retryHandler,封装为可执行的RetryFunctionProcessor

   @Override

   public void afterSingletonsInstantiated() {

       postedClasseCache.clear();


       this.retryTaskMapper = defaultListableBeanFactory.getBean(RetryTaskMapper.class);

       RetryFunctionProcessor retryFunctionProcessor = defaultListableBeanFactory.getBean(RetryFunctionProcessor.class);


       this.retrySerializer = getRetrySerializerFromBeanFactory(defaultListableBeanFactory);


       List<RetryProcessor> retryProcessorList = new ArrayList<>();

       for (RetryHandler retryHandler : retryHandlers) {

           if (retryHandler.identity().length() > 50) {

               throw new IllegalArgumentException("identity=" + retryHandler.identity() + " is too long, it must be less than 50");

           }


           RetryHandler retryHandlerProxy =new ImmediatelyRetryHandler(retryHandler, new DefaultRetryTaskFactory(retrySerializer), retryTaskMapper, retryRule);

           RetryHandlerRegistration.registry(retryHandlerProxy);


           RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer, retryRule);

           retryProcessorList.add(retryProcessor);

       }

       

       retryFunctionProcessor.register(retryProcessorList);


       retryHandlers.clear();

   }

RetryFunctionProcessor其实就是把retryProcessorList保存为成员变量,提供给用户去调用,调用方式将在如何使用篇章讲解

支持抛出特定异常后重试

smart-retry只能在抛出RuntimeException的时候,进行重试,改造后,可以支持抛出特定异常后重试,这样当自定义异常时,更加灵活,代码改造点如下

RetryFunction注解增加变量

   /**

    * 如果方法抛出该异常则会创建重试任务

    */

   Class<? extends RuntimeException>[] retryException() default {RuntimeException.class};

在执行方法抛出异常时,判断是否在配置的这些异常范围,来决定是否要进行重试

       } catch (RuntimeException e) {

           boolean isIncludeException = RetryHandlerUtils.isIncludeException(e, retryException);

           if (!isIncludeException) {

               throw e;

           }

支持配置在注解上配置是否在执行方法前入库

smart-retry是支持是否在执行方法前入库的,但是是全局的配置,改造后可以对每一个任务单独配置

   /**

    * 是否在执行任务之前插入数据库 |配置false则表示,只有任务执行报错才插入数据库|

    * @return

    */

   boolean beforeTask() default false;

具体逻辑就比较简单了,就是根据配置,来决定是否在抛出异常前插入数据

增加了重试记录表,把每次重试都记录下来

smart-retry只有一张重试表,比如重试过5次,没办法知道这5次重试的具体详情,只能知道最后一次是成功还是失败等,所以增加了重试记录表,来记录每一次重试的记录,利用了spring的切面的原理,对原代码没有任何侵入

@Aspect

@Slf4j

public class RetryTaskRecordAspect implements ApplicationContextAware {


   private ApplicationContext applicationContext;

   

   @Pointcut("execution(* com.aliyun.gts.bpaas.retry.core.RetryTaskMapper.insert(..)) ||" +

           "execution(* com.aliyun.gts.bpaas.retry.core.RetryTaskMapper.update(..))")

   public void pointCut(){

       

   }

   

   @Around(value = "pointCut()")

   public Object insertTaskRecord(ProceedingJoinPoint proceedingJoinPoint) {

       Object o = null;

       try {

           o = proceedingJoinPoint.proceed();

       } catch (Throwable throwable) {

           log.error("切面insertTaskRecord报错,", throwable);

       }


       Object[] args = proceedingJoinPoint.getArgs();

       RetryTask retryTask = (RetryTask) args[0];

       RetryTaskMapper retryTaskMapper = applicationContext.getBean(RetryTaskMapper.class);

       retryTaskMapper.insertTaskRecord(retryTask);

       return o;

   }


   @Override

   public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

       this.applicationContext = applicationContext;  

   }

}

重试规则由cron表达式改为按时间,或者间隔重试,更通俗易懂

改造前:smart-retry原来的重试规则,是为每一条重试任务配置一个cron表达式

改造后:重试规则在application.properties中全局配置,更方便管理,cron表达式改为按时间,或者间隔,更容易配置,不易配置错

   /**

    * 重试规则,有两种方式,指定时间或者间隔

    * 1.时间,格式为:onTime-时间-最大重试次数,例如  onTime-23:59:59-3,onTime为类型,23:59:59为时间的时分秒,3位最大重试次数,用中划线隔开

    * 2.间隔,格式为:onInterval-间隔重试规则,例如 onInterval-10,20,30,onInterval为类型,10,20,30表示每次重试的间隔时间,单位为秒

    */

   private String retryRule;

指定时间重试的实现逻辑:获取到下次的执行时间,如果执行时间小于当前时间,则需要加一天,否则,就是该时间

指定间隔重试的实现逻辑:现在的时间,加上重试次数对应的间隔时间即可

代码如下

   /**

    * 获取下次执行时间

    * @param retryRule

    * @param retryCount

    * @return

    */

   public static LocalDateTime getNextExecTime(RetryRule retryRule, int retryCount) {

       

       if (retryRule instanceof OnTimeRetryRule) {

           OnTimeRetryRule onTimeRetryRule = (OnTimeRetryRule) retryRule;


           LocalTime retryTime = onTimeRetryRule.getRetryTime();


           LocalDateTime nextExecTime = LocalDateTime.of(LocalDate.now(), retryTime);

           

           if (nextExecTime.isAfter(LocalDateTime.now())) {

               return nextExecTime;

           } else {

               return nextExecTime.plusDays(1L);

           }

       }


       OnIntervalRetryRule onIntervalRetryRule = (OnIntervalRetryRule) retryRule;


       Long[] retryInterval = onIntervalRetryRule.getRetryInterval();

       

       return plusSeconds(retryInterval[retryCount]);

   }

如何使用


建表

接口重推依赖两张表重试表以及重试记录表

重试表sql

create table sys_retry_task (

task_id bigint not null primary key auto_increment,

identity_name varchar(50) not null COMMENT '任务的唯一标识',

params text COMMENT '参数',

status tinyint not null COMMENT '状态。1: 处理中,2: 成功,3: 失败',

retry_count int not null default 0 COMMENT '重试次数',

remark varchar(1000) COMMENT '备注',

create_date datetime not null,

edit_date datetime,

next_date DATETIME COMMENT '下次执行的时间') ENGINE=InnoDB COMMENT='系统重试表';


create index idx_identityname_status ON sys_retry_task(identity_name asc,status asc);

重试记录表sql

CREATE TABLE `sys_retry_task_record`

(

   `id`            BIGINT(20)    NOT NULL AUTO_INCREMENT,

   `task_id`       BIGINT(20)    NULL     DEFAULT NULL COMMENT '重试任务id',

   `identity_name` VARCHAR(50)   NOT NULL COMMENT '任务的唯一标识' COLLATE 'utf8_general_ci',

   `params`        TEXT          NULL     DEFAULT NULL COMMENT '参数' COLLATE 'utf8_general_ci',

   `status`        TINYINT(4)    NOT NULL COMMENT '状态。1: 处理中,2: 成功,3: 失败',

   `retry_count`   INT(11)       NOT NULL DEFAULT '0' COMMENT '重试次数',

   `remark`        VARCHAR(1000) NULL     DEFAULT NULL COMMENT '备注' COLLATE 'utf8_general_ci',

   `next_date`     DATETIME      NULL     DEFAULT NULL COMMENT '下次执行的时间',

   `edit_date`     DATETIME      NULL     DEFAULT NULL,

   `create_date`   DATETIME      NOT NULL,

   PRIMARY KEY (`id`) USING BTREE,

   INDEX `idx_identityname_status` (`identity_name`, `status`) USING BTREE

)

   COMMENT ='系统重试记录表'

   COLLATE = 'utf8_general_ci'

   ENGINE = InnoDB


引入依赖

       <dependency>

           <groupId>com.aliyun.gts.bpaas</groupId>

           <artifactId>aliyun-gts-retry-starter</artifactId>

           <version>1.0.0-SNAPSHOT</version>

       </dependency>

application.properties配置

# 数据源配置,注意要和建表的数据源和数据库在同一个

spring.datasource.driver-class-name=com.mysql.jdbc.Driver

spring.datasource.url=jdbc:mysql://localhost:3306/test?characterEncoding=utf8&autoReconnect=true&useSSL=false

spring.datasource.username=root

spring.datasource.password=123456


# 重试开关,默认为false

gts.retry.enable=true

# 重试的规则

gts.retry.retry-rule=onInterval-10,30,30,30

# 序列化方式,支持fastjson,jackson以及gson,默认为fastjson

gts.retry.serialize-type=fastjson

下边对重试规则配置gts.retry.retry-rule进行详细说明

重试规则,有两种方式,指定时间或者间隔
1. 时间,格式为:onTime-时间-最大重试次数,例如(onTime-23:59:59-3),onTime为类型,23:59:59为时间的时分秒,3位最大重试次数,用中划线隔开
2. 间隔,格式为:onInterval-间隔重试规则,例如 (onInterval-10,20,30),onInterval为类型,10,20,30表示每次重试的间隔时间,单位为秒

编程界面

接口重推需要的编程界面有两处,第一处是在方法上打上@RetryFunction注解,第二处是在定时任务中调用处理定时任务的方法

在方法上打上@RetryFunction注解

   @RetryFunction(identity = "demo.simplest", beforeTask = true,

           retryListener = SimpleTestRetryListener.class,

           retryException = {RuntimeException.class}, ignoreException = true)

   public void simplestWithId(int id) {

       

       log.info("simplestWithId[{}]执行开始", id);

       // doSomething()

       log.info("simplestWithId[{}]执行完成", id);

   }

下面对注解中的每一个属性做详细的解释

属性

类型

备注

默认值

identity

string

唯一标识,系统内不能重复

长度要小50个字节

类的全名称+方法名称

beforeTask

boolean

是否在执行任务之前插入数据

配置false则表示,只有任务执行报错才插入数据库|,true表示在方法执行前就会插入数据库

false

retryListener

Class<? extends RetryListener>

任务监听器。可以在任务重试、任务完成、任务失败时进行回调

onRetry():每次重试时触发(执行后触发)

onComplete():任务完成时触发

onError():失败时触发(超过最大重试次数)

不进行任务监听

retryException

Class<? extends RuntimeException>[]

如果方法抛出指定异常则会创建重试任务或执行重试动作

{RuntimeException.class}

ignoreException

boolean

当重试任务有多个的时候,上一个重试报错,是否忽略错误继续执行下一个任务

true

定时任务中触发重试任务示例

@Slf4j

@Component

public class DistributeRetrySchedule extends AbstractGtsSchedulerTaskProcessor {


   

   @Autowired

   private RetryFunctionProcessor retryFunctionProcessor;

   

   @Override

   public String getTaskId(){

       return "retry-job";

   }


   @Override

   public GtsSchedulerTaskResult process(GtsSchedulerTaskParameter parameter) throws Exception{

       DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

       log.info("执行定时任务,当前时间{}",format.format(new Date()));

       retryFunctionProcessor.processTask();

       return GtsSchedulerTaskResult.successResult();

   }

}

重试任务持久化示例

重试表

重试记录表

相关文章
|
Cloud Native Linux 网络安全
如何利用Gitlab-CI持续部署到远程机器?
长话短说,今天聊一聊使用Gitlab-CI 自动部署到远程服务器。
如何利用Gitlab-CI持续部署到远程机器?
|
8月前
|
自然语言处理
Havenask 分词器开发
本次分享由阿里云智能引擎工程师江半介绍Havenask分词器开发,内容涵盖三部分:1. 分词器插件简介,Tokenizer是Analyzer内部组件,负责文本切词;2. 内置分词器介绍,包括simple、aliws、multilevel_aliws和singlews等类型;3. 分词器开发实战,通过特定分隔符切词的TestTokenizer插件,详细讲解编写分词器插件的流程。整个过程涉及初始化、配置读取及Token生成等步骤。
133 2
|
9月前
|
Java Maven
java项目中jar启动执行日志报错:no main manifest attribute, in /www/wwwroot/snow-server/z-server.jar-jar打包的大小明显小于正常大小如何解决
在Java项目中,启动jar包时遇到“no main manifest attribute”错误,且打包大小明显偏小。常见原因包括:1) Maven配置中跳过主程序打包;2) 缺少Manifest文件或Main-Class属性。解决方案如下:
2283 8
java项目中jar启动执行日志报错:no main manifest attribute, in /www/wwwroot/snow-server/z-server.jar-jar打包的大小明显小于正常大小如何解决
|
缓存 Java 测试技术
谷粒商城笔记+踩坑(11)——性能压测和调优,JMeter压力测试+jvisualvm监控性能+资源动静分离+修改堆内存
使用JMeter对项目各个接口进行压力测试,并对前端进行动静分离优化,优化三级分类查询接口的性能
447 10
谷粒商城笔记+踩坑(11)——性能压测和调优,JMeter压力测试+jvisualvm监控性能+资源动静分离+修改堆内存
|
运维 监控 Cloud Native
GitHub现新霸主!「Java并发编程实战攻略」竟是阿里内部顶级机密
随着经济环境下行,大厂降本增效、筛除了一部分冗余岗位,原本荒蛮的 IT 行业发展正在逐步进入正轨中。虽说依旧算是不温不火,但在今年的技术招聘市场上,Java 依旧是当仁不让的霸主! Java 有着极其成熟的生态,这个不用我多说; Java 在运维、可观测性、可监控性方面都有着非常优秀的表现; Java 也在积极应对容器化、云原生等大趋势,比如 Spring Boot 就已经全面拥抱云原生。 企业对 Java 的需求最大,对应着 Java 程序员的群体也最为庞大,有着 1200 万之多。换句话说,也是最“修罗场”的! 这最近就有粉丝跟我反馈,技术面倒在了并发编程上!
288 0
GitHub现新霸主!「Java并发编程实战攻略」竟是阿里内部顶级机密
|
SQL 关系型数据库 MySQL
使用 RDS for MySQL 配置到自建数据库的主从复制
场景 出于数据容灾、ETL、异地数据访问等目的,可能需要基于 RDS for MySQL 实例,搭建到自己线下MySQL实例的主从复制。这篇文章将给出简单的操作步骤,供大家参考。由于要使用GTID特性,因此要求MySQL版本>=5.6. 前提条件 操作步骤 配置主实例 登录 RDS 控制台
24023 0
|
Java 调度 数据库
|
JSON 数据库 数据格式
gorm 教程三 gen自动代码生成工具
gorm 教程三 gen自动代码生成工具
1103 0
notepad快捷键大全
Notepad++ 快捷键 大全Ctrl+C 复制Ctrl+X 剪切Ctrl+V 粘贴Ctrl+Z 撤消Ctrl+Y 恢复Ctrl+A 全选Ctrl+F 键查找对话框启动Ctrl+H 查找/替换对话框Ctrl+D 复制并粘贴当行Ctrl+L 删除当前行Ctrl+T 当行向上移动一行F3 查找下一个S...
2955 0