一、插件开发
模块命名规范
Reader插件名称格式: Xreader-Y
Writer插件名称格式: Xwriter-Y
其中:
X:数据源类型英⽂文名称小写 oraclereader、mysqlwriter
Y:厂商(版本) mysqlreader-5.7 hivewriter-cdh5.12.0
QuickStart
使用私服上的项目骨架进行开发
- DgroupId:新项⽬目的groupId 【保持与示例一致即可】
- Dpackage:新项⽬目的package 【保持与示例一致即可】
- Dversion:新项⽬目的version 【保持与示例一致即可】
- DarchetypeGroupId:模板⼯工程的GroupId 【保持与示例一致即可】
- DarchetypeVersion:模板⼯工程的Version 【保持与示例一致即可】
- DarchetypeArtifactId:模板⼯工程的ArtifactId
- DartifactId:新项⽬目的artifactId 建议命名方式为插件名称
jdbc类骨架
writer
mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxwriterplugin-archetype -DartifactId=
reader
mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxreaderplugin-archetype -DartifactId=
Hadoop类骨架
需要指定厂商和版本,添加参数-Dvendor,例如:
cdh5.12.0 -Dvendor=cdh5.12.0
cdh6.1.0 -Dvendor=cdh6.1.0
apache -Dvendor=apache
- writer
mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxhadoopwriterplugin-archetype -DartifactId= -Dvendor=
- reader
mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxhadoopreaderplugin-archetype -DartifactId= -Dvendor=
其他类骨架
其他的一些数据源,例如es,需要使用自身的api,通用实现无法适用,则使用common骨架
主要的区别在于mvn的依赖,只依赖了datax-common
- writer
mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxcommonwriterplugin-archetype -DartifactId=
- reader
mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxcommonreaderplugin-archetype -DartifactId=
其中骨架生成的目录结构如下
.
├── doc
│ └── README.md 插件文档
├── pom.xml
├── src
│ └── main
│ ├── assembly
│ │ └── package.xml 打包配置
│ ├── java
│ └── resources
│ └── plugin.json 插件描述
0.mvn依赖
- rdbms
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!--这里的connector依赖要改成相应数据源的-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.34</version>
</dependency>
- hadoop
<properties>
<!--替换成对应集群厂商的hadoop和hive的pom版本-->
<hadoop.version>2.6.0-cdh5.12.0</hadoop.version>
<hive.version>1.1.0-cdh5.12.0</hive.version>
</properties>
1.实现接口
插件的入口类必须扩展Reader
或Writer
抽象类,并且实现分别实现Job
和Task
两个内部抽象类,Job
和Task
的实现必须是 内部类 的形式。
骨架生成项目是一个mysql插件的实现
我们已经在CommonRdbmsReader和CommonRdbmsWriter做了通用实现
可以看到,MysqlReader通过CommonRdbmsReader只需要很少的代码就可以适配了
rdbms类型的数据源适配过程和mysql基本一致
大部分情况下,只要把DBType中的typeName和driverClassName改一下就好了
public class MysqlReader extends Reader {
/**
* 数据源类型
*/
private static final DBType DB_TYPE;
static {
/*
typeName:数据源类型名称(全小写)
driverClassName:数据源jdbc驱动类
*/
DB_TYPE = new DBType("mysql", "com.mysql.jdbc.Driver");
}
public static class Job extends Reader.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig = null;
private CommonRdbmsReader.Job commonRdbmsReaderJob;
/**
* Job对象初始化工作,测试可以通过super.getPluginJobConf()获取与本插件相关的配置。
* 读插件获得配置中reader部分,写插件获得writer部分。
*/
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);
if (userConfigedFetchSize != null) {
LOG.warn("对 mysqlreader 不需要配置 fetchSize, mysqlreader 将会忽略这项配置. 如果您不想再看到此警告,请去除fetchSize 配置.");
}
this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);
this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DB_TYPE);
this.commonRdbmsReaderJob.init(this.originalConfig);
}
/**
* 全局准备工作,比如odpswriter清空目标表。
*/
@Override
public void prepare() {
super.prepare();
}
/**
* 校验
*/
@Override
public void preCheck() {
this.commonRdbmsReaderJob.preCheck(this.originalConfig, DB_TYPE);
}
/**
* 拆分Task。
* 参数adviceNumber框架建议的拆分数,一般是运行时所配置的并发度。
* 值返回的是`Task`的配置列表。
*
* @param adviceNumber
* @return
*/
@Override
public List<Configuration> split(int adviceNumber) {
return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber);
}
/**
* 全局的后置工作,比如mysqlwriter同步完影子表后的rename操作。
*/
@Override
public void post() {
this.commonRdbmsReaderJob.post(this.originalConfig);
}
/**
* Job对象自身的销毁工作。
*/
@Override
public void destroy() {
this.commonRdbmsReaderJob.destroy(this.originalConfig);
}
}
public static class Task extends Reader.Task {
private Configuration readerSliceConfig;
private CommonRdbmsReader.Task commonRdbmsReaderTask;
/**
* Task对象的初始化。此时可以通过super.getPluginJobConf()获取与本Task相关的配置。
* 这里的配置是Job的split方法返回的配置列表中的其中一个。
*/
@Override
public void init() {
this.readerSliceConfig = super.getPluginJobConf();
this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DB_TYPE, super.getTaskGroupId(), super.getTaskId());
this.commonRdbmsReaderTask.init(this.readerSliceConfig);
}
/**
* 局部的准备工作。
*/
@Override
public void prepare() {
super.prepare();
}
/**
* 从数据源读数据,写入到RecordSender中。
* RecordSender会把数据写入连接Reader和Writer的缓存队列。
*
* @param recordSender
*/
@Override
public void startRead(RecordSender recordSender) {
int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);
this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
super.getTaskPluginCollector(), fetchSize);
}
/**
* 局部的后置工作。
*/
@Override
public void post() {
this.commonRdbmsReaderTask.post(this.readerSliceConfig);
}
/**
* Task对象自身的销毁工作。
*/
@Override
public void destroy() {
this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);
}
}
}
2.修改插件描述plugin.json
在每个插件的项目中,都有一个plugin.json
文件,这个文件定义了插件的相关信息,包括入口类。例如:
{
"name": "oraclereader",
"class": "com.alibaba.datax.plugin.writer.oraclereader.OracleReader",
"description": "插件用途",
"developer": "插件开发者",
"engineVersion": ""
}
name
: 插件名称,大小写敏感。框架根据用户在配置文件中指定的名称来搜寻插件。 十分重要 。class
: 入口类的全限定名称,框架通过反射穿件入口类的实例。十分重要 。description
: 描述信息。developer
: 开发人员。engineVersion
: 数据源版本,目前先不管
注意:插件的目录名字必须和plugin.json
中定义的插件名称一致。
3.打包发布
- 模块单独打包命令:
mvn clean package -DskipTests
编译好的内容存放在target/datax 下面
.
├── plugin
│ └── reader
│ └── common
│ └── mysqlreader
│ ├── libs
│ │ ├── mysqlreader-plugin-dependencies.jar
│ ├── mysqlreader-0.0.1-SNAPSHOT.jar
│ └── plugin.json
- 为了运维方便部署区分厂商版本,整体打包形成压缩包目录格式根据厂商版本区分、通用组件存放在common 文件目录下
-- plugin
-- reader
-- common(通用插件)
-- mysqlreader
-- sqlserverreader
-- ...
-- hdp (hdp厂商的插件)
-- hivereader
-- ...
-- cdh5.14.2(cdh5.14.2 的插件)
-- hivereader
-- ...
-- cdh6.1.0
-- hivereader
-- ...
-- writer
-- common(通用插件)
-- mysqlwriter
-- sqlserverwriter
-- ...
-- hdp (hdp厂商的插件)
-- hivewriter
-- ...
-- cdh5.14.2(cdh5.14.2 的插件)
-- hivewriter
-- ...
-- cdh6.1.0
-- hivewriter
-- ...
4.如何调试
0. python准备
如果没有python,需要先安装python2.7
,一定要2.7版本
1.copy插件
在datax-plugin-test
文件夹中已经准好了下面的目录结构
.
├── bin 可执行程序目录
├── conf 框架配置目录
├── job 任务配置文件,里面有stream.json和rdbms.json的两个配置文件样例
├── lib 框架依赖库目录
├── log 运行日志
└── plugin 插件目录
plugin目录分为`reader`和`writer`子目录,读写插件分别存放。插件目录规范如下:
${PLUGIN_HOME}/libs: 插件的依赖库。
${PLUGIN_HOME}/plugin-name.jar: 插件本身的jar。
${PLUGIN_HOME}/plugin.json: 插件描述文件。
可以在终端执行python datax.py ../job/stream.json
感受一下执行过程
把打包好的插件放到plugin对应目录下
在调试时推荐另一端使用mysql。比如要写一个reader的插件,那目的端使用mysqlwriter,要写一个writer的插件,源端使用mysqlreader
2.修改配置文件
修改job路径下rdbms.json,填写正确的jdbcUrl、column以及table信息
不要忘记修改使用的插件名reader.name
、writer.name
配置详细信息请看第三章配置文件
3.执行debug脚本
cd到bin目录下,执行
python datax.py ../job/mysql2mysql_config.json -d
执行脚本后会打印出远程调试端口
4.idea启动调试
设置ip和端口,本地调试的话ip就用localhost就行了,端口设置成终端打印出来的
打上断点,点击启动调试
5.写文档
一定要记得把你的成果用文档记录下来哦!!!
可以参照doc/README.md
二、如何使用Configuration
类
为了简化对json的操作,DataX
提供了简单的DSL配合Configuration
类使用。
Configuration
提供了常见的get
, 带类型get
,带默认值get
,set
等读写配置项的操作,以及clone
, toJSON
等方法。配置项读写操作都需要传入一个path
做为参数,这个path
就是DataX
定义的DSL。语法有两条:
- 子map用
.key
表示,path
的第一个点省略。 - 数组元素用
[index]
表示。
比如操作如下json:
{
"a": {
"b": {
"c": 2
},
"f": [
1,
2,
{
"g": true,
"h": false
},
4
]
},
"x": 4
}
比如调用configuration.get(path)
方法,当path为如下值的时候得到的结果为:
x
:4
a.b.c
:2
a.b.c.d
:null
a.b.f[0]
:1
a.b.f[2].g
:true
注意,因为插件看到的配置只是整个配置的一部分。使用Configuration
对象时,需要注意当前的根路径是什么。
三、配置文件
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/test"
],
"table": [
"`test`"
]
}
],
"modifyUserName": "shulan_admin",
"password": "root",
"username": "root"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"cleanRule": 1,
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test",
"table": [
"`test`"
]
}
],
"modifyUserName": "shulan_admin",
"password": "root",
"username": "root",
"writeMode": "replace"
}
}
}
],
"setting": {
"errorLimit": {
"record": 0
},
"speed": {
"channel": 5,
"throttle": false
}
}
}
}
- jdbcUrl
数据库的 JDBC 连接信息。作业运行时,DataX 会在你提供的 jdbcUrl 后面追加如下属性:
- username
数据库的用户名
- password
数据库的密码
- table
表名称,支持写入一个或者多个表;当配置为多张表时,必须确保所有表结构保持一致。
注意:table 和 jdbcUrl 必须包含在 connection 配置单元中
- column
表需要写入/读取数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。
下面是一些非必填的选项
- session
DataX在获取Mysql连接时,执行session指定的SQL语句,修改当前connection session属性
- preSql
写入数据到目的表前,会先执行这里的标准语句。比如你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:
"preSql":["delete from 表名"]
,效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称
- postSql
写入数据到目的表后,会执行这里的标准语句。(原理同 preSql )
- writeMode
控制写入数据到目标表采用
insert into
或者replace into
或者ON DUPLICATE KEY UPDATE
语句
所有选项:insert/replace/update。需要注意的是一些库可能不支持replace
默认值:insert
- batchSize
一次性批量提交的记录数大小,该值可以极大减少DataX与Mysql的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。
默认值:1024
四、脏数据处理
什么是脏数据?
目前主要有三类脏数据:
- Reader读到不支持的类型、不合法的值。
- 不支持的类型转换,比如:
Bytes
转换为Date
。 - 写入目标端失败,比如:写mysql整型长度超长。
如何处理脏数据
在Reader.Task
和Writer.Task
中,通过AbstractTaskPlugin.getPluginCollector()
可以拿到一个TaskPluginCollector
,它提供了一系列collectDirtyRecord
的方法。当脏数据出现时,只需要调用合适的collectDirtyRecord
方法,把被认为是脏数据的Record
传入即可。
用户可以在任务的配置中指定脏数据限制条数或者百分比限制,当脏数据超出限制时,框架会结束同步任务,退出。插件需要保证脏数据都被收集到,其他工作交给框架就好。
五、整体框架
逻辑执行模型
插件开发者不用关心太多,基本只需要关注特定系统读和写,以及自己的代码在逻辑上是怎样被执行的,哪一个方法是在什么时候被调用的。在此之前,需要明确以下概念:
Job
:Job
是DataX用以描述从一个源头到一个目的端的同步作业,是DataX数据同步的最小业务单元。比如:从一张mysql的表同步到odps的一个表的特定分区。Task
:Task
是为最大化而把Job
拆分得到的最小执行单元。比如:读一张有1024个分表的mysql分库分表的Job
,拆分成1024个读Task
,用若干个并发执行。TaskGroup
: 描述的是一组Task
集合。在同一个TaskGroupContainer
执行下的Task
集合称之为TaskGroup
JobContainer
:Job
执行器,负责Job
全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTrackerTaskGroupContainer
:TaskGroup
执行器,负责执行一组Task
的工作单元,类似Yarn中的TaskTracker。
简而言之, Job
拆分成Task
,在分别在框架提供的容器中执行,插件只需要实现Job
和Task
两部分逻辑。
物理执行模型
框架为插件提供物理上的执行能力(线程)。DataX
框架有三种运行模式:
Standalone
: 单进程运行,没有外部依赖。Local
: 单进程运行,统计信息、错误信息汇报到集中存储。Distrubuted
: 分布式多进程运行,依赖DataX Service
服务。
当然,上述三种模式对插件的编写而言没有什么区别,你只需要避开一些小错误,插件就能够在单机/分布式之间无缝切换了。
当JobContainer
和TaskGroupContainer
运行在同一个进程内时,就是单机模式(Standalone
和Local
);当它们分布在不同的进程中执行时,就是分布式(Distributed
)模式。
是不是很简单?
编程接口
那么,Job
和Task
的逻辑应是怎么对应到具体的代码中的?
首先,插件的入口类必须扩展Reader
或Writer
抽象类,并且实现分别实现Job
和Task
两个内部抽象类,Job
和Task
的实现必须是 内部类 的形式,原因见 加载原理 一节。以Reader为例:
public class SomeReader extends Reader {
public static class Job extends Reader.Job {
@Override
public void init() {
}
@Override
public void prepare() {
}
@Override
public List<Configuration> split(int adviceNumber) {
return null;
}
@Override
public void post() {
}
@Override
public void destroy() {
}
}
public static class Task extends Reader.Task {
@Override
public void init() {
}
@Override
public void prepare() {
}
@Override
public void startRead(RecordSender recordSender) {
}
@Override
public void post() {
}
@Override
public void destroy() {
}
}
}
Job
接口功能如下:
init
: Job对象初始化工作,测试可以通过super.getPluginJobConf()
获取与本插件相关的配置。读插件获得配置中reader
部分,写插件获得writer
部分。prepare
: 全局准备工作,比如odpswriter清空目标表。split
: 拆分Task
。参数adviceNumber
框架建议的拆分数,一般是运行时所配置的并发度。值返回的是Task
的配置列表。post
: 全局的后置工作,比如mysqlwriter同步完影子表后的rename操作。destroy
: Job对象自身的销毁工作。
Task
接口功能如下:
init
:Task对象的初始化。此时可以通过super.getPluginJobConf()
获取与本Task
相关的配置。这里的配置是Job
的split
方法返回的配置列表中的其中一个。prepare
:局部的准备工作。startRead
: 从数据源读数据,写入到RecordSender
中。RecordSender
会把数据写入连接Reader和Writer的缓存队列。startWrite
:从RecordReceiver
中读取数据,写入目标数据源。RecordReceiver
中的数据来自Reader和Writer之间的缓存队列。post
: 局部的后置工作。destroy
: Task象自身的销毁工作。
需要注意的是:
Job
和Task
之间一定不能有共享变量,因为分布式运行时不能保证共享变量会被正确初始化。两者之间只能通过配置文件进行依赖。prepare
和post
在Job
和Task
中都存在,插件需要根据实际情况确定在什么地方执行操作。
框架按照如下的顺序执行Job
和Task
的接口:
上图中,黄色表示Job
部分的执行阶段,蓝色表示Task
部分的执行阶段,绿色表示框架执行阶段。
相关类关系如下:
配置文件
DataX
使用json
作为配置文件的格式。一个典型的DataX
任务配置如下:
{
"job": {
"content": [
{
"reader": {
"name": "odpsreader",
"parameter": {
"accessKey": "",
"accessId": "",
"column": [""],
"isCompress": "",
"odpsServer": "",
"partition": [
""
],
"project": "",
"table": "",
"tunnelServer": ""
}
},
"writer": {
"name": "oraclewriter",
"parameter": {
"username": "",
"password": "",
"column": ["*"],
"connection": [
{
"jdbcUrl": "",
"table": [
""
]
}
]
}
}
}
]
}
}
DataX
框架有core.json
配置文件,指定了框架的默认行为。任务的配置里头可以指定框架中已经存在的配置项,而且具有更高的优先级,会覆盖core.json
中的默认值。
配置中job.content.reader.parameter
的value部分会传给Reader.Job
;job.content.writer.parameter
的value部分会传给Writer.Job
,Reader.Job
和Writer.Job
可以通过super.getPluginJobConf()
来获取。
DataX
框架支持对特定的配置项进行RSA加密,例子中以*
开头的项目便是加密后的值。 配置项加密解密过程对插件是透明,插件仍然以不带*
的key来查询配置和操作配置项 。
如何设计配置参数
配置文件的设计是插件开发的第一步!
任务配置中reader
和writer
下parameter
部分是插件的配置参数,插件的配置参数应当遵循以下原则:
- 驼峰命名:所有配置项采用驼峰命名法,首字母小写,单词首字母大写。
- 正交原则:配置项必须正交,功能没有重复,没有潜规则。
富类型:合理使用json的类型,减少无谓的处理逻辑,减少出错的可能。
- 使用正确的数据类型。比如,bool类型的值使用
true
/false
,而非"yes"
/"true"
/0
等。 - 合理使用集合类型,比如,用数组替代有分隔符的字符串。
- 使用正确的数据类型。比如,bool类型的值使用
- 类似通用:遵守同一类型的插件的习惯,比如关系型数据库的
connection
参数都是如下结构:
{
"connection": [
{
"table": [
"table_1",
"table_2"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/database_1",
"jdbc:mysql://127.0.0.2:3306/database_1_slave"
]
},
{
"table": [
"table_3",
"table_4"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.3:3306/database_2",
"jdbc:mysql://127.0.0.4:3306/database_2_slave"
]
}
]
}
插件数据传输
跟一般的生产者-消费者
模式一样,Reader
插件和Writer
插件之间也是通过channel
来实现数据的传输的。channel
可以是内存的,也可能是持久化的,插件不必关心。插件通过RecordSender
往channel
写入数据,通过RecordReceiver
从channel
读取数据。
channel
中的一条数据为一个Record
的对象,Record
中可以放多个Column
对象,这可以简单理解为数据库中的记录和列。
Record
有如下方法:
public interface Record {
// 加入一个列,放在最后的位置
void addColumn(Column column);
// 在指定下标处放置一个列
void setColumn(int i, final Column column);
// 获取一个列
Column getColumn(int i);
// 转换为json String
String toString();
// 获取总列数
int getColumnNumber();
// 计算整条记录在内存中占用的字节数
int getByteSize();
}
因为Record
是一个接口,Reader
插件首先调用RecordSender.createRecord()
创建一个Record
实例,然后把Column
一个个添加到Record
中。
Writer
插件调用RecordReceiver.getFromReader()
方法获取Record
,然后把Column
遍历出来,写入目标存储中。当Reader
尚未退出,传输还在进行时,如果暂时没有数据RecordReceiver.getFromReader()
方法会阻塞直到有数据。如果传输已经结束,会返回null
,Writer
插件可以据此判断是否结束startWrite
方法。
Column
的构造和操作,我们在《类型转换》一节介绍。
类型转换
为了规范源端和目的端类型转换操作,保证数据不失真,DataX支持六种内部数据类型:
Long
:定点数(Int、Short、Long、BigInteger等)。Double
:浮点数(Float、Double、BigDecimal(无限精度)等)。String
:字符串类型,底层不限长,使用通用字符集(Unicode)。Date
:日期类型。Bool
:布尔值。Bytes
:二进制,可以存放诸如MP3等非结构化数据。
对应地,有DateColumn
、LongColumn
、DoubleColumn
、BytesColumn
、StringColumn
和BoolColumn
六种Column
的实现。
Column
除了提供数据相关的方法外,还提供一系列以as
开头的数据类型转换转换方法。
DataX的内部类型在实现上会选用不同的java类型:
内部类型 | 实现类型 | 备注 |
---|---|---|
Date | java.util.Date | |
Long | java.math.BigInteger | 使用无限精度的大整数,保证不失真 |
Double | java.lang.String | 用String表示,保证不失真 |
Bytes | byte[] | |
String | java.lang.String | |
Bool | java.lang.Boolean |
类型之间相互转换的关系如下:
from\to | Date | Long | Double | Bytes | String | Bool |
---|---|---|---|---|---|---|
Date | - | 使用毫秒时间戳 | 不支持 | 不支持 | 使用系统配置的date/time/datetime格式转换 | 不支持 |
Long | 作为毫秒时间戳构造Date | - | BigInteger转为BigDecimal,然后BigDecimal.doubleValue() | 不支持 | BigInteger.toString() | 0为false,否则true |
Double | 不支持 | 内部String构造BigDecimal,然后BigDecimal.longValue() | - | 不支持 | 直接返回内部String | |
Bytes | 不支持 | 不支持 | 不支持 | - | 按照common.column.encoding 配置的编码转换为String,默认utf-8 |
不支持 |
String | 按照配置的date/time/datetime/extra格式解析 | 用String构造BigDecimal,然后取longValue() | 用String构造BigDecimal,然后取doubleValue(),会正确处理NaN /Infinity /-Infinity |
按照common.column.encoding 配置的编码转换为byte[],默认utf-8 |
- | "true"为true , "false"为false ,大小写不敏感。其他字符串不支持 |
Bool | 不支持 | true 为1L ,否则0L |
true 为1.0 ,否则0.0 |
不支持 | - |
加载原理
- 框架扫描
plugin/reader
和plugin/writer
目录,加载每个插件的plugin.json
文件。 - 以
plugin.json
文件中name
为key,索引所有的插件配置。如果发现重名的插件,框架会异常退出。 - 用户在插件中在
reader
/writer
配置的name
字段指定插件名字。框架根据插件的类型(reader
/writer
)和插件名称去插件的路径下扫描所有的jar,加入classpath
。 - 根据插件配置中定义的入口类,框架通过反射实例化对应的
Job
和Task
对象。