图解 DataX 核心设计原理

简介: 前段时间我在 K8s 相关文章中有提到过数据同步的项目,该项目就是基于 DataX 内核构建的,由于公司数据同步的需求,还需要在 DataX 原有的基础上支持增量同步功能,同时支持分布式调度,在「使用 K8s 进行作业调度实战分享」这篇文章中已经详细描述其中的实现。

DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。


前段时间我在 K8s 相关文章中有提到过数据同步的项目,该项目就是基于 DataX 内核构建的,由于公司数据同步的需求,还需要在 DataX 原有的基础上支持增量同步功能,同时支持分布式调度,在「使用 K8s 进行作业调度实战分享」这篇文章中已经详细描述其中的实现。


基于我在项目中对 DataX 的实践过程,给大家分享我所理解的 DataX 核心设计原理。


设计理念



异构数据源离线同步是将源端数据同步到目的端,但是端与端的数据源类型种类繁多,在没有 DataX 之前,端与端的链路将组成一个复杂的网状结构,非常零散无法将同步核心逻辑抽象出来,DataX 的理念就是作为一个同步核心载体连接连接各类数据源,当我们需要数据同步时,只需要以插件的形式接入到 DataX 即可,将复杂的网状结构链路变成了一个星型结构,如下图所示:

640.png


架构设计



用过 IDEA 的小伙都知道,IDEA 有很多非常棒的插件,用户可根据自身编程需求,下载相关的插件,DataX 也是使用这种可插拔的设计,采用了 Framework + Plugin 的架构设计,如下图所示:

640.png

有了插件,DataX 可支持任意数据源到数据源,只要实现了 Reader/Writer Plugin,官方已经实现了主流的数据源插件,比如 MySQL、Oracle、SQLServer 等,当然我们也可以开发一个 DataX 插件。


核心概念



DataX 核心主要由 Job、Task Group、Task、Channel 等概念组成:


1、Job

在 DataX 中用来描述一个源端到一个目的端的同步作业,是 DataX 数据同步面向用户的最小业务单元。一个Job 对应 一个 JobContainer, JobContainer 负责 Job 的全局切分、调度、前置语句和后置语句等工作。

2、Task Group

一组 Task 的集合,根据 DataX 的公平分配策略,公平地分配 Task 到对应的 TaskGroup 中。一个 TaskGroup 对应一个 TaskGroupContainer,负责执行一组 Task。

3、Task

Job 的最小执行单元,一个 Job 可根据 Reader 端切分策略,且分成若干个 Task,以便于并发执行。


Job、Task Group、Task 三者之间的关系可以用如下图表示:


640.png

根据切分策略将一个 Job 切分成多个 Task,根据分配策略将多个 Task 组成一个 TaskGroup。


4、Channel


DataX 会单独启动一条线程运行运行一个 Task,而 Task 会持有一个 Channel,用作 Reader 与 Writer 的数据传输媒介,DataX 的数据流向都是按照 Reader—>Channel—>Writer 的方向流转,用如下图表示:


640.png

Channel 作为传输通道,即能充当缓冲层,同时还能对数据传输进行限流操作。


5、Transformer


DataX 的 transformer 模式同时还提供了强大的数据转换功能,DataX 默认提供了丰富的数据转换实现类,用户还可以根据项目自身需求,扩展数据转换。


640.png


调度流程



DataX 将用户的 job.json 同步作业配置解析成一个 Job,DataX 通过 JobContainer 完成全局切分、调度、前置语句和后置语句等工作,整体调度流程用如下图表示:

640.png


1、切分策略


1)计算并发量(即 needChannelNumber 大小)

DataX有流控模式,其中,可以设置 bps 限速,tps 限速:

  • bps 限速:needChannelNumber = 总 byteLimit / 单个 Channel byteLimit
  • tps 限速:needChannelNumber = 总 recordLimit / 单个 Channel recordLimit

如果以上都没有设置,则会根据用户在 job.setting.speed.channel 配置的并发数量设置 needChannelNumber。


2)根据 needChannelNumber 将 Job 切分成多个 Task

这个步骤的具体切分逻辑交由相关插件去完成,例如 Rdb 对数据的拆分主要分成两类:

  • 如果用户配置了具体的 Table 数量,那么就按照 Table 为最小单元进行拆分(即一个 Table 对应一个 Task),并生成对应的 querySql;
  • 如果用户还配置了 splitPk,则会根据 splitPk 进行切分,具体逻辑是根据 splitPk 区间对 Table 进行拆分,并生成对应的 querySql。


2、公平分配策略


DataX 在执行调度之前,会调用 JobAssignUtil#assignFairly方法对切分好的 Task 公平分配给每个 TaskGroup。


在分配之前,会计算 TaskGroup 的数量,具体公式:

int taskGroupNumber = (int) Math.ceil(1.0 * channelNumber / channelsPerTaskGroup);

channelNumber 即为在切分策略中根据用户配置计算得到的 needChannelNumber 并发数量大小,channelsPerTaskGroup 为每个 TaskGroup 需要的并发数量,默认为 5。

求出 TaskGroup 的数量之后,就会执行公平分配策略,将 Task 平均分配个每个 TaskGroup,最后执行调度,完成整个同步作业。举个公平分配策略的例子:


假设 A 库有表 0、1、2,B 库上有表 3、4,C 库上有表 5、6、7,如果此时有 4 个

TaskGroup,则 assign 后的结果为:

taskGroup-0: 0,  4,
taskGroup-1: 3,  6,
taskGroup-2: 5,  2,
taskGroup-3: 1,  7


举个例子来描述 Job、Task、Task Group 之间的关系:


用户构建了一个数据同步作业,该作业的目的是将 MySql 的 100 张表同步到 Oracle 库中,假设此时用户设置了 20 个并发(即 channelNumber=20):


  1. DataX 根据表的数量切分成 100 个 Task;
  2. DataX 默认给每个 TaskGroup 分配 5 个 Channel,因此 taskGroupNumber = channelNumber / channelsPerTaskGroup = 20 / 5 = 4;
  3. 根据 DataX 的公平分配策略,会将 100 个 Task 平均分配给每个 TaskGroup,因此每个 TaskGroup 处理 taskNumber / taskGroupNumber = 100 / 4 = 25 个 Task。


以上的例子用如下图表示:


640.png

由于一个 Channel 对应一个线程执行,因此 DataX 的线程模型可以用如下图表示:640.png

相关文章
|
6月前
|
Java 关系型数据库 MySQL
DataX教程(10)- DataX插件热插拔原理
DataX教程(10)- DataX插件热插拔原理
220 0
|
9月前
|
JSON 缓存 关系型数据库
5、DataX(DataX简介、DataX架构原理、DataX部署、使用、同步MySQL数据到HDFS、同步HDFS数据到MySQL)(二)
5、DataX(DataX简介、DataX架构原理、DataX部署、使用、同步MySQL数据到HDFS、同步HDFS数据到MySQL)(二)
|
9月前
|
存储 SQL JSON
5、DataX(DataX简介、DataX架构原理、DataX部署、使用、同步MySQL数据到HDFS、同步HDFS数据到MySQL)(一)
5、DataX(DataX简介、DataX架构原理、DataX部署、使用、同步MySQL数据到HDFS、同步HDFS数据到MySQL)(一)
|
2月前
|
Java 数据处理 调度
Dataphin常见问题之离线管道同步数据datax就报连接超时如何解决
Dataphin是阿里云提供的一站式数据处理服务,旨在帮助企业构建一体化的智能数据处理平台。Dataphin整合了数据建模、数据处理、数据开发、数据服务等多个功能,支持企业更高效地进行数据治理和分析。
|
7月前
|
存储 DataWorks Unix
Dataworks数据集成之“文本数据”
Dataworks不是支持文本数据导入么?为什么Excel数据不能导入?CSV文件不就是Excel文件么?关于这些问题,我整理了一篇文章进行解释。
782 2
|
10月前
|
数据采集 SQL 分布式计算
数据处理 、大数据、数据抽取 ETL 工具 DataX 、Kettle、Sqoop
数据处理 、大数据、数据抽取 ETL 工具 DataX 、Kettle、Sqoop
1045 0
|
13天前
|
分布式计算 DataWorks 数据库
DataWorks操作报错合集之DataWorks使用数据集成整库全增量同步oceanbase数据到odps的时候,遇到报错,该怎么处理
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
24 0
|
14天前
|
分布式计算 DataWorks 调度
DataWorks产品使用合集之在DataWorks中,查看ODPS表的OSS对象如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
28 1
|
14天前
|
分布式计算 DataWorks MaxCompute
DataWorks产品使用合集之在DataWorks中,将数据集成功能将AnalyticDB for MySQL中的数据实时同步到MaxCompute中如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
24 0
|
14天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在使用 DataWorks 数据集成同步 PostgreSQL 数据库中的 Geometry 类型数据如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
24 0