前言
今天是2024-2-21,农历正月十二,相信今天开始是新的阶段,尽管它不是新的周一、某月一日、某年第一天,尽管我是一个很讲究仪式感的人。新年刚过去 12 天,再过 3 天就开学咯,开学之后我的大学时光就进入了冲刺阶段,之前没完成的目标和习惯务必严格要求自己执行,我也慢慢悟出了解决各种 "病症" 的办法了~
这里推荐我喜欢的几本书:《黄金时代》、《一直特立独行的猪》、《沉默的大多数》,都是王小波的,对我收益颇深。尽管这博客是写给我自己看的 hahaha
言归正传,今天学习 DataX,这也是一个大数据工具,和 Maxwell 差不多,它是用来做全量数据同步的,前者主要是做增量数据同步的。
1、概述
1.1、什么是 DataX
DataX 是阿里巴巴开源的一个异构数据源离线同步工具(区别于 Maxwell、Cannal,这俩是主要是做增量同步的),致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。
源码地址:https://github.com/alibaba/DataX
1.2、DataX 的设计
为了解决异构数据源同步问题,DataX 将复杂的网状的同步链路变成了星型数据链路, DataX 作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到 DataX,便能跟已有的数据源做到无缝数据同步。
1.3、支持的数据源
类型 |
数据源 |
Reader(读) |
Writer(写) |
RDBMS 关系型数据库 |
MySQL |
√ |
√ |
Oracle |
√ |
√ |
|
OceanBase |
√ |
√ |
|
SQLServer |
√ |
√ |
|
PostgreSQL |
√ |
√ |
|
DRDS |
√ |
√ |
|
通用RDBMS |
√ |
√ |
|
阿里云数仓数据存储 |
ODPS |
√ |
√ |
ADS |
√ |
||
OSS |
√ |
√ |
|
OCS |
√ |
√ |
|
NoSQL数据存储 |
OTS |
√ |
√ |
Hbase0.94 |
√ |
√ |
|
Hbase1.1 |
√ |
√ |
|
Phoenix4.x |
√ |
√ |
|
Phoenix5.x |
√ |
√ |
|
MongoDB |
√ |
√ |
|
Hive |
√ |
√ |
|
Cassandra |
√ |
√ |
|
无结构化数据存储 |
TxtFile |
√ |
√ |
FTP |
√ |
√ |
|
HDFS |
√ |
√ |
|
Elasticsearch |
√ |
||
时间序列数据库 |
OpenTSDB |
√ |
|
TSDB |
√ |
√ |
1.4、框架设计
- Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework。
- Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
- Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲, 流控,并发,数据转换等核心技术问题。
1.5、运行原理
- Job:单个作业的管理节点,负责数据清理、子任务划分、TaskGroup监控管理。一个 Job 启动一个进程。
- Task:根据不同数据源的切分策略,一个 Job 会被切分为多个 Task(由 Split 模块完成),Task 是 DataX 作业的最小单元,每个 Task 负责一部分数据的同步工作。
- TaskGroup:Scheduler 调度模块会对 Task 进行分组,每个 TaskGroup 负责启动 Task,单个 TaskGroup 的并发数量为 5(最多同时执行 5 个Task,一个 Task 执行完就会释放掉,再进来一个 Task 继续执行)。
- Reader -> Channel -> Writer :每个 Task 启动后,都会固定启动 Reader -> Channel -> Writer 来完成同步工作。
举例来说,用户提交了一个 DataX 作业,并且配置了 20 个并发,目的是将一个 100 张分表的 mysql 数据同步到 odps 里面。 DataX 的调度决策思路是:
- DataXJob 根据分库分表切分成了 100 个 Task。
- 根据 20 个并发,DataX 计算共需要分配 4 个 TaskGroup。
- 4 个 TaskGroup 平分切分好的 100 个 Task,每一个 TaskGroup 负责以 5 个并发共计运行 25 个 Task。
1.6、与 Sqoop 对比
2、DataX3.0 部署
傻瓜式安装解压,然后执行下面的脚本
python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json
运行结果:
当出现上面的结果说明安装成功,这里我们用的是 DataX 自带的一个测试作业,它是一个 json 格式的文件,之后我们的 DataX 作业也是通过自己 编写 json 文件来实现。
3、DataX 的使用
3.1、DataX 任务提交命令
DataX的使用十分简单,用户只需根据自己同步数据的数据源和目的地选择相应的Reader和Writer,并将Reader和Writer的信息配置在一个json文件中,然后执行如下命令提交数据同步任务即可,就像我们安装时测试执行 DataX 任务的操作一样:
python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json
3.2、DataX 配置文件格式
可以通过下面这个命令来查看 DataX 配置文件模板:
# -r 代表 reader -w 代表 writer python bin/datax.py -r mysqlreader -w hdfswriter
配置文件模板如下,json最外层是一个job,job包含setting和content两部分,其中setting用于对整个job进行配置,content用户配置数据源和目的地。
Reader和Writer的具体参数可参考官方文档,地址:
https://github.com/alibaba/DataX/blob/master/README.md
所以,如果我们需要自定义 DataX 任务的时候,就需要打开官网的 reader 和 writer 文档,查看需要配置哪些参数,接下来我们就来练习一下:
4、使用案例
4.1、MySQL -> HDFS
从 MySQL 写入到 HDFS ,我们就需要去官网查看 MySQLReader 和 HDFSWriter 的内容:
简而言之,MysqlReader通过JDBC连接器连接到远程的Mysql数据库,并根据用户配置的信息生成查询SELECT SQL语句,然后发送到远程Mysql数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。
对于用户配置Table、Column、Where的信息,MysqlReader将其拼接为SQL语句发送到Mysql数据库;对于用户配置querySql信息,MysqlReader直接将其发送到Mysql数据库。
案例要求:同步 gmall 数据库中 base_province 表数据到 HDFS 的 /base_province 目录
需求分析:要实现该功能,需选用 MySQLReader 和 HDFSWriter,MySQLReader 具有两种模式分别是TableMode和QuerySQLMode,前者使用table,column,where等属性声明需要同步的数据;后者使用一条SQL查询语句声明需要同步的数据。
下面分别使用两种模式进行演示:
4.1.1、MySQLReader & TableMode
1)编写配置文件
vim /opt/module/datax/job/base_province.json
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [ "id", "name", "region_id", "area_code", "iso_code", "iso_3166_2" ], "where": "id>=3", "connection": [ { "jdbcUrl": [ "jdbc:mysql://hadoop102:3306/gmall" ], "table": [ "base_province" ] } ], "password": "123456", "splitPk": "", "username": "root" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name": "id", "type": "bigint" }, { "name": "name", "type": "string" }, { "name": "region_id", "type": "string" }, { "name": "area_code", "type": "string" }, { "name": "iso_code", "type": "string" }, { "name": "iso_3166_2", "type": "string" } ], "compress": "gzip", "defaultFS": "hdfs://hadoop102:8020", "fieldDelimiter": "\t", "fileName": "base_province", "fileType": "text", "path": "/base_province", "writeMode": "append" } } } ], "setting": { "speed": { "channel": 1 } } } }
2)配置说明
1. Reader 参数说明
注意:这里的 splitPk 参数是数据分片字段,一般是主键,仅支持整型 ,而且只有 TableMode 模式下才有效。
2. Writer 参数说明
我们的 hdfswriter 中有一个 column 参数,但是我们知道 HDFS 是没有列的这个概念的。其实,这里代表的是我们Hive表中数据的字段类型,这个配置参数是给 Hive 看的。之后我们在使用 hdfsreader 的时候 依然要配置这个参数,这个参数的意义仍然是 hive 的数据字段。
注意:这里的 fileName 参数指的是 HDFS 前缀名而并不是完整文件名!
3)提交任务
使用DataX向HDFS同步数据时,必须确保目标路径已存在!
hadoop fs -mkdir /base_province
python bin/datax.py job/base_province.json
执行结果:
可以看到,我们的文件名是由我们 hdfswriter 中指定的前缀 fileName + uuid 组成的。
查看HDFS 中的文件内容(因为我们的文件是经过 gzip 压缩的,所以网页端查看不了):
hadoop fs -cat /base_province/* | zcat
可以看到,MySQL 中 34 条数据一共写入了 32 条,这是因为我们设置了 where 参数的值为 id>=3
DataX - 全量数据同步工具(2)https://developer.aliyun.com/article/1532375