[toc]
简介
DataX是一个数据同步工具,可以将数据从一个地方读取出来并以极快的速度写入另外一个地方。常见的如将mysql中的数据同步到另外一个mysql中,或者另外一个mongodb中。
工作流程
- read:设置一个源,DataX从源读取数据
- write:设置一个目的地,DataX将读取到的数据写入目的地
- setting:同步设置,如设置并发通道、控制作业速度等
- Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题
- 多线程:充分利用多线程来处理同步任务
核心架构
核心模块介绍
1:DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
2:DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
3:切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5
4:每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作
5:DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
DataX调度流程
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:
DaXJob根据分库分表切分成了100个Task。
根据20个并发,DataX计算共需要分配4个TaskGroup。
4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
支持的数据
类型 | 数据源 | Reader(读) | Writer(写) | 文档 |
---|---|---|---|---|
RDBMS 关系型数据库 | MySQL | √ | √ | 读 、写 |
Oracle | √ | √ | 读 、写 | |
OceanBase | √ | √ | 读 、写 | |
SQLServer | √ | √ | 读 、写 | |
PostgreSQL | √ | √ | 读 、写 | |
DRDS | √ | √ | 读 、写 | |
达梦 | √ | √ | 读 、写 | |
通用RDBMS(支持所有关系型数据库) | √ | √ | 读 、写 | |
阿里云数仓数据存储 | ODPS | √ | √ | 读 、写 |
ADS | √ | 写 | ||
OSS | √ | √ | 读 、写 | |
OCS | √ | √ | 读 、写 | |
NoSQL数据存储 | OTS | √ | √ | 读 、写 |
Hbase0.94 | √ | √ | 读 、写 | |
Hbase1.1 | √ | √ | 读 、写 | |
MongoDB | √ | √ | 读 、写 | |
Hive | √ | √ | 读 、写 | |
无结构化数据存储 | TxtFile | √ | √ | 读 、写 |
FTP | √ | √ | 读 、写 | |
HDFS | √ | √ | 读 、写 | |
Elasticsearch | √ | 写 |
实践
作为极简教程,本文将从mysql中读取一张表的数据,然后同步到clickhouse中。
下载
打开该项目的Github 首页进行下载:https://github.com/alibaba/DataX
下载链接:https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz
下载下来是一个tar.gz的包,windows下解压命令:
tar -zxvf xxx.tar.gz
程序目录:
- bin:使用里面的 datax.py 来启动程序
- job:里面放了一个job.json,用来检查运行环境,一般的建议下载完毕之后执行一次。
- log:存放执行日志
- plugin:插件集,插件分为read和write,分别对应datax可支持的数据库
- 其他目录:......
环境
DataX是基于python和java的,需要机器拥有python和java 的运行环境。
在下载完毕后,通过执行自检脚本,可确认环境是否正确
python {
YOUR_DATAX_HOME}/bin/datax.py {
YOUR_DATAX_HOME}/job/job.json
执行流程
编写同步任务配置文件,在job目录中创建 mysql-to-clickhouse.json 文件,并填入如下内容
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "xxx",
"password": "xxx",
"column": [
"id",
"name"
],
"splitPk": "id",
"connection": [
{
"table": [
"table_name"
],
"jdbcUrl": [
"jdbc:mysql://192.168.1.xxx:xxx/db_name"
]
}
]
}
},
"writer": {
"name": "clickhousewriter",
"parameter": {
"username": "xxx",
"password": "xxx",
"column": [
"id",
"ame"
],
"connection": [
{
"jdbcUrl": "jdbc:clickhouse://192.168.1.xxx:xxx/table_name",
"table": [
"table_name"
]
}
],
"preSql": [],
"postSql": [],
"batchSize": 65536,
"batchByteSize": 134217728,
"dryRun": false,
"writeMode": "insert"
}
}
}
]
}
}
- job:一个job包含两个部分,setting中设置任务的执行速度,错误限制等,content中是任务具体的描述。
- reader:任务的数据输入源
- writer:任务的数据输出源
根据任务配置文件启动datax,先cd到datax的根目录
python bin/datax.py job/mysql-to-clickhouse.json
运行上述命令后,任务就开启了。本例从mysql数据库中的一张表中读取了两个字段(id,name),然后同步到clickhouse中,clickhouse中需要先创建同样的库,表和列。
任务执行非常快,140W数据仅用了 18s 就完成了同步。
2024-05-16 16:24:57.312 [job-0] INFO JobContainer -
任务启动时刻 : 2024-05-16 16:24:38
任务结束时刻 : 2024-05-16 16:24:57
任务总计耗时 : 18s
任务平均流量 : 2.21MB/s
记录写入速度 : 142425rec/s
读出记录总数 : 1424252
读写失败总数 : 0