2、配置文件说明
(1)Reader参数说明
3、提交任务
(1)清空历史数据
hadoop fs -rm -r -f /base_province/*
(2)进入DataX根目录
(3)执行如下命令
python bin/datax.py job/base_province.json
4、查看结果
(1)DataX打印日志
(2)查看HDFS文件
hadoop fs -cat /base_province/* | zcat
4.2.3 DataX传参
通常情况下,离线数据同步任务需要每日定时重复执行,故HDFS上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此DataX配置文件中HDFS Writer的path参数的值应该是动态的。为实现这一效果,就需要使用DataX传参的功能。
DataX传参的用法如下,在JSON配置文件中使用${param}引用参数,在提交任务时使用-p"-Dparam=value"传入参数值,具体示例如下。
1、编写配置文件
(1)修改配置文件base_province.json
(2)配置文件内容如下
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "connection": [ { "jdbcUrl": [ "jdbc:mysql://hadoop102:3306/gmall" ], "querySql": [ "select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3" ] } ], "password": "000000", "username": "root" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name": "id", "type": "bigint" }, { "name": "name", "type": "string" }, { "name": "region_id", "type": "string" }, { "name": "area_code", "type": "string" }, { "name": "iso_code", "type": "string" }, { "name": "iso_3166_2", "type": "string" } ], "compress": "gzip", "defaultFS": "hdfs://hadoop102:8020", "fieldDelimiter": "\t", "fileName": "base_province", "fileType": "text", "path": "/base_province/${dt}", "writeMode": "append" } } } ], "setting": { "speed": { "channel": 1 } } } }
2、提交任务
(1)创建目标路径
hadoop fs -mkdir /base_province/2020-06-14
(2)进入DataX根目录
(3)执行如下命令
python bin/datax.py -p"-Ddt=2020-06-14" job/base_province.json
3、查看结果
hadoop fs -ls /base_province
4.3 同步HDFS数据到MySQL案例
案例要求:同步HDFS上的/base_province目录下的数据到MySQL gmall 数据库下的test_province表。
需求分析:要实现该功能,需选用HDFSReader和MySQLWriter。
1、编写配置文件
(1)创建配置文件test_province.json
(2)配置文件内容如下
{ "job": { "content": [ { "reader": { "name": "hdfsreader", "parameter": { "defaultFS": "hdfs://hadoop102:8020", "path": "/base_province", "column": [ "*" ], "fileType": "text", "compress": "gzip", "encoding": "UTF-8", "nullFormat": "\\N", "fieldDelimiter": "\t", } }, "writer": { "name": "mysqlwriter", "parameter": { "username": "root", "password": "000000", "connection": [ { "table": [ "test_province" ], "jdbcUrl": "jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=utf-8" } ], "column": [ "id", "name", "region_id", "area_code", "iso_code", "iso_3166_2" ], "writeMode": "replace" } } } ], "setting": { "speed": { "channel": 1 } } } }
2、配置文件说明
(1)Reader参数说明
(2)Writer参数说明
3、提交任务
(1)在MySQL中创建gmall.test_province表
DROP TABLE IF EXISTS `test_province`; CREATE TABLE `test_province` ( `id` bigint(20) NOT NULL, `name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `region_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `area_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `iso_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `iso_3166_2` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
(2)进入DataX根目录
(3)执行如下命令
python bin/datax.py job/test_province.json
4、查看结果
(1)DataX打印日志
(2)查看MySQL目标表数据
5、DataX优化
5.1 速度控制
DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在数据库可以承受的范围内达到最佳的同步速度。
注意事项:
1.若配置了总record限速,则必须配置单个channel的record限速
2.若配置了总byte限速,则必须配置单个channe的byte限速
3.若配置了总record限速和总byte限速,channel并发数参数就会失效。因为配置了总record限速和总byte限速之后,实际channel并发数是通过计算得到的:
计算公式为:
min(总byte限速/单个channel的byte限速,总record限速/单个channel的record限速)
5.2 内存调整
当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。
建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。
调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm=“-Xms8G -Xmx8G” /path/to/your/job.json