多线程事务如何保证效率和原子性

简介: 多线程事务如何保证效率和原子性

多线程事务

在Spring开发时,遇到一个从Excel表导入数据到数据库的需求,当然最简单的方法就是先使用EasyExcel把数据读出到集合中,然后依次插入到数据库中。

但如何保证效率,原子性呢?我们一步步优化方案。这里不会引入不必要的组件,而是自己模拟类似的思想。

方法1:依次顺序插入

void test() {
        List<User> users = getAllUsers();
        users.forEach(user -> userService.save(user));
    }

方法2:使用批处理,一次操作中执行多条SQL

void test() {
    List<User> users = getAllUsers();
    userService.saveBatch(users);
}

方法3:使用多线程+批处理,每个线程插入多条数据

需要注意的一点,Spring容器不允许线程注入,也就是没办法在多线程直接使用Bean操作,例如:

void testThread() {
// 下面两种方式是无效的,不会执行任何东西
Runnable runnable = () -> {
         userService.save(new User());
     };
// 方法1
new Thread(runnable).start();
// 方法2
Executors.newFixedThreadPool(1).submit(runnable);
}

我们需要下面的方式进行执行

void testThread() {
     Runnable runnable = () -> {
         userService.save(new User());
     };
     ExecutorService executorService = Executors.newFixedThreadPool(1);
     CompletableFuture<Void> future = CompletableFuture.runAsync(runnable, executorService);
     future.join();
 }

void testThread() {
    int threadSize = 5;
    ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
    List<List<User>> list = new ArrayList<>();
    for (int i = 0; i < threadSize; i++) {
        // 我们假设数据拆分为五分
        list.add(getAllUsers());
    }
    for (List<User> users : list) {
        CompletableFuture.runAsync(()->{
            userService.saveBatch(users);
        },executorService).join();
    }
    System.out.println("插入成功");
}

方法4:这时候速度已经很快了,但是如果其中一个线程插入数据时发生错误进行回滚,其他线程是无法得知的,因为事务是针对线程的,所以这里我们需要用一些方式保证每个线程之间的状态是被共享的。

// UserService#saveUserSyn()
@Override
public boolean saveUserSyn(List<User> users, CountDownLatch threadLatch, CountDownLatch mainLatch, UserError hasError) {
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        System.out.println("子线程:" + Thread.currentThread().getName());
        try {
            users.forEach(this::save);
        } catch (Throwable e) {
            hasError.setHasError(true);
        } finally {
            threadLatch.countDown(); // 切换到主线程执行
        }
        try {
            mainLatch.await();  //等待主线程执行
        } catch (Throwable e) {
            hasError.setHasError(true);
        }
        // 判断是否有错误,如有错误 就回滚事务
        if (hasError.isHasError()) {
            dataSourceTransactionManager.rollback(transactionStatus);
        } else {
            dataSourceTransactionManager.commit(transactionStatus);
        }
        return true;
    }
// 测试方法
@Test
    void userSaveSyn() {
        List<User> userList = getAllUsers();
        // 添加一个错误数据
        User user = new User();
        user.setUserAccount(null);
        user.setUserPassword("123456");
        userList.add(user);
        // 线程数量
        final Integer threadCount = 4;
        //每个线程处理的数据量
        final Integer dataPartionLength = (userList.size() + threadCount - 1) / threadCount;
        // 创建多线程处理任务
        ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
        CountDownLatch threadLatchs = new CountDownLatch(threadCount); // 用于计算子线程提交数量
        CountDownLatch mainLatch = new CountDownLatch(1); // 用于判断主线程是否提交
        for (int i = 0; i < threadCount; i++) {
            // 每个线程处理的数据
            List<User> threadDatas = userList.stream()
                    .skip(i * dataPartionLength).limit(dataPartionLength)
                    .collect(Collectors.toList());
            studentThreadPool.execute(() -> {
                userService.saveUserSyn(threadDatas, threadLatchs, mainLatch, hasError);
            });
        }
        try {
            // 倒计时锁设置超时时间 30s
            boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
            if (!await) { // 等待超时,事务回滚
                hasError.setHasError(true);
            }
        } catch (Throwable e) {
            e.printStackTrace();
            hasError.setHasError(true);
        }
        mainLatch.countDown(); // 切换到子线程执行
        studentThreadPool.shutdown(); //关闭线程池
        System.out.println("主线程完成");
    }

这里我们使用CountDownLatchVolatile来解决这个问题。

CountDownLatch的语法与原理讲解

Volatile保证线程间数据的可见性

2PC(两阶段提交),这个属于分布式事务的一个理论,这里模拟了这样的业务场景,大致流程为:

  • 每个线程开启事务,插入数据,但不提交,向主线程通知说,我这里已经好了
  • 主线程等待一段时间,看是否所有的子线程都没问题了。如果超时也算是异常
  • 如果没有异常,主线程向所有子线程通知,可以提交事务
  • 如果有异常,主线程向所有子线程通知,进行回滚操作
  • 而中间使用Volatile修饰的hasError对象进行传达,是否出现异常。需要注意如果只是传递普通的boolean对象,可能会发生不一致的情况,我测试时没法通过。
  • CountDownLatch则保证子线程在主线程没有通知前,是不能提交事务的。

这里细心些就会发现,即便是主线程通知子线程可以提交了,子线程依然有可能出现提交失败的可能,那其他线程提交事务是无法得知这边的失败的消息的。这就是我们其实无法在一个Java进程中保证多线程的原子性。


目录
相关文章
|
SQL Java 数据库连接
联表查询 && 索引 && 事务 && JDBC使用 &&CPU工作原理 && 线程概念 && Thread类的用法
联表查询 && 索引 && 事务 && JDBC使用 &&CPU工作原理 && 线程概念 && Thread类的用法
160 0
|
3月前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
2月前
|
SQL 存储 监控
SQLServer事务复制延迟优化之并行(多线程)复制
【9月更文挑战第12天】在SQL Server中,事务复制延迟会影响数据同步性。并行复制可通过多线程处理优化这一问题,提高复制效率。主要优化方法包括:配置分发代理参数、优化网络带宽、调整系统资源、优化数据库设计及定期监控维护。合理实施这些措施可提升数据同步的及时性和可靠性。
|
4月前
|
设计模式 安全 Java
Java面试题:设计模式如单例模式、工厂模式、观察者模式等在多线程环境下线程安全问题,Java内存模型定义了线程如何与内存交互,包括原子性、可见性、有序性,并发框架提供了更高层次的并发任务处理能力
Java面试题:设计模式如单例模式、工厂模式、观察者模式等在多线程环境下线程安全问题,Java内存模型定义了线程如何与内存交互,包括原子性、可见性、有序性,并发框架提供了更高层次的并发任务处理能力
78 1
|
4月前
|
缓存 Java 编译器
多线程内存模型问题之保证Java中的原子性,如何解决
多线程内存模型问题之保证Java中的原子性,如何解决
|
6月前
|
数据库连接 数据库
多线程事务失效的原因
【5月更文挑战第16天】多线程事务失效的原因
387 0
|
6月前
|
缓存 安全 Java
多线程的三大特性:原子性、可见性和有序性
多线程的三大特性:原子性、可见性和有序性
142 0
|
消息中间件 JavaScript 小程序
SpringBoot 使用线程池如何控制主线程和子线程的事务
SpringBoot 使用线程池如何控制主线程和子线程的事务
|
6月前
|
缓存 安全 Java
3.线程安全之可见性、有序性、原子性是什么?
3.线程安全之可见性、有序性、原子性是什么?
72 0
3.线程安全之可见性、有序性、原子性是什么?
|
6月前
|
Java
Java线程面试题:什么是原子性问题?如何解决?
Java线程面试题:什么是原子性问题?如何解决?
74 0