DataX教程(01)- 入门

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: DataX教程(01)- 入门

01 引言

因为最近使用到了DataX,所以接下来需要来个系统的学习,并以博客的形式记录。

DataX的源码地址:https://github.com/alibaba/DataX

DataX官方介绍:https://github.com/alibaba/DataX/blob/master/introduction.md

02 DataX引入

很多时候,我们都需要把不同数据库的数据做迁移,典型的就如Oracle数据库的数据迁移到MySQL或者迁移到SQLServer,那么问题来了,我们把数据源迁移到另外一个新的数据库,都需要写一个程序,这是十分麻烦的(如下图):

那么有没有一个框架,能实现同步数据库之间的数据同步呢?其实是有的,就是本文要讲的DataX

03 DataX

3.1 DataX概念

首先我们要知道的是DataX为何物?官方是这样描述的:

  • DataX是阿里云DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台
  • DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。

可以知道,DataX就是一个离线异构数据源同步工具,它的设计理念图如下,这也是网上举例最多的图了:

3.2 DataX原理

首先看看DataX的原理图:

从上图可以看到,DataX主要由3部分组成:

  • ReaderReader为数据采集模块,负责采集数据源的数据,将数据发送给Framework
  • WriterWriter为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端;
  • FrameworkFramework用于连接readerwriter,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

DataX已经把主流的RDBMS数据库、NOSQL、大数据计算系统插件都已经接入了,如下:

类型 数据源 Reader(读) Writer(写) 文档
RDBMS 关系型数据库 MySQL
Oracle
OceanBase
SQLServer
PostgreSQL
DRDS
通用RDBMS(支持所有关系型数据库)
阿里云数仓数据存储 ODPS
ADS
OSS
OCS
NoSQL数据存储 OTS
Hbase0.94
Hbase1.1
Phoenix4.x
Phoenix5.x
MongoDB
Hive
Cassandra
无结构化数据存储 TxtFile
FTP
HDFS
Elasticsearch
时间序列数据库 OpenTSDB
TSDB

3.3 DataX架构

看看DataX的架构图:

主要由Job模块、Task模块、TaskGroup模块组成,当DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。下面细讲每个模块:

3.3.1 Job作业

  • DataX完成单个数据同步的作业,我们称之为Job
  • DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。
  • DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)TaskGroup管理等功能。

3.3.2 Task子任务

  • DataX Job启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。
  • Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

3.3.3 TaskGroup

  • 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。
  • 每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  • 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

3.4 DataX代码执行流程

这里参考了:https://zhuanlan.zhihu.com/p/81817787,后面的文章也会继续编写(这里可以跳过)

流程:

  1. 解析配置,包括job.json、core.json、plugin.json三个配置
  2. 设置jobId到configuration当中
  3. 启动Engine,通过Engine.start()进入启动程序
  4. 设置RUNTIME_MODE到configuration当中
  5. 通过JobContainer的start()方法启动
  6. 依次执行job的preHandler()、init()、prepare()、split()、schedule()、- post()、postHandle()等方法。
  7. init()方法涉及到根据configuration来初始化reader和writer插件,这里涉及到jar包热加载以及调用插件init()操作方法,同时设置reader和writer的configuration信息
  8. prepare()方法涉及到初始化reader和writer插件的初始化,通过调用插件的prepare()方法实现,每个插件都有自己的jarLoader,通过集成URLClassloader实现而来
  9. split()方法通过adjustChannelNumber()方法调整channel个数,同时执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,达到切分后数目相等,才能满足1:1的通道模型
  10. channel的计数主要是根据byte和record的限速来实现的(如果自己没有设置了channel的个数),在split()的函数中第一步就是计算channel的大小
  11. split()方法reader插件会根据channel的值进行拆分,但是有些reader插件可能不会参考channel的值,writer插件会完全根据reader的插件1:1进行返回
  12. split()方法内部的mergeReaderAndWriterTaskConfigs()负责合并reader、writer、以及transformer三者关系,生成task的配置,并且重写job.content的配置
  13. schedule()方法根据split()拆分生成的task配置分配生成taskGroup对象,根据task的数量和单个taskGroup支持的task数量进行配置,两者相除就可以得出taskGroup的数量14、schdule()内部通过AbstractScheduler的schedule()执行,继续执行startAllTaskGroup()方法创建所有的TaskGroupContainer组织相关的task,TaskGroupContainerRunner负责运行TaskGroupContainer执行分配的task。
  14. taskGroupContainerExecutorService启动固定的线程池用以执行TaskGroupContainerRunner对象,TaskGroupContainerRunner的run()方法调用taskGroupContainer.start()方法,针对每个channel创建一个TaskExecutor,通过taskExecutor.doStart()启动任务。


04 文末

DataX的源码解读,可以参考其它博主的博客:https://waterwang.blog.csdn.net/article/details/114630690

目录
相关文章
|
7月前
|
存储 NoSQL 关系型数据库
阿里DataX极简教程
【5月更文挑战第1天】DataX是一个高效的数据同步工具,用于在各种数据源之间迁移数据,如MySQL到另一个MySQL或MongoDB。它的工作流程包括read、write和setting步骤,通过Framework协调多线程处理。其核心架构包括Job、Task和TaskGroup,支持并发执行。DataX支持多种数据源,如RDBMS、阿里云数仓、NoSQL和无结构化数据存储。例如,从MySQL读取数据并同步到ClickHouse的实践操作包括下载DataX、配置任务文件和执行同步任务。
754 1
阿里DataX极简教程
|
关系型数据库 MySQL 调度
DataX教程(05)- DataX Web项目实践
DataX教程(05)- DataX Web项目实践
1345 0
|
7月前
|
Java Linux DataX
DataX入门指南:快速部署和安装指南
DataX入门指南:快速部署和安装指南
1785 2
DataX入门指南:快速部署和安装指南
|
Java 关系型数据库 MySQL
DataX教程(10)- DataX插件热插拔原理
DataX教程(10)- DataX插件热插拔原理
572 0
|
调度 DataX 容器
DataX教程(07)- 图解DataX任务分配及执行流程
DataX教程(07)- 图解DataX任务分配及执行流程
576 0
DataX教程(07)- 图解DataX任务分配及执行流程
|
数据采集 分布式计算 调度
DataX教程(03)- 源码解读(超详细版)
DataX教程(03)- 源码解读(超详细版)
845 0
|
监控 DataX
DataX教程(09)- DataX是如何做到限速的?
DataX教程(09)- DataX是如何做到限速的?
365 0
|
监控 调度 DataX
DataX教程(08)- 监控与汇报
DataX教程(08)- 监控与汇报
526 0
|
JSON Java DataX
DataX教程(04)- 配置完整解读
DataX教程(04)- 配置完整解读
2439 0
|
Java DataX Maven
DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)
DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)
658 0

热门文章

最新文章

相关实验场景

更多