0 老司机集成任务在Aone实验室中遇到的问题
老司机平台是一个集合了用例管理,用例执行,测试沉淀等功能的一站式集成测试平台。老司机中提供了一种名为集成任务的测试件,它一般包含一组核心的可执行用例,主要在变更发布前回归指定的接口或应用时由平台或手工触发运行。也可以在每日定时执行指定的集成任务,以实现用例与测试任务的持续集成。
集成任务用作发布前的卡点测试件时,一般是在流水线中加入Aone实验室卡点,再由实验室触发集成任务运行。创建实验室时,需要在配置脚本中输入参数模板的各项信息,如待运行的集成任务id、执行环境、执行应用、分支等。老司机则只需要提供一组接口供AONE实验室调用,接口能力主要包括:触发任务运行,查询运行结果。
使用Aone实验室运行老司机集成任务,存在一个问题,就是AONE每触发一次,老司机只能执行一次。由于日常环境抖动等各类原因,一般并不能保证用例仅单次执行就全部通过,而这将会产生更大范围的影响:
-
应用发布时,测试件卡点不通过,这就需要测试同学人工介入,进行手动排查+手动跳过,耗时耗力;
-
集成任务通过率被动降低,这不仅无法正确体现自动化的作用与价值,甚至影响对测试有效性的判断。
1 解决方案
1.1 整体技术方案
考虑到大部分问题都可以通过重跑集成任务或指定用例解决,而且大部分同学在进行人工介入处理时,也选择了重跑,我们决定在老司机内部将重跑封装起来,如果第一次调用没能全部通过,则老司机直接触发自动重跑。
此时AONE实验室完全不感知老司机的重跑,调用查询结果接口时,仍然返回执行中。仅当老司机重跑次数超过2次,或某次重跑结束后全部用例都通过了,我们才返回执行完毕,并展示测试件本次运行最终是否通过。
1.2 CompletableFuture简介
重跑能力需要通过多线程的方式实现,而讲到Java的多线程异步处理,必须从Runnable说起。实现一个类,并实现Runnable接口的run()方法,再将实例放入Thread类的示例中发起调用,即可实现最简单最基础的多线程。 然而这样的方式,不太容易获取到异步执行的结果,这时就需要带返回值的Callable接口。Callable一般与Future接口配合使用,在主线程中获取子线程的执行结果。
但实际获取子线程的执行结果时,使用的往往还是轮询的方式,这对主线程来说是阻塞的。而我们使用多线程的目的就是在不阻塞主线程的情况下,通过子线程执行特定的任务并返回结果。这个矛盾要怎么解决呢?
Java8给出了一个比较完善的解决方案,即提供了CompletableFuture类。使用这个类创建的子线程,可以通过thenXXX()方法,实现异步编排。通过回调的方式,在不阻塞主线程的情况下,优雅的解决获取子线程执行结果的问题。
下面给出实际工程中,常用到的几个实现异步编排的CompletableFuture类方法
CompletableFuture在创建实例和后续的异步处理函数中,默认使用ForkJoinPool.commonPool()这个线程池。但是我们在创建实例时,会使用自定义的ThreadPoolExecutor,这有以下几个好处:
-
ThreadPoolExecutor是单一Job队列+多线程模式,模型简单,易于问题分析;
-
老司机业务层面主要是执行各种IO,线程执行时间短,IO阻塞时间长,线程&任务队列来回抢占意义不大,反而可能由于切换的导致开销增大;
-
默认先进先出,每个任务独占线程,相同的集成任务所消耗的运行时间基本是稳定的,不会出现太大变化,上层应用发布都会有测试卡点,有稳定的执行时间预期对业务应用发布非常重要;
-
ForkJoinPool.commonPool()在虚拟机内全局共用,包括parallelStream()等默认也会使用,若使用此默认线程池,可能会受到系统内其他线程或任务的影响。
1.3 异步任务链模型
按照前文的介绍,我们在老司机内部,将一次调用改为多次重跑。我们主要借助CompletableFuture创建新线程来完成用例执行。改造后,从AONE实验室开始触发集成任务执行的流程图如下,其中步骤8、14和18即为CompletableFuture运行完成时的回调:
1.4 异步任务链实现
接下来我们来关注具体的实现。
之前在没有引入内部重跑时,默认使用runAsync()来创建首次执行即可。因为执行线程在执行完成后,不需要做额外处理,因此既不需要入参,也不需要返回值。AONE轮询时,查指定任务的结果即可。所以直接传入一个Runnable实例作为异步任务即可。
// 创建业务唯一ID
String oriBatchId = UUID.randomUUID().toString();
LOGGER.info("AONE实验室首次调起集成任务,批次号:batchId = {}", oriBatchId);
// 持久化任务执行记录
InvokeTaskDO invokeTaskDO = convertExecuteParamVo2InvokeTaskDO(executeParamVo);
invokeTaskDO.setTaskId(taskId);
invokeTaskDO.setBatchId(oriBatchId);
invokeTaskMapper.insertInvokeTask(invokeTaskDO);
// 使用runAsync()创建异步任务,不必关注线程执行的返回值
CompletableFuture<Void> oriFuture = CompletableFuture.runAsync(() -> {
try {
asyncInvokeBatchComposite(Constants.INTEGRATION, taskId, executeParamVo.getAppName(),
invokeComposites, executeParamVo.getInvoker(), oriBatchId, executeParamVo.getEnv(),
executeParamVo.getCrId(), new ArrayList<>());
} catch (Exception e) {
LOGGER.error("异步执行调用异常 = {}", e.getMessage());
e.printStackTrace();
}
}, executor); // 线程池已单独创建
但是想要在首次执行的基础上来做重跑,仅接收Runnable的runAsync就不能满足了。因为再次执行时,我们至少需要知道上一次执行时的信息,没有入参的话,就会比较复杂。这时就要考虑使用supplyAsync()创建首次执行。
接下来再考虑首次执行的后的处理。首先排除thenRun(),它无法接收上一个线程有返回值,因此无法与supplyAsync()创建的线程配合使用。其次,如果使用thenAccept()来接收首次执行的结果,它又无法返回本次执行的参数,这样要么只能重跑一次,要么只能对首次执行反复重跑。最后,考虑使用thenApply(),它既可以接收上次执行的参数,也可以将本次执行的参数返回,可以真正的实现传递参数的任务链模型。
按照一般的写法,每次将下一个线程注册为上一个线程的thenApply(),我们得到了以下的代码。
// 创建业务唯一ID
String oriBatchId = UUID.randomUUID().toString();
LOGGER.info("AONE实验室跑批,批次号:batchId = {}", oriBatchId);
// 持久化任务执行记录
InvokeTaskDO invokeTaskDO = convertExecuteParamVo2InvokeTaskDO(executeParamVo);
invokeTaskDO.setTaskId(taskId);
invokeTaskDO.setBatchId(oriBatchId);
invokeTaskMapper.insertInvokeTask(invokeTaskDO);
// 创建异步任务, 这里简化掉try-catch
CompletableFuture<String> oriFuture = CompletableFuture.supplyAsync(() -> {
asyncInvokeBatchComposite(Constants.INTEGRATION, taskId, executeParamVo.getAppName(),
invokeComposites, executeParamVo.getInvoker(), oriBatchId, executeParamVo.getEnv(),
executeParamVo.getCrId(), new ArrayList<>());
return oriBatchId;
}, executor).thenApply((batchId) -> { // 第一次调用thenApply(), 第一次重跑
// 通过传入的返回值batchId,查询上次的结果,成功就不重跑了
InvokeTaskDO finishedTask = invokeTaskMapper.loadInvokeTaskByTaskIdAndBatchId(taskId, batchId).get(0);
if (finishedTask.getResult().equalsIgnoreCase("success")) { return batchId; }
String newBatchId = UUID.randomUUID().toString(); // 为重跑创建新的唯一ID
LOGGER.info("AONE实验室重跑,原批次号 oriBatchId = {}, 新批次号 newBatchId = {}", oriBatchId, newBatchId);
invokeTaskDO.setBatchId(newBatchId);
invokeTaskDO.setExtra(JsonUtils.appendAttr(invokeTaskDO.getExtra(), "oriBatchId", oriBatchId));
invokeTaskDO.setExtra(JsonUtils.appendAttr(invokeTaskDO.getExtra(), "reRun", "true"));
invokeTaskMapper.insertInvokeTask(invokeTaskDO); // 为重跑任务打标后,持久化
// 创建重跑的异步任务, 这里简化掉try-catch
asyncInvokeBatchComposite(Constants.INTEGRATION, taskId, executeParamVo.getAppName(),
invokeComposites, executeParamVo.getInvoker(), newBatchId, executeParamVo.getEnv(),
executeParamVo.getCrId(), new ArrayList<>());
return newBatchId;
}).thenApply((batchId) -> { // 第二次调用thenApply(), 第二次重跑,一模一样再来一次
// 通过传入的返回值batchId,查询上次的结果,成功就不重跑了
InvokeTaskDO finishedTask = invokeTaskMapper.loadInvokeTaskByTaskIdAndBatchId(taskId, batchId).get(0);
if (finishedTask.getResult().equalsIgnoreCase("success")) { return batchId; }
String newBatchId = UUID.randomUUID().toString(); // 为重跑创建新的唯一ID
LOGGER.info("AONE实验室重跑,原批次号 oriBatchId = {}, 新批次号 newBatchId = {}", oriBatchId, newBatchId);
invokeTaskDO.setBatchId(newBatchId);
invokeTaskDO.setExtra(JsonUtils.appendAttr(invokeTaskDO.getExtra(), "oriBatchId", oriBatchId));
invokeTaskDO.setExtra(JsonUtils.appendAttr(invokeTaskDO.getExtra(), "reRun", "true"));
invokeTaskMapper.insertInvokeTask(invokeTaskDO); // 为重跑任务打标后,持久化
// 创建重跑的异步任务, 这里简化掉try-catch
asyncInvokeBatchComposite(Constants.INTEGRATION, taskId, executeParamVo.getAppName(),
invokeComposites, executeParamVo.getInvoker(), newBatchId, executeParamVo.getEnv(),
executeParamVo.getCrId(), new ArrayList<>());
return newBatchId;
}); // 想重跑第三次还可以继续往下接...
但这样的做法有几个问题:
-
代码太丑陋,每次重跑都是一样的代码,却没法重用;
-
控制不了重跑次数,默认两次重跑,就要固定写两遍;
-
任务链上一步与下一步实际仍然是同步的,因为仍然在使用同一个线程中;
-
最关键的问题是,这个执行线程一直被占用,不能释放线程,处理多个不同的执行任务时,无法满足FIFO的原则,后提交的任务可能由于之前提交的任务在重跑,而进入长时间的等待。而我们期望的是,重跑的任务应当像再次提交新任务一样,重新排队。
1.5 将异步任务链变为循环
看到这里,其实大家应该都能想到需要配合循环了,但执行线程不能释放的问题还没有解决。这时候就要讲一下最终的解决方案了,使用thenApplyAsync()。首先它可以接收上次执行的参数信息,也能返回本次执行的结果参数;其次再创建时可以传入指定的线程池,重跑时新任务进入Job队列的队尾重新排队,将任务链上一步与下一步完全异步化;最后使用循环结合前后步骤,简化代码。
// 创建业务唯一ID
String oriBatchId = UUID.randomUUID().toString();
LOGGER.info("AONE实验室跑批,批次号:batchId = {}", oriBatchId);
// 持久化任务执行记录
InvokeTaskDO invokeTaskDO = convertExecuteParamVo2InvokeTaskDO(executeParamVo);
invokeTaskDO.setTaskId(taskId);
invokeTaskDO.setBatchId(oriBatchId);
invokeTaskMapper.insertInvokeTask(invokeTaskDO);
// 创建异步任务, 这里简化掉try-catch
CompletableFuture<String> oriFuture = CompletableFuture.supplyAsync(() -> {
asyncInvokeBatchComposite(Constants.INTEGRATION, taskId, executeParamVo.getAppName(),
invokeComposites, executeParamVo.getInvoker(), oriBatchId, executeParamVo.getEnv(),
executeParamVo.getCrId(), new ArrayList<>());
return oriBatchId;
}, executor);
CompletableFuture<String> future = oriFuture; // 初始化全局变量
for (int rerunTimes = 1; rerunTimes < requiredTimes; rerunTimes ++) { // requiredTimes为外部传入的重跑次数
future = future.thenApplyAsync(batchId -> {
InvokeTaskDO finishedTask = invokeTaskMapper.loadInvokeTaskByTaskIdAndBatchId(taskId, batchId).get(0);
if (finishedTask.getResult().equalsIgnoreCase("success")) { return batchId;}
// 不通过,准备重跑用例范围(比如可以仅重跑失败用例),并创建新的执行记录。
List<InvokeCompositeVo> newInvokeComposites = getRerunComposites(batchId);
String newBatchId = UUID.randomUUID().toString();
LOGGER.info("AONE实验室自动重跑,原批次号 oriBatchId = {}, 新批次号 newBatchId = {}", oriBatchId, newBatchId);
invokeTaskDO.setBatchId(newBatchId);
invokeTaskDO.setExtra(JsonUtils.appendAttr(invokeTaskDO.getExtra(), "oriBatchId", oriBatchId));
invokeTaskDO.setExtra(JsonUtils.appendAttr(invokeTaskDO.getExtra(), "reRun", "true"));
invokeTaskMapper.insertInvokeTask(invokeTaskDO);
// 重跑, 这里简化掉try-catch
asyncInvokeBatchComposite(Constants.INTEGRATION, taskId, executeParamVo.getAppName(),
newInvokeComposites, executeParamVo.getInvoker(), newBatchId, executeParamVo.getEnv(),
executeParamVo.getCrId(), new ArrayList<>());
return newBatchId;
}, executor); // 使用指定的线程池
}
这里总结一下最终的写法:
-
先创建异步任务链,再在执行时判断是否需要进行完整的重跑
-
如果不需要重跑,则直接返回传入的UUID,释放线程,下一个线程也会如此快速返回&释放
-
如果需要重跑,就进行完整的重跑操作:创建UUID,持久化任务,开始执行,返回新的UUID
-
-
重跑次数靠循环控制,循环变量有几次,创建任务链时就有几个节点
-
每执行一次,提交一次新的job,从队尾重新排队,释放上一次执行的线程,等待新线程拉起执行
-
此外,我们还可以将上一次执行通过的用例,从下一次执行的用例列表中排除掉,仅重跑那些失败的用例,提升整体的执行效率
最后,在实际运行中,使用CompletableFuture的任务重跑,整体的执行顺序示意图如下。其中fx是创建好的CompletableFuture实例,Completion则是压入栈中的回调队列。如果还想了解更多关于CompletableFuture的细节与原理信息,请参考这里。
2 总结
2.1 本次改动的成效
通过本次改动,由AONE实验室发起的自动化集成任务测试通过率有了显著的提高(72% -> 86%,本BU在此改动前后一周的数据对比),让业务项目的变更可以更顺畅地发布。
此外,针对多次重跑的功能,老司机还提供了在首次执行的执行结果页中查询重跑情况的能力,将多次重跑的结果合并展示。如下图所示,用户可以一目了然的看到哪些用例进行了重跑,哪些重跑后通过了,哪些重跑2次后也仍未通过等细节信息。
2.2 下一步改进方向
下一步,老司机还将建设重跑功能的可配置化,如重跑次数可配置、重跑范围可定制、重跑前等待时间可设置等,便于某些业务场景可以结合特定的业务实际情况,定制化的按需使用重跑功能。
此外,老司机还考虑在AONE实验室侧升级能力,将更多的变更信息从实验室带入老司机中,让集成任务的执行更加精细化。如将实验室执行时的代码commitId传入老司机,如果查询到相同commitId已经存在集成任务执行记录,则直接返回上次的执行结果,实现AONE实验室的快速重跑,在某些特定场景的发布流程中进一步提效。