为什么说datax是目前最好的异构数据源数据交换工具

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 以前我做过一个项目,其中有个需求就是每天定时把sql server中的数据同步到Mysql中,当时写了一段Java的代码来实现,一套Java代码中需要写两个数据源的连接以及两套sql的代码,十分不方便。如果还要实现Oracle、Mysql、SqlServer的互相同步,那代码逻辑就更加复杂。而且通过代码的方式,同步600万条数据要花费2个多小时,性能效率十分低下。

听说微信搜索《Java鱼仔》会变更强哦!


本文收录于JavaStarter ,里面有我完整的Java系列文章,学习或面试都可以看看哦


(一)什么是Datax


以前我做过一个项目,其中有个需求就是每天定时把sql server中的数据同步到Mysql中,当时写了一段Java的代码来实现,一套Java代码中需要写两个数据源的连接以及两套sql的代码,十分不方便。如果还要实现Oracle、Mysql、SqlServer的互相同步,那代码逻辑就更加复杂。而且通过代码的方式,同步600万条数据要花费2个多小时,性能效率十分低下。


最近在工作中接触到了一个新的工具datax,才意识到数据同步原来还有这么简单的方式。


Datax是阿里巴巴开源的一个异构数据源离线同步工具,DataX 实现了包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。


简单来讲,datax就是可以把各个数据库之间的数据来回传输同步,并且操作起来只需要配置一下json文件就可以了。


目前Datax开源在github上:github.com/alibaba/Dat…


(二)Datax架构


网络异常,图片无法展示
|


Datax的架构采用FrameWork+plugin构建,其中:


Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework


Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端


Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲、流控、并发、数据转换等核心技术问题。


(三)Datax运行原理


网络异常,图片无法展示
|


Job:单个作业的管理节点,负责数据清理、子任务划分、TaskGroup监控管理


Task:由Job切分出来,是Datax的最小单元,每隔Task负责一部分数据的同步工作


Schedule:将Task组成TaskGroup,单个TaskGroup的并发数量为5. TaskGroup:负责启动Task


(四)DataX快速入门


datax的推荐系统为:


  • Linux


  • JDK(1.8以上,推荐1.8)


  • Python(推荐Python2.6.X)


  • Apache Maven 3.x (Compile DataX)


我这里就按照推荐系统进行操作。


首先我们将datax下载下来,datax的下载有两种方式,一种是直接下载压缩包,另外一种是下载源码自己手动编译,这里先展示下载压缩包的使用方式:


首先是下载datax的压缩包:datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.g…


下载下来后上传到linux服务器上,解压:


tar -zxvf datax.tar.gz

进入datax的bin目录,运行自检脚本

cd datax/bin/
python datax.py ../job/job.json 

运行结果如果是下面这样说明datax安装成功。


网络异常,图片无法展示
|


(五)datax控制台数据同步


datax的作用就是实现异构数据库之间的数据传输,并且应用起来还比较简单,只需要配置好对应的json模板,就可以对数据进行传输。


通过下面的命令,就可以拿到datax对应的json模板,比如我现在的reader是控制台数据,writer也是控制台数据:


python datax.py -r streamreader -w streamwriter

就拿到了对应的模板:

{
"job": {
"content": [
            {
"reader": {
"name": "streamreader", 
"parameter": {
"column": [], 
"sliceRecordCount": ""                    }
                }, 
"writer": {
"name": "streamwriter", 
"parameter": {
"encoding": "", 
"print": true                    }
                }
            }
        ], 
"setting": {
"speed": {
"channel": ""            }
        }
    }
}

我们简单配置一下看看效果,效果就是在控制台输出十遍hello,world,在job目录下新建一个文件叫stream2stream.json

{
"job": {
"content": [
            {
"reader": {
"name": "streamreader", 
"parameter": {
"column": [
                            {
"type":"string",
"value":"hello"                            },
                            {
"type":"string",
"value":"world"                            }
                        ], 
"sliceRecordCount": "10"                    }
                }, 
"writer": {
"name": "streamwriter", 
"parameter": {
"encoding": "UTF-8", 
"print": true                    }
                }
            }
        ], 
"setting": {
"speed": {
"channel": "1"            }
        }
    }
}

运行项目:

python datax.py ../job/stream2stream.json

查看效果


网络异常,图片无法展示
|


(六)datax mysql数据同步


因为本地只装了mysql,就直接用mysql演示数据同步,首先还是通过命令拿到基本配置模板:

python datax.py -r mysqlreader -w mysqlwriter

简单介绍一下模板:


column:表示reader或者writer中对应的列名


connection:填写连接信息


where:设置连接条件


具体的其他参数可以在官方文档中全部找到更详细说明

{
"job": {
"content": [
            {
"reader": {
"name": "mysqlreader", 
"parameter": {
"column": [],  #需要同步的列"connection": [ #连接信息                            {
"jdbcUrl": [], 
"table": []
                            }
                        ], 
"password": "",  #密码"username": "",  #用户名"where": ""#筛选条件                    }
                }, 
"writer": {
"name": "mysqlwriter", 
"parameter": {
"column": [],  #写入段的列名,与上面需要同步的值的位置保持一致"connection": [ #连接信息                            {
"jdbcUrl": "", 
"table": []
                            }
                        ], 
"password": "",  #密码"preSql": [],  #执行写入之前做的事情"session": [], #DataX在获取Mysql连接时,执行session指定的SQL语句,修改当前connectionsession属性"username": "",  #用户名"writeMode": ""#控制写入数据到目标表采用insertinto或者replaceinto或者ONDUPLICATEKEYUPDATE语句                    }
                }
            }
        ], 
"setting": {
"speed": {
"channel": ""            }
        }
    }
}

做个初始工作,在mysql中先建两张表,一张有数据,一张没有数据:

CREATETABLE `user`(`id` int(4)notnull auto_increment,`name` varchar(32)notnull,PRIMARY KEY(id))CREATETABLE `user2`(`id` int(4)notnull auto_increment,`name` varchar(32)notnull,PRIMARY KEY(id))INSERTINTO `user` VALUES(1,'javayz')INSERTINTO `user` VALUES(2,'java')

接下来配置mysql2mysql.json

{
"job": {
"content": [
            {
"reader": {
"name": "mysqlreader", 
"parameter": {
"column": [
"id",
"name"                        ], 
"connection": [
                            {
"jdbcUrl": ["jdbc:mysql://10.10.128.120:3306/test"], 
"table": ["user"]
                            }
                        ], 
"password": "123456", 
"username": "root"                    }
                }, 
"writer": {
"name": "mysqlwriter", 
"parameter": {
"column": [
"id",
"name"                        ], 
"connection": [
                            {
"jdbcUrl": "jdbc:mysql://10.10.128.120:3306/test", 
"table": ["user2"]
                            }
                        ], 
"password": "123456", 
"username": "root"                    }
                }
            }
        ], 
"setting": {
"speed": {
"channel": "1"            }
        }
    }
}

同样运行脚本:

python datax.py ../job/mysql2mysql.json

控制台输出成功之后,查看数据库,可以发现数据已经同步过去了。


(七)总结


上面展示了两种方式的数据同步,除此之外datax还支持大量的数据库,并且使用文档写的十分详细,大家有兴趣可以自己去尝试一下,十分有意思的工具。


网络异常,图片无法展示
|


我是鱼仔,我们下期再见!

相关文章
|
4月前
|
分布式计算 DataWorks 关系型数据库
MaxCompute 生态系统中的数据集成工具
【8月更文第31天】在大数据时代,数据集成对于构建高效的数据处理流水线至关重要。阿里云的 MaxCompute 是一个用于处理大规模数据集的服务平台,它提供了强大的计算能力和丰富的生态系统工具来帮助用户管理和处理数据。本文将详细介绍如何使用 DataWorks 这样的工具将 MaxCompute 整合到整个数据处理流程中,以便更有效地管理数据生命周期。
156 0
|
5月前
|
DataWorks API 数据库
DataWorks操作报错合集之在使用 OceanBase (OB) 作为数据源进行数据集成时遇到报错,该如何排查
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
4月前
|
关系型数据库 MySQL 大数据
DataX:数据同步的超音速英雄!阿里开源工具带你飞越数据传输的银河系,告别等待和故障的恐惧!快来见证这一数据工程的奇迹!
【8月更文挑战第13天】DataX是由阿里巴巴开源的一款专为大规模数据同步设计的工具,在数据工程领域展现强大竞争力。它采用插件化架构,支持多种数据源间的高效迁移。相较于Apache Sqoop和Flume,DataX通过并发写入和流处理实现了高性能同步,并简化了配置流程。DataX还支持故障恢复,能够在同步中断后继续执行,节省时间和资源。这些特性使其成为构建高效可靠数据同步方案的理想选择。
381 2
|
6月前
|
数据采集 DataWorks 安全
DataWorks产品使用合集之选择独享调度,数据集成里可以使用,但是数据地图里面测试无法通过,是什么原因导致的
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
59 0
DataWorks产品使用合集之选择独享调度,数据集成里可以使用,但是数据地图里面测试无法通过,是什么原因导致的
|
5月前
|
SQL DataWorks 关系型数据库
DataWorks产品使用合集之数据集成时源头提供数据库自定义函数调用返回数据,数据源端是否可以写自定义SQL实现
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
5月前
|
DataWorks 安全 API
DataWorks产品使用合集之是否可以不使用DataWorks进行EMR的调度和DataX数据导入
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
5月前
|
分布式计算 DataWorks 数据挖掘
DataWorks操作报错合集之上传数据时报错com.alibaba.datax.common.exception.DataXException: Code:[UnstructuredStorageReader-11],该如何排查
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
数据采集 分布式计算 大数据
MaxCompute产品使用合集之数据集成中进行数据抽取时,是否可以定义使用和源数据库一样的字符集进行抽取
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
4月前
|
数据采集 SQL DataWorks
【颠覆想象的数据巨匠】DataWorks——远超Excel的全能数据集成与管理平台:一场电商数据蜕变之旅的大揭秘!
【8月更文挑战第7天】随着大数据技术的发展,企业对数据处理的需求日益增长。DataWorks作为阿里云提供的数据集成与管理平台,为企业提供从数据采集、清洗、加工到应用的一站式解决方案。不同于桌面级工具如Excel,DataWorks具备强大的数据处理能力和丰富的功能集,支持大规模数据处理任务。本文通过电商平台案例,展示了如何使用DataWorks构建数据处理流程,包括多源数据接入、SQL任务实现数据采集、数据清洗加工以提高质量,以及利用分析工具挖掘数据价值的过程。这不仅凸显了DataWorks在大数据处理中的核心功能与优势,还展示了其相较于传统工具的高扩展性和灵活性。
152 0
|
6月前
|
分布式计算 DataWorks 调度
DataWorks操作报错合集之DataX访问MaxCompute(原ODPS)突然无法读取到字段数据,是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。

热门文章

最新文章