如何快速安全的插入千万条数据?

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 最近有个需求解析一个订单文件,并且说明文件可达到千万条数据,每条数据大概在20个字段左右,每个字段使用逗号分隔,需要尽量在半小时内入库。

最近有个需求解析一个订单文件,并且说明文件可达到千万条数据,每条数据大概在20个字段左右,每个字段使用逗号分隔,需要尽量在半小时内入库。


思路

1.估算文件大小

因为告诉文件有千万条,同时每条记录大概在20个字段左右,所以可以大致估算一下整个订单文件的大小,方法也很简单使用FileWriter往文件中插入一千万条数据,查看文件大小,经测试大概在1.5G左右;


2.如何批量插入

由上可知文件比较大,一次性读取内存肯定不行,方法是每次从当前订单文件中截取一部分数据,然后进行批量插入,如何批次插入可以使用insert(...)values(...),(...)的方式,经测试这种方式效率还是挺高的;怎么快速插入 100 条数据,用时最短,这篇看下。


3.数据的完整性

截取数据的时候需要注意,需要保证数据的完整性,每条记录最后都是一个换行符,需要根据这个标识保证每次截取都是整条数,不要出现半条数据这种情况;


4.数据库是否支持批次数据

因为需要进行批次数据的插入,数据库是否支持大量数据写入,比如这边使用的mysql,可以通过设置max_allowed_packet来保证批次提交的数据量;


5.中途出错的情况

因为是大文件解析,如果中途出现错误,比如数据刚好插入到900w的时候,数据库连接失败,这种情况不可能重新来插一遍,所有需要记录每次插入数据的位置,并且需要保证和批次插入的数据在同一个事务中,这样恢复之后可以从记录的位置开始继续插入。


实现

1.准备数据表

这里需要准备两张表分别是:订单状态位置信息表,订单表;

CREATE TABLE `file_analysis` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `file_type` varchar(255) NOT NULL COMMENT '文件类型 01:类型1,02:类型2',
  `file_name` varchar(255) NOT NULL COMMENT '文件名称',
  `file_path` varchar(255) NOT NULL COMMENT '文件路径',
  `status` varchar(255) NOT NULL COMMENT '文件状态 0初始化;1成功;2失败:3处理中',
  `position` bigint(20) NOT NULL COMMENT '上一次处理完成的位置',
  `crt_time` datetime NOT NULL COMMENT '创建时间',
  `upd_time` datetime NOT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8
CREATE TABLE `file_order` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `file_id` bigint(20) DEFAULT NULL,
  `field1` varchar(255) DEFAULT NULL,
  `field2` varchar(255) DEFAULT NULL,
  `field3` varchar(255) DEFAULT NULL,
  `field4` varchar(255) DEFAULT NULL,
  `field5` varchar(255) DEFAULT NULL,
  `field6` varchar(255) DEFAULT NULL,
  `field7` varchar(255) DEFAULT NULL,
  `field8` varchar(255) DEFAULT NULL,
  `field9` varchar(255) DEFAULT NULL,
  `field10` varchar(255) DEFAULT NULL,
  `field11` varchar(255) DEFAULT NULL,
  `field12` varchar(255) DEFAULT NULL,
  `field13` varchar(255) DEFAULT NULL,
  `field14` varchar(255) DEFAULT NULL,
  `field15` varchar(255) DEFAULT NULL,
  `field16` varchar(255) DEFAULT NULL,
  `field17` varchar(255) DEFAULT NULL,
  `field18` varchar(255) DEFAULT NULL,
  `crt_time` datetime NOT NULL COMMENT '创建时间',
  `upd_time` datetime NOT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10000024 DEFAULT CHARSET=utf8

2.配置数据库包大小

mysql> show VARIABLES like '%max_allowed_packet%';
+--------------------------+------------+
| Variable_name | Value |
+--------------------------+------------+
| max_allowed_packet | 1048576 |
| slave_max_allowed_packet | 1073741824 |
+--------------------------+------------+
2 rows in set
mysql> set global max_allowed_packet = 1024*1024*10;
Query OK, 0 rows affected

通过设置max_allowed_packet,保证数据库能够接收批次插入的数据包大小;不然会出现如下错误:

Caused by: com.mysql.jdbc.PacketTooBigException: Packet for query is too large (4980577 > 1048576). You can change this value on the server by setting the max_allowed_packet' variable.
    at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3915)
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2598)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2778)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2834)

3.准备测试数据

public static void main(String[] args) throws IOException {
  FileWriter out = new FileWriter(new File("D://xxxxxxx//orders.txt"));
  for (int i = 0; i < 10000000; i++) {
    out.write(
        "vaule1,vaule2,vaule3,vaule4,vaule5,vaule6,vaule7,vaule8,vaule9,vaule10,vaule11,vaule12,vaule13,vaule14,vaule15,vaule16,vaule17,vaule18");
    out.write(System.getProperty("line.separator"));
  }
  out.close();
}

使用FileWriter遍历往一个文件里插入1000w条数据即可,这个速度还是很快的,不要忘了在每条数据的后面添加换行符(\n\r);


4.截取数据的完整性

除了需要设置每次读取文件的大小,同时还需要设置一个参数,用来每次获取一小部分数据,从这小部分数据中获取换行符(\n\r),如果获取不到一直累加直接获取为止,这个值设置大小大致同每条数据的大小差不多合适,部分实现如下:

ByteBuffer byteBuffer = ByteBuffer.allocate(buffSize); // 申请一个缓存区
long endPosition = batchFileSize + startPosition - buffSize;// 子文件结束位置
long startTime, endTime;
for (int i = 0; i < count; i++) {
    startTime = System.currentTimeMillis();
    if (i + 1 != count) {
        int read = inputChannel.read(byteBuffer, endPosition);// 读取数据
        readW: while (read != -1) {
            byteBuffer.flip();// 切换读模式
            byte[] array = byteBuffer.array();
            for (int j = 0; j < array.length; j++) {
                byte b = array[j];
                if (b == 10 || b == 13) { // 判断\n\r
                    endPosition += j;
                    break readW;
                }
            }
            endPosition += buffSize;
            byteBuffer.clear(); // 重置缓存块指针
            read = inputChannel.read(byteBuffer, endPosition);
        }
    } else {
        endPosition = fileSize; // 最后一个文件直接指向文件末尾
    }
    ...省略,更多可以查看Github完整代码...
}

如上代码所示开辟了一个缓冲区,根据每行数据大小来定大概在200字节左右,然后通过遍历查找换行符(\n\r),找到以后将当前的位置加到之前的结束位置上,保证了数据的完整性;


5.批次插入数据

通过insert(...)values(...),(...)的方式批次插入数据,部分代码如下:

// 保存订单和解析位置保证在一个事务中
SqlSession session = sqlSessionFactory.openSession();
try {
  long startTime = System.currentTimeMillis();
  FielAnalysisMapper fielAnalysisMapper = session.getMapper(FielAnalysisMapper.class);
  FileOrderMapper fileOrderMapper = session.getMapper(FileOrderMapper.class);
  fileOrderMapper.batchInsert(orderList);
  // 更新上次解析到的位置,同时指定更新时间
  fileAnalysis.setPosition(endPosition + 1);
  fileAnalysis.setStatus("3");
  fileAnalysis.setUpdTime(new Date());
  fielAnalysisMapper.updateFileAnalysis(fileAnalysis);
  session.commit();
  long endTime = System.currentTimeMillis();
  System.out.println("===插入数据花费:" + (endTime - startTime) + "ms===");
} catch (Exception e) {
  session.rollback();
} finally {
  session.close();
}
...省略,更多可以查看Github完整代码...

如上代码在一个事务中同时保存批次订单数据和文件解析位置信息,batchInsert通过使用mybatis的标签来遍历订单列表,生成values数据;


总结

以上展示了部分代码,完整的代码可以查看Github地址中的batchInsert模块,本地设置每次截取的文件大小为2M。


经测试1000w条数据(大小1.5G左右)插入mysql数据库中,大概花费时间在20分钟左右,当然可以通过设置截取的文件大小,花费的时间也会相应的改变。


相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
10月前
|
前端开发 Java 数据库
数据重复插入问题及解决方案
数据重复插入问题及解决方案
540 0
|
数据库 数据安全/隐私保护
批量插入【对新数据进行插入操作,已存在的记录进行更新操作】
批量插入【对新数据进行插入操作,已存在的记录进行更新操作】
127 0
批量插入【对新数据进行插入操作,已存在的记录进行更新操作】
|
Java C语言
练习6—数据插入
练习6—数据插入
|
关系型数据库 MySQL 数据库
查看或者插入表数据|学习笔记
快速学习查看或者插入表数据
|
SQL 关系型数据库 测试技术
关系数据库如何快速查询表的记录数
在数据库中,很多人员习惯使用SELECT COUNT(*)、SELECT COUNT(1)、SELECT COUNT(COL)来查询一个表有多少记录,对于小表,这种SQL的开销倒不是很大,但是对于大表,这种查询表记录数的做法就是一个非常消耗资源了,而且效率很差。
2582 0
|
Java 数据库 索引
解决方案:如何防止数据重复插入?
为啥要解决数据重复插入? 解决方案实战 可落地小总结 一、为啥要解决数据重复插入? 问题起源,微信小程序抽风 wx.request() 重复请求服务器提交数据。
2404 0
|
SQL
SQL查询数据并插入新表
SQL查询数据并插入新表     --如果接受数据导入的表已经存在 insert into 表 select * from tablename --如果导入数据并生成表 select * into 表 from tablename  网名:浩秦; 邮箱:root#landv.pw; 只要我能控制一個國家的貨幣發行,我不在乎誰制定法律。
1725 0
|
SQL Java 关系型数据库
绝对干货,教你4分钟插入1000万条数据到mysql数据库表,快快进来
原文:绝对干货,教你4分钟插入1000万条数据到mysql数据库表,快快进来 我用到的数据库为,mysql数据库5.7版本的 1.首先自己准备好数据库表 其实我在插入1000万条数据的时候遇到了一些问题,现在先来解决他们,一开始我插入100万条数据时候报错,控制台的信息如下: com.
1975 0