SpringBoot 中的多线程事务处理太繁琐?一个自定义注解直接搞定!

简介: SpringBoot 中的多线程事务处理太繁琐?一个自定义注解直接搞定!

前言

我们开发的时候常常会遇到多线程事务的问题。以为添加了@Transactional注解就行了,其实你加了注解之后会发现事务失效。

原因:数据库连接spring是放在threadLocal里面,多线程场景下,拿到的数据库连接是不一样的,即是属于不同事务。

本文是基于springboot的@Async注解开启多线程,,并通过自定义注解和AOP实现的多线程事务,避免繁琐的手动提交/回滚事务  (CV即用、参数齐全、无需配置)

一、springboot多线程(声明式)的使用方法?

1、springboot提供了注解@Async来使用线程池,具体使用方法如下:

(1) 在启动类(配置类)添加@EnableAsync来开启线程池

(2) 在需要开启子线程的方法上添加注解@Async

注意: 框架默认 ----->   来一个请求开启一个线程,在高并发下容易内存溢出

所以使用时需要配置自定义线程池,如下:

@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
    @Bean("threadPoolTaskExecutor")//自定义线程池名称
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活
        executor.setCorePoolSize(16);
        //如果设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
        //executor.setAllowCoreThreadTimeOut(true);
        //阻塞队列 当核心线程数达到最大时,新任务会放在队列中排队等待执行
        executor.setQueueCapacity(124);
        //最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
        //任务队列已满时, 且当线程数=maxPoolSize,,线程池会拒绝处理任务而抛出异常
        executor.setMaxPoolSize(64);
        //当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
        //允许线程空闲时间30秒,当maxPoolSize的线程在空闲时间到达的时候销毁
        //如果allowCoreThreadTimeout=true,则会直到线程数量=0
        executor.setKeepAliveSeconds(30);
        //spring 提供的 ThreadPoolTaskExecutor 线程池,是有setThreadNamePrefix() 方法的。
        //jdk 提供的ThreadPoolExecutor 线程池是没有 setThreadNamePrefix() 方法的
        executor.setThreadNamePrefix("自定义线程池-");
        // rejection-policy:拒绝策略:当线程数已经达到maxSize的时候,如何处理新任务
        // CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行, (个人推荐)
        // AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
        // DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
        // DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁
  executor.setWaitForTasksToCompleteOnShutdown(true);
  //设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
  executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }
}

开启子线程方法:  在需要开启线程的方法上添加 注解@Async("threadPoolTaskExecutor")即可,其中注解中的参数为自定义线程池的名称。

二、自定义注解实现多线程事务控制

1.自定义注解

本文是使用了两个注解共同作用实现的,主线程当做协调者,各子线程作为参与者

package com.example.anno;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * 多线程事务注解: 主事务
 *
 * @author zlj
 * @since 2022/11/3
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface MainTransaction {
 int value();//子线程数量
}
package com.example.anno;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * 多线程事务注解: 子事务
 *
 * @author zlj
 * @since 2022/11/3
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface SonTransaction {
    String value() default "";
}

解释:

两个注解都是用在方法上的,须配合@Transactional(rollbackFor = Exception.class)一起使用

@MainTransaction注解 用在调用方,其参数为必填,参数值为本方法中调用的方法开启的线程数,如:在这个方法中调用的方法中有2个方法用@Async注解开启了子线程,则参数为@MainTransaction(2),另外如果未使用@MainTransaction注解,则直接就无多线程事务执行(不影响方法的单线程事务)

@SonTransaction注解 用在被调用方(开启线程的方法),无需传入参数

2.AOP内容

代码如下:

package com.example.aop;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.example.anno.MainTransaction;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
/**
 * 多线程事务
 *
 * @author zlj
 * @since 2022/11/3
 */
@Aspect
@Component
public class TransactionAop {
    //用来存储各线程计数器数据(每次执行后会从map中删除)
 private static final Map<String, Object> map = new HashMap<>();
 @Resource
 private PlatformTransactionManager transactionManager;
 @Around("@annotation(mainTransaction)")
 public void mainIntercept(ProceedingJoinPoint joinPoint, MainTransaction mainTransaction) throws Throwable {
  //当前线程名称
  Thread thread = Thread.currentThread();
  String threadName = thread.getName();
  //初始化计数器
  CountDownLatch mainDownLatch = new CountDownLatch(1);
  CountDownLatch sonDownLatch = new CountDownLatch(mainTransaction.value());//@MainTransaction注解中的参数, 为子线程的数量
  // 用来记录子线程的运行状态,只要有一个失败就变为true
  AtomicBoolean rollBackFlag = new AtomicBoolean(false);
  // 用来存每个子线程的异常,把每个线程的自定义异常向vector的首位置插入,其余异常向末位置插入,避免线程不安全,所以使用vector代替list
  Vector<Throwable> exceptionVector = new Vector<>();
  map.put(threadName + "mainDownLatch", mainDownLatch);
  map.put(threadName + "sonDownLatch", sonDownLatch);
  map.put(threadName + "rollBackFlag", rollBackFlag);
  map.put(threadName + "exceptionVector", exceptionVector);
  try {
   joinPoint.proceed();//执行方法
  } catch (Throwable e) {
   exceptionVector.add(0, e);
   rollBackFlag.set(true);//子线程回滚
   mainDownLatch.countDown();//放行所有子线程
  }
  if (!rollBackFlag.get()) {
   try {
    // sonDownLatch等待,直到所有子线程执行完插入操作,但此时还没有提交事务
    sonDownLatch.await();
    mainDownLatch.countDown();// 根据rollBackFlag状态放行子线程的await处,告知是回滚还是提交
   } catch (Exception e) {
    rollBackFlag.set(true);
    exceptionVector.add(0, e);
   }
  }
  if (CollectionUtils.isNotEmpty(exceptionVector)) {
   map.remove(threadName + "mainDownLatch");
   map.remove(threadName + "sonDownLatch");
   map.remove(threadName + "rollBackFlag");
   map.remove(threadName + "exceptionVector");
   throw exceptionVector.get(0);
  }
 }
 @Around("@annotation(com.huigu.common.anno.SonTransaction)")
 public void sonIntercept(ProceedingJoinPoint joinPoint) throws Throwable {
  Object[] args = joinPoint.getArgs();
  Thread thread = (Thread) args[args.length - 1];
  String threadName = thread.getName();
  CountDownLatch mainDownLatch = (CountDownLatch) map.get(threadName + "mainDownLatch");
  if (mainDownLatch == null) {
   //主事务未加注解时, 直接执行子事务
   joinPoint.proceed();//这里最好的方式是:交由上面的thread来调用此方法,但我没有找寻到对应api,只能直接放弃事务, 欢迎大神来优化, 留言分享
   return;
  }
  CountDownLatch sonDownLatch = (CountDownLatch) map.get(threadName + "sonDownLatch");
  AtomicBoolean rollBackFlag = (AtomicBoolean) map.get(threadName + "rollBackFlag");
  Vector<Throwable> exceptionVector = (Vector<Throwable>) map.get(threadName + "exceptionVector");
  //如果这时有一个子线程已经出错,那当前线程不需要执行
  if (rollBackFlag.get()) {
   sonDownLatch.countDown();
   return;
  }
  DefaultTransactionDefinition def = new DefaultTransactionDefinition();// 开启事务
  def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);// 设置事务隔离级别
  TransactionStatus status = transactionManager.getTransaction(def);
  try {
   joinPoint.proceed();//执行方法
   sonDownLatch.countDown();// 对sonDownLatch-1
   mainDownLatch.await();// 如果mainDownLatch不是0,线程会在此阻塞,直到mainDownLatch变为0
   // 如果能执行到这一步说明所有子线程都已经执行完毕判断如果atomicBoolean是true就回滚false就提交
   if (rollBackFlag.get()) {
    transactionManager.rollback(status);
   } else {
    transactionManager.commit(status);
   }
  } catch (Throwable e) {
   exceptionVector.add(0, e);
   // 回滚
   transactionManager.rollback(status);
   // 并把状态设置为true
   rollBackFlag.set(true);
   mainDownLatch.countDown();
   sonDownLatch.countDown();
  }
 }
}

扩展说明: CountDownLatch是什么?

一个同步辅助类

  • 创建对象时: 用给定的数字初始化 CountDownLatch
  • countDown() 方法: 使计数减1
  • await() 方法: 阻塞当前线程, 直至当前计数到达零。

本文中:

用 计数 1 初始化的 mainDownLatch 当作一个简单的开/关锁存器,或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。

用 子线程数量 初始化的 sonDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。

3、注解使用Demo

任务方法:

package com.example.demo.service;
import com.example.demo.anno.SonTransaction;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
 * @author zlj
 * @since 2022/11/14
 */
@Service
public class SonService {
    /**
     * 参数说明:  以下4个方法参数和此相同
     *
     * @param args   业务中需要传递的参数
     * @param thread 调用者的线程, 用于aop获取参数, 不建议以方法重写的方式简略此参数,
     *               在调用者方法中可以以此参数为标识计算子线程的个数作为注解参数,避免线程参数计算错误导致锁表
     *               传参时参数固定为: Thread.currentThread()
     */
    @Transactional(rollbackFor = Exception.class)
    @Async("threadPoolTaskExecutor")
    @SonTransaction
    public void sonMethod1(String args, Thread thread) {
        System.out.println(args + "开启了线程");
    }
    @Transactional(rollbackFor = Exception.class)
    @Async("threadPoolTaskExecutor")
    @SonTransaction
    public void sonMethod2(String args1, String args2, Thread thread) {
        System.out.println(args1 + "和" + args2 + "开启了线程");
    }
    @Transactional(rollbackFor = Exception.class)
    @Async("threadPoolTaskExecutor")
    @SonTransaction
    public void sonMethod3(String args, Thread thread) {
        System.out.println(args + "开启了线程");
    }
    //sonMethod4方法没有使用线程池
    @Transactional(rollbackFor = Exception.class)
    public void sonMethod4(String args) {
        System.out.println(args + "没有开启线程");
    }
}

调用方:

package com.example.demo.service;
import com.example.demo.anno.MainTransaction;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
 * @author zlj
 * @since 2022/11/14
 */
@Service
public class MainService {
    @Resource
    private SonService sonService;
    @MainTransaction(3)//调用的方法中sonMethod1/sonMethod2/sonMethod3使用@Async开启了线程, 所以参数为: 3
    @Transactional(rollbackFor = Exception.class)
    public void test1() {
        sonService.sonMethod1("路飞", Thread.currentThread());
        sonService.sonMethod2("索隆", "山治", Thread.currentThread());
        sonService.sonMethod3("娜美", Thread.currentThread());
        sonService.sonMethod4("罗宾");
    }
    /*
     * 有的业务中存在if的多种可能, 每一种走向调用的方法(开启线程的方法)数量如果不同, 这时可以选择放弃使用@MainTransaction注解避免锁表
     * 这时候如果发生异常会导致多线程不能同时回滚, 可根据业务自己权衡是否使用
     */
    @Transactional(rollbackFor = Exception.class)
    public void test2() {
        sonService.sonMethod1("路飞", Thread.currentThread());
        sonService.sonMethod2("索隆", "山治", Thread.currentThread());
        sonService.sonMethod3("娜美", Thread.currentThread());
        sonService.sonMethod4("罗宾");
    }
}

来源:blog.csdn.net/weixin_69672118/

article/details/127675808

相关文章
|
10天前
|
缓存 监控 Java
SpringBoot @Scheduled 注解详解
使用`@Scheduled`注解实现方法周期性执行,支持固定间隔、延迟或Cron表达式触发,基于Spring Task,适用于日志清理、数据同步等定时任务场景。需启用`@EnableScheduling`,注意线程阻塞与分布式重复问题,推荐结合`@Async`异步处理,提升任务调度效率。
257 127
|
25天前
|
XML 安全 Java
使用 Spring 的 @Aspect 和 @Pointcut 注解简化面向方面的编程 (AOP)
面向方面编程(AOP)通过分离横切关注点,如日志、安全和事务,提升代码模块化与可维护性。Spring 提供了对 AOP 的强大支持,核心注解 `@Aspect` 和 `@Pointcut` 使得定义切面与切入点变得简洁直观。`@Aspect` 标记切面类,集中处理通用逻辑;`@Pointcut` 则通过表达式定义通知的应用位置,提高代码可读性与复用性。二者结合,使开发者能清晰划分业务逻辑与辅助功能,简化维护并提升系统灵活性。Spring AOP 借助代理机制实现运行时织入,与 Spring 容器无缝集成,支持依赖注入与声明式配置,是构建清晰、高内聚应用的理想选择。
259 0
|
1月前
|
Java 测试技术 API
将 Spring 的 @Embedded 和 @Embeddable 注解与 JPA 结合使用的指南
Spring的@Embedded和@Embeddable注解简化了JPA中复杂对象的管理,允许将对象直接嵌入实体,减少冗余表与连接操作,提升数据库设计效率。本文详解其用法、优势及适用场景。
206 126
|
2月前
|
XML JSON Java
Spring框架中常见注解的使用规则与最佳实践
本文介绍了Spring框架中常见注解的使用规则与最佳实践,重点对比了URL参数与表单参数的区别,并详细说明了@RequestParam、@PathVariable、@RequestBody等注解的应用场景。同时通过表格和案例分析,帮助开发者正确选择参数绑定方式,避免常见误区,提升代码的可读性与安全性。
|
12天前
|
XML Java 数据格式
常用SpringBoot注解汇总与用法说明
这些注解的使用和组合是Spring Boot快速开发和微服务实现的基础,通过它们,可以有效地指导Spring容器进行类发现、自动装配、配置、代理和管理等核心功能。开发者应当根据项目实际需求,运用这些注解来优化代码结构和服务逻辑。
108 12
|
25天前
|
Java 测试技术 数据库
使用Spring的@Retryable注解进行自动重试
在现代软件开发中,容错性和弹性至关重要。Spring框架提供的`@Retryable`注解为处理瞬时故障提供了一种声明式、可配置的重试机制,使开发者能够以简洁的方式增强应用的自我恢复能力。本文深入解析了`@Retryable`的使用方法及其参数配置,并结合`@Recover`实现失败回退策略,帮助构建更健壮、可靠的应用程序。
104 1
使用Spring的@Retryable注解进行自动重试
|
25天前
|
传感器 Java 数据库
探索Spring Boot的@Conditional注解的上下文配置
Spring Boot 的 `@Conditional` 注解可根据不同条件动态控制 Bean 的加载,提升应用的灵活性与可配置性。本文深入解析其用法与优势,并结合实例展示如何通过自定义条件类实现环境适配的智能配置。
探索Spring Boot的@Conditional注解的上下文配置
|
25天前
|
智能设计 Java 测试技术
Spring中最大化@Lazy注解,实现资源高效利用
本文深入探讨了 Spring 框架中的 `@Lazy` 注解,介绍了其在资源管理和性能优化中的作用。通过延迟初始化 Bean,`@Lazy` 可显著提升应用启动速度,合理利用系统资源,并增强对 Bean 生命周期的控制。文章还分析了 `@Lazy` 的工作机制、使用场景、最佳实践以及常见陷阱与解决方案,帮助开发者更高效地构建可扩展、高性能的 Spring 应用程序。
Spring中最大化@Lazy注解,实现资源高效利用
|
25天前
|
安全 IDE Java
Spring 的@FieldDefaults和@Data:Lombok 注解以实现更简洁的代码
本文介绍了如何在 Spring 应用程序中使用 Project Lombok 的 `@Data` 和 `@FieldDefaults` 注解来减少样板代码,提升代码可读性和可维护性,并探讨了其适用场景与限制。
Spring 的@FieldDefaults和@Data:Lombok 注解以实现更简洁的代码