基于 Spring 事务的可靠异步调用实践

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
EMR Serverless StarRocks,5000CU*H 48000GB*H
网络型负载均衡 NLB,每月750个小时 15LCU
简介: pringTxAsync 组件是仓储平台组(WMS6)自主研发的一个专门用于解决可靠异步调用问题的组件。通过使用 SpringTxAsync 组件,我们成功地解决了在仓储平台(WMS6)中的异步调用需求。经过近二年多的实践并经历了两次 618 活动以及两次双 11 活动,该组件已经在我们的所有应用中稳定运行并成功应用于各种业务场景。该组件的主要功能是实现可靠的异步调用。在异步任务的执行过程中,我们能

SpringTxAsync 组件是仓储平台组(WMS6)自主研发的一个专门用于解决可靠异步调用问题的组件。

通过使用 SpringTxAsync 组件,我们成功地解决了在仓储平台(WMS6)中的异步调用需求。经过近二年多的实践并经历了两次 618 活动以及两次双 11 活动,该组件已经在我们的所有应用中稳定运行并成功应用于各种业务场景。

该组件的主要功能是实现可靠的异步调用。在异步任务的执行过程中,我们能够确保任务的可靠性,即使在出现异常情况或重要机器重启等不确定因素时,仍能保证任务的正常执行,并且能够满足我们的业务需求。

SpringTxAsync 组件的成功应用为我们的仓储平台(WMS6)提供了稳定可靠的异步调用支持。在接下来的内容中,我们将详细介绍该组件的设计原理和技术特点,以帮助读者更好地理解和应用该组件。

异步调用的场景
异步的本质就是一种 Fire-And-Forget 模式,它在编程中常用于两种场景的应用:

提升请求的响应速度,减少不必要的同步等待时间。通过将某些操作以异步方式执行,可以避免在请求处理过程中发生长时间的阻塞等待,从而提高系统的吞吐量和响应性能。
实现逻辑的解耦,将纯实时逻辑与非线性非实时逻辑进行解耦。在事件驱动的场景中,异步机制常被广泛应用。通过异步方式处理事件,可以将实时逻辑与非实时逻辑分离,提高系统的可维护性和扩展性。
异步编程模式的使用可以带来多种好处,例如提升系统性能、改善用户体验、增强系统的稳定性和可扩展性等。

简单举例说明一下:

用户注册完成后,需要发送注册成功的邮件通知。

// transaction begin
User user = UserService.create(UserCreateRequest reqeust)//本地事务
NotificationService.sendRegisterEmail(user)// 本地调用
// do other business operations
//transaction commit

//NotificationService.sendRegisterEmail(user) 实现
@TransactionAsync
public void sendRegisterEmail(User user){
EmailService.send(...);//远程调用
}
可选技术方案
消息中间件: 通过向消息中间件发送消息,消费者监听并消费消息来执行异步任务。然而,由于消息中间件的一致性保证较为困难(许多消息中间件没有可靠的发送端,有些中间件虽然有,但使用较为复杂)。
多线程: 可以通过 Fork 线程或直接使用 Spring 的 @Async 注解来实现异步。但需要注意,在使用 Spring 的 JDBC 事务时,一旦 fork 出新线程,就会脱离原事务范畴,丧失 JDBC 事务的可靠性和一致性。
进程内事件通知: Guava Event 和 Spring Event 本质上都是实现了发布 - 订阅模式,可以用于实现异步调用。Guava Event 提供了同步和异步的事件发布,其实现是基于队列和线程池。Spring Event 配置了 Spring Async 以支持异步操作,同时还提供了 @TransactionEventListener,在当前事务的不同阶段发布事件。然而,底层本质上仍然是 fork 了线程,因此会失去事务特性。
Outbox Pattern: Outbox Pattern 是一种常见的微服务场景下基于 Guaranteed Delivery 模式的技术范式,旨在实现可靠的消息传递。主要用于解决应用与外部应用之间交互的可靠性和一致性问题,可看作是分布式事务的一种简单场景。
类似如下场景:

用户下单成功后给用户发一个 email
用户注册成功后,发布一个事件(Message)到消息中件间
在 DDD 开发模式下中处理聚合根之间的领域事件通知
整体示意图如下:

我们的核心需求
异步任务的可靠性,与本地事务的一致性。

可靠性
可靠性是指在创建完成异步任务并成功提交本地事务后,确保异步任务一定会被执行,无论是否出现异常情况或机器重启等意外因素,始终能够保证达到 At-Least-Once 语义。

一致性
一致性指的是异步方法内部业务逻辑与调用异步方法时的事务上下文保持一致。当本地事务提交时,异步任务执行;而当本地事务回滚时,异步任务则不会执行。

基于以上两点需求,我们做了能满足上述场景的 OutboxPattern 实现。

SpringTxAsync 方案
核心逻辑
为了对使用 @TransactionAsync 标注为异步的方法进行拦截,我们可以采用 AOP 方式。

在拦截过程中,将 Invocation 封装成一个异步任务,并将其持久化到数据库的异步任务表中,该操作需要在当前事务域内完成。

对于已经保存在本地表中的异步任务的处理,我们可以考虑两种实现方案。

方案 1
首先,异步任务表被视为只读表,只会插入任务记录,不会进行修改操作。对于任务的处理,我们可以通过监听表的 binlog 来实现。当收到 insert 事件时,将其转换成消息队列(mq)消息,在应用中有一个监听器(listener)负责消费这些消息。

方案 2
为了有效追踪异步任务的生命周期,我们对异步任务进行状态化管理。

当有新任务生成时,我们将其插入到异步任务表中。在当前事务提交后,任务会立即提交到线程池中执行,而不是从数据库中获取任务。只有发生异常的异步任务,才会定时从数据库中获取任务来处理。 这种方案可以确保异步任务能够及时地被提交和执行,同时在异常情况下也能够保证任务的处理。

两种方案的取舍:我们选择第二种方案

减少依赖的中间件,降低组件的整体复杂度以及团队接入组件后运维成本
降低任务数据的调用链长度,从 binlog 到 mq 这中间增加了很多不确定性
异步任务状态机
异步任务的生命周期共有五个状态:READY(就绪)、RUNNING(运行中)、EXCEPTION(异常)、SUCCESS(成功)、FAIL(失败)

当新的异步任务生成时,我们将其插入到异步任务表中,并将状态设置为 READY(0)。
在当前事务提交后,任务将被立即提交到线程池中进行执行,而不是从数据库中获取任务。这种方式确保了任务能够及时执行,并与事务保持一致性。
在任务的执行过程中,如果发生异常情况,我们可以将任务的状态修改为 EXCEPTION(2),并单独处理这些异常任务。这样,我们可以定时从数据库中获取这些异常任务,并进行重新处理,以确保任务能够顺利完成。
同时,我们还需要确定任务的成功与失败状态。当任务执行成功时,我们将其状态设置为 SUCCESS(4),表示任务已成功完成。
而当任务多次执行异常时,达到某个预设的阈值,我们将其状态设置为 FAIL(3),以标记任务的失败状态,失败的任务不再重试。
为了处理可能的机器重启、掉电等场景,我们引入了额外的定时器处理对 READY 和 RUNNING 状态下的超时情况进行监控,如果在这两个状态发生超时,那么直接转入 EXCEPTION 或 FAIL 状态。

核心代码
异步方法注解,用来声明异步任务

/**

  • 标记在Spring 容器管理的bean的public方法上,以实现本地事务级别的可靠异步调用。
    *
  • 对于加注了该注解的方法:

  • 1. 如果当前处理事务中,则对异步方法的调用是在事务提交之后进行的,这样是为了保证异步调用与本地事务的一致性,

  • 假如事务回滚,则异步调用不会执行。
  • 2. 如果当前调用不在事务中,则可靠性退化为与{@link org.springframework.scheduling.annotation.Async}一致。
    *

  • 对于异步逻辑的调用,能保证At Least Once的语义。

  • 在极端场景下,比如down机,或线程池被打爆的情况下,是通过重度来保证调用的可靠性的,所以有可能对异步逻辑的执行大于一次,所以要求异步逻辑本身是幂等的。
    *

  • 注意:在同一个bean内部调用时不起作用,这是因为Spring AOP的Proxy代理机制导致。
    /
    @Target({ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    public @interface TransactionAsync {

    public static int UN_DEFINED = -1;

    /*

    • -1 as not set. default 5 min
    • @return timeout value, in seconds
      /
      long timeout() default UN_DEFINED;

      /**

    • 用来指定当前异步任务执行的线程池,这里指定线程池的Bean名称,运行时会从Spring容器中根据名称查询。
    • 如果未指定,则使用一个默认线程池,全局共享一个。
    • 不同的异步任务优先级高低不同,如果想要隔离,可以给不同任务指定不同的线程池。
    • 注意:如果要自定义线程池,要使用Spring的ThreadPoolTaskExecutor,这个线程池的实现,支持运行时调整线程数。
    • @return executor bean name
      */
      String executor() default "";

      /**

    • -1 as not set. default 3 times
      *
    • @return max retry attempt
      */
      int maxAttempt() default UN_DEFINED;
      }

拦截 @TransactionAsync,在 AOP 中做切面逻辑。伪代码如下:

TransactionAsync.AOP.invoke(MethodInvocation) {
AsyncInvocation asyncInvocation = 根据当前方法以及注解里的属性(支持SpringEL)确定落库的各个字段值
wms_async_task.insert(asyncInvocation, status=READY) // 当前事务里塞入一个insert,如果无事务auto_commit
ThreadPoolTaskExecutor executor = determineAsyncExecutor(method) // 根据方法获取异步执行的线程池
if isInTransaction() {
// 注册事务提交后的hook
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
void afterCommit() { // 仍然在当前业务线程里,但事务外
executor.submit(asyncInvocation) // 转为异步线程,原事务数据变化可见
}
})
} else {
executor.submit(asyncInvocation) // redo log writen, invoke task best effort
}
}
方案完备性
错误重试
为了处理异步任务发生异常的情况,我们引入了重试机制。默认情况下,异步任务将进行 3 次重试,并且可以在注解中自定义重试次数。重试的间隔采用指数级递增的方式,直到达到最大重试间隔 300 秒后,将以固定间隔进行重试。当任务达到最大重试次数后,任务将被标记为 DeadTask,同时触发报警并需要人工介入处理。

重试机制的实现依赖于 Spring 框架的本地调度器 @Schedule。我们通过 @Schedule 注解定义一个定时任务,该任务会从异步任务表中根据任务的状态获取任务,并调用相应的处理方法。需要注意的是,使用乐观锁来管理任务的状态,以避免多个本地 Spring Schedule 任务执行相同的失败任务。

通过以上的重试机制,我们能够及时处理异步任务发生异常的情况,并尽可能地进行重试,以提高任务的成功率。在达到最大重试次数后,及时报警并进行人工介入,以确保任务的及时处理和系统的稳定运行。

超时保护
有三种类型的超时:等待超时、执行超时和失败超时。

等待超时:当任务进入就绪状态(Ready)后,会等待一段时间(waitTimeout),等待被线程池调度执行。如果超过等待超时阈值,则任务被标记为失败任务,并进入重试队列。另外,如果任务提交后,发生了机器重启,任务将一直保持在就绪状态,直到等待超时。
执行超时:当任务处于运行中状态(Running),如果执行时间超过限定的超时时间(executeTimeout),任务将被标记为失败任务,并进入重试队列。同样,如果由于机器重启等原因导致任务一直处于运行状态,任务将一直卡在运行状态,直到执行超时。
失败超时:为了处理已经标记为失败的任务,我们还引入了失败超时机制。一旦任务被标记为失败,会设定一个失败超时时间(根据失败次数进行计算,采用指数递增策略)。在这个失败超时时间到期后,任务将被定时任务捞取出来,进行重试操作。 通过对等待超时、执行超时和失败超时进行明确的描述,我们能够更清晰地理解不同超时情况下的任务处理逻辑,并有效地进行超时相关的任务重试和管理。
队列管理
一旦异步任务被创建并持久化到数据库,当创建异步任务所在的事务提交后,异步任务会立即提交给线程池进行处理。由于仓储平台的大部分应用都是长时间运行的任务,所以我们默认配置了较大的执行线程池,并使用阻塞队列作为缓冲。 线程池默认采用 ThreadPoolTaskExecutor 来执行任务,队列使用 LinkedBlockingQueue 来实现有界队列。当任务提交到队列无法进入时,采用丢弃策略(DiscardPolicy)并触发报警。需要注意的是,被丢弃的任务并不会真正丢失,而是会被当做等待超时的处理方式。

隔离性
通过使用线程池来实现任务的隔离是一种有效的方法。在声明异步任务时,可以在注解中明确指定所使用的线程池,以实现任务的隔离。举例来说,对于不同的优先级任务,可以使用不同的线程池来处理;而对于不同特征的任务,则可以配置不同的线程池;另外,还可以将短时任务与长时任务分别隔离开,分别放入不同的线程池中,以保证任务能够得到有效地处理。

伸缩性
在某些情况下,一个节点可能会产生大量的异步任务,而这些任务只能在该节点上执行,无法分配给其他节点执行。这会导致该节点负载过重,而其他节点却处于闲置状态。

为了解决这个问题,我们需要合理设计异步任务的粒度。如果一个节点产生的异步任务过于庞大,可以考虑将其拆分为更小的任务,并分配给其他节点执行。这样做的好处是,能够有效地平衡各个节点的负载,提高系统的性能和可伸缩性。

此外,还需要考虑创建异步任务本身的请求是否能够进行负载均衡。如果一个节点的异步任务请求过于频繁,可以通过一些负载均衡的策略来分担请求压力,例如使用负载均衡器或者分布式任务调度器来进行任务的分发和调度。

因此,在设计异步任务时,需要综合考虑任务粒度和请求负载均衡两个方面,以实现合理的任务分配和节点负载均衡,从而保证整个系统的稳定性和可伸缩性。

可配置项
提供了大量的可选配置项以满足不同的业务场景

核心配置选项如下:

默认最大重试次数
默认的失败重试间隔
默认执行超时时间
默认异步任务线程池相关核心参数
扩展点
通过采用 SPI(Service Provider Interface)方式,系统提供了一些可扩展的接入点,以增强异步任务执行过程中的灵活性。这些接入点可以根据具体业务场景,自定义增加相应的逻辑。 以下是几个重要的扩展点和其作用:

重试异常回调(RetryExceptionCallback): 当异步任务执行过程中遇到需要重试的异常时,可以通过该回调接口进行相应的处理逻辑。
异步任务调用回调(InvocationCallback): 在异步任务完成后,可以通过调用该回调接口来处理任务执行成功或异常的情况。
特殊数据源提供者(DataSourceWrapper): 用于处理分库数据源,通过该接口可以自定义管理特殊的数据源。
异步任务主键生成器策略提供者(SnowflakeWorkerIdStrategyProvider): 用于自定义异步任务主键生成策略,以满足具体业务需求。
线程池自定义拒绝策略提供者(RejectedExecutionHandlerProvider): 用于定义线程池的拒绝策略,当任务无法被接受时,可以根据具体需求进行自定义处理。
定时调度异常处理器提供者(ScheduleErrorHandlerProvider): 用于处理定时调度任务执行过程中的异常情况,可以根据具体需要进行相应的处理策略。
通过以上提供的扩展点,系统提供了灵活性和扩展性,便于根据业务特定需求进行定制化开发。使用者可以根据自身业务场景和需求,选择相应的扩展点,并编写逻辑来满足具体的业务要求。这种 SPI 扩展机制使得系统更具可定制性和代码的可维护性。

管理 API
任务的查询
任务的重置
监控点
通过 UMP(京东自研监控平台) SDK 上报到 UMP 平台已经内置的监控如下所示,还可以通过预留的 SPI 增加自定义监控。

任务执行异常监控
线程池拒绝线程监控
任务积压监控
组件内部异常监控

相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
3月前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
16天前
|
Java 开发者 Spring
Spring高手之路24——事务类型及传播行为实战指南
本篇文章深入探讨了Spring中的事务管理,特别是事务传播行为(如REQUIRES_NEW和NESTED)的应用与区别。通过详实的示例和优化的时序图,全面解析如何在实际项目中使用这些高级事务控制技巧,以提升开发者的Spring事务管理能力。
31 1
Spring高手之路24——事务类型及传播行为实战指南
|
9天前
|
XML Java 数据库连接
Spring中的事务是如何实现的
Spring中的事务管理机制通过一系列强大的功能和灵活的配置选项,为开发者提供了高效且可靠的事务处理手段。无论是通过注解还是AOP配置,Spring都能轻松实现复杂的事务管理需求。掌握这些工具和最佳实践,能
15 3
|
25天前
|
数据采集 Java 数据安全/隐私保护
Spring Boot 3.3中的优雅实践:全局数据绑定与预处理
【10月更文挑战第22天】 在Spring Boot应用中,`@ControllerAdvice`是一个强大的工具,它允许我们在单个位置处理多个控制器的跨切面关注点,如全局数据绑定和预处理。这种方式可以大大减少重复代码,提高开发效率。本文将探讨如何在Spring Boot 3.3中使用`@ControllerAdvice`来实现全局数据绑定与预处理。
58 2
|
28天前
|
SQL Java 数据库
Spring Boot与Flyway:数据库版本控制的自动化实践
【10月更文挑战第19天】 在软件开发中,数据库的版本控制是一个至关重要的环节,它确保了数据库结构的一致性和项目的顺利迭代。Spring Boot结合Flyway提供了一种自动化的数据库版本控制解决方案,极大地简化了数据库迁移管理。本文将详细介绍如何使用Spring Boot和Flyway实现数据库版本的自动化控制。
26 2
|
2月前
|
Java 数据库连接 数据库
spring复习05,spring整合mybatis,声明式事务
这篇文章详细介绍了如何在Spring框架中整合MyBatis以及如何配置声明式事务。主要内容包括:在Maven项目中添加依赖、创建实体类和Mapper接口、配置MyBatis核心配置文件和映射文件、配置数据源、创建sqlSessionFactory和sqlSessionTemplate、实现Mapper接口、配置声明式事务以及测试使用。此外,还解释了声明式事务的传播行为、隔离级别、只读提示和事务超时期间等概念。
spring复习05,spring整合mybatis,声明式事务
|
2月前
|
Java 测试技术 数据库
Spring事务传播机制(最全示例)
在使用Spring框架进行开发时,`service`层的方法通常带有事务。本文详细探讨了Spring事务在多个方法间的传播机制,主要包括7种传播类型:`REQUIRED`、`SUPPORTS`、`MANDATORY`、`REQUIRES_NEW`、`NOT_SUPPORTED`、`NEVER` 和 `NESTED`。通过示例代码和数据库插入测试,逐一展示了每种类型的运作方式。例如,`REQUIRED`表示如果当前存在事务则加入该事务,否则创建新事务;`SUPPORTS`表示如果当前存在事务则加入,否则以非事务方式执行;`MANDATORY`表示必须在现有事务中运行,否则抛出异常;
143 4
Spring事务传播机制(最全示例)
|
1月前
|
Java 关系型数据库 MySQL
Spring事务失效,我总结了这7个主要原因
本文详细探讨了Spring事务在日常开发中常见的七个失效原因,包括数据库不支持事务、类不受Spring管理、事务方法非public、异常被捕获、`rollbackFor`属性配置错误、方法内部调用事务方法及事务传播属性使用不当。通过具体示例和源码分析,帮助开发者更好地理解和应用Spring事务机制,避免线上事故。适合所有使用Spring进行业务开发的工程师参考。
32 2
|
1月前
|
Java 程序员 Spring
Spring事务的1道面试题
每次聊起Spring事务,好像很熟悉,又好像很陌生。本篇通过一道面试题和一些实践,来拆解几个Spring事务的常见坑点。
Spring事务的1道面试题
|
2月前
|
Java Spring
Spring 事务传播机制是什么?
Spring 事务传播机制是什么?
23 4
下一篇
无影云桌面