01 引言
通过前面的博文,我们知道了DataX的概念以及原理了:
本文需要讲解的是DataX
的配置,即编译打包后的bin
目录配置文件:
以及conf
目录的配置文件:
还有job
目录的配置文件:
02 bin目录文件解析
分析bin目录的文件,需要有一定的
python
基础,之前写过相关的专栏,有兴趣的童鞋可以参阅下:《Python专栏》
在bin
目录,可以看到有几个文件,分别是:
datax.py
dxprof.py
perftrace.py
2.1 datax.py
2.1.1 命令窗口运行datax.py
执行
py
文件需要安装python
环境,mac
系统一般都自带了,无需安装,如果是windows
系统,自行百度或谷歌。mac系统升级python3可以参考我写的博客《Mac下安装Python3》
datax.py
主要是用来提交任务的,相当于datax
的入口,我们使用命令窗口来执行看看,进入编译后的datax
目录,然后执行:
python datax.py ../job/job.json
可以看到,DataX
执行成功,结果如下(与IDEA
下运行DataX
的效果一致,可参考:《DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)》):
2.1.2 使用PyCharm运行datax.py
要解读datax.py
最好的方式是断点调试,我这里使用PyCharm
来进行断点调试。
首先导入编译后的项目,File->Open..
,并打开datax.py
:
点击运行,可以看到报错了:
其实是没有指定任务的配置文件,点击“Edit Configuration ...
”:
配置任务文件的路径:
打断点右键Debug:
接下来可以愉快的跟着断点解读datax.py
的源码了:
2.1.3 datax.py解读
datax.py
主要做的事情就是生成一个命令,并使用cmd来执行。
程序的入口在“__main__
”方法,我们看看入口代码的内容:
if __name__ == "__main__": printCopyright() parser = getOptionParser() options, args = parser.parse_args(sys.argv[1:]) if options.reader is not None and options.writer is not None: generateJobConfigTemplate(options.reader, options.writer) sys.exit(RET_STATE['OK']) if len(args) != 1: parser.print_help() sys.exit(RET_STATE['FAIL']) startCommand = buildStartCommand(options, args) # print startCommand child_process = subprocess.Popen(startCommand, shell=True) register_signal() (stdout, stderr) = child_process.communicate() sys.exit(child_process.returncode)
上面的代码不多,主要看到第12行代码:
startCommand = buildStartCommand(options, args)
第12行代码之前做的事情,是封装startCommand
命令,通过断点调试,封装好的命令如下:
从上图,可以看到其实就是一个java
命令。在第12行之后主要做的是创建一个进程来执行startCommand
命令:
# 创建进程并执行指定的shell 脚本 child_process = subprocess.Popen(startCommand, shell=True) # 将执行结果保存在信号量中 register_signal() # 父子进程进行通信,并将通信结果保存到 stdout, stderr (stdout, stderr) = child_process.communicate() # 退出(根据子进程的状态码) sys.exit(child_process.returncode)
好了,在这里已经解读完main
方法了,那么还有其它的方法呢?Cotrol+O
看看:
整理之后如下:
分类 | 逻辑名称 | 解析 |
变量 | DATAX_HOME | 编译后当前运行datax目录的根路径,如:/Users/用户名/Desktop/datax' |
DATAX_VERSION | DataX的版本号 ,如:DATAX-OPENSOURCE-3.0 |
|
DEFAULT_JVM | jvm配置,以及dump的路径,如:'-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/Users/用户名/Desktop/datax/log' |
|
DEFAULT_PROPERTY_CONF | 一些默认属性的配置, 如:-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=/Users/用户名/Desktop/datax -Dlogback.configurationFile=/Users/用户名/Desktop/datax/conf/logback.xml |
|
ENGINE_COMMAND | 引擎完整的启动命令格式,如:java -server ${jvm} -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=/Users/用户名/Desktop/datax -Dlogback.configurationFile=/Users/用户名/Desktop/datax/conf/logback.xml -classpath /Users/用户名/Desktop/datax/lib/*:. ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job} |
|
ispy2 | 是否为python2.x | |
LOGBACK_FILE | logback.xml 路径,如:/Users/yanglinwei/Desktop/datax/conf/logback.xml |
|
REMOTE_DEBUG_CONFIG | 远程调试配置,如:-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999 |
|
RET_STATE | 返回码定义,如: {"KILL": 143,"FAIL": -1,"OK": 0,"RUN": 1,"RETRY": 2} |
|
方法 | buildStartCommand(options, args) | 通过各类 if else 构建启动命令。启动命令中包含2部分 JVM 参数+环境变量 |
generateJobConfigTemplate(reader, writer) | 根据writer 和reader名, 从github 拉取对应的模板,最终创建出 json 任务的模板 |
|
getLocalIp() | 获取本地ip 地址 |
|
getOptionParser() | 获取选项的解析器 | |
isUrl(path) | 判断入参是否为一个 url | |
isWindows() | 当前环境是否为windows |
|
printCopyright() | 打印版权信息 | |
readPluginTemplate(plugin) | 根据插件名读取插件模板 | |
register_signal() | 将执行结果保存在信号量中 | |
suicide(signum, e) | 根据信号值,结束本子进程 |
2.2 dxprof.py
在datax
项目里,没有找到这个文件在哪里调用了,通过阅读源码,大概知道这个类是一个工具类,用来查询数据库的主机信息、表信息以及任务的执行结果信息。
control+o查看dxprof.py
这个文件有哪些变量和方法,具体这里就不解释了,从方法名可以看出其含义:
2.3 perftrace.py
2.3.1 perftrace.py的用法
根据学习perftrace.py
的源码,可以知道它的主要作用就是,用户可以通过传入json字符串来生成任务的json,然后执行。
在perftrace.py
的getUsage()
方法里面,描述了perftrace.py
的用法:
perftrace.py --channel=10 --reader='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "where":"", "splitPk":"", "writer-print":"false"}' perftrace.py --channel=10 --writer='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}' perftrace.py --file=/tmp/datax.job.json --type=reader --reader='{"writer-print": "false"}' perftrace.py --file=/tmp/datax.job.json --type=writer --writer='{"reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}'
注意,里面的参数也是有要求的,对于reader:
The following params are available for -r --reader: [these params is for rdbms reader, used to trace rdbms read performance, it's like datax's key] *datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2 etc... *jdbcUrl: datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database *username: username for datasource *password: password for datasource *table: table name for read data column: column to be read, the default value is ['*'] splitPk: the splitPk column of rdbms table where: limit the scope of the performance data set fetchSize: how many rows to be fetched at each communicate [these params is for stream reader, used to trace rdbms write performance] reader-sliceRecordCount: how man test data to mock(each channel), the default value is 10000 reader-column : stream reader while generate test data(type supports: string|long|date|double|bool|bytes; support constant value and random function),demo: [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]
对于writer:
The following params are available for -w --writer: [these params is for rdbms writer, used to trace rdbms write performance, it's like datax's key] datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2|ads etc... *jdbcUrl: datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database *username: username for datasource *password: password for datasource *table: table name for write data column: column to be writed, the default value is ['*'] batchSize: how many rows to be storeed at each communicate, the default value is 512 preSql: prepare sql to be executed before write data, the default value is '' postSql: post sql to be executed end of write data, the default value is '' url: required for ads, pattern is ip:port schme: required for ads, ads database name [these params is for stream writer, used to trace rdbms read performance] writer-print: true means print data read from source datasource, the default value is false
对于全局配置global:
The following params are available global control: -c --channel: the number of concurrent tasks, the default value is 1 -f --file: existing completely dataX configuration file path -t --type: test read or write performance for a datasource, couble be reader or writer, in collaboration with -f --file -h --help: print help message
2.3.2 perftrace.py例子
比如,需要打印本地数据库某张表的信息,如下内容:
这里直接使用如下命令:
perftrace.py --channel=1 --reader='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/bm_wxcp?useSSL=false", "username":"root", "password":"123456", "table": "t_sync_log", "where":"", "splitPk":"id", "writer-print":"true"}'
打印结果如下:
yanglineideMBP2:bin yanglinwei$ perftrace.py --channel=1 --reader='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/bm_wxcp?useSSL=false", "username":"root", "password":"123456", "table": "t_sync_log", "where":"", "splitPk":"id", "writer-print":"true"}' DataX Util Tools (UNKNOWN_DATAX_VERSION), From Alibaba ! Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved. trace environments: dataxJobPath: /Users/yanglinwei/Desktop/datax/bin/perftrace-c336d159-8a57-11ec-b78e-f45c89ba8565 dataxHomePath: /Users/yanglinwei/Desktop/datax dataxCommand: /Users/yanglinwei/Desktop/datax/bin/datax.py /Users/yanglinwei/Desktop/datax/bin/perftrace-c336d159-8a57-11ec-b78e-f45c89ba8565 DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. 2022-02-10 17:56:49.547 [main] INFO VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl 2022-02-10 17:56:49.560 [main] INFO Engine - the machine info => osInfo: Oracle Corporation 11 11.0.2+9-LTS jvmInfo: Mac OS X x86_64 10.15.1 cpu num: 4 totalPhysicalMemory: -0.00G freePhysicalMemory: -0.00G maxFileDescriptorCount: -1 currentOpenFileDescriptorCount: -1 GC Names [G1 Young Generation, G1 Old Generation] MEMORY_NAME | allocation_size | init_size CodeHeap 'profiled nmethods' | 117.22MB | 2.44MB G1 Old Gen | 1,024.00MB | 970.00MB G1 Survivor Space | -0.00MB | 0.00MB CodeHeap 'non-profiled nmethods' | 117.22MB | 2.44MB Compressed Class Space | 1,024.00MB | 0.00MB Metaspace | -0.00MB | 0.00MB G1 Eden Space | -0.00MB | 54.00MB CodeHeap 'non-nmethods' | 5.56MB | 2.44MB 2022-02-10 17:56:49.614 [main] INFO Engine - { "content":[ { "reader":{ "name":"mysqlreader", "parameter":{ "column":[ "*" ], "connection":[ { "jdbcUrl":[ "jdbc:mysql://127.0.0.1:3306/bm_wxcp?useSSL=false" ], "table":[ "t_sync_log" ] } ], "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/bm_wxcp?useSSL=false", "password":"******", "sliceRecordCount":"10000", "splitPk":"id", "table":"t_sync_log", "username":"root", "where":"" } }, "writer":{ "name":"streamwriter", "parameter":{ "print":"true" } } } ], "setting":{ "speed":{ "channel":"1" } } } 2022-02-10 17:56:49.665 [main] WARN Engine - prioriy set to 0, because NumberFormatException, the value is: null 2022-02-10 17:56:49.668 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0 2022-02-10 17:56:49.668 [main] INFO JobContainer - DataX jobContainer starts job. 2022-02-10 17:56:49.671 [main] INFO JobContainer - Set jobId = 0 2022-02-10 17:56:50.356 [job-0] INFO OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:mysql://127.0.0.1:3306/bm_wxcp?useSSL=false&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true. 2022-02-10 17:56:50.358 [job-0] WARN OriginalConfPretreatmentUtil - 您的配置文件中的列配置存在一定的风险. 因为您未配置读取数据库表的列,当您的表字段个数、类型有变动时,可能影响任务正确性甚至会运行出错。请检查您的配置并作出修改. 2022-02-10 17:56:50.369 [job-0] INFO JobContainer - jobContainer starts to do prepare ... 2022-02-10 17:56:50.369 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do prepare work . 2022-02-10 17:56:50.371 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] do prepare work . 2022-02-10 17:56:50.372 [job-0] INFO JobContainer - jobContainer starts to do split ... 2022-02-10 17:56:50.372 [job-0] INFO JobContainer - Job set Channel-Number to 1 channels. 2022-02-10 17:56:50.377 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks. 2022-02-10 17:56:50.378 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] splits to [1] tasks. 2022-02-10 17:56:50.423 [job-0] INFO JobContainer - jobContainer starts to do schedule ... 2022-02-10 17:56:50.429 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups. 2022-02-10 17:56:50.432 [job-0] INFO JobContainer - Running by standalone Mode. 2022-02-10 17:56:50.440 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks. 2022-02-10 17:56:50.465 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to 2000000. 2022-02-10 17:56:50.467 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated. 2022-02-10 17:56:50.498 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started 2022-02-10 17:56:50.505 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql: [select * from t_sync_log ] jdbcUrl:[jdbc:mysql://127.0.0.1:3306/bm_wxcp?useSSL=false&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true]. 2022-02-10 17:56:50.572 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select * from t_sync_log ] jdbcUrl:[jdbc:mysql://127.0.0.1:3306/bm_wxcp?useSSL=false&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true]. 5941 cc01b87e-abd9-4c9d-9da7-2103987db5ae 8888 0 1 SYNC_ORG 2021-10-28 09:57:45 2021-10-28 09:57:46 true 1 0 false 杨林伟 杨林伟 2021-10-28 01:57:44 2021-10-28 01:57:46 5942 cc01b87e-abd9-4c9d-9da7-2103987db5ae 8888 1026081 1 SYNC_DEPT 2021-10-28 09:57:46 null false 1 0 false 杨林伟 杨林伟 2021-10-28 01:57:46 2021-10-28 01:57:46 2022-02-10 17:56:50.803 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[309]ms 2022-02-10 17:56:50.803 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks. 2022-02-10 17:57:00.482 [job-0] INFO StandAloneJobContainerCommunicator - Total 2 records, 191 bytes | Speed 19B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.076s | Percentage 100.00% 2022-02-10 17:57:00.482 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks. 2022-02-10 17:57:00.483 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] do post work. 2022-02-10 17:57:00.483 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do post work. 2022-02-10 17:57:00.484 [job-0] INFO JobContainer - DataX jobId [0] completed successfully. 2022-02-10 17:57:00.492 [job-0] INFO JobContainer - [total cpu info] => averageCpu | maxDeltaCpu | minDeltaCpu -1.00% | -1.00% | -1.00% [total gc info] => NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime G1 Young Generation | 1 | 1 | 1 | 0.009s | 0.009s | 0.009s G1 Old Generation | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s 2022-02-10 17:57:00.492 [job-0] INFO JobContainer - PerfTrace not enable! 2022-02-10 17:57:00.493 [job-0] INFO StandAloneJobContainerCommunicator - Total 2 records, 191 bytes | Speed 19B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.076s | Percentage 100.00% 2022-02-10 17:57:00.495 [job-0] INFO JobContainer - 任务启动时刻 : 2022-02-10 17:56:49 任务结束时刻 : 2022-02-10 17:57:00 任务总计耗时 : 10s 任务平均流量 : 19B/s 记录写入速度 : 0rec/s 读出记录总数 : 2 读写失败总数 : 0
03 config目录文件解析
config目录主要有以下两个文件:
- core.json
- logback.xml
对于做Javak开发的同学来说,logback.xml
已经是很常见的,主要就是控制打印的级别,以及控制打印日志的格式等,这里不再描述。
这里主要讲解core.json。
3.1 core.json
core.json文件里面的内容如下:
{ "entry": { "jvm": "-Xms1G -Xmx1G", "environment": {} }, "common": { "column": { "datetimeFormat": "yyyy-MM-dd HH:mm:ss", "timeFormat": "HH:mm:ss", "dateFormat": "yyyy-MM-dd", "extraFormats":["yyyyMMdd"], "timeZone": "GMT+8", "encoding": "utf-8" } }, "core": { "dataXServer": { "address": "http://localhost:7001/api", "timeout": 10000, "reportDataxLog": false, "reportPerfLog": false }, "transport": { "channel": { "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel", "speed": { "byte": 2000000, "record": -1 }, "flowControlInterval": 20, "capacity": 512, "byteCapacity": 67108864 }, "exchanger": { "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger", "bufferSize": 32 } }, "container": { "job": { "reportInterval": 10000 }, "taskGroup": { "channel": 5 }, "trace": { "enable": "false" } }, "statistics": { "collector": { "plugin": { "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector", "maxDirtyNumber": 10 } } } } }
这些内容主要对应的是CoreConstant类里面的core.*
变量:
由于篇幅以及时间的缘故,后续再补充每个配置的含义。
04 job目录文件解析
job目录只有一个job.json
文件,也是示例文件,具体的内容如下:
{ "job": { "setting": { "speed": { "byte":10485760 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "streamreader", "parameter": { "column" : [ { "value": "DataX", "type": "string" }, { "value": 日期xxxx, "type": "long" }, { "value": "日期xxxx", "type": "date" }, { "value": true, "type": "bool" }, { "value": "test", "type": "bytes" } ], "sliceRecordCount": 100000 } }, "writer": { "name": "streamwriter", "parameter": { "print": false, "encoding": "UTF-8" } } } ] } }
其实在源码里面,每个reader
或writer
插件已经有doc
文档了,这里写的肯定没有官网好,大家可以克隆源码下来看看:
05 文末
本文是经过阅读源码整理出来的,有疑问的童鞋欢迎留言,本文完!