DataX - 全量数据同步工具(1)https://developer.aliyun.com/article/1532373
4.1.2、MySQLReader & QuerySQLMode
1)配置文件
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "connection": [ { "jdbcUrl": [ "jdbc:mysql://hadoop102:3306/gmall" ], "querySql": [ "select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3" ] } ], "password": "123456", "username": "root" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name": "id", "type": "bigint" }, { "name": "name", "type": "string" }, { "name": "region_id", "type": "string" }, { "name": "area_code", "type": "string" }, { "name": "iso_code", "type": "string" }, { "name": "iso_3166_2", "type": "string" } ], "compress": "gzip", "defaultFS": "hdfs://hadoop102:8020", "fieldDelimiter": "\t", "fileName": "base_province", "fileType": "text", "path": "/base_province", "writeMode": "append" } } } ], "setting": { "speed": { "channel": 1 } } } }
可以看到,TableMode 的 mysqlreader 中是通过在 connection 参数设置 table 参数的值来指定我们的表,而这里 QuerySQLMode 模式是通过 querySql 参数来指定 SQL ,从SQL中可以得到表名。此外,QuerySQLMode 模式没有 columns 和 where 参数,因为这些都可以在 SQL 中指定。
那 TableMode 和 QuerySQLMode 有什么区别呢?其实 QuerySQLMode 正因为它可以指定 SQL ,所以就更加灵活,我们可以使用复杂的 join 和聚合函数,这一点是 TableMode 所实现不了的。反过来,我们上面知道 TableMode 的配置文件中可以在 mysqlreader 中指定一个参数 splitPk 来开启多个 Task 去读取一张表,这一点同样是 QuerySQLMode 所不具备的,QuerySQLMode 只支持单个 Task。
QuerySQLMode 这种模式用的还是比较少的,毕竟我们的 Hive 也可以完成数据的聚合和联结。
此外,关于 hdfswriter 还有一些注意事项:
注意事项:
HFDS Writer并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(''),而Hive默认的null值存储格式为\N。所以后期将DataX同步的文件导入Hive表就会出现问题。
解决该问题的方案有两个:
一是修改DataX HDFS Writer的源码,增加自定义null值存储格式的逻辑(也就是如果读取到 null 就把它替换为 "\\N",双斜杠是因为 DataX 是 Java 写的),可参考这里。
二是在Hive中建表时指定null值存储格式为空字符串(''")
2)配置文件说明
mysqlreader:
hdfswriter 和上面的是一样的。
3)提交任务
结果和上面是一样的,这里不再演示。
4.1.3、DataX 传参
通常情况下,离线数据同步任务需要每日定时重复执行(就像我们之前 flume 上传到 HDFS 也指定过),故HDFS上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此DataX配置文件中 HDFS Writer 的path参数的值应该是动态的。为实现这一效果,就需要使用DataX传参的功能。
1)修改配置文件
DataX传参的用法如下,在JSON配置文件中使用${param}引用参数,在提交任务时使用-p"-Dparam=value"传入参数值,我们只需要修改 hdfswriter 下 parameter 参数下的 path 为:
"path": "/base_province/${dt}"
2)创建 hdfs 路径
hadoop fs -mkdir /base_province/2020-06-14
3)提交任务
python bin/datax.py -p"-Ddt=2020-06-14" job/base_province.json
4.2、HDFS -> MySQL
案例要求:同步HDFS上的/base_province目录下的数据到 MySQL gmall 数据库下的 test_province表。
需求分析:要实现该功能,需选用 HDFSReader 和 MySQLWriter。
1)编写配置文件
vim test_province.json
{ "job": { "content": [ { "reader": { "name": "hdfsreader", "parameter": { "defaultFS": "hdfs://hadoop102:8020", "path": "/base_province", "column": [ "*" ], "fileType": "text", "compress": "gzip", "encoding": "UTF-8", "nullFormat": "\\N", "fieldDelimiter": "\t", } }, "writer": { "name": "mysqlwriter", "parameter": { "username": "root", "password": "123456", "connection": [ { "table": [ "test_province" ], "jdbcUrl": "jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=utf-8" } ], "column": [ "id", "name", "region_id", "area_code", "iso_code", "iso_3166_2" ], "writeMode": "replace" } } } ], "setting": { "speed": { "channel": 1 } } } }
2)配置说明
hdfsreader:
mysqlwriter:
其中 writeMode 的三种不同取值代表三种不同的 SQL 语句,其中 replace 和 on duplicate key update 都要求我们的 MySQL 表是有主键的。我们经常使用的 insert 语句是不需要主键的,所以当有主键重复的时候会直接报错。而 replace 语句如果遇到表中已经存在该主键的数据会直接替换掉, on duplicate key update 语句的话如果遇到表中已经存在该主键的数据会更新不同值的字段。
3)提交任务
创建 HDFS 输出端 MySQL 的表:
DROP TABLE IF EXISTS `test_province`; CREATE TABLE `test_province` ( `id` bigint(20) NOT NULL, `name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `region_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `area_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `iso_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `iso_3166_2` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
提交任务:
bin/datax.py job/test_province.json
4)查看结果
5、DataX 优化
上面 DataX 的任务配置文件中,job 下有两个参数 content 和 setting,content 是配置 reader 和 writer 的,而 setting 其实就是来给 DataX 优化用的(通过控制流量、并发)。
5.1、速度控制
DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在数据库可以承受的范围内达到最佳的同步速度。
关键优化参数如下:
参数 |
说明 |
job.setting.speed.channel |
并发数 |
job.setting.speed.record |
总 record 限速(tps:条数/s) |
job.setting.speed.byte |
总 byte 限速(bps:字节数/s) |
core.transport.channel.speed.record |
单个 channel 的record限速,默认值为10000(10000条/s) |
core.transport.channel.speed.byte |
单个channel的byte限速,默认值1024*1024(1M/s) |
注意事项:
1. 若配置了总 record 限速,则必须配置单个 channel 的 record 限速
2. 若配置了总 byte 限速,则必须配置单个 channe 的 byte 限速
3. 若配置了总 record 限速和总 byte 限速,channel 并发数参数就会失效。因为配置了总record限速和总 byte 限速之后,实际 channel 并发数是通过计算得到的:
计算公式为:
min(总byte限速/单个channel的byte限速,总record限速/单个 channel 的record限速)
配置示例:
{ "core": { "transport": { "channel": { "speed": { "byte": 1048576 //单个channel byte限速1M/s } } } }, "job": { "setting": { "speed": { "byte" : 5242880 //总byte限速5M/s } }, ... } }
5.2、内存调整
当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。
建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。
调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" /path/to/your/job.json
总结
DataX 这个工具的学习就结束了,比我想象的要简单多,但是也需要好好熟悉练习一下。目前只学习了 DataX 在 HDFS 和 MySQL 之间的相互数据传递,以后用到其它框架的时候还需要精进一下。