DataX教程(03)- 源码解读(超详细版)

简介: DataX教程(03)- 源码解读(超详细版)

01 引言


通过前面的两篇博文,我们可以在IDEA下运行DataX源码项目了:


  • 《DataX教程(01)- 入门》
  • 《DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)》


本文需要讲解的是DataX的源码。


02 DataX框架讲解


2.1 DataX设计思想

ebc0675c33324f6fa9a8e8abef6a5890.png


DataX采用Framework + plugin架构构建,将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。


  • Reader:Reader做为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。


2.2 DataX运行流程


2.2.1 DataX运行流程解析d199c5cd0528421db3ba0194f613fd18.png

结合上图,描述一下DataX整个运行流程(从左往右看):


  • step1: DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程;
  • step2: 会根据不同的源端切分策略,将Job切分成多个小的Task(子任务);
  • step3: 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将step2拆分成的Task重新组合,组装成TaskGroup(任务组),每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5;
  • step4:TaskGroup里面的Task都有TaskGroup来启动,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作;
  • step5:DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0


2.2.2 DataX运行流程简单举例


举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。


DataX的调度决策思路是:


  • ① DataXJob根据分库分表切分成了100个Task。
  • ② 根据20个并发,DataX计算共需要分配4个TaskGroup(默认单个任务组的并发数量为5)。
  • ③ 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

03 DataX源码分析


在进行源码分析前,首先贴上我自己整理的DataX源码分析流程图:

8b377dddeceb41a2b04e040df94b1340.png


3.1 源码流程描述


其实DataX的运行流程也没有我们想的这么复杂,根据上面的流程图,简单描述如下:


  • 【Step1】:首先入口在Engine类,入口参数我们有“mode”、“jobId”、“job.json”(即运行的模式、任务id、任务配置等);
  • 【Step2】:有了入口参数,会先校验,然后统一封装好成Configuration,传给下一步使用;
  • 【Step3】:根据配置初始化不同的容器并启动,目前只有两种容器,分别为JobContainer(作业任务容器)、TaskGroupContainer(分组任务容器),一般都是创建一个作业,先进JobContainer;
  • 【Step4】:在JobContainer里面,会根据配置使用类加载器加载reader和writer的实例对象,然后出发加载对象的生命周期方法,如init()、prepare()、post()等;
  • 【Step5】:JobContainer执行流程里,有一个schedule方法,主要是为了切割任务组,并初始化到TaskContainer,调度使用;
  • 【Step6】:TaskContainer主要是对任务进行初始化、并控制执行的顺序(如:reader -> transfromer -> writer)。
  • 【Step7】:最后,具体的业务实现,就在Reader(即:E)、Transformer(即:T)、Writer(即:L)里面实现。

3.2 流程对应代码


3.2.1 step1:入口


【Step1】:首先入口在Engine类,入口参数我们有“mode”、“jobId”、“job.json”(即运行的模式、任务id、任务配置等)


对应的代码(com.alibaba.datax.core.Engine#main)及详细描述如下:

79fed68de9144960b4202873b22e073b.png


3.2.2 step2:封装配置


【Step2】:有了入口参数,会先校验,然后统一封装好成Configuration,传给下一步使用。


对应的代码(com.alibaba.datax.core.Engine#entry)及详细描述如下:

b0ff642d39744ca0a304bfec8eedaec0.png


3.2.3 step3:初始化并启动容器


【Step3】:根据配置初始化不同的容器并启动,目前只有两种容器,分别为JobContainer(作业任务容器)、TaskGroupContainer(分组任务容器),一般都是创建一个作业,先进JobContainer;


对应的代码(com.alibaba.datax.core.Engine#start)及详细描述如下:

a8a4c06b86ef4ed5b854dd6a365d3d10.png


3.2.4 step4:JobContainer运行内容


【Step4】:在JobContainer里面,会根据配置使用类加载器加载reader和writer的实例对象,然后出发加载对象的生命周期方法,如init()、prepare()、post()等;


对应代码(com.alibaba.datax.core.job.JobContainer#start)及详细描述如下:

c94bd235705a44788dcf518d733b135f.png


3.2.5 step5:调度JobContainer


【Step5】:JobContainer执行流程里,有一个schedule方法,主要是为了切割任务组,并初始化到TaskContainer,调度使用;


对应代码(com.alibaba.datax.core.job.JobContainer#start),内容如step4的图片,具体对应第119行代码:

this.schedule();


3.2.6 step6:JobContainer详解


【Step6】:TaskContainer主要是对任务进行初始化、并控制执行的顺序(如:reader -> transfromer -> writer)


对应代码(com.alibaba.datax.core.taskgroup.TaskGroupContainer#start)及详细描述如下:

image.png


再来看看TaskExecutor是怎么初始化的,对应代码(com.alibaba.datax.core.taskgroup.TaskGroupContainer.TaskExecutor#TaskExecutor)具体详情如下:

e752d6bd18e94e89b42bedc13283059d.png


ok,到这里可以看看doStart()运行的方法,对应代码(com.alibaba.datax.core.taskgroup.TaskGroupContainer.TaskExecutor#doStart),详情如下:

9438f34a6ea94097bc22c6a21a40ce14.png


3.2.7 step7:业务实现


【Step7】:最后,具体的业务实现,就在Reader(即:E)、Transformer(即:T)、Writer(即:L)里面实现。


那么,经过前几个步骤的操作,最终哪里实现业务呢?DataX的例子是使用了StreamReader和StreamWriter的,我们Stream插件里的代码:


① E:对应StreamReader,详情如下:

ecb63ab8172a4bc198c6eb1a62631b87.png


② L:对应StreamWriter,详情如下

ca24c1aac45d431a85062c61322a3579.png


这个时候,可能很多小伙伴都会提出,还有个T(即Transformer)呢?这里拿个简单的来看看,即ReplaceTransformercom.alibaba.datax.core.transport.transformer.ReplaceTransformer),代码内容如下:

f913f60c233a40dc906470e5eb19b4a6.png


好奇宝贝肯定会问,这里的evaluate()方法什么时候调用?我们一直逆向查询调用,会发现调用的流程如下:

Transformer.evaluate() 
<= 
ComplexTransformer.evaluate()
<=
TransformerExchanger.doTransformer() 
<=
RecordSender.sendToWriter() 
<=
Reader.Task.startRead()


翻译上面的流程,用人话说就是(这里改为了正向流程了):

Reader读内容(`Reader.Task.startRead()`)
=>
Reader读内容的方法最后一步调用发送给Writer(`RecordSender.sendToWriter() `)
=>
发送之前使用Exchanger交换机来进行transform操作(`TransformerExchanger.doTransformer() `)
=>
transform会触发evaluate去处理业务(`ComplexTransformer.evaluate()`)
=>
最后处理发送到writer前的业务操作(`Transformer.evaluate() `)


04 几个关键的类


到了这一步,我们大概知道了DataX的整个运行流程机制了,那么有一些工具类在这里还是值得说一下的。


4.1 LoadUtil 插件加载工具


描述:这一个工具类是一个插件加载器,大体上分reader、transformer(还未实现)和writer三中插件类型,reader和writer在执行时又可能出现Job和Task两种运行时(加载的类不同)。


Ctrl+O看看有哪些方法,这里就不再描述里面的每一个方法了,知道怎么用就可以了:

374744ca97124d14a026213d555ae743.png

4.2 ClassLoaderSwapper 类加载器管理者


描述:这个类主要配合LoadUtil使用,主要管理不同线程(即:Task任务)的类加载器。

LoadUtil配合使用的方式(这里说的是线程,但是官方文档说开了个进程,感觉不太合适):


// 初始化ClassLoaderSwapper
private ClassLoaderSwapper classLoaderSwapper 
= ClassLoaderSwapper.newCurrentThreadClassLoaderSwapper();
// 注入插件到ClassLoaderSwapper
classLoaderSwapper.setCurrentThreadClassLoader(
  LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName)
);


Ctrl+O看看ClassLoaderSwapper有什么方法,这里看注释,不再描述了:

c7f218cbcf924f38b3bc6a770ba1769e.png


4.3 ErrorRecordChecker 错误记录检查者


描述: 检查任务是否到达错误记录限制。有检查条数(recordLimit)和百分比(percentageLimit)两种方式。


  1. errorRecord表示出错条数不能大于限制数,当超过时任务失败。比如errorRecord为0表示不容许任何脏数据。
  2. errorPercentage表示出错比例,在任务结束时校验。
  3. errorRecord优先级高于errorPercentage。


Ctrl+O看看ClassLoaderSwapper有什么方法:

c562365b362e49c5a83693b967fe0d0a.png


4.4 Communication 所有的状态及信息统计


描述:主要记录DataX所有的状态及统计信息交互,job、taskGroup、task等的消息汇报都走该类。


Ctrl+O看看Communication有什么方法(这里方法比较多,其实就是一些DataX的一些运行状态及信息统计):

db1ce33f0639426eb9c1fb91e4b2f2d3.png


4.5 AbstractScheduler 调度器


描述:主要就是根据job配置来调度TaskGroupContainer


Ctrl+O看看AbstractScheduler有什么方法:

e52239bf2c0b474aae0be9e4fd570ddf.png


05 文末


本文是对DataX源码解读的一篇分析文章,如果文中有理解错误的,欢迎童鞋们指出,本文完!

目录
相关文章
|
关系型数据库 MySQL 调度
DataX教程(05)- DataX Web项目实践
DataX教程(05)- DataX Web项目实践
1342 0
|
Java 关系型数据库 MySQL
DataX教程(10)- DataX插件热插拔原理
DataX教程(10)- DataX插件热插拔原理
567 0
|
分布式计算 Java API
Flink教程(04)- Flink入门案例
Flink教程(04)- Flink入门案例
153 0
|
数据采集 分布式计算 调度
DataX教程(03)- 源码解读(超详细版)
DataX教程(03)- 源码解读(超详细版)
841 0
|
调度 DataX 容器
DataX教程(07)- 图解DataX任务分配及执行流程
DataX教程(07)- 图解DataX任务分配及执行流程
574 0
DataX教程(07)- 图解DataX任务分配及执行流程
|
存储 NoSQL 关系型数据库
DataX教程(01)- 入门
DataX教程(01)- 入门
808 0
|
JSON Java DataX
DataX教程(04)- 配置完整解读
DataX教程(04)- 配置完整解读
2418 0
|
Java DataX Maven
DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)
DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)
652 0
|
Java Apache 开发工具
Flink 源码阅读环境搭建
阅读优秀的源码是提升我们代码技能最重要的手段之一,工欲善其事必先利其器,所以,搭建好源码阅读环境是我们阅读的第一步。
|
分布式计算 资源调度 Java
Hadoop中的MapReduce概述、优缺点、核心思想、编程规范、进程、官方WordCount源码、提交到集群测试、常用数据序列化类型、WordCount案例实操
Hadoop中的MapReduce概述、优缺点、核心思想、编程规范、进程、官方WordCount源码、提交到集群测试、常用数据序列化类型、WordCount案例实操
Hadoop中的MapReduce概述、优缺点、核心思想、编程规范、进程、官方WordCount源码、提交到集群测试、常用数据序列化类型、WordCount案例实操
下一篇
无影云桌面