四、基于两个CountDownLatch控制多线程事务提交
由于多线程提交时,每个线程事务时单独的,无法保证一致性,我们尝试给多线程添加事务控制,来保证每个线程都是在插入数据完成后在提交事务,
这里我们使用两个 CountDownLatch 来控制主线程与子线程事务提交,并设置了超时时间为 30 秒。我们对代码进行了一点修改:
@Override public void updateStudentsThread(List<Student> students, CountDownLatch threadLatch, CountDownLatch mainLatch, StudentTaskError taskStatus) { TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); System.out.println("子线程:" + Thread.currentThread().getName()); try { students.forEach(s -> { // 更新教师信息 // String teacher = s.getTeacher(); String newTeacher = "TNO_" + new Random().nextInt(100); s.setTeacher(newTeacher); studentMapper.update(s); }); } catch (Throwable e) { taskStatus.setIsError(); } finally { threadLatch.countDown(); // 切换到主线程执行 } try { mainLatch.await(); //等待主线程执行 } catch (Throwable e) { taskStatus.setIsError(); } // 判断是否有错误,如有错误 就回滚事务 if (taskStatus.getIsError()) { dataSourceTransactionManager.rollback(transactionStatus); } else { dataSourceTransactionManager.commit(transactionStatus); } } /** * 由于每个线程都是单独的事务,需要添加对线程事务的统一控制 * 我们这边使用两个 CountDownLatch 对子线程的事务进行控制 */ @Test void updateStudentWithThreadsAndTrans() { //查询总数据 List<Student> allStudents = studentMapper.getAll(); // 线程数量 final Integer threadCount = 4; //每个线程处理的数据量 final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount; // 创建多线程处理任务 ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount); CountDownLatch threadLatchs = new CountDownLatch(threadCount); // 用于计算子线程提交数量 CountDownLatch mainLatch = new CountDownLatch(1); // 用于判断主线程是否提交 StudentTaskError taskStatus = new StudentTaskError(); // 用于判断子线程任务是否有错误 for (int i = 0; i < threadCount; i++) { // 每个线程处理的数据 List<Student> threadDatas = allStudents.stream() .skip(i * dataPartionLength).limit(dataPartionLength) .collect(Collectors.toList()); studentThreadPool.execute(() -> { studentService.updateStudentsThread(threadDatas, threadLatchs, mainLatch, taskStatus); }); } try { // 倒计时锁设置超时时间 30s boolean await = threadLatchs.await(30, TimeUnit.SECONDS); if (!await) { // 等待超时,事务回滚 taskStatus.setIsError(); } } catch (Throwable e) { e.printStackTrace(); taskStatus.setIsError(); } mainLatch.countDown(); // 切换到子线程执行 studentThreadPool.shutdown(); //关闭线程池 System.out.println("主线程完成"); }
本想再次测试一下不同线程数对执行效率的影响时,发现当线程数超过10个时,执行时就报错。具体错误内容如下:
Exception in thread "pool-1-thread-2" org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms. at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:309) at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400) at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373) at com.example.springbootmybatis.service.Impl.StudentServiceImpl.updateStudentsThread(StudentServiceImpl.java:58) at com.example.springbootmybatis.StudentTest.lambda$updateStudentWithThreadsAndTrans$3(StudentTest.java:164) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms. at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696) at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197) at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162) at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:265) ... 7 more
错误的大致意思时,不能为数据库事务打开 jdbc Connection,连接在30s的时候超时了。由于前面启动的十个线程需要等待主线程完成后才能提交,所以一直占用连接未释放,造成后面的进程创建连接超时。
看错误日志中错误的来源是 HikariPool ,我们来重新配置一下这个连接池的参数,将最大连接数修改为100,具体配置如下:
# 连接池中允许的最小连接数。缺省值:10 spring.datasource.hikari.minimum-idle=10 # 连接池中允许的最大连接数。缺省值:10 spring.datasource.hikari.maximum-pool-size=100 # 自动提交 spring.datasource.hikari.auto-commit=true # 一个连接idle状态的最大时长(毫秒),超时则被释放(retired),缺省:10分钟 spring.datasource.hikari.idle-timeout=30000 # 一个连接的生命时长(毫秒),超时而且没被使用则被释放(retired),缺省:30分钟,建议设置比数据库超时时长少30秒 spring.datasource.hikari.max-lifetime=1800000 # 等待连接池分配连接的最大时长(毫秒),超过这个时长还没可用的连接则发生SQLException, 缺省:30秒
再次执行测试发现没有报错,修改线程数为20又执行了一下,同样执行成功了。另外,关注公众号Java技术栈,在后台回复:面试,可以获取我整理的 Java 系列面试题和答案,非常齐全。
五、基于TransactionStatus集合来控制多线程事务提交
在同事推荐下我们使用事务集合来进行多线程事务控制,主要代码如下
@Service public class StudentsTransactionThread { @Autowired private StudentMapper studentMapper; @Autowired private StudentService studentService; @Autowired private PlatformTransactionManager transactionManager; List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>()); @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class}) public void updateStudentWithThreadsAndTrans() throws InterruptedException { //查询总数据 List<Student> allStudents = studentMapper.getAll(); // 线程数量 final Integer threadCount = 2; //每个线程处理的数据量 final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount; // 创建多线程处理任务 ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount); CountDownLatch threadLatchs = new CountDownLatch(threadCount); AtomicBoolean isError = new AtomicBoolean(false); try { for (int i = 0; i < threadCount; i++) { // 每个线程处理的数据 List<Student> threadDatas = allStudents.stream() .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList()); studentThreadPool.execute(() -> { try { try { studentService.updateStudentsTransaction(transactionManager, transactionStatuses, threadDatas); } catch (Throwable e) { e.printStackTrace(); isError.set(true); }finally { threadLatchs.countDown(); } } catch (Exception e) { e.printStackTrace(); isError.set(true); } }); } // 倒计时锁设置超时时间 30s boolean await = threadLatchs.await(30, TimeUnit.SECONDS); // 判断是否超时 if (!await) { isError.set(true); } } catch (Throwable e) { e.printStackTrace(); isError.set(true); } if (!transactionStatuses.isEmpty()) { if (isError.get()) { transactionStatuses.forEach(s -> transactionManager.rollback(s)); } else { transactionStatuses.forEach(s -> transactionManager.commit(s)); } } System.out.println("主线程完成"); } } @Override @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class}) public void updateStudentsTransaction(PlatformTransactionManager transactionManager, List<TransactionStatus> transactionStatuses, List<Student> students) { // 使用这种方式将事务状态都放在同一个事务里面 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比较安全些。 TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态 transactionStatuses.add(status); students.forEach(s -> { // 更新教师信息 // String teacher = s.getTeacher(); String newTeacher = "TNO_" + new Random().nextInt(100); s.setTeacher(newTeacher); studentMapper.update(s); }); System.out.println("子线程:" + Thread.currentThread().getName()); }
由于这个中方式去前面方式相同,需要等待线程执行完成后才会提交事务,所有任会占用Jdbc连接池,如果线程数量超过连接池最大数量会产生连接超时。所以在使用过程中任要控制线程数量,
六、使用union连接多个select实现批量update
有些情况写不支持,批量update,但支持insert 多条数据,这个时候可尝试将需要更新的数据拼接成多条select 语句,然后使用union 连接起来,再使用update 关联这个数据进行update,具体代码演示如下:
update student,( (select 1 as id,'teacher_A' as teacher) union (select 2 as id,'teacher_A' as teacher) union (select 3 as id,'teacher_A' as teacher) union (select 4 as id,'teacher_A' as teacher) /* ....more data ... */ ) as new_teacher set student.teacher=new_teacher.teacher where student.id=new_teacher.id
这种方式在Mysql 数据库没有配置 allowMultiQueries=true
也可以实现批量更新。
总结
- 对于大批量数据库操作,使用手动事务提交可以很多程度上提高操作效率
- 多线程对数据库进行操作时,并非线程数越多操作时间越快,按上述示例大约在2-5个线程时操作时间最快。
- 对于多线程阻塞事务提交时,线程数量不能过多。
- 如果能有办法实现批量更新那是最好