DataX教程(03)- 源码解读(超详细版)

简介: DataX教程(03)- 源码解读(超详细版)

01 引言

通过前面的两篇博文,我们可以在IDEA下运行DataX源码项目了:

本文需要讲解的是DataX的源码。

02 DataX框架讲解

2.1 DataX设计思想

DataX采用Framework + plugin架构构建,将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader做为数据采集模块,负责采集数据源的数据,将数据发送给Framework
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题

2.2 DataX运行流程

2.2.1 DataX运行流程解析

结合上图,描述一下DataX整个运行流程(从左往右看):

  • step1DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程;
  • step2: 会根据不同的源端切分策略,将Job切分成多个小的Task(子任务);
  • step3: 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量将step2拆分成的Task重新组合,组装成TaskGroup(任务组),每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5
  • step4:TaskGroup里面的Task都有TaskGroup来启动,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作;
  • step5:DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

2.2.2 DataX运行流程简单举例

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。

DataX的调度决策思路是

  • ① DataXJob根据分库分表切分成了100个Task。
  • ② 根据20个并发,DataX计算共需要分配4个TaskGroup(默认单个任务组的并发数量为5)。
  • ③ 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

03 DataX源码分析

在进行源码分析前,首先贴上我自己整理的DataX源码分析流程图:

3.1 源码流程描述

其实DataX的运行流程也没有我们想的这么复杂,根据上面的流程图,简单描述如下:

  • 【Step1】:首先入口在Engine类,入口参数我们有“mode”、“jobId”、“job.json”(即运行的模式、任务id、任务配置等);
  • 【Step2】:有了入口参数,会先校验,然后统一封装好成Configuration,传给下一步使用;
  • 【Step3】:根据配置初始化不同的容器并启动,目前只有两种容器,分别为JobContainer(作业任务容器)、TaskGroupContainer(分组任务容器),一般都是创建一个作业,先进JobContainer
  • 【Step4】:在JobContainer里面,会根据配置使用类加载器加载readerwriter的实例对象,然后出发加载对象的生命周期方法,如init()prepare()post()等;
  • 【Step5】:JobContainer执行流程里,有一个schedule方法,主要是为了切割任务组,并初始化到TaskContainer,调度使用;
  • 【Step6】:TaskContainer主要是对任务进行初始化、并控制执行的顺序(如:reader -> transfromer -> writer)。
  • 【Step7】:最后,具体的业务实现,就在Reader(即:E)、Transformer(即:T)、Writer(即:L)里面实现。

3.2 流程对应代码

3.2.1 step1:入口

【Step1】:首先入口在Engine类,入口参数我们有“mode”、“jobId”、“job.json”(即运行的模式、任务id、任务配置等)

对应的代码(com.alibaba.datax.core.Engine#main)及详细描述如下:

3.2.2 step2:封装配置

【Step2】:有了入口参数,会先校验,然后统一封装好成Configuration,传给下一步使用。

对应的代码(com.alibaba.datax.core.Engine#entry)及详细描述如下:

3.2.3 step3:初始化并启动容器

【Step3】:根据配置初始化不同的容器并启动,目前只有两种容器,分别为JobContainer(作业任务容器)、TaskGroupContainer(分组任务容器),一般都是创建一个作业,先进JobContainer

对应的代码(com.alibaba.datax.core.Engine#start)及详细描述如下:

3.2.4 step4:JobContainer运行内容

【Step4】:在JobContainer里面,会根据配置使用类加载器加载readerwriter的实例对象,然后出发加载对象的生命周期方法,如init()prepare()post()等;

对应代码(com.alibaba.datax.core.job.JobContainer#start)及详细描述如下:

3.2.5 step5:调度JobContainer

【Step5】:JobContainer执行流程里,有一个schedule方法,主要是为了切割任务组,并初始化到TaskContainer,调度使用;

对应代码(com.alibaba.datax.core.job.JobContainer#start),内容如step4的图片,具体对应第119行代码:

this.schedule();

3.2.6 step6:JobContainer详解

【Step6】:TaskContainer主要是对任务进行初始化、并控制执行的顺序(如:reader -> transfromer -> writer

对应代码(com.alibaba.datax.core.taskgroup.TaskGroupContainer#start)及详细描述如下:

再来看看TaskExecutor是怎么初始化的,对应代码(com.alibaba.datax.core.taskgroup.TaskGroupContainer.TaskExecutor#TaskExecutor)具体详情如下:

ok,到这里可以看看doStart()运行的方法,对应代码(com.alibaba.datax.core.taskgroup.TaskGroupContainer.TaskExecutor#doStart),详情如下:

3.2.7 step7:业务实现

【Step7】:最后,具体的业务实现,就在Reader(即:E)、Transformer(即:T)、Writer(即:L)里面实现。

那么,经过前几个步骤的操作,最终哪里实现业务呢?DataX的例子是使用了StreamReaderStreamWriter的,我们Stream插件里的代码:

① E:对应StreamReader,详情如下:

② L:对应StreamWriter,详情如下:

这个时候,可能很多小伙伴都会提出,还有个T(即Transformer)呢?这里拿个简单的来看看,即ReplaceTransformercom.alibaba.datax.core.transport.transformer.ReplaceTransformer),代码内容如下:

好奇宝贝肯定会问,这里的evaluate()方法什么时候调用?我们一直逆向查询调用,会发现调用的流程如下:

Transformer.evaluate() 
<= 
ComplexTransformer.evaluate()
<=
TransformerExchanger.doTransformer() 
<=
RecordSender.sendToWriter() 
<=
Reader.Task.startRead()

翻译上面的流程,用人话说就是(这里改为了正向流程了):

Reader读内容(`Reader.Task.startRead()`)
=>
Reader读内容的方法最后一步调用发送给Writer(`RecordSender.sendToWriter() `)
=>
发送之前使用Exchanger交换机来进行transform操作(`TransformerExchanger.doTransformer() `)
=>
transform会触发evaluate去处理业务(`ComplexTransformer.evaluate()`)
=>
最后处理发送到writer前的业务操作(`Transformer.evaluate() `)

04 几个关键的类

到了这一步,我们大概知道了DataX的整个运行流程机制了,那么有一些工具类在这里还是值得说一下的。

4.1 LoadUtil 插件加载工具

描述:这一个工具类是一个插件加载器,大体上分readertransformer(还未实现)和writer三中插件类型,readerwriter在执行时又可能出现JobTask两种运行时(加载的类不同)。

Ctrl+O看看有哪些方法,这里就不再描述里面的每一个方法了,知道怎么用就可以了:

4.2 ClassLoaderSwapper 类加载器管理者

描述:这个类主要配合LoadUtil使用,主要管理不同线程(即:Task任务)的类加载器。

LoadUtil配合使用的方式(这里说的是线程,但是官方文档说开了个进程,感觉不太合适):

// 初始化ClassLoaderSwapper
private ClassLoaderSwapper classLoaderSwapper 
= ClassLoaderSwapper.newCurrentThreadClassLoaderSwapper();
// 注入插件到ClassLoaderSwapper
classLoaderSwapper.setCurrentThreadClassLoader(
  LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName)
);

Ctrl+O看看ClassLoaderSwapper有什么方法,这里看注释,不再描述了:

4.3 ErrorRecordChecker 错误记录检查者

描述: 检查任务是否到达错误记录限制。有检查条数(recordLimit)和百分比(percentageLimit)两种方式。

  1. errorRecord表示出错条数不能大于限制数,当超过时任务失败。比如errorRecord为0表示不容许任何脏数据。
  2. errorPercentage表示出错比例,在任务结束时校验。
  3. errorRecord优先级高于errorPercentage。

Ctrl+O看看ClassLoaderSwapper有什么方法:

4.4 Communication 所有的状态及信息统计

描述:主要记录DataX所有的状态及统计信息交互,jobtaskGrouptask等的消息汇报都走该类。

Ctrl+O看看Communication有什么方法(这里方法比较多,其实就是一些DataX的一些运行状态及信息统计):

4.5 AbstractScheduler 调度器

描述:主要就是根据job配置来调度TaskGroupContainer

Ctrl+O看看AbstractScheduler有什么方法:

05 文末

本文是对DataX源码解读的一篇分析文章,如果文中有理解错误的,欢迎童鞋们指出,本文完!


原创不易,最后需要声明

需要转载的请务必联系我本人!

目录
相关文章
|
6月前
|
关系型数据库 MySQL 调度
DataX教程(05)- DataX Web项目实践
DataX教程(05)- DataX Web项目实践
718 0
|
6月前
|
Java 关系型数据库 MySQL
DataX教程(10)- DataX插件热插拔原理
DataX教程(10)- DataX插件热插拔原理
210 0
|
6月前
|
监控 DataX
DataX教程(09)- DataX是如何做到限速的?
DataX教程(09)- DataX是如何做到限速的?
165 0
|
6月前
|
监控 调度 DataX
DataX教程(08)- 监控与汇报
DataX教程(08)- 监控与汇报
181 0
|
6月前
|
调度 DataX 容器
DataX教程(07)- 图解DataX任务分配及执行流程
DataX教程(07)- 图解DataX任务分配及执行流程
253 0
DataX教程(07)- 图解DataX任务分配及执行流程
|
6月前
|
JSON Java DataX
DataX教程(04)- 配置完整解读
DataX教程(04)- 配置完整解读
777 0
|
6月前
|
Java DataX Maven
DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)
DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)
295 0
|
6月前
|
存储 NoSQL 关系型数据库
DataX教程(01)- 入门
DataX教程(01)- 入门
386 0
|
JSON Oracle 关系型数据库
Datax将Oracle数据导入ElasticSearch7完成教程
Datax将Oracle数据导入ElasticSearch7完成教程
823 0
|
2月前
|
Java 数据处理 调度
Dataphin常见问题之离线管道同步数据datax就报连接超时如何解决
Dataphin是阿里云提供的一站式数据处理服务,旨在帮助企业构建一体化的智能数据处理平台。Dataphin整合了数据建模、数据处理、数据开发、数据服务等多个功能,支持企业更高效地进行数据治理和分析。