01 引言
通过前面的两篇博文,我们可以在IDEA
下运行DataX
源码项目了:
本文需要讲解的是DataX
的源码。
02 DataX框架讲解
2.1 DataX设计思想
DataX采用Framework + plugin
架构构建,将数据源读取和写入抽象成为Reader/Writer
插件,纳入到整个同步框架中。
- Reader:Reader做为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
- Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
- Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
2.2 DataX运行流程
2.2.1 DataX运行流程解析
结合上图,描述一下DataX
整个运行流程(从左往右看):
- step1:
DataX
接受到一个Job
之后,将启动一个进程来完成整个作业同步过程;- step2: 会根据不同的源端切分策略,将
Job
切分成多个小的Task
(子任务);- step3: 切分多个
Task
之后,DataX Job
会调用Scheduler模块,根据配置的并发数据量,将step2拆分成的Task重新组合,组装成TaskGroup(任务组),每一个TaskGroup
负责以一定的并发运行完毕分配好的所有Task
,默认单个任务组的并发数量为5
;- step4:TaskGroup里面的Task都有TaskGroup来启动,会固定启动
Reader—>Channel—>Writer
的线程来完成任务同步工作;- step5:DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
2.2.2 DataX运行流程简单举例
举例来说,用户提交了一个DataX
作业,并且配置了20
个并发,目的是将一个100
张分表的mysql
数据同步到odps
里面。
DataX的调度决策思路是:
- ① DataXJob根据分库分表切分成了100个Task。
- ② 根据20个并发,DataX计算共需要分配4个TaskGroup(默认单个任务组的并发数量为5)。
- ③ 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
03 DataX源码分析
在进行源码分析前,首先贴上我自己整理的DataX源码分析流程图:
3.1 源码流程描述
其实DataX的运行流程也没有我们想的这么复杂,根据上面的流程图,简单描述如下:
- 【Step1】:首先入口在
Engine
类,入口参数我们有“mode
”、“jobId
”、“job.json
”(即运行的模式、任务id、任务配置等); - 【Step2】:有了入口参数,会先校验,然后统一封装好成
Configuration
,传给下一步使用; - 【Step3】:根据配置初始化不同的容器并启动,目前只有两种容器,分别为
JobContainer
(作业任务容器)、TaskGroupContainer
(分组任务容器),一般都是创建一个作业,先进JobContainer
; - 【Step4】:在
JobContainer
里面,会根据配置使用类加载器加载reader
和writer
的实例对象,然后出发加载对象的生命周期方法,如init()
、prepare()
、post
()等; - 【Step5】:
JobContainer
执行流程里,有一个schedule
方法,主要是为了切割任务组,并初始化到TaskContainer
,调度使用; - 【Step6】:TaskContainer主要是对任务进行初始化、并控制执行的顺序(如:
reader
->transfromer
->writer
)。 - 【Step7】:最后,具体的业务实现,就在
Reader
(即:E
)、Transformer
(即:T
)、Writer
(即:L
)里面实现。
3.2 流程对应代码
3.2.1 step1:入口
【Step1】:首先入口在
Engine
类,入口参数我们有“mode
”、“jobId
”、“job.json
”(即运行的模式、任务id、任务配置等)
对应的代码(com.alibaba.datax.core.Engine#main
)及详细描述如下:
3.2.2 step2:封装配置
【Step2】:有了入口参数,会先校验,然后统一封装好成
Configuration
,传给下一步使用。
对应的代码(com.alibaba.datax.core.Engine#entry
)及详细描述如下:
3.2.3 step3:初始化并启动容器
【Step3】:根据配置初始化不同的容器并启动,目前只有两种容器,分别为
JobContainer
(作业任务容器)、TaskGroupContainer
(分组任务容器),一般都是创建一个作业,先进JobContainer
;
对应的代码(com.alibaba.datax.core.Engine#start
)及详细描述如下:
3.2.4 step4:JobContainer运行内容
【Step4】:在
JobContainer
里面,会根据配置使用类加载器加载reader
和writer
的实例对象,然后出发加载对象的生命周期方法,如init()
、prepare()
、post
()等;
对应代码(com.alibaba.datax.core.job.JobContainer#start
)及详细描述如下:
3.2.5 step5:调度JobContainer
【Step5】:
JobContainer
执行流程里,有一个schedule
方法,主要是为了切割任务组,并初始化到TaskContainer
,调度使用;
对应代码(com.alibaba.datax.core.job.JobContainer#start
),内容如step4的图片,具体对应第119行代码:
this.schedule();
3.2.6 step6:JobContainer详解
【Step6】:TaskContainer主要是对任务进行初始化、并控制执行的顺序(如:
reader
->transfromer
->writer
)
对应代码(com.alibaba.datax.core.taskgroup.TaskGroupContainer#start
)及详细描述如下:
再来看看TaskExecutor
是怎么初始化的,对应代码(com.alibaba.datax.core.taskgroup.TaskGroupContainer.TaskExecutor#TaskExecutor
)具体详情如下:
ok,到这里可以看看doStart()
运行的方法,对应代码(com.alibaba.datax.core.taskgroup.TaskGroupContainer.TaskExecutor#doStart
),详情如下:
3.2.7 step7:业务实现
【Step7】:最后,具体的业务实现,就在
Reader
(即:E
)、Transformer
(即:T
)、Writer
(即:L
)里面实现。
那么,经过前几个步骤的操作,最终哪里实现业务呢?DataX的例子是使用了StreamReader
和StreamWriter
的,我们Stream
插件里的代码:
① E:对应StreamReader,详情如下:
② L:对应StreamWriter,详情如下:
这个时候,可能很多小伙伴都会提出,还有个T(即Transformer)呢?这里拿个简单的来看看,即ReplaceTransformer(com.alibaba.datax.core.transport.transformer.ReplaceTransformer
),代码内容如下:
好奇宝贝肯定会问,这里的evaluate()
方法什么时候调用?我们一直逆向查询调用,会发现调用的流程如下:
Transformer.evaluate() <= ComplexTransformer.evaluate() <= TransformerExchanger.doTransformer() <= RecordSender.sendToWriter() <= Reader.Task.startRead()
翻译上面的流程,用人话说就是(这里改为了正向流程了):
Reader读内容(`Reader.Task.startRead()`) => Reader读内容的方法最后一步调用发送给Writer(`RecordSender.sendToWriter() `) => 发送之前使用Exchanger交换机来进行transform操作(`TransformerExchanger.doTransformer() `) => transform会触发evaluate去处理业务(`ComplexTransformer.evaluate()`) => 最后处理发送到writer前的业务操作(`Transformer.evaluate() `)
04 几个关键的类
到了这一步,我们大概知道了DataX的整个运行流程机制了,那么有一些工具类在这里还是值得说一下的。
4.1 LoadUtil 插件加载工具
描述:这一个工具类是一个插件加载器,大体上分reader
、transformer
(还未实现)和writer
三中插件类型,reader
和writer
在执行时又可能出现Job
和Task
两种运行时(加载的类不同)。
Ctrl+O
看看有哪些方法,这里就不再描述里面的每一个方法了,知道怎么用就可以了:
4.2 ClassLoaderSwapper 类加载器管理者
描述:这个类主要配合LoadUtil使用,主要管理不同线程(即:Task任务)的类加载器。
与LoadUtil
配合使用的方式(这里说的是线程,但是官方文档说开了个进程,感觉不太合适):
// 初始化ClassLoaderSwapper private ClassLoaderSwapper classLoaderSwapper = ClassLoaderSwapper.newCurrentThreadClassLoaderSwapper(); // 注入插件到ClassLoaderSwapper classLoaderSwapper.setCurrentThreadClassLoader( LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName) );
Ctrl+O
看看ClassLoaderSwapper
有什么方法,这里看注释,不再描述了:
4.3 ErrorRecordChecker 错误记录检查者
描述: 检查任务是否到达错误记录限制。有检查条数(recordLimit
)和百分比(percentageLimit
)两种方式。
- errorRecord表示出错条数不能大于限制数,当超过时任务失败。比如errorRecord为0表示不容许任何脏数据。
- errorPercentage表示出错比例,在任务结束时校验。
- errorRecord优先级高于errorPercentage。
Ctrl+O
看看ClassLoaderSwapper
有什么方法:
4.4 Communication 所有的状态及信息统计
描述:主要记录DataX所有的状态及统计信息交互,job
、taskGroup
、task
等的消息汇报都走该类。
Ctrl+O
看看Communication
有什么方法(这里方法比较多,其实就是一些DataX的一些运行状态及信息统计):
4.5 AbstractScheduler 调度器
描述:主要就是根据job
配置来调度TaskGroupContainer
。
Ctrl+O
看看AbstractScheduler
有什么方法:
05 文末
本文是对DataX源码解读的一篇分析文章,如果文中有理解错误的,欢迎童鞋们指出,本文完!
原创不易,最后需要声明:
- 本文原创作者:阿甘兄
- 作者博客地址:https://yanglinwei.blog.csdn.net/
需要转载的请务必联系我本人!