01 引言
通过前面的两篇博文,我们可以在IDEA下运行DataX源码项目了:
- 《DataX教程(01)- 入门》
- 《DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)》
本文需要讲解的是DataX的源码。
02 DataX框架讲解
2.1 DataX设计思想
DataX采用Framework + plugin架构构建,将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
- Reader:Reader做为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
- Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
- Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
2.2 DataX运行流程
2.2.1 DataX运行流程解析
结合上图,描述一下DataX整个运行流程(从左往右看):
- step1: DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程;
- step2: 会根据不同的源端切分策略,将Job切分成多个小的Task(子任务);
- step3: 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将step2拆分成的Task重新组合,组装成TaskGroup(任务组),每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5;
- step4:TaskGroup里面的Task都有TaskGroup来启动,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作;
- step5:DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
2.2.2 DataX运行流程简单举例
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。
DataX的调度决策思路是:
- ① DataXJob根据分库分表切分成了100个Task。
- ② 根据20个并发,DataX计算共需要分配4个TaskGroup(默认单个任务组的并发数量为5)。
- ③ 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
03 DataX源码分析
在进行源码分析前,首先贴上我自己整理的DataX源码分析流程图:
3.1 源码流程描述
其实DataX的运行流程也没有我们想的这么复杂,根据上面的流程图,简单描述如下:
- 【Step1】:首先入口在Engine类,入口参数我们有“mode”、“jobId”、“job.json”(即运行的模式、任务id、任务配置等);
- 【Step2】:有了入口参数,会先校验,然后统一封装好成Configuration,传给下一步使用;
- 【Step3】:根据配置初始化不同的容器并启动,目前只有两种容器,分别为JobContainer(作业任务容器)、TaskGroupContainer(分组任务容器),一般都是创建一个作业,先进JobContainer;
- 【Step4】:在JobContainer里面,会根据配置使用类加载器加载reader和writer的实例对象,然后出发加载对象的生命周期方法,如init()、prepare()、post()等;
- 【Step5】:JobContainer执行流程里,有一个schedule方法,主要是为了切割任务组,并初始化到TaskContainer,调度使用;
- 【Step6】:TaskContainer主要是对任务进行初始化、并控制执行的顺序(如:reader -> transfromer -> writer)。
- 【Step7】:最后,具体的业务实现,就在Reader(即:E)、Transformer(即:T)、Writer(即:L)里面实现。
3.2 流程对应代码
3.2.1 step1:入口
【Step1】:首先入口在Engine类,入口参数我们有“mode”、“jobId”、“job.json”(即运行的模式、任务id、任务配置等)
对应的代码(com.alibaba.datax.core.Engine#main)及详细描述如下:
3.2.2 step2:封装配置
【Step2】:有了入口参数,会先校验,然后统一封装好成
Configuration
,传给下一步使用。
对应的代码(com.alibaba.datax.core.Engine#entry
)及详细描述如下:
3.2.3 step3:初始化并启动容器
【Step3】:根据配置初始化不同的容器并启动,目前只有两种容器,分别为JobContainer(作业任务容器)、TaskGroupContainer(分组任务容器),一般都是创建一个作业,先进JobContainer;
对应的代码(com.alibaba.datax.core.Engine#start)及详细描述如下:
3.2.4 step4:JobContainer运行内容
【Step4】:在JobContainer里面,会根据配置使用类加载器加载reader和writer的实例对象,然后出发加载对象的生命周期方法,如init()、prepare()、post()等;
对应代码(com.alibaba.datax.core.job.JobContainer#start)及详细描述如下:
3.2.5 step5:调度JobContainer
【Step5】:JobContainer执行流程里,有一个schedule方法,主要是为了切割任务组,并初始化到TaskContainer,调度使用;
对应代码(com.alibaba.datax.core.job.JobContainer#start),内容如step4的图片,具体对应第119行代码:
this.schedule();
3.2.6 step6:JobContainer详解
【Step6】:TaskContainer主要是对任务进行初始化、并控制执行的顺序(如:reader -> transfromer -> writer)
对应代码(com.alibaba.datax.core.taskgroup.TaskGroupContainer#start)及详细描述如下:
再来看看TaskExecutor
是怎么初始化的,对应代码(com.alibaba.datax.core.taskgroup.TaskGroupContainer.TaskExecutor#TaskExecutor
)具体详情如下:
ok,到这里可以看看doStart()
运行的方法,对应代码(com.alibaba.datax.core.taskgroup.TaskGroupContainer.TaskExecutor#doStart
),详情如下:
3.2.7 step7:业务实现
【Step7】:最后,具体的业务实现,就在Reader(即:E)、Transformer(即:T)、Writer(即:L)里面实现。
那么,经过前几个步骤的操作,最终哪里实现业务呢?DataX的例子是使用了StreamReader和StreamWriter的,我们Stream插件里的代码:
① E:对应StreamReader,详情如下:
② L:对应StreamWriter,详情如下:
这个时候,可能很多小伙伴都会提出,还有个T(即Transformer)呢?这里拿个简单的来看看,即ReplaceTransformer(com.alibaba.datax.core.transport.transformer.ReplaceTransformer
),代码内容如下:
好奇宝贝肯定会问,这里的evaluate()
方法什么时候调用?我们一直逆向查询调用,会发现调用的流程如下:
Transformer.evaluate() <= ComplexTransformer.evaluate() <= TransformerExchanger.doTransformer() <= RecordSender.sendToWriter() <= Reader.Task.startRead()
翻译上面的流程,用人话说就是(这里改为了正向流程了):
Reader读内容(`Reader.Task.startRead()`) => Reader读内容的方法最后一步调用发送给Writer(`RecordSender.sendToWriter() `) => 发送之前使用Exchanger交换机来进行transform操作(`TransformerExchanger.doTransformer() `) => transform会触发evaluate去处理业务(`ComplexTransformer.evaluate()`) => 最后处理发送到writer前的业务操作(`Transformer.evaluate() `)
04 几个关键的类
到了这一步,我们大概知道了DataX的整个运行流程机制了,那么有一些工具类在这里还是值得说一下的。
4.1 LoadUtil 插件加载工具
描述:这一个工具类是一个插件加载器,大体上分reader、transformer(还未实现)和writer三中插件类型,reader和writer在执行时又可能出现Job和Task两种运行时(加载的类不同)。
Ctrl+O看看有哪些方法,这里就不再描述里面的每一个方法了,知道怎么用就可以了:
4.2 ClassLoaderSwapper 类加载器管理者
描述:这个类主要配合LoadUtil使用,主要管理不同线程(即:Task任务)的类加载器。
与LoadUtil
配合使用的方式(这里说的是线程,但是官方文档说开了个进程,感觉不太合适):
// 初始化ClassLoaderSwapper private ClassLoaderSwapper classLoaderSwapper = ClassLoaderSwapper.newCurrentThreadClassLoaderSwapper(); // 注入插件到ClassLoaderSwapper classLoaderSwapper.setCurrentThreadClassLoader( LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName) );
Ctrl+O
看看ClassLoaderSwapper
有什么方法,这里看注释,不再描述了:
4.3 ErrorRecordChecker 错误记录检查者
描述: 检查任务是否到达错误记录限制。有检查条数(recordLimit)和百分比(percentageLimit)两种方式。
- errorRecord表示出错条数不能大于限制数,当超过时任务失败。比如errorRecord为0表示不容许任何脏数据。
- errorPercentage表示出错比例,在任务结束时校验。
- errorRecord优先级高于errorPercentage。
Ctrl+O看看ClassLoaderSwapper有什么方法:
4.4 Communication 所有的状态及信息统计
描述:主要记录DataX所有的状态及统计信息交互,job、taskGroup、task等的消息汇报都走该类。
Ctrl+O看看Communication有什么方法(这里方法比较多,其实就是一些DataX的一些运行状态及信息统计):
4.5 AbstractScheduler 调度器
描述:主要就是根据job
配置来调度TaskGroupContainer
。
Ctrl+O
看看AbstractScheduler
有什么方法:
05 文末
本文是对DataX源码解读的一篇分析文章,如果文中有理解错误的,欢迎童鞋们指出,本文完!