01 引言
因为最近使用到了DataX
,所以接下来需要来个系统的学习,并以博客的形式记录。
DataX
的源码地址:https://github.com/alibaba/DataX
DataX
官方介绍:https://github.com/alibaba/DataX/blob/master/introduction.md
02 DataX引入
很多时候,我们都需要把不同数据库的数据做迁移,典型的就如Oracle
数据库的数据迁移到MySQL
或者迁移到SQLServer
,那么问题来了,我们把数据源迁移到另外一个新的数据库,都需要写一个程序,这是十分麻烦的(如下图):
那么有没有一个框架,能实现同步数据库之间的数据同步呢?其实是有的,就是本文要讲的DataX
。
03 DataX
3.1 DataX概念
首先我们要知道的是DataX
为何物?官方是这样描述的:
DataX
是阿里云DataWorks
数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX
实现了包括MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS
等各种异构数据源之间高效的数据同步功能。
可以知道,DataX
就是一个离线异构数据源同步工具,它的设计理念图如下,这也是网上举例最多的图了:
3.2 DataX原理
首先看看DataX
的原理图:
从上图可以看到,DataX
主要由3部分组成:
- Reader:
Reader
为数据采集模块,负责采集数据源的数据,将数据发送给Framework
; - Writer:
Writer
为数据写入模块,负责不断向Framework
取数据,并将数据写入到目的端; - Framework:
Framework
用于连接reader
和writer
,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
DataX
采用Framework + plugin
架构构建。将数据源读取和写入抽象成为Reader/Writer
插件,纳入到整个同步框架中。
DataX
已经把主流的RDBMS
数据库、NOSQL
、大数据计算系统插件都已经接入了,如下:
类型 | 数据源 | Reader(读) | Writer(写) | 文档 |
RDBMS 关系型数据库 | MySQL | √ | √ | 读 、写 |
Oracle | √ | √ | 读 、写 | |
OceanBase | √ | √ | 读 、写 | |
SQLServer | √ | √ | 读 、写 | |
PostgreSQL | √ | √ | 读 、写 | |
DRDS | √ | √ | 读 、写 | |
通用RDBMS(支持所有关系型数据库) | √ | √ | 读 、写 | |
阿里云数仓数据存储 | ODPS | √ | √ | 读 、写 |
ADS | √ | 写 | ||
OSS | √ | √ | 读 、写 | |
OCS | √ | 写 | ||
NoSQL数据存储 | OTS | √ | √ | 读 、写 |
Hbase0.94 | √ | √ | 读 、写 | |
Hbase1.1 | √ | √ | 读 、写 | |
Phoenix4.x | √ | √ | 读 、写 | |
Phoenix5.x | √ | √ | 读 、写 | |
MongoDB | √ | √ | 读 、写 | |
Hive | √ | √ | 读 、写 | |
Cassandra | √ | √ | 读 、写 | |
无结构化数据存储 | TxtFile | √ | √ | 读 、写 |
FTP | √ | √ | 读 、写 | |
HDFS | √ | √ | 读 、写 | |
Elasticsearch | √ | 写 | ||
时间序列数据库 | OpenTSDB | √ | 读 | |
TSDB | √ | √ | 读 、写 |
3.3 DataX架构
看看DataX
的架构图:
主要由Job模块、Task模块、TaskGroup模块组成,当DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。下面细讲每个模块:
3.3.1 Job作业
DataX
完成单个数据同步的作业,我们称之为Job
。DataX
接受到一个Job
之后,将启动一个进程来完成整个作业同步过程。DataX Job
模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)
、TaskGroup
管理等功能。
3.3.2 Task子任务
DataX Job
启动后,会根据不同的源端切分策略,将Job
切分成多个小的Task
(子任务),以便于并发执行。Task
便是DataX
作业的最小单元,每一个Task
都会负责一部分数据的同步工作。
3.3.3 TaskGroup
- 切分多个
Task
之后,DataX Job
会调用Scheduler
模块,根据配置的并发数据量,将拆分成的Task
重新组合,组装成TaskGroup(
任务组)。 - 每一个
TaskGroup
负责以一定的并发运行完毕分配好的所有Task
,默认单个任务组的并发数量为5。 - 每一个
Task
都由TaskGroup
负责启动,Task
启动后,会固定启动Reader—>Channel—>Writer
的线程来完成任务同步工作。
3.4 DataX代码执行流程
这里参考了:https://zhuanlan.zhihu.com/p/81817787,后面的文章也会继续编写(这里可以跳过)
流程:
- 解析配置,包括job.json、core.json、plugin.json三个配置
- 设置jobId到configuration当中
- 启动Engine,通过Engine.start()进入启动程序
- 设置RUNTIME_MODE到configuration当中
- 通过JobContainer的start()方法启动
- 依次执行job的preHandler()、init()、prepare()、split()、schedule()、- post()、postHandle()等方法。
- init()方法涉及到根据configuration来初始化reader和writer插件,这里涉及到jar包热加载以及调用插件init()操作方法,同时设置reader和writer的configuration信息
- prepare()方法涉及到初始化reader和writer插件的初始化,通过调用插件的prepare()方法实现,每个插件都有自己的jarLoader,通过集成URLClassloader实现而来
- split()方法通过adjustChannelNumber()方法调整channel个数,同时执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,达到切分后数目相等,才能满足1:1的通道模型
- channel的计数主要是根据byte和record的限速来实现的(如果自己没有设置了channel的个数),在split()的函数中第一步就是计算channel的大小
- split()方法reader插件会根据channel的值进行拆分,但是有些reader插件可能不会参考channel的值,writer插件会完全根据reader的插件1:1进行返回
- split()方法内部的mergeReaderAndWriterTaskConfigs()负责合并reader、writer、以及transformer三者关系,生成task的配置,并且重写job.content的配置
- schedule()方法根据split()拆分生成的task配置分配生成taskGroup对象,根据task的数量和单个taskGroup支持的task数量进行配置,两者相除就可以得出taskGroup的数量14、schdule()内部通过AbstractScheduler的schedule()执行,继续执行startAllTaskGroup()方法创建所有的TaskGroupContainer组织相关的task,TaskGroupContainerRunner负责运行TaskGroupContainer执行分配的task。
- taskGroupContainerExecutorService启动固定的线程池用以执行TaskGroupContainerRunner对象,TaskGroupContainerRunner的run()方法调用taskGroupContainer.start()方法,针对每个channel创建一个TaskExecutor,通过taskExecutor.doStart()启动任务。
04 文末
DataX的源码解读,可以参考其它博主的博客:https://waterwang.blog.csdn.net/article/details/114630690