老司机使用CompletableFuture实现集成任务失败后自动重跑

简介: 0 老司机集成任务在Aone实验室中遇到的问题  老司机平台是一个集合了用例管理,用例执行,测试沉淀等功能的一站式集成测试平台。老司机中提供了一种名为集成任务的测试件,它一般包含一组核心的可执行用例,主要在变更发布前回归指定的接口或应用时由平台或手工触发运行。也可以在每日定时执行指定的集成任务,以实现用例与测试任务的持续集成。  集成任务用作发布前的卡点测试件时,一般是在流水线中加入Aone实验室

0 老司机集成任务在Aone实验室中遇到的问题

  老司机平台是一个集合了用例管理,用例执行,测试沉淀等功能的一站式集成测试平台。老司机中提供了一种名为集成任务的测试件,它一般包含一组核心的可执行用例,主要在变更发布前回归指定的接口或应用时由平台或手工触发运行。也可以在每日定时执行指定的集成任务,以实现用例与测试任务的持续集成。

  集成任务用作发布前的卡点测试件时,一般是在流水线中加入Aone实验室卡点,再由实验室触发集成任务运行。创建实验室时,需要在配置脚本中输入参数模板的各项信息,如待运行的集成任务id、执行环境、执行应用、分支等。老司机则只需要提供一组接口供AONE实验室调用,接口能力主要包括:触发任务运行,查询运行结果。

  使用Aone实验室运行老司机集成任务,存在一个问题,就是AONE每触发一次,老司机只能执行一次。由于日常环境抖动等各类原因,一般并不能保证用例仅单次执行就全部通过,而这将会产生更大范围的影响:

  • 应用发布时,测试件卡点不通过,这就需要测试同学人工介入,进行手动排查+手动跳过,耗时耗力;
  • 集成任务通过率被动降低,这不仅无法正确体现自动化的作用与价值,甚至影响对测试有效性的判断。

1 解决方案

1.1 整体技术方案

  考虑到大部分问题都可以通过重跑集成任务或指定用例解决,而且大部分同学在进行人工介入处理时,也选择了重跑,我们决定在老司机内部将重跑封装起来,如果第一次调用没能全部通过,则老司机直接触发自动重跑。

  此时AONE实验室完全不感知老司机的重跑,调用查询结果接口时,仍然返回执行中。仅当老司机重跑次数超过2次,或某次重跑结束后全部用例都通过了,我们才返回执行完毕,并展示测试件本次运行最终是否通过。

1.2 CompletableFuture简介

  重跑能力需要通过多线程的方式实现,而讲到Java的多线程异步处理,必须从Runnable说起。实现一个类,并实现Runnable接口的run()方法,再将实例放入Thread类的示例中发起调用,即可实现最简单最基础的多线程。   然而这样的方式,不太容易获取到异步执行的结果,这时就需要带返回值的Callable接口。Callable一般与Future接口配合使用,在主线程中获取子线程的执行结果。

  但实际获取子线程的执行结果时,使用的往往还是轮询的方式,这对主线程来说是阻塞的。而我们使用多线程的目的就是在不阻塞主线程的情况下,通过子线程执行特定的任务并返回结果。这个矛盾要怎么解决呢?

  Java8给出了一个比较完善的解决方案,即提供了CompletableFuture类。使用这个类创建的子线程,可以通过thenXXX()方法,实现异步编排。通过回调的方式,在不阻塞主线程的情况下,优雅的解决获取子线程执行结果的问题。

  下面给出实际工程中,常用到的几个实现异步编排的CompletableFuture类方法

创建一个CompletableFuture实例 static方法

接续处理,thenXXX()方法 member方法

仅异步,供主线程调用发起

同步处理

异步处理

无入参

无返回值

runAsync

thenRun

thenRunAsync

有返回值

supplyAsync

-

-

有入参

无返回值

-

thenAccept

thenAcceptAsync

有返回值

-

thenApply

thenApplyAsync

  CompletableFuture在创建实例和后续的异步处理函数中,默认使用ForkJoinPool.commonPool()这个线程池。但是我们在创建实例时,会使用自定义的ThreadPoolExecutor,这有以下几个好处:

  • ThreadPoolExecutor是单一Job队列+多线程模式,模型简单,易于问题分析;
  • 老司机业务层面主要是执行各种IO,线程执行时间短,IO阻塞时间长,线程&任务队列来回抢占意义不大,反而可能由于切换的导致开销增大;
  • 默认先进先出,每个任务独占线程,相同的集成任务所消耗的运行时间基本是稳定的,不会出现太大变化,上层应用发布都会有测试卡点,有稳定的执行时间预期对业务应用发布非常重要;
  • ForkJoinPool.commonPool()在虚拟机内全局共用,包括parallelStream()等默认也会使用,若使用此默认线程池,可能会受到系统内其他线程或任务的影响。

1.3 异步任务链模型

  按照前文的介绍,我们在老司机内部,将一次调用改为多次重跑。我们主要借助CompletableFuture创建新线程来完成用例执行。改造后,从AONE实验室开始触发集成任务执行的流程图如下,其中步骤8、14和18即为CompletableFuture运行完成时的回调:

  

1.4 异步任务链实现

  接下来我们来关注具体的实现。

  之前在没有引入内部重跑时,默认使用runAsync()来创建首次执行即可。因为执行线程在执行完成后,不需要做额外处理,因此既不需要入参,也不需要返回值。AONE轮询时,查指定任务的结果即可。所以直接传入一个Runnable实例作为异步任务即可。

// 创建业务唯一ID
String oriBatchId = UUID.randomUUID().toString();
LOGGER.info("AONE实验室首次调起集成任务,批次号:batchId = {}", oriBatchId);

// 持久化任务执行记录
InvokeTaskDO invokeTaskDO = convertExecuteParamVo2InvokeTaskDO(executeParamVo);
invokeTaskDO.setTaskId(taskId);
invokeTaskDO.setBatchId(oriBatchId);
invokeTaskMapper.insertInvokeTask(invokeTaskDO);

// 使用runAsync()创建异步任务,不必关注线程执行的返回值
CompletableFuture<Void> oriFuture = CompletableFuture.runAsync(() -> {
    try {
        asyncInvokeBatchComposite(Constants.INTEGRATION, taskId, executeParamVo.getAppName(),
                invokeComposites, executeParamVo.getInvoker(), oriBatchId, executeParamVo.getEnv(),
                executeParamVo.getCrId(), new ArrayList<>());
    } catch (Exception e) {
        LOGGER.error("异步执行调用异常 = {}", e.getMessage());
        e.printStackTrace();
    }
}, executor); // 线程池已单独创建

  但是想要在首次执行的基础上来做重跑,仅接收Runnable的runAsync就不能满足了。因为再次执行时,我们至少需要知道上一次执行时的信息,没有入参的话,就会比较复杂。这时就要考虑使用supplyAsync()创建首次执行。

  接下来再考虑首次执行的后的处理。首先排除thenRun(),它无法接收上一个线程有返回值,因此无法与supplyAsync()创建的线程配合使用。其次,如果使用thenAccept()来接收首次执行的结果,它又无法返回本次执行的参数,这样要么只能重跑一次,要么只能对首次执行反复重跑。最后,考虑使用thenApply(),它既可以接收上次执行的参数,也可以将本次执行的参数返回,可以真正的实现传递参数的任务链模型。

  按照一般的写法,每次将下一个线程注册为上一个线程的thenApply(),我们得到了以下的代码。

// 创建业务唯一ID
String oriBatchId = UUID.randomUUID().toString();
LOGGER.info("AONE实验室跑批,批次号:batchId = {}", oriBatchId);

// 持久化任务执行记录
InvokeTaskDO invokeTaskDO = convertExecuteParamVo2InvokeTaskDO(executeParamVo);
invokeTaskDO.setTaskId(taskId);
invokeTaskDO.setBatchId(oriBatchId);
invokeTaskMapper.insertInvokeTask(invokeTaskDO);

// 创建异步任务, 这里简化掉try-catch
CompletableFuture<String> oriFuture = CompletableFuture.supplyAsync(() -> {
    asyncInvokeBatchComposite(Constants.INTEGRATION, taskId, executeParamVo.getAppName(),
            invokeComposites, executeParamVo.getInvoker(), oriBatchId, executeParamVo.getEnv(),
            executeParamVo.getCrId(), new ArrayList<>());
    return oriBatchId;
}, executor).thenApply((batchId) -> { // 第一次调用thenApply(), 第一次重跑
    // 通过传入的返回值batchId,查询上次的结果,成功就不重跑了
    InvokeTaskDO finishedTask = invokeTaskMapper.loadInvokeTaskByTaskIdAndBatchId(taskId, batchId).get(0);
    if (finishedTask.getResult().equalsIgnoreCase("success")) { return batchId; }

    String newBatchId = UUID.randomUUID().toString(); // 为重跑创建新的唯一ID
    LOGGER.info("AONE实验室重跑,原批次号 oriBatchId = {}, 新批次号 newBatchId = {}", oriBatchId, newBatchId);

    invokeTaskDO.setBatchId(newBatchId);
    invokeTaskDO.setExtra(JsonUtils.appendAttr(invokeTaskDO.getExtra(), "oriBatchId", oriBatchId));
    invokeTaskDO.setExtra(JsonUtils.appendAttr(invokeTaskDO.getExtra(), "reRun", "true"));
    invokeTaskMapper.insertInvokeTask(invokeTaskDO); // 为重跑任务打标后,持久化

    // 创建重跑的异步任务, 这里简化掉try-catch
    asyncInvokeBatchComposite(Constants.INTEGRATION, taskId, executeParamVo.getAppName(),
            invokeComposites, executeParamVo.getInvoker(), newBatchId, executeParamVo.getEnv(),
            executeParamVo.getCrId(), new ArrayList<>());
    return newBatchId;
}).thenApply((batchId) -> { // 第二次调用thenApply(), 第二次重跑,一模一样再来一次
    // 通过传入的返回值batchId,查询上次的结果,成功就不重跑了
    InvokeTaskDO finishedTask = invokeTaskMapper.loadInvokeTaskByTaskIdAndBatchId(taskId, batchId).get(0);
    if (finishedTask.getResult().equalsIgnoreCase("success")) { return batchId; }

    String newBatchId = UUID.randomUUID().toString(); // 为重跑创建新的唯一ID
    LOGGER.info("AONE实验室重跑,原批次号 oriBatchId = {}, 新批次号 newBatchId = {}", oriBatchId, newBatchId);

    invokeTaskDO.setBatchId(newBatchId);
    invokeTaskDO.setExtra(JsonUtils.appendAttr(invokeTaskDO.getExtra(), "oriBatchId", oriBatchId));
    invokeTaskDO.setExtra(JsonUtils.appendAttr(invokeTaskDO.getExtra(), "reRun", "true"));
    invokeTaskMapper.insertInvokeTask(invokeTaskDO); // 为重跑任务打标后,持久化

    // 创建重跑的异步任务, 这里简化掉try-catch
    asyncInvokeBatchComposite(Constants.INTEGRATION, taskId, executeParamVo.getAppName(),
            invokeComposites, executeParamVo.getInvoker(), newBatchId, executeParamVo.getEnv(),
            executeParamVo.getCrId(), new ArrayList<>());
    return newBatchId;
}); // 想重跑第三次还可以继续往下接...

  但这样的做法有几个问题:

  • 代码太丑陋,每次重跑都是一样的代码,却没法重用;
  • 控制不了重跑次数,默认两次重跑,就要固定写两遍;
  • 任务链上一步与下一步实际仍然是同步的,因为仍然在使用同一个线程中;
  • 最关键的问题是,这个执行线程一直被占用,不能释放线程,处理多个不同的执行任务时,无法满足FIFO的原则,后提交的任务可能由于之前提交的任务在重跑,而进入长时间的等待。而我们期望的是,重跑的任务应当像再次提交新任务一样,重新排队。

1.5 将异步任务链变为循环

  看到这里,其实大家应该都能想到需要配合循环了,但执行线程不能释放的问题还没有解决。这时候就要讲一下最终的解决方案了,使用thenApplyAsync()。首先它可以接收上次执行的参数信息,也能返回本次执行的结果参数;其次再创建时可以传入指定的线程池,重跑时新任务进入Job队列的队尾重新排队,将任务链上一步与下一步完全异步化;最后使用循环结合前后步骤,简化代码。

// 创建业务唯一ID
String oriBatchId = UUID.randomUUID().toString();
LOGGER.info("AONE实验室跑批,批次号:batchId = {}", oriBatchId);

// 持久化任务执行记录
InvokeTaskDO invokeTaskDO = convertExecuteParamVo2InvokeTaskDO(executeParamVo);
invokeTaskDO.setTaskId(taskId);
invokeTaskDO.setBatchId(oriBatchId);
invokeTaskMapper.insertInvokeTask(invokeTaskDO);

// 创建异步任务, 这里简化掉try-catch
CompletableFuture<String> oriFuture = CompletableFuture.supplyAsync(() -> {
    asyncInvokeBatchComposite(Constants.INTEGRATION, taskId, executeParamVo.getAppName(),
            invokeComposites, executeParamVo.getInvoker(), oriBatchId, executeParamVo.getEnv(),
            executeParamVo.getCrId(), new ArrayList<>());
    return oriBatchId;
}, executor);

CompletableFuture<String> future = oriFuture; // 初始化全局变量
for (int rerunTimes = 1; rerunTimes < requiredTimes; rerunTimes ++) { // requiredTimes为外部传入的重跑次数
    future = future.thenApplyAsync(batchId -> {
        InvokeTaskDO finishedTask = invokeTaskMapper.loadInvokeTaskByTaskIdAndBatchId(taskId, batchId).get(0);
        if (finishedTask.getResult().equalsIgnoreCase("success")) { return batchId;}

        // 不通过,准备重跑用例范围(比如可以仅重跑失败用例),并创建新的执行记录。
        List<InvokeCompositeVo> newInvokeComposites = getRerunComposites(batchId);
        String newBatchId = UUID.randomUUID().toString();
        LOGGER.info("AONE实验室自动重跑,原批次号 oriBatchId = {}, 新批次号 newBatchId = {}", oriBatchId, newBatchId);

        invokeTaskDO.setBatchId(newBatchId);
        invokeTaskDO.setExtra(JsonUtils.appendAttr(invokeTaskDO.getExtra(), "oriBatchId", oriBatchId));
        invokeTaskDO.setExtra(JsonUtils.appendAttr(invokeTaskDO.getExtra(), "reRun", "true"));
        invokeTaskMapper.insertInvokeTask(invokeTaskDO);

        // 重跑, 这里简化掉try-catch
        asyncInvokeBatchComposite(Constants.INTEGRATION, taskId, executeParamVo.getAppName(),
                newInvokeComposites, executeParamVo.getInvoker(), newBatchId, executeParamVo.getEnv(),
                executeParamVo.getCrId(), new ArrayList<>());
        return newBatchId;
    }, executor); // 使用指定的线程池
}

  这里总结一下最终的写法:

  • 先创建异步任务链,再在执行时判断是否需要进行完整的重跑
    • 如果不需要重跑,则直接返回传入的UUID,释放线程,下一个线程也会如此快速返回&释放
    • 如果需要重跑,就进行完整的重跑操作:创建UUID,持久化任务,开始执行,返回新的UUID
  • 重跑次数靠循环控制,循环变量有几次,创建任务链时就有几个节点
  • 每执行一次,提交一次新的job,从队尾重新排队,释放上一次执行的线程,等待新线程拉起执行
  • 此外,我们还可以将上一次执行通过的用例,从下一次执行的用例列表中排除掉,仅重跑那些失败的用例,提升整体的执行效率

  最后,在实际运行中,使用CompletableFuture的任务重跑,整体的执行顺序示意图如下。其中fx是创建好的CompletableFuture实例,Completion则是压入栈中的回调队列。如果还想了解更多关于CompletableFuture的细节与原理信息,请参考这里

2 总结

2.1 本次改动的成效

  通过本次改动,由AONE实验室发起的自动化集成任务测试通过率有了显著的提高(72% -> 86%,本BU在此改动前后一周的数据对比),让业务项目的变更可以更顺畅地发布。

  此外,针对多次重跑的功能,老司机还提供了在首次执行的执行结果页中查询重跑情况的能力,将多次重跑的结果合并展示。如下图所示,用户可以一目了然的看到哪些用例进行了重跑,哪些重跑后通过了,哪些重跑2次后也仍未通过等细节信息。

2.2 下一步改进方向

  下一步,老司机还将建设重跑功能的可配置化,如重跑次数可配置、重跑范围可定制、重跑前等待时间可设置等,便于某些业务场景可以结合特定的业务实际情况,定制化的按需使用重跑功能。

  此外,老司机还考虑在AONE实验室侧升级能力,将更多的变更信息从实验室带入老司机中,让集成任务的执行更加精细化。如将实验室执行时的代码commitId传入老司机,如果查询到相同commitId已经存在集成任务执行记录,则直接返回上次的执行结果,实现AONE实验室的快速重跑,在某些特定场景的发布流程中进一步提效。

相关文章
|
5天前
|
存储 Java 调度
Sppring集成Quartz简单案例详解 包括(添加、停止、恢复、删除任务、获取下次执行时间等)
Sppring集成Quartz简单案例详解 包括(添加、停止、恢复、删除任务、获取下次执行时间等)
14 2
|
1月前
|
jenkins Shell 持续交付
Jenkins持续集成GitLab项目 GitLab提交分支后触发Jenkis任务 持续集成 CI/CD 超级详细 超多图(二)
Jenkins持续集成GitLab项目 GitLab提交分支后触发Jenkis任务 持续集成 CI/CD 超级详细 超多图(二)
63 0
|
23天前
|
机器学习/深度学习 算法 前端开发
集成学习任务七和八、投票法与bagging学习
集成学习任务七和八、投票法与bagging学习
11 0
|
2月前
|
人工智能 自然语言处理 机器人
谷歌将大模型集成在实体机器人中,能看、听、说执行57种任务
【9月更文挑战第17天】近年来,人工智能在多模态大模型领域取得显著进展。谷歌最新研发的Mobility VLA系统,将大模型与实体机器人结合,实现了视觉、语言和行动的融合,使机器人能理解并执行复杂多模态指令,如“我应该把这个放回哪里?”系统在真实环境测试中表现出色,但在计算资源、数据需求及伦理问题上仍面临挑战。相关论文发布于https://arxiv.org/abs/2407.07775。
55 9
|
1月前
|
jenkins Shell 持续交付
Jenkins持续集成GitLab项目 GitLab提交分支后触发Jenkis任务 持续集成 CI/CD 超级详细 超多图(一)
Jenkins持续集成GitLab项目 GitLab提交分支后触发Jenkis任务 持续集成 CI/CD 超级详细 超多图(一)
89 0
|
3月前
|
移动开发 小程序 测试技术
项目管理和持续集成系统搭建问题之帮助以诺行管理任务和资源如何解决
项目管理和持续集成系统搭建问题之帮助以诺行管理任务和资源如何解决
33 2
|
3月前
|
数据采集 DataWorks 监控
DataWorks产品使用合集之数据集成任务日志中显示wait,是什么原因
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
运维 DataWorks 监控
DataWorks产品使用合集之集成任务发布后,修改了任务调度的配置但没有生效,是什么导致的
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
存储 运维 DataWorks
DataWorks产品使用合集之如何把运维中心数据集成里面各个任务的执行时间拉取出来
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
SQL 存储 JSON
DataWorks产品使用合集之没有dev环境的project,如何创建数据集成任务时完成网络与资源配置
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。