可以看到是 1048576,即 1024*1024,1M 大小。
而我们需要传输的包大小是 42777840 字节,大概是 41M 的样子。
所以我们需要修改配置大小。
这个地方也给大家提了个醒:如果你的 sql 语句非常大,里面有大字段,记得调整一下 mysql 的这个参数。
可以通过修改配置文件或者直接执行 sql 语句的方式进行修改。
我这里就使用 sql 语句修改为 64M:
set global max_allowed_packet = 1024*1024*64;
然后再次执行,可以看到插入成功了:
50w 的数据,74s 的样子。
数据要么全部提交,要么一条也没有,需求也实现了。
时间上呢,是有点长,但是好像也想不到什么好的提升方案。
那么我们怎么还能再缩短点时间呢?
骚想法出现了
我能想到的,只能是祭出多线程了。
50w 数据。我们开五个线程,一个线程处理 10w 数据,没有异常就保存入库,出现问题就回滚。
这个需求很好实现。分分钟就能写出来。
但是再加上一个需求:这 5 个线程的数据,如果有一个线程出现问题了,需要全部回滚。
顺着思路慢慢撸,我们发现这个时候就是所谓的多线程事务了。
我之前说完全不可能实现是因为提到事务我就想到了 @Transactional 注解去实现了。
我们只需要正确使用它,然后关系业务逻辑即可,不需要也根本插手不了事务的开启和提交或者回滚。
这种代码的写法我们叫做声明式事务。
和声明式事务对应的就是编程式事务了。
通过编程式事务,我们就能完全掌控事务的开启和提交或者回滚操作。
能想到编程式事务,这事基本上就成了一半了。
你想,首先我们有一个全局变量为 Boolean 类型,默认为可以提交。
在子线程里面,我们可以先通过编程式事务开启事务,然后插入 10w 条数据后,但是不提交。同时告诉主线程,我这边准备好了,进入等待。
如果子线程里面出现了异常,那么我就告诉主线程,我这边出问题了,然后自己进行回滚。
最后主线程收集到了 5 个子线程的状态。
如果有一个线程出现了问题,那么设置全局变量为不可提交。
然后唤醒所有等待的子线程,进行回滚。
根据上面的流程,写出模拟代码就是这样的,大家可以直接复制出来运行:
public class MainTest { //是否可以提交 public static volatile boolean IS_OK = true; public static void main(String[] args) { //子线程等待主线程通知 CountDownLatch mainMonitor = new CountDownLatch(1); int threadCount = 5; CountDownLatch childMonitor = new CountDownLatch(threadCount); //子线程运行结果 List<Boolean> childResponse = new ArrayList<Boolean>(); ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < threadCount; i++) { int finalI = i; executor.execute(() -> { try { System.out.println(Thread.currentThread().getName() + ":开始执行"); // if (finalI == 4) { // throw new Exception("出现异常"); // } TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(1000)); childResponse.add(Boolean.TRUE); childMonitor.countDown(); System.out.println(Thread.currentThread().getName() + ":准备就绪,等待其他线程结果,判断是否事务提交"); mainMonitor.await(); if (IS_OK) { System.out.println(Thread.currentThread().getName() + ":事务提交"); } else { System.out.println(Thread.currentThread().getName() + ":事务回滚"); } } catch (Exception e) { childResponse.add(Boolean.FALSE); childMonitor.countDown(); System.out.println(Thread.currentThread().getName() + ":出现异常,开始事务回滚"); } }); } //主线程等待所有子线程执行response try { childMonitor.await(); for (Boolean resp : childResponse) { if (!resp) { //如果有一个子线程执行失败了,则改变mainResult,让所有子线程回滚 System.out.println(Thread.currentThread().getName()+":有线程执行失败,标志位设置为false"); IS_OK = false; break; } } //主线程获取结果成功,让子线程开始根据主线程的结果执行(提交或回滚) mainMonitor.countDown(); //为了让主线程阻塞,让子线程执行。 Thread.currentThread().join(); } catch (Exception e) { e.printStackTrace(); } } }
在所有子线程都正常的情况下,输出结果是这样的:
从结果看,是符合我们的预期的。
假设有子线程出现了异常,那么运行结果是这样的:
一个线程出现异常,全部线程都进行回滚,这样看来也是符合预期的。
如果你根据前面的需求写出了这样的代码,那么恭喜你,一不留神实现了一个类似于两阶段提交(2PC)的一致性协议。
我前面说的能想到编程式事务,这事基本上就成了一半了。
而另外一半,就是两阶段提交(2PC)。
依瓢画葫芦
有了前面的瓢,你照着画个葫芦不是很简单的事情吗?
就不大段上代码了,示例代码可以点击这里获取到,所以我这里截个图吧:
上面的代码应该是非常好理解的,开启五个线程,每个线程插入 10w 条数据。
这个不用说,用脚趾头想也能知道,肯定是比一次性批量插入 50w 条数据快的。
至于快多少,不废话了,直接看执行效果吧。
由于我们的 controller 是这样的:
所以调用链接:
http://127.0.0.1:8081/batchHandle
输出结果如下: