DataX插件开发

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 面向DataX插件开发人员,阐述开发一个DataX插件所经过的历程,消除开发者的困惑,让插件开发变得简单。

一、插件开发

模块命名规范

Reader插件名称格式: Xreader-Y

Writer插件名称格式: Xwriter-Y

其中:

X:数据源类型英⽂文名称小写   oraclereader、mysqlwriter
Y:厂商(版本)    mysqlreader-5.7    hivewriter-cdh5.12.0

QuickStart

使用私服上的项目骨架进行开发

  • DgroupId:新项⽬目的groupId 【保持与示例一致即可】
  • Dpackage:新项⽬目的package 【保持与示例一致即可】
  • Dversion:新项⽬目的version 【保持与示例一致即可】
  • DarchetypeGroupId:模板⼯工程的GroupId 【保持与示例一致即可】
  • DarchetypeVersion:模板⼯工程的Version 【保持与示例一致即可】
  • DarchetypeArtifactId:模板⼯工程的ArtifactId
  • DartifactId:新项⽬目的artifactId 建议命名方式为插件名称

jdbc类骨架

writer

mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxwriterplugin-archetype -DartifactId=

reader

mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxreaderplugin-archetype -DartifactId=

Hadoop类骨架

需要指定厂商和版本,添加参数-Dvendor,例如:
cdh5.12.0    -Dvendor=cdh5.12.0
cdh6.1.0      -Dvendor=cdh6.1.0
apache        -Dvendor=apache

  • writer
mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxhadoopwriterplugin-archetype -DartifactId= -Dvendor=
  • reader
mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxhadoopreaderplugin-archetype -DartifactId= -Dvendor=

其他类骨架

其他的一些数据源,例如es,需要使用自身的api,通用实现无法适用,则使用common骨架

主要的区别在于mvn的依赖,只依赖了datax-common

  • writer
mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxcommonwriterplugin-archetype -DartifactId=
  • reader
mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxcommonreaderplugin-archetype -DartifactId=

其中骨架生成的目录结构如下

.
├── doc
│   └── README.md             插件文档
├── pom.xml
├── src
│   └── main
│       ├── assembly
│       │   └── package.xml   打包配置
│       ├── java
│       └── resources
│           └── plugin.json   插件描述

0.mvn依赖

  • rdbms
<dependency>
  <groupId>com.alibaba.datax</groupId>
  <artifactId>plugin-rdbms-util</artifactId>
  <version>0.0.1-SNAPSHOT</version>
</dependency>
 <!--这里的connector依赖要改成相应数据源的-->
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.34</version>
</dependency>
  • hadoop
<properties>
         <!--替换成对应集群厂商的hadoop和hive的pom版本-->
    <hadoop.version>2.6.0-cdh5.12.0</hadoop.version>
    <hive.version>1.1.0-cdh5.12.0</hive.version>
</properties>

1.实现接口

插件的入口类必须扩展ReaderWriter抽象类,并且实现分别实现JobTask两个内部抽象类,JobTask的实现必须是 内部类 的形式。

骨架生成项目是一个mysql插件的实现

我们已经在CommonRdbmsReader和CommonRdbmsWriter做了通用实现
可以看到,MysqlReader通过CommonRdbmsReader只需要很少的代码就可以适配了
rdbms类型的数据源适配过程和mysql基本一致
大部分情况下,只要把DBType中的typeName和driverClassName改一下就好了

public class MysqlReader extends Reader {
    /**
     * 数据源类型
     */
    private static final DBType DB_TYPE;
    static {
          /*
          typeName:数据源类型名称(全小写)
          driverClassName:数据源jdbc驱动类
         */
        DB_TYPE = new DBType("mysql", "com.mysql.jdbc.Driver");
    }
    public static class Job extends Reader.Job {
        private static final Logger LOG = LoggerFactory.getLogger(Job.class);
        private Configuration originalConfig = null;
        private CommonRdbmsReader.Job commonRdbmsReaderJob;
        /**
         * Job对象初始化工作,测试可以通过super.getPluginJobConf()获取与本插件相关的配置。
         * 读插件获得配置中reader部分,写插件获得writer部分。
         */
        @Override
        public void init() {
            this.originalConfig = super.getPluginJobConf();
            Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);
            if (userConfigedFetchSize != null) {
                LOG.warn("对 mysqlreader 不需要配置 fetchSize, mysqlreader 将会忽略这项配置. 如果您不想再看到此警告,请去除fetchSize 配置.");
            }
            this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);
            this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DB_TYPE);
            this.commonRdbmsReaderJob.init(this.originalConfig);
        }
        /**
         * 全局准备工作,比如odpswriter清空目标表。
         */
        @Override
        public void prepare() {
            super.prepare();
        }
        /**
         * 校验
         */
        @Override
        public void preCheck() {
            this.commonRdbmsReaderJob.preCheck(this.originalConfig, DB_TYPE);
        }
        /**
         * 拆分Task。
         * 参数adviceNumber框架建议的拆分数,一般是运行时所配置的并发度。
         * 值返回的是`Task`的配置列表。
         *
         * @param adviceNumber
         * @return
         */
        @Override
        public List<Configuration> split(int adviceNumber) {
            return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber);
        }
        /**
         * 全局的后置工作,比如mysqlwriter同步完影子表后的rename操作。
         */
        @Override
        public void post() {
            this.commonRdbmsReaderJob.post(this.originalConfig);
        }
        /**
         * Job对象自身的销毁工作。
         */
        @Override
        public void destroy() {
            this.commonRdbmsReaderJob.destroy(this.originalConfig);
        }
    }
    public static class Task extends Reader.Task {
        private Configuration readerSliceConfig;
        private CommonRdbmsReader.Task commonRdbmsReaderTask;
        /**
         * Task对象的初始化。此时可以通过super.getPluginJobConf()获取与本Task相关的配置。
         * 这里的配置是Job的split方法返回的配置列表中的其中一个。
         */
        @Override
        public void init() {
            this.readerSliceConfig = super.getPluginJobConf();
            this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DB_TYPE, super.getTaskGroupId(), super.getTaskId());
            this.commonRdbmsReaderTask.init(this.readerSliceConfig);

        }
        /**
         * 局部的准备工作。
         */
        @Override
        public void prepare() {
            super.prepare();
        }
        /**
         * 从数据源读数据,写入到RecordSender中。
         * RecordSender会把数据写入连接Reader和Writer的缓存队列。
         *
         * @param recordSender
         */
        @Override
        public void startRead(RecordSender recordSender) {
            int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);
            this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
                    super.getTaskPluginCollector(), fetchSize);
        }
        /**
         * 局部的后置工作。
         */
        @Override
        public void post() {
            this.commonRdbmsReaderTask.post(this.readerSliceConfig);
        }
        /**
         * Task对象自身的销毁工作。
         */
        @Override
        public void destroy() {
            this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);
        }

    }
}

2.修改插件描述plugin.json

在每个插件的项目中,都有一个plugin.json文件,这个文件定义了插件的相关信息,包括入口类。例如:

{
    "name": "oraclereader",
    "class": "com.alibaba.datax.plugin.writer.oraclereader.OracleReader",
    "description": "插件用途",
    "developer": "插件开发者",
         "engineVersion": "" 
}
  • name: 插件名称,大小写敏感。框架根据用户在配置文件中指定的名称来搜寻插件。 十分重要
  • class: 入口类的全限定名称,框架通过反射穿件入口类的实例。十分重要
  • description: 描述信息。
  • developer: 开发人员。
  • engineVersion: 数据源版本,目前先不管

注意:插件的目录名字必须和plugin.json中定义的插件名称一致。

3.打包发布

  • 模块单独打包命令:
mvn clean package -DskipTests
编译好的内容存放在target/datax 下面
.
├── plugin
│   └── reader
│       └── common
│           └── mysqlreader
│               ├── libs
│               │   ├── mysqlreader-plugin-dependencies.jar
│               ├── mysqlreader-0.0.1-SNAPSHOT.jar
│               └── plugin.json
  • 为了运维方便部署区分厂商版本,整体打包形成压缩包目录格式根据厂商版本区分、通用组件存放在common 文件目录下
    -- plugin
        -- reader
            -- common(通用插件)
                -- mysqlreader
                -- sqlserverreader
                -- ...
            -- hdp (hdp厂商的插件)
                -- hivereader
                -- ...
            -- cdh5.14.2(cdh5.14.2 的插件)
                -- hivereader
                -- ...
            -- cdh6.1.0
                -- hivereader
                -- ...
    
        -- writer
            -- common(通用插件)
                -- mysqlwriter
                -- sqlserverwriter
                -- ...
            -- hdp (hdp厂商的插件)
                -- hivewriter
                -- ...
            -- cdh5.14.2(cdh5.14.2 的插件)
                -- hivewriter
                -- ...
            -- cdh6.1.0
                -- hivewriter
                -- ...

4.如何调试

0. python准备

如果没有python,需要先安装python2.7一定要2.7版本

1.copy插件

datax-plugin-test文件夹中已经准好了下面的目录结构

.
├── bin    可执行程序目录
├── conf   框架配置目录
├── job    任务配置文件,里面有stream.json和rdbms.json的两个配置文件样例
├── lib    框架依赖库目录
├── log    运行日志
└── plugin 插件目录

plugin目录分为`reader`和`writer`子目录,读写插件分别存放。插件目录规范如下:

${PLUGIN_HOME}/libs: 插件的依赖库。
${PLUGIN_HOME}/plugin-name.jar: 插件本身的jar。
${PLUGIN_HOME}/plugin.json: 插件描述文件。

可以在终端执行python datax.py ../job/stream.json
感受一下执行过程

把打包好的插件放到plugin对应目录下

在调试时推荐另一端使用mysql。比如要写一个reader的插件,那目的端使用mysqlwriter,要写一个writer的插件,源端使用mysqlreader

2.修改配置文件

修改job路径下rdbms.json,填写正确的jdbcUrl、column以及table信息

不要忘记修改使用的插件名reader.namewriter.name

配置详细信息请看第三章配置文件

image.png

3.执行debug脚本

cd到bin目录下,执行

python datax.py ../job/mysql2mysql_config.json -d

执行脚本后会打印出远程调试端口

image.png

4.idea启动调试

image.png

image.png

设置ip和端口,本地调试的话ip就用localhost就行了,端口设置成终端打印出来的

image.png

打上断点,点击启动调试

image.png

5.写文档

一定要记得把你的成果用文档记录下来哦!!!

可以参照doc/README.md

二、如何使用Configuration

为了简化对json的操作,DataX提供了简单的DSL配合Configuration类使用。

Configuration提供了常见的get, 带类型get带默认值getset等读写配置项的操作,以及clone, toJSON等方法。配置项读写操作都需要传入一个path做为参数,这个path就是DataX定义的DSL。语法有两条:

  1. 子map用.key表示,path的第一个点省略。
  2. 数组元素用[index]表示。

比如操作如下json:

{
  "a": {
    "b": {
      "c": 2
    },
    "f": [
      1,
      2,
      {
        "g": true,
        "h": false
      },
      4
    ]
  },
  "x": 4
}

比如调用configuration.get(path)方法,当path为如下值的时候得到的结果为:

  • x4
  • a.b.c2
  • a.b.c.dnull
  • a.b.f[0]1
  • a.b.f[2].gtrue

注意,因为插件看到的配置只是整个配置的一部分。使用Configuration对象时,需要注意当前的根路径是什么。

三、配置文件

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              "id",
              "name"
            ],
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:3306/test"
                ],
                "table": [
                  "`test`"
                ]
              }
            ],
            "modifyUserName": "shulan_admin",
            "password": "root",
            "username": "root"
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "cleanRule": 1,
            "column": [
              "id",
              "name"
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test",
                "table": [
                  "`test`"
                ]
              }
            ],
            "modifyUserName": "shulan_admin",
            "password": "root",
            "username": "root",
            "writeMode": "replace"
          }
        }
      }
    ],
    "setting": {
      "errorLimit": {
        "record": 0
      },
      "speed": {
        "channel": 5,
        "throttle": false
      }
    }
  }
}
  • jdbcUrl

数据库的 JDBC 连接信息。作业运行时,DataX 会在你提供的 jdbcUrl 后面追加如下属性:

  • username

数据库的用户名

  • password

数据库的密码

  • table

表名称,支持写入一个或者多个表;当配置为多张表时,必须确保所有表结构保持一致。

注意:table 和 jdbcUrl 必须包含在 connection 配置单元中
  • column

表需要写入/读取数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。

下面是一些非必填的选项

  • session

DataX在获取Mysql连接时,执行session指定的SQL语句,修改当前connection session属性

  • preSql

写入数据到目的表前,会先执行这里的标准语句。比如你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:"preSql":["delete from 表名"],效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称

  • postSql

写入数据到目的表后,会执行这里的标准语句。(原理同 preSql )

  • writeMode

控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句
所有选项:insert/replace/update。需要注意的是一些库可能不支持replace
默认值:insert

  • batchSize

一次性批量提交的记录数大小,该值可以极大减少DataX与Mysql的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。
默认值:1024

四、脏数据处理

  • 什么是脏数据?

目前主要有三类脏数据:

  1. Reader读到不支持的类型、不合法的值。
  2. 不支持的类型转换,比如:Bytes转换为Date
  3. 写入目标端失败,比如:写mysql整型长度超长。
  • 如何处理脏数据

Reader.TaskWriter.Task中,通过AbstractTaskPlugin.getPluginCollector()可以拿到一个TaskPluginCollector,它提供了一系列collectDirtyRecord的方法。当脏数据出现时,只需要调用合适的collectDirtyRecord方法,把被认为是脏数据的Record传入即可。

用户可以在任务的配置中指定脏数据限制条数或者百分比限制,当脏数据超出限制时,框架会结束同步任务,退出。插件需要保证脏数据都被收集到,其他工作交给框架就好。

五、整体框架

逻辑执行模型

插件开发者不用关心太多,基本只需要关注特定系统读和写,以及自己的代码在逻辑上是怎样被执行的,哪一个方法是在什么时候被调用的。在此之前,需要明确以下概念:

  • Job: Job是DataX用以描述从一个源头到一个目的端的同步作业,是DataX数据同步的最小业务单元。比如:从一张mysql的表同步到odps的一个表的特定分区。
  • Task: Task是为最大化而把Job拆分得到的最小执行单元。比如:读一张有1024个分表的mysql分库分表的Job,拆分成1024个读Task,用若干个并发执行。
  • TaskGroup:  描述的是一组Task集合。在同一个TaskGroupContainer执行下的Task集合称之为TaskGroup
  • JobContainer:  Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker
  • TaskGroupContainer: TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TaskTracker。

简而言之, Job拆分成Task,在分别在框架提供的容器中执行,插件只需要实现JobTask两部分逻辑

物理执行模型

框架为插件提供物理上的执行能力(线程)。DataX框架有三种运行模式:

  • Standalone: 单进程运行,没有外部依赖。
  • Local: 单进程运行,统计信息、错误信息汇报到集中存储。
  • Distrubuted: 分布式多进程运行,依赖DataX Service服务。

当然,上述三种模式对插件的编写而言没有什么区别,你只需要避开一些小错误,插件就能够在单机/分布式之间无缝切换了。

JobContainerTaskGroupContainer运行在同一个进程内时,就是单机模式(StandaloneLocal);当它们分布在不同的进程中执行时,就是分布式(Distributed)模式。

是不是很简单?

编程接口

那么,JobTask的逻辑应是怎么对应到具体的代码中的?

首先,插件的入口类必须扩展ReaderWriter抽象类,并且实现分别实现JobTask两个内部抽象类,JobTask的实现必须是 内部类 的形式,原因见 加载原理 一节。以Reader为例:

public class SomeReader extends Reader {
    public static class Job extends Reader.Job {

        @Override
        public void init() {
        }
      
      @Override
      public void prepare() {
        }

        @Override
        public List<Configuration> split(int adviceNumber) {
            return null;
        }

        @Override
        public void post() {
        }

        @Override
        public void destroy() {
        }

    }

    public static class Task extends Reader.Task {

        @Override
        public void init() {
        }
      
      @Override
      public void prepare() {
        }

        @Override
        public void startRead(RecordSender recordSender) {
        }

        @Override
        public void post() {
        }

        @Override
        public void destroy() {
        }
    }
}

Job接口功能如下:

  • init: Job对象初始化工作,测试可以通过super.getPluginJobConf()获取与本插件相关的配置。读插件获得配置中reader部分,写插件获得writer部分。
  • prepare: 全局准备工作,比如odpswriter清空目标表。
  • split: 拆分Task。参数adviceNumber框架建议的拆分数,一般是运行时所配置的并发度。值返回的是Task的配置列表。
  • post: 全局的后置工作,比如mysqlwriter同步完影子表后的rename操作。
  • destroy: Job对象自身的销毁工作。

Task接口功能如下:

  • init:Task对象的初始化。此时可以通过super.getPluginJobConf()获取与本Task相关的配置。这里的配置是Jobsplit方法返回的配置列表中的其中一个。
  • prepare:局部的准备工作。
  • startRead: 从数据源读数据,写入到RecordSender中。RecordSender会把数据写入连接Reader和Writer的缓存队列。
  • startWrite:从RecordReceiver中读取数据,写入目标数据源。RecordReceiver中的数据来自Reader和Writer之间的缓存队列。
  • post: 局部的后置工作。
  • destroy: Task象自身的销毁工作。

需要注意的是:

  • JobTask之间一定不能有共享变量,因为分布式运行时不能保证共享变量会被正确初始化。两者之间只能通过配置文件进行依赖。
  • preparepostJobTask中都存在,插件需要根据实际情况确定在什么地方执行操作。

框架按照如下的顺序执行JobTask的接口:

image.png

上图中,黄色表示Job部分的执行阶段,蓝色表示Task部分的执行阶段,绿色表示框架执行阶段。

相关类关系如下:

image.png

配置文件

DataX使用json作为配置文件的格式。一个典型的DataX任务配置如下:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "odpsreader",
          "parameter": {
            "accessKey": "",
            "accessId": "",
            "column": [""],
            "isCompress": "",
            "odpsServer": "",
            "partition": [
              ""
            ],
            "project": "",
            "table": "",
            "tunnelServer": ""
          }
        },
        "writer": {
          "name": "oraclewriter",
          "parameter": {
            "username": "",
            "password": "",
            "column": ["*"],
            "connection": [
              {
                "jdbcUrl": "",
                "table": [
                  ""
                ]
              }
            ]
          }
        }
      }
    ]
  }
}

DataX框架有core.json配置文件,指定了框架的默认行为。任务的配置里头可以指定框架中已经存在的配置项,而且具有更高的优先级,会覆盖core.json中的默认值。

配置中job.content.reader.parameter的value部分会传给Reader.Jobjob.content.writer.parameter的value部分会传给Writer.JobReader.JobWriter.Job可以通过super.getPluginJobConf()来获取。

DataX框架支持对特定的配置项进行RSA加密,例子中以*开头的项目便是加密后的值。 配置项加密解密过程对插件是透明,插件仍然以不带*的key来查询配置和操作配置项

如何设计配置参数

配置文件的设计是插件开发的第一步!

任务配置中readerwriterparameter部分是插件的配置参数,插件的配置参数应当遵循以下原则:

  • 驼峰命名:所有配置项采用驼峰命名法,首字母小写,单词首字母大写。
  • 正交原则:配置项必须正交,功能没有重复,没有潜规则。
  • 富类型:合理使用json的类型,减少无谓的处理逻辑,减少出错的可能。

    • 使用正确的数据类型。比如,bool类型的值使用true/false,而非"yes"/"true"/0等。
    • 合理使用集合类型,比如,用数组替代有分隔符的字符串。
  • 类似通用:遵守同一类型的插件的习惯,比如关系型数据库的connection参数都是如下结构:
{
  "connection": [
    {
      "table": [
        "table_1",
        "table_2"
      ],
      "jdbcUrl": [
        "jdbc:mysql://127.0.0.1:3306/database_1",
        "jdbc:mysql://127.0.0.2:3306/database_1_slave"
      ]
    },
    {
      "table": [
        "table_3",
        "table_4"
      ],
      "jdbcUrl": [
        "jdbc:mysql://127.0.0.3:3306/database_2",
        "jdbc:mysql://127.0.0.4:3306/database_2_slave"
      ]
    }
  ]
}

插件数据传输

跟一般的生产者-消费者模式一样,Reader插件和Writer插件之间也是通过channel来实现数据的传输的。channel可以是内存的,也可能是持久化的,插件不必关心。插件通过RecordSenderchannel写入数据,通过RecordReceiverchannel读取数据。

channel中的一条数据为一个Record的对象,Record中可以放多个Column对象,这可以简单理解为数据库中的记录和列。

Record有如下方法:

public interface Record {
    // 加入一个列,放在最后的位置
    void addColumn(Column column);
    // 在指定下标处放置一个列
    void setColumn(int i, final Column column);
    // 获取一个列
    Column getColumn(int i);
    // 转换为json String
    String toString();
    // 获取总列数
    int getColumnNumber();
    // 计算整条记录在内存中占用的字节数
    int getByteSize();
}

因为Record是一个接口,Reader插件首先调用RecordSender.createRecord()创建一个Record实例,然后把Column一个个添加到Record中。

Writer插件调用RecordReceiver.getFromReader()方法获取Record,然后把Column遍历出来,写入目标存储中。当Reader尚未退出,传输还在进行时,如果暂时没有数据RecordReceiver.getFromReader()方法会阻塞直到有数据。如果传输已经结束,会返回nullWriter插件可以据此判断是否结束startWrite方法。

Column的构造和操作,我们在《类型转换》一节介绍。

类型转换

为了规范源端和目的端类型转换操作,保证数据不失真,DataX支持六种内部数据类型:

  • Long:定点数(Int、Short、Long、BigInteger等)。
  • Double:浮点数(Float、Double、BigDecimal(无限精度)等)。
  • String:字符串类型,底层不限长,使用通用字符集(Unicode)。
  • Date:日期类型。
  • Bool:布尔值。
  • Bytes:二进制,可以存放诸如MP3等非结构化数据。

对应地,有DateColumnLongColumnDoubleColumnBytesColumnStringColumnBoolColumn六种Column的实现。

Column除了提供数据相关的方法外,还提供一系列以as开头的数据类型转换转换方法。

image.png

DataX的内部类型在实现上会选用不同的java类型:

内部类型 实现类型 备注
Date java.util.Date
Long java.math.BigInteger 使用无限精度的大整数,保证不失真
Double java.lang.String 用String表示,保证不失真
Bytes byte[]
String java.lang.String
Bool java.lang.Boolean

类型之间相互转换的关系如下:

from\to Date Long Double Bytes String Bool
Date - 使用毫秒时间戳 不支持 不支持 使用系统配置的date/time/datetime格式转换 不支持
Long 作为毫秒时间戳构造Date - BigInteger转为BigDecimal,然后BigDecimal.doubleValue() 不支持 BigInteger.toString() 0为false,否则true
Double 不支持 内部String构造BigDecimal,然后BigDecimal.longValue() - 不支持 直接返回内部String
Bytes 不支持 不支持 不支持 - 按照common.column.encoding配置的编码转换为String,默认utf-8 不支持
String 按照配置的date/time/datetime/extra格式解析 用String构造BigDecimal,然后取longValue() 用String构造BigDecimal,然后取doubleValue(),会正确处理NaN/Infinity/-Infinity 按照common.column.encoding配置的编码转换为byte[],默认utf-8 - "true"为true, "false"为false,大小写不敏感。其他字符串不支持
Bool 不支持 true1L,否则0L true1.0,否则0.0 不支持 -

加载原理

  1. 框架扫描plugin/readerplugin/writer目录,加载每个插件的plugin.json文件。
  2. plugin.json文件中name为key,索引所有的插件配置。如果发现重名的插件,框架会异常退出。
  3. 用户在插件中在reader/writer配置的name字段指定插件名字。框架根据插件的类型(reader/writer)和插件名称去插件的路径下扫描所有的jar,加入classpath
  4. 根据插件配置中定义的入口类,框架通过反射实例化对应的JobTask对象。
相关文章
|
DataX 数据格式 Java
DataX插件编写指南
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、HDFS、Hive、OceanBase、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。DataX采用了框架 + 插件 的模式,目前已开源,代码托管在github。
13409 1
|
5月前
|
DataWorks NoSQL fastjson
DataWorks操作报错合集之DataX进行MongoDB全量迁移的过程中,DataX的MongoDB Reader插件在初始化阶段找不到Fastjson 2.x版本的类库,该怎么办
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
DataWorks Java 调度
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
76 0
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
|
7月前
|
消息中间件 SQL 分布式计算
DataX插件开发-KafkaWriter
DataX插件开发-KafkaWriter
216 0
|
Java 关系型数据库 MySQL
DataX教程(10)- DataX插件热插拔原理
DataX教程(10)- DataX插件热插拔原理
611 0
|
Oracle Java 关系型数据库
聊聊 datax 的 OceanBase 数据同步插件 ||批处理参数 rewriteBatchedStatements=true&useCursorFetch=true
聊聊 datax 的 OceanBase 数据同步插件 分析下批处理参数 rewriteBatchedStatements=true&useCursorFetch=true 对大规模数据读写的性能影响
聊聊 datax 的 OceanBase 数据同步插件 ||批处理参数 rewriteBatchedStatements=true&useCursorFetch=true
|
4月前
|
数据采集 DataWorks 监控
DataWorks产品使用合集之数据集成并发数不支持批量修改,该怎么办
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
数据采集 DataWorks 数据管理
DataWorks不是Excel,它是一个数据集成和数据管理平台
【10月更文挑战第10天】随着大数据技术的发展,企业对数据处理的需求日益增长。阿里云推出的DataWorks是一款强大的数据集成和管理平台,提供从数据采集、清洗、加工到应用的一站式解决方案。本文通过电商平台案例,详细介绍了DataWorks的核心功能和优势,展示了如何高效处理大规模数据,帮助企业挖掘数据价值。
127 1
|
2月前
|
数据采集 SQL DataWorks
DataWorks不是Excel,它是一个数据集成和数据管理平台
【10月更文挑战第5天】本文通过一家电商平台的案例,详细介绍了阿里云DataWorks在数据处理全流程中的应用。从多源数据采集、清洗加工到分析可视化,DataWorks提供了强大的一站式解决方案,显著提升了数据分析效率和质量。通过具体SQL示例,展示了如何构建高效的数据处理流程,突显了DataWorks相较于传统工具如Excel的优势,为企业决策提供了有力支持。
112 3
|
3月前
|
存储 分布式计算 DataWorks
dataworks数据集成
dataworks数据集成
137 1

热门文章

最新文章