多线程事务
在Spring开发时,遇到一个从Excel表导入数据到数据库的需求,当然最简单的方法就是先使用EasyExcel把数据读出到集合中,然后依次插入到数据库中。
但如何保证效率,原子性呢?我们一步步优化方案。这里不会引入不必要的组件,而是自己模拟类似的思想。
方法1:依次顺序插入
void test() { List<User> users = getAllUsers(); users.forEach(user -> userService.save(user)); }
方法2:使用批处理,一次操作中执行多条SQL
void test() { List<User> users = getAllUsers(); userService.saveBatch(users); }
方法3:使用多线程+批处理,每个线程插入多条数据
需要注意的一点,Spring容器不允许线程注入,也就是没办法在多线程直接使用Bean操作,例如:
void testThread() { // 下面两种方式是无效的,不会执行任何东西 Runnable runnable = () -> { userService.save(new User()); }; // 方法1 new Thread(runnable).start(); // 方法2 Executors.newFixedThreadPool(1).submit(runnable); }
我们需要下面的方式进行执行
void testThread() { Runnable runnable = () -> { userService.save(new User()); }; ExecutorService executorService = Executors.newFixedThreadPool(1); CompletableFuture<Void> future = CompletableFuture.runAsync(runnable, executorService); future.join(); }
void testThread() { int threadSize = 5; ExecutorService executorService = Executors.newFixedThreadPool(threadSize); List<List<User>> list = new ArrayList<>(); for (int i = 0; i < threadSize; i++) { // 我们假设数据拆分为五分 list.add(getAllUsers()); } for (List<User> users : list) { CompletableFuture.runAsync(()->{ userService.saveBatch(users); },executorService).join(); } System.out.println("插入成功"); }
方法4:这时候速度已经很快了,但是如果其中一个线程插入数据时发生错误进行回滚,其他线程是无法得知的,因为事务是针对线程的,所以这里我们需要用一些方式保证每个线程之间的状态是被共享的。
// UserService#saveUserSyn() @Override public boolean saveUserSyn(List<User> users, CountDownLatch threadLatch, CountDownLatch mainLatch, UserError hasError) { TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); System.out.println("子线程:" + Thread.currentThread().getName()); try { users.forEach(this::save); } catch (Throwable e) { hasError.setHasError(true); } finally { threadLatch.countDown(); // 切换到主线程执行 } try { mainLatch.await(); //等待主线程执行 } catch (Throwable e) { hasError.setHasError(true); } // 判断是否有错误,如有错误 就回滚事务 if (hasError.isHasError()) { dataSourceTransactionManager.rollback(transactionStatus); } else { dataSourceTransactionManager.commit(transactionStatus); } return true; }
// 测试方法 @Test void userSaveSyn() { List<User> userList = getAllUsers(); // 添加一个错误数据 User user = new User(); user.setUserAccount(null); user.setUserPassword("123456"); userList.add(user); // 线程数量 final Integer threadCount = 4; //每个线程处理的数据量 final Integer dataPartionLength = (userList.size() + threadCount - 1) / threadCount; // 创建多线程处理任务 ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount); CountDownLatch threadLatchs = new CountDownLatch(threadCount); // 用于计算子线程提交数量 CountDownLatch mainLatch = new CountDownLatch(1); // 用于判断主线程是否提交 for (int i = 0; i < threadCount; i++) { // 每个线程处理的数据 List<User> threadDatas = userList.stream() .skip(i * dataPartionLength).limit(dataPartionLength) .collect(Collectors.toList()); studentThreadPool.execute(() -> { userService.saveUserSyn(threadDatas, threadLatchs, mainLatch, hasError); }); } try { // 倒计时锁设置超时时间 30s boolean await = threadLatchs.await(30, TimeUnit.SECONDS); if (!await) { // 等待超时,事务回滚 hasError.setHasError(true); } } catch (Throwable e) { e.printStackTrace(); hasError.setHasError(true); } mainLatch.countDown(); // 切换到子线程执行 studentThreadPool.shutdown(); //关闭线程池 System.out.println("主线程完成"); }
这里我们使用CountDownLatch
和 Volatile
来解决这个问题。
Volatile
保证线程间数据的可见性
2PC(两阶段提交),这个属于分布式事务的一个理论,这里模拟了这样的业务场景,大致流程为:
- 每个线程开启事务,插入数据,但不提交,向主线程通知说,我这里已经好了
- 主线程等待一段时间,看是否所有的子线程都没问题了。如果超时也算是异常
- 如果没有异常,主线程向所有子线程通知,可以提交事务
- 如果有异常,主线程向所有子线程通知,进行回滚操作
- 而中间使用Volatile修饰的hasError对象进行传达,是否出现异常。需要注意如果只是传递普通的boolean对象,可能会发生不一致的情况,我测试时没法通过。
- CountDownLatch则保证子线程在主线程没有通知前,是不能提交事务的。
这里细心些就会发现,即便是主线程通知子线程可以提交了,子线程依然有可能出现提交失败的可能,那其他线程提交事务是无法得知这边的失败的消息的。这就是我们其实无法在一个Java进程中保证多线程的原子性。