在前面二十四章做了一个springboot使用EasyExcel和线程池实现多线程导入Excel数据的demo,在写时忘了做事务处理,评论区有个大佬提出来了,这章就对二十四章的代码做一个改造,完善多线程的事务处理。
对于springboot的事务处理,前面在二十三章也做过springboot整合spring事务详解以及实战的学习,但是在多线程时,这个东西并不适用,本章就通过手写事务处理(编程式事务处理)。
由于本章是针对二十四章的批量导入功能的扩展,所有不会再写事务处理不相关的(二十四章的内容)介绍了。
一、阐述目的与实现方式
前面章节实现的多线程处理excel导入功能,如果一个子线程出现错误,结果会是那个子线程的数据处理不了,而其他子线程的数据仍然正常处理保存,并不会存在事务处理的情况,本章改造代码实现事务处理,所有线程正常执行才会保存数据,否则就回滚。大致如下:
二、手动让子线程报错
为了后面测试事务回滚,手动让某个子线程报错,比如名为线程3的子线程,如下:
三、改造主线程
根据上面图的思路,首先改造主线程的代码,整体代码如下:
package com.swagger.demo.service; import com.alibaba.excel.context.AnalysisContext; import com.alibaba.excel.event.AnalysisEventListener; import com.swagger.demo.config.SpringJobBeanFactory; import com.swagger.demo.mapper.DeadManMapper; import com.swagger.demo.model.entity.DeadManExcelData; import com.swagger.demo.thread.DeadManThread; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; /** * @author zrc * @version 1.0 * @description: TODO 最新入狱名单导入监听器 * * @date 2022/5/30 15:56 */ @Service @Slf4j @Component public class DeadManExcelListener extends AnalysisEventListener<DeadManExcelData> { /** * 多线程保存集合,使用线程安全集合 */ private List<DeadManExcelData> list = Collections.synchronizedList(new ArrayList<>()); /** * 创建线程池必要参数 */ private static final int CORE_POOL_SIZE = 10; // 核心线程数 private static final int MAX_POOL_SIZE = 100; // 最大线程数 private static final int QUEUE_CAPACITY = 100; // 队列大小 private static final Long KEEP_ALIVE_TIME = 1L; // 存活时间 public List<DeadManExcelData> getData(){ return list; } public DeadManExcelListener(){ } public void setData(List<DeadManExcelData> deadManExcelDataList){ this.list = deadManExcelDataList; } @Override public void invoke(DeadManExcelData deadManExcelData, AnalysisContext analysisContext) { if(deadManExcelData!=null){ list.add(deadManExcelData); } } /** * 多线程方式保存 * @param analysisContext */ @Override public void doAfterAllAnalysed(AnalysisContext analysisContext) { log.info("解析结束,开始插入数据"); // 创建线程池 ExecutorService executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_CAPACITY), new ThreadPoolExecutor.CallerRunsPolicy()); // 指定每个线程需要处理的导入数量,假设每个线程处理15000条,注意配合上面线程池的大小 int singleThreadDealCount = 15000; // 根据假设每个线程需要处理的数量以及总数,计算需要提交到线程池的线程数量 int threadSize=(list.size()/singleThreadDealCount)+1; // 计算需要导入的数据总数,用于拆分时线程需要处理数据时使用 int rowSize = list.size() + 1; // 测试开始时间 long startTime = System.currentTimeMillis(); // 申明该线程需要处理数据的开始位置 int startPosition = 0; // 申明该线程需要处理数据的结束位置 int endPosition = 0; // 为了让每个线程执行完后回到当前线程,使用CountDownLatch,值为线程数,每次线程执行完就会执行countDown方法减1,为0后回到主线程,也就是当前线程执行后续的代码 CountDownLatch count = new CountDownLatch(threadSize); // 用来控制主线程回到子线程 CountDownLatch mainCount = new CountDownLatch(1); // 用来控制最终回到主线程 CountDownLatch endCount = new CountDownLatch(threadSize); // 用来存放子线程的处理结果,若出错就保存一个false CopyOnWriteArrayList<Boolean> sonResult = new CopyOnWriteArrayList<Boolean>(); // 使用线程安全的对象存储,保存主线程最后总的判断结果,是提交还是回滚 AtomicBoolean ifSubmit = new AtomicBoolean(true); // 计算每个线程要处理的数据 for(int i=0;i<threadSize;i++){ // 如果是最后一个线程,为保证程序不发生空指针异常,特殊判断结束位置 if((i+1)==threadSize){ // 计算开始位置 startPosition = (i * singleThreadDealCount); // 当前线程为划分的最后一个线程,则取总数据的最后为此线程的结束位置 endPosition = rowSize-1; }else{ // 计算开始位置 startPosition = (i * singleThreadDealCount); // 计算结束位置 endPosition = (i + 1) * singleThreadDealCount; } DeadManMapper deadManMapper = SpringJobBeanFactory.getBean(DeadManMapper.class); DeadManThread thread = new DeadManThread(count,deadManMapper,list,startPosition,endPosition ,sonResult,mainCount,ifSubmit,endCount); executor.execute(thread); } try { count.await(); for (Boolean resp : sonResult) { if (!resp) { // 只要有一个子线程出异常,就设置最终结果为回滚 log.info("主线程:有线程执行失败,所有线程需要回滚"); ifSubmit.set(false); break; } } } catch (InterruptedException e) { e.printStackTrace(); }finally { // 回到子线程处理回滚或者提交事务 mainCount.countDown(); } try { endCount.await(); // 逻辑处理完,关闭线程池 executor.shutdown(); long endTime = System.currentTimeMillis(); log.info("总耗时:"+(endTime-startTime)); } catch (InterruptedException e) { e.printStackTrace(); } } }
新增如下4个参数(count是前面章节的已有的)。
PS:CountDownLatch类前面有讲过,通过await和countDown方法能够方便的实现多个线程之间的来回切换。
CopyOnWriteArrayList和AtomicBoolean是为了能够线程安全的保存多个线程共同使用的数据。
接着,重写DeadManThread线程类的构造方法,将上面新增的四个参数通过构造方法传给子线程。然后调用记录子线程第一次的cout的await方法,等待子线程第一次执行完毕,回到主线程继续执行。回到主线程后,主线程判断子线程第一次执行完后保存的返回集,判断是否存在false(子线程若报错,保存false,否则保存true)。若存在false就将idsubmit设置为false,意思是需要回滚数据,然后调用记录主线程执行的mainCount的countDown方法,让主线程执行完毕,回到子线程调用mainCount.countDown的位置继续子线程执行。
当子线程根据ifSubmit判断完进行回滚还是提交事务操作后,回到主线程,主线程关闭线程池。
四、改造子线程
接着改造子线程,整体代码如下:
package com.swagger.demo.thread; import com.swagger.demo.config.SpringJobBeanFactory; import com.swagger.demo.mapper.DeadManMapper; import com.swagger.demo.model.entity.DeadMan; import com.swagger.demo.model.entity.DeadManExcelData; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Component; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.DefaultTransactionDefinition; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; /** * @author zrc * @version 1.0 * @description TODO * @date 2022/7/22 15:40 */ @Component @Slf4j public class DeadManThread implements Runnable{ /** * 当前线程需要处理的总数据中的开始位置 */ private int startPosition; /** * 当前线程需要处理的总数据中的结束位置 */ private int endPosition; /** * 需要处理的未拆分之前的全部数据 */ private List<DeadManExcelData> list = Collections.synchronizedList(new ArrayList<>()); /** * 记录子线程第一次执行是否完成 */ private CountDownLatch count; private DeadManMapper deadManMapper; /** * 保存每个线程的执行结果 */ private CopyOnWriteArrayList<Boolean> sonResult; /** * 记录主线程是否执行过判断每个线程的执行结果这个操作 */ private CountDownLatch mainCount; /** * 记录主线程对每个线程的执行结果的判断 */ private AtomicBoolean ifSubmit; /** * 声明该子线程的事务管理器 */ private DataSourceTransactionManager dataSourceTransactionManager; /** * 声明该线程事务的状态 */ private TransactionStatus status; /** * 记录子线程第二次执行是否完成 */ private CountDownLatch endCount; public DeadManThread() { } public DeadManThread(CountDownLatch count, DeadManMapper deadManMapper, List<DeadManExcelData> list , int startPosition, int endPosition, CopyOnWriteArrayList<Boolean> sonResult,CountDownLatch mainCount ,AtomicBoolean ifSubmit,CountDownLatch endCount) { this.startPosition = startPosition; this.endPosition = endPosition; this.deadManMapper = deadManMapper; this.list = list; this.count = count; this.sonResult = sonResult; this.mainCount = mainCount; this.ifSubmit = ifSubmit; this.endCount = endCount; } @Override public void run() { try{ dataSourceTransactionManager = SpringJobBeanFactory.getBean(DataSourceTransactionManager.class); DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); status = dataSourceTransactionManager.getTransaction(def); if(Thread.currentThread().getName().contains("3")){ throw new RuntimeException("线程3出问题了"); } List<DeadMan> deadManList = new ArrayList<>(); List<DeadManExcelData> newList = list.subList(startPosition, endPosition); // 将EasyExcel对象和实体类对象进行一个转换 for (DeadManExcelData deadManExcelData : newList) { DeadMan deadMan = new DeadMan(); BeanUtils.copyProperties(deadManExcelData, deadMan); deadManList.add(deadMan); } // 批量新增 deadManMapper.insertBatchSomeColumn(deadManList); sonResult.add(true); } catch (Exception e) { e.printStackTrace(); sonResult.add(false); } finally { // 当一个线程执行完了计数要减一不然这个线程会被一直挂起 count.countDown(); try { log.info(Thread.currentThread().getName() + ":准备就绪,等待其他线程结果,判断是否事务提交"); mainCount.await(); } catch (InterruptedException e) { e.printStackTrace(); } if (ifSubmit.get()) { dataSourceTransactionManager.commit(status); log.info(Thread.currentThread().getName() + ":事务提交"); } else { dataSourceTransactionManager.rollback(status); log.info(Thread.currentThread().getName() + ":事务回滚"); } // 执行完所有逻辑,等待主线程执行 endCount.countDown(); } } }
先改造构造方法,用于接受主线程传过来的参数,并设置到自己的内部私有。
然后改造run方法。
先开启自己的事务,并保存事务状态,用于后面执行提交或者回滚操作。数据处理完后,若正常结束将线程安全的返回值集合变量保存一个true,否则保存false,并执行记录第一线程执行的count的countDown方法,等待所有子线程执行完后,返回主线程执行刚才上面讲的判断sonResult的代码,等主线程执行完判断并设置ifSubmit的值后,回到子线程执行main.await之后的代码。
如果ifSubmit是false就回滚,否则就提交,执行完后执行记录子线程第二次执行的endCount的countDown方法,等待子线程全部执行完后,回到主线程,主线程执行关闭线程池的逻辑,结束。
五、测试
代码写完,测试一下。测试之前数据:
现在是线程3会报错,调用接口测试。
事务成功回滚,数据没有提交。若删除手动抛异常的代码,让程序正常执行,如下:
数据提交成功,事务正常处理。
补充: 关于可能出现的死锁问题(子线程数大于10时发生),可能是springboot默认的datasource:
hikari:
maximum-pool-size这个属性导致的,maximum-pool-size:最大连接数,小于等于0会被重置为默认值10。
hikari是springboot默认使用的数据源连接池。
如果子线程是大于10个,但是最大连接数只有10,就会导致后面的子线程连不上数据库,10个连接上数据库的线程也无法释放,导致出现死锁。