前言:
最近在做业务功能的时候,拿到一个非常"简单"的需求,把一个 30万行的数据文件按照特定的格式进行入库,文件格式和字段的内容都有对应的规定。这种需求其实还算比较常见,通常这一类需求不管系统配置多么强悍,都不可能无脑的读取插入。趁着这个需求搜集了一下几种常见的做法。下面就来介绍一下解决这种大数据文件的常用套路
文章目的:
- 在JAVA中如何安全的将一份超大文件进行安全入库处理方式。
- 大文件读写可能产生的性能问题和瓶颈分析
- 关于分析大文件读写的常见套路
- 使用单线程还是多线程
- 多线程的相关问题讨论
文件内容分析
由于实际的情况复杂多变,在做具体的编码之前,需要先梳理有可能存在的情况,下面简单列举系统有可能的存在的问题,和一些常见的注意事项:
- 系统硬件水平,服务器是否会因为读写大量的数据文件占用大量资源
- 内存问题:加载大数据最容易出的问题那就是爆内存,建议至少使用缓冲流进行读写
- 硬盘问题:读写的限制另一种体现就是硬盘的好坏,固态优于机械的读写.
- 文件的读写方式,JAVA的IO比较复杂,这里简化为三种也就是常见的BIO、NIO、和AIO(具体代表含义请自行百度)。
- 异步IO虽然看起来很美,但是需要考虑顺序入库的问题。
- 多线程异步读写比较考验机器性能,请谨慎使用。
- 顺序读写永远是硬盘最快捷的一种方式
- 完成一次完整的操作时间估量,既然是大文件,就必然需要考虑整个操作的执行时间,一份几十万的数据跑一轮下来不管如何优化肯定需要不少的时间,所以操作的时间消耗需要考虑在可接受的范围
- 大数据文件读写的时间选择
- 通常比较重和累的活都放大半夜去干
- 估量整个任务的执行时间消耗
这些分析只是一些最基本的要求,不同的业务场景会有更多的细节考量,文章不可能面面俱到,这些分析更多的是帮助个人提高警惕性,只有考虑到所有可能想到的细节,这样的大文件读写才可能是安全可靠的,同时可以保证突发情况可以及时的反应。
最后,这类开销比较大的操作,对于日志打印和记录的计算需要额外小心,最好在一次较大操作中记录操作成功失败记录数,同时在整个记录完成之后通过日志持久化整个操作的结果。
大文件读写的常见套路
其实这些套路网上多看看资料基本都可以有自己的一套方案,下面给出的建议可能不是最好的方式,有些可能在实际业务场景下走不通。(完全有可能)但是借着这些套路希望可以给读者一些启发,下面我们直接进入主题。
分批入库
分批入库是最容易想到的方式,也是最保险最稳妥的方式,这里包含了一个隐式的条件,就是数据都是增量不改动数据,大致意思就是不会改动的固定数据库数据。
现在我们来看下分批入库是如何处理的,分批的意思就是说每N条进行一次操作,防止数据库突然收到一个巨量的Insert请求导致锁表并且影响业务(弱一点的服务器直接满载),下面根据一段案例代码来说明做法:
个人公司的电脑是一块SATA的固态硬盘,在开启批量操作之后,经常100%读写占用系统假死,所以如果要进行试验,建议先设置一个很小的值慢慢加量,否则你的电脑可能会卡的动不了。
- 首先需要编写一个批量插入的sql语句,网上对应案例的语句如下(如果是mybatis,需要使用<foreach>标签标记需要循环的对象内容):
INSERT INTO table ( "clo1", "col2", "col3", "col4", "col5" ) VALUES ( 1, 10, NULL, '2019-12-19 13:38:35', '新年活动16张卡券'), ( 2, 11, NULL, '2019-12-19 15:05:13', '圣诞活动11张卡券'), ( 3, 12, NULL, '2019-12-19 15:05:13', '圣诞活动12张卡券'), ( 4, 13, NULL, '2019-12-19 15:05:13', '圣诞活动13张卡券');
下面是分批操作的JAVA代码,大致逻辑是打开一个文件,然后将一行数据转为一个对象,同时塞入到一个集合当中,当集合的内容超过限制的时候,进行一次入库的操作。
private void insert2DbByBatchList(Config config, String line) throws IOException { List<VisaNewBinVo> insertList = new ArrayList<>(1000); Map configValue = readConfigValue(); while (StringUtils.isNotEmpty(line)) { Timestamp timestamp = new Timestamp(System.currentTimeMillis()); VisaNewBin visaNewBin = new VisaNewBin(); configValue.forEach((key, value) -> { Map<String, Object> visaBinField = (Map<String, Object>) value; Integer endInex = (Integer) visaBinField.get("endInex"); Integer startIndex = (Integer) visaBinField.get("startIndex"); if (startIndex < line.length() && endInex < line.length()) { String substring = line.substring(startIndex, endInex); FieldReflectionUtil.setFieldValueByFieldName(visaNewBin, key.toString(), substring); } }); VisaNewBinVo visaNewBinVo = new VisaNewBinVo(); BeanUtils.copyProperties(visaNewBin, visaNewBinVo); visaNewBinVo.setBinId(UUID.randomUUID().toString()); visaNewBinVo.setBatchNo(getVisaNewCardBinDecAfterFileName(config)); visaNewBinVo.setCreateTime(timestamp); insertList.add(visaNewBinVo); // 限制部分 if (rechLimitValue(insertList)) { int count = visaNewBinMapper.batchInsertNewBins(insertList); logger.info("当前批次数据为:{} 条,成功入库: {} 条数据", insertList.size(), count); insertList.clear(); } } } private boolean rechLimitValue(List insertList) { return insertList.size() % 500 == 0; }
小贴士:很多人可能会认为可以用Thread.sleep(1000)
类似的线程休眠的方式让计算机“冷静”一下,给数据库一些缓冲时间,但是其实从大文件读写的角度来看,没有太大的意义,因为我们的文件读写要么需要开一条“河流”,要么就像新的方式直接开一条“矿道”(底层IO)。我们一旦打开流或者开通矿道就是在占用系统资源。用这种休眠的方式无非就是拉长了整个工作的时间,其实并没有太大的实际意义。
当然这种形式并不是完全没有任何作用,有些情况下比如之前个人曾经做过关于一个百度的分析接口存在QPS个位数限制的情况下,这种时候最简单的方法就是使用线程休眠来限制调用。
当然这种形式在编码里面比价丑陋,可以使用JDK的工具类TimeUtil
来更加优雅的细粒度控制线程休眠时间控制。
这里有个八股文的面试题Thread.sleep(0)
的含义。
分批入库存在的问题
分批入库虽然是最无脑的一种方式,但是这里其实是存在限制的,一般会存在下面这些问题:
- 数据库对于preSql的占位符限制:比如postgreSql 的限制为Short类型的最大值,即32747,超过这个值就会抛出如下的异常:
Tried to send an out-of-range integer as a 2-byte value
github上面有人提过这个issue,里面还有一些老外的吐槽,挺有意思的,文章连接:
github.com/pgjdbc/pgjd…如何解决"尝试将超范围整数发送为 2 个按次值"的错误#1311
stackoverflow.com/questions/2…PostgreSQL ERROR: INSERT has more target columns than expressions, when it doesn't
如果想要绕开这个问题,可以自己手写一个实现类进行替换。还有一种办法就是减少占位符,增加批次然后减少每次批次的插入数据量。
- 硬件水平的限制:这里主要说的是硬盘上的限制,一块差点的硬盘即使是分批操作也会卡死,需要注意分批之后不是高枕无忧了
硬件问题不能完全作为无法解决问题的借口。
- 程序中断的影响:分批的方式比较常见的一个问题是处理入库过程中 程序异常,断电,系统故障(蓝屏)。
一种推荐的解决方式是数据库设置唯一校验字段,每次入库之前检查是否存在标记,可以使用redis进行辅助。(布隆过滤器)
多线程读写
多线程的处理方式也比较容易理解,既然一个人读写吃力,那就把文件“劈”成很多份,比如文件的第1条到1万条为线程1,第10001条到20000条为线程2, 依次类推,这种方式需要提前计算数据行的总量,然后开启线程将数据行分配给多个线程,由于个人处理的时候,被禁止使用多线程的处理方式,这里的代码为一些案例作用。
(建议PC端查看)
总结:
通过这次的小需求整理了一下大数据问题的处理经验,也算是对个人的一点提升。比较关键的是掌握多线程写入文件,需要考虑的内容还不少。不过网上的资料并不是特别多,还需要花更多的时间去研究。