SpringBoot 中的多线程事务处理太繁琐?一个自定义注解直接搞定!

简介: SpringBoot 中的多线程事务处理太繁琐?一个自定义注解直接搞定!

前言

我们开发的时候常常会遇到多线程事务的问题。以为添加了@Transactional注解就行了,其实你加了注解之后会发现事务失效。

原因:数据库连接spring是放在threadLocal里面,多线程场景下,拿到的数据库连接是不一样的,即是属于不同事务。

本文是基于springboot的@Async注解开启多线程,,并通过自定义注解和AOP实现的多线程事务,避免繁琐的手动提交/回滚事务  (CV即用、参数齐全、无需配置)

一、springboot多线程(声明式)的使用方法?

1、springboot提供了注解@Async来使用线程池,具体使用方法如下:

(1) 在启动类(配置类)添加@EnableAsync来开启线程池

(2) 在需要开启子线程的方法上添加注解@Async

注意: 框架默认 ----->   来一个请求开启一个线程,在高并发下容易内存溢出

所以使用时需要配置自定义线程池,如下:

@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
    @Bean("threadPoolTaskExecutor")//自定义线程池名称
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活
        executor.setCorePoolSize(16);
        //如果设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
        //executor.setAllowCoreThreadTimeOut(true);
        //阻塞队列 当核心线程数达到最大时,新任务会放在队列中排队等待执行
        executor.setQueueCapacity(124);
        //最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
        //任务队列已满时, 且当线程数=maxPoolSize,,线程池会拒绝处理任务而抛出异常
        executor.setMaxPoolSize(64);
        //当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
        //允许线程空闲时间30秒,当maxPoolSize的线程在空闲时间到达的时候销毁
        //如果allowCoreThreadTimeout=true,则会直到线程数量=0
        executor.setKeepAliveSeconds(30);
        //spring 提供的 ThreadPoolTaskExecutor 线程池,是有setThreadNamePrefix() 方法的。
        //jdk 提供的ThreadPoolExecutor 线程池是没有 setThreadNamePrefix() 方法的
        executor.setThreadNamePrefix("自定义线程池-");
        // rejection-policy:拒绝策略:当线程数已经达到maxSize的时候,如何处理新任务
        // CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行, (个人推荐)
        // AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
        // DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
        // DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁
  executor.setWaitForTasksToCompleteOnShutdown(true);
  //设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
  executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }
}

开启子线程方法:  在需要开启线程的方法上添加 注解@Async("threadPoolTaskExecutor")即可,其中注解中的参数为自定义线程池的名称。

二、自定义注解实现多线程事务控制

1.自定义注解

本文是使用了两个注解共同作用实现的,主线程当做协调者,各子线程作为参与者

package com.example.anno;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * 多线程事务注解: 主事务
 *
 * @author zlj
 * @since 2022/11/3
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface MainTransaction {
 int value();//子线程数量
}
package com.example.anno;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * 多线程事务注解: 子事务
 *
 * @author zlj
 * @since 2022/11/3
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface SonTransaction {
    String value() default "";
}

解释:

两个注解都是用在方法上的,须配合@Transactional(rollbackFor = Exception.class)一起使用

@MainTransaction注解 用在调用方,其参数为必填,参数值为本方法中调用的方法开启的线程数,如:在这个方法中调用的方法中有2个方法用@Async注解开启了子线程,则参数为@MainTransaction(2),另外如果未使用@MainTransaction注解,则直接就无多线程事务执行(不影响方法的单线程事务)

@SonTransaction注解 用在被调用方(开启线程的方法),无需传入参数

2.AOP内容

代码如下:

package com.example.aop;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.example.anno.MainTransaction;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
/**
 * 多线程事务
 *
 * @author zlj
 * @since 2022/11/3
 */
@Aspect
@Component
public class TransactionAop {
    //用来存储各线程计数器数据(每次执行后会从map中删除)
 private static final Map<String, Object> map = new HashMap<>();
 @Resource
 private PlatformTransactionManager transactionManager;
 @Around("@annotation(mainTransaction)")
 public void mainIntercept(ProceedingJoinPoint joinPoint, MainTransaction mainTransaction) throws Throwable {
  //当前线程名称
  Thread thread = Thread.currentThread();
  String threadName = thread.getName();
  //初始化计数器
  CountDownLatch mainDownLatch = new CountDownLatch(1);
  CountDownLatch sonDownLatch = new CountDownLatch(mainTransaction.value());//@MainTransaction注解中的参数, 为子线程的数量
  // 用来记录子线程的运行状态,只要有一个失败就变为true
  AtomicBoolean rollBackFlag = new AtomicBoolean(false);
  // 用来存每个子线程的异常,把每个线程的自定义异常向vector的首位置插入,其余异常向末位置插入,避免线程不安全,所以使用vector代替list
  Vector<Throwable> exceptionVector = new Vector<>();
  map.put(threadName + "mainDownLatch", mainDownLatch);
  map.put(threadName + "sonDownLatch", sonDownLatch);
  map.put(threadName + "rollBackFlag", rollBackFlag);
  map.put(threadName + "exceptionVector", exceptionVector);
  try {
   joinPoint.proceed();//执行方法
  } catch (Throwable e) {
   exceptionVector.add(0, e);
   rollBackFlag.set(true);//子线程回滚
   mainDownLatch.countDown();//放行所有子线程
  }
  if (!rollBackFlag.get()) {
   try {
    // sonDownLatch等待,直到所有子线程执行完插入操作,但此时还没有提交事务
    sonDownLatch.await();
    mainDownLatch.countDown();// 根据rollBackFlag状态放行子线程的await处,告知是回滚还是提交
   } catch (Exception e) {
    rollBackFlag.set(true);
    exceptionVector.add(0, e);
   }
  }
  if (CollectionUtils.isNotEmpty(exceptionVector)) {
   map.remove(threadName + "mainDownLatch");
   map.remove(threadName + "sonDownLatch");
   map.remove(threadName + "rollBackFlag");
   map.remove(threadName + "exceptionVector");
   throw exceptionVector.get(0);
  }
 }
 @Around("@annotation(com.huigu.common.anno.SonTransaction)")
 public void sonIntercept(ProceedingJoinPoint joinPoint) throws Throwable {
  Object[] args = joinPoint.getArgs();
  Thread thread = (Thread) args[args.length - 1];
  String threadName = thread.getName();
  CountDownLatch mainDownLatch = (CountDownLatch) map.get(threadName + "mainDownLatch");
  if (mainDownLatch == null) {
   //主事务未加注解时, 直接执行子事务
   joinPoint.proceed();//这里最好的方式是:交由上面的thread来调用此方法,但我没有找寻到对应api,只能直接放弃事务, 欢迎大神来优化, 留言分享
   return;
  }
  CountDownLatch sonDownLatch = (CountDownLatch) map.get(threadName + "sonDownLatch");
  AtomicBoolean rollBackFlag = (AtomicBoolean) map.get(threadName + "rollBackFlag");
  Vector<Throwable> exceptionVector = (Vector<Throwable>) map.get(threadName + "exceptionVector");
  //如果这时有一个子线程已经出错,那当前线程不需要执行
  if (rollBackFlag.get()) {
   sonDownLatch.countDown();
   return;
  }
  DefaultTransactionDefinition def = new DefaultTransactionDefinition();// 开启事务
  def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);// 设置事务隔离级别
  TransactionStatus status = transactionManager.getTransaction(def);
  try {
   joinPoint.proceed();//执行方法
   sonDownLatch.countDown();// 对sonDownLatch-1
   mainDownLatch.await();// 如果mainDownLatch不是0,线程会在此阻塞,直到mainDownLatch变为0
   // 如果能执行到这一步说明所有子线程都已经执行完毕判断如果atomicBoolean是true就回滚false就提交
   if (rollBackFlag.get()) {
    transactionManager.rollback(status);
   } else {
    transactionManager.commit(status);
   }
  } catch (Throwable e) {
   exceptionVector.add(0, e);
   // 回滚
   transactionManager.rollback(status);
   // 并把状态设置为true
   rollBackFlag.set(true);
   mainDownLatch.countDown();
   sonDownLatch.countDown();
  }
 }
}

扩展说明: CountDownLatch是什么?

一个同步辅助类

  • 创建对象时: 用给定的数字初始化 CountDownLatch
  • countDown() 方法: 使计数减1
  • await() 方法: 阻塞当前线程, 直至当前计数到达零。

本文中:

用 计数 1 初始化的 mainDownLatch 当作一个简单的开/关锁存器,或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。

用 子线程数量 初始化的 sonDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。

3、注解使用Demo

任务方法:

package com.example.demo.service;
import com.example.demo.anno.SonTransaction;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
 * @author zlj
 * @since 2022/11/14
 */
@Service
public class SonService {
    /**
     * 参数说明:  以下4个方法参数和此相同
     *
     * @param args   业务中需要传递的参数
     * @param thread 调用者的线程, 用于aop获取参数, 不建议以方法重写的方式简略此参数,
     *               在调用者方法中可以以此参数为标识计算子线程的个数作为注解参数,避免线程参数计算错误导致锁表
     *               传参时参数固定为: Thread.currentThread()
     */
    @Transactional(rollbackFor = Exception.class)
    @Async("threadPoolTaskExecutor")
    @SonTransaction
    public void sonMethod1(String args, Thread thread) {
        System.out.println(args + "开启了线程");
    }
    @Transactional(rollbackFor = Exception.class)
    @Async("threadPoolTaskExecutor")
    @SonTransaction
    public void sonMethod2(String args1, String args2, Thread thread) {
        System.out.println(args1 + "和" + args2 + "开启了线程");
    }
    @Transactional(rollbackFor = Exception.class)
    @Async("threadPoolTaskExecutor")
    @SonTransaction
    public void sonMethod3(String args, Thread thread) {
        System.out.println(args + "开启了线程");
    }
    //sonMethod4方法没有使用线程池
    @Transactional(rollbackFor = Exception.class)
    public void sonMethod4(String args) {
        System.out.println(args + "没有开启线程");
    }
}

调用方:

package com.example.demo.service;
import com.example.demo.anno.MainTransaction;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
 * @author zlj
 * @since 2022/11/14
 */
@Service
public class MainService {
    @Resource
    private SonService sonService;
    @MainTransaction(3)//调用的方法中sonMethod1/sonMethod2/sonMethod3使用@Async开启了线程, 所以参数为: 3
    @Transactional(rollbackFor = Exception.class)
    public void test1() {
        sonService.sonMethod1("路飞", Thread.currentThread());
        sonService.sonMethod2("索隆", "山治", Thread.currentThread());
        sonService.sonMethod3("娜美", Thread.currentThread());
        sonService.sonMethod4("罗宾");
    }
    /*
     * 有的业务中存在if的多种可能, 每一种走向调用的方法(开启线程的方法)数量如果不同, 这时可以选择放弃使用@MainTransaction注解避免锁表
     * 这时候如果发生异常会导致多线程不能同时回滚, 可根据业务自己权衡是否使用
     */
    @Transactional(rollbackFor = Exception.class)
    public void test2() {
        sonService.sonMethod1("路飞", Thread.currentThread());
        sonService.sonMethod2("索隆", "山治", Thread.currentThread());
        sonService.sonMethod3("娜美", Thread.currentThread());
        sonService.sonMethod4("罗宾");
    }
}

来源:blog.csdn.net/weixin_69672118/

article/details/127675808

相关文章
|
1月前
|
XML Java 数据格式
SpringBoot入门(8) - 开发中还有哪些常用注解
SpringBoot入门(8) - 开发中还有哪些常用注解
49 0
|
2月前
|
并行计算 Java 数据处理
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
208 0
|
2月前
|
Java Spring
在使用Spring的`@Value`注解注入属性值时,有一些特殊字符需要注意
【10月更文挑战第9天】在使用Spring的`@Value`注解注入属性值时,需注意一些特殊字符的正确处理方法,包括空格、引号、反斜杠、新行、制表符、逗号、大括号、$、百分号及其他特殊字符。通过适当包裹或转义,确保这些字符能被正确解析和注入。
118 3
|
17天前
|
前端开发 Java Spring
Spring MVC核心:深入理解@RequestMapping注解
在Spring MVC框架中,`@RequestMapping`注解是实现请求映射的核心,它将HTTP请求映射到控制器的处理方法上。本文将深入探讨`@RequestMapping`注解的各个方面,包括其注解的使用方法、如何与Spring MVC的其他组件协同工作,以及在实际开发中的应用案例。
33 4
|
1月前
|
XML JSON Java
SpringBoot必须掌握的常用注解!
SpringBoot必须掌握的常用注解!
60 4
SpringBoot必须掌握的常用注解!
|
17天前
|
前端开发 Java 开发者
Spring MVC中的请求映射:@RequestMapping注解深度解析
在Spring MVC框架中,`@RequestMapping`注解是实现请求映射的关键,它将HTTP请求映射到相应的处理器方法上。本文将深入探讨`@RequestMapping`注解的工作原理、使用方法以及最佳实践,为开发者提供一份详尽的技术干货。
49 2
|
17天前
|
前端开发 Java Spring
探索Spring MVC:@Controller注解的全面解析
在Spring MVC框架中,`@Controller`注解是构建Web应用程序的基石之一。它不仅简化了控制器的定义,还提供了一种优雅的方式来处理HTTP请求。本文将全面解析`@Controller`注解,包括其定义、用法、以及在Spring MVC中的作用。
38 2
|
20天前
|
消息中间件 Java 数据库
解密Spring Boot:深入理解条件装配与条件注解
Spring Boot中的条件装配与条件注解提供了强大的工具,使得应用程序可以根据不同的条件动态装配Bean,从而实现灵活的配置和管理。通过合理使用这些条件注解,开发者可以根据实际需求动态调整应用的行为,提升代码的可维护性和可扩展性。希望本文能够帮助你深入理解Spring Boot中的条件装配与条件注解,在实际开发中更好地应用这些功能。
26 2
|
21天前
|
JSON Java 数据格式
springboot常用注解
@RestController :修饰类,该控制器会返回Json数据 @RequestMapping(“/path”) :修饰类,该控制器的请求路径 @Autowired : 修饰属性,按照类型进行依赖注入 @PathVariable : 修饰参数,将路径值映射到参数上 @ResponseBody :修饰方法,该方法会返回Json数据 @RequestBody(需要使用Post提交方式) :修饰参数,将Json数据封装到对应参数中 @Controller@Service@Compont: 将类注册到ioc容器
|
21天前
|
XML Java 数据格式
SpringBoot入门(8) - 开发中还有哪些常用注解
SpringBoot入门(8) - 开发中还有哪些常用注解
36 2