springboot整合datax实现数据同步

本文涉及的产品
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: springboot整合datax实现数据同步

1.源码下载

git clone git@github.com:alibaba/DataX.git

需要下载核心的包,core与common,在maven下进行安装到本地

mvn install:install-file -DgroupId=com.datax -DartifactId=datax-core -Dversion=1.0.0 -Dpackaging=jar -Dfile=datax-core-0.0.1-SNAPSHOT.jar

mvn install:install-file -DgroupId=com.datax -DartifactId=datax-common -Dversion=1.0.0 -Dpackaging=jar -Dfile=datax-common-0.0.1-SNAPSHOT.jar

2.mysql创建源表以及目标表

-- testdata.source_table definition

CREATE TABLE `source_table` (

 `id` int NOT NULL AUTO_INCREMENT,

 `name` varchar(100) DEFAULT NULL,

 `address` varchar(100) DEFAULT NULL,

 PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

-- testdata.target_table definition

CREATE TABLE `target_table` (

 `id` int NOT NULL DEFAULT '0',

 `name` varchar(100) DEFAULT NULL,

 `address` varchar(100) DEFAULT NULL

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

3.使用存储过程生成测试数据

///存储过程生成测试数据 100w  

create procedure geneData(in loop_times int)

begin

declare i int default 1;

while i <= loop_times do

 set @name = CONCAT('elite',i);

 set @address =CONCAT('xxx',i);

 INSERT INTO source_table(name,address)

 VALUES(@name,@address);

 set i=i+1;

end while;

end

3.创建springboot项目

1.引入datax的核心包

    <!--datax-->

       <dependency>

           <groupId>com.datax</groupId>

           <artifactId>datax-core</artifactId>

           <version>1.0.0</version>

       </dependency>

       <dependency>

           <groupId>com.datax</groupId>

           <artifactId>datax-common</artifactId>

           <version>1.0.0</version>

       </dependency>

2.job的配置

{

  "job": {

   "setting": {

     "speed": {

       "channel":2

     }

   },

   "content": [

     {

       "reader": {

         "name": "mysqlreader",

         "parameter": {

           "username": "username",

           "password": "password",

           "splitPk": "id",

           "column": ["id","name","address"],

           "connection": [

             {

               "jdbcUrl": ["jdbc:mysql://ip:3306/testdata?useUnicode=true&characterEncoding=UTF-8&useSSL=false"],

               "table": ["source_table"]

             }

           ]

         }

       },

       "writer": {

         "name": "mysqlwriter",

         "parameter": {

           "username": "username",

           "password": "password",

           "column": ["id","name","address"],

           "connection": [

             {

               "table": [

                 "target_table"

               ],

               "jdbcUrl": "jdbc:mysql://ip:3306/testdata?useUnicode=true&characterEncoding=UTF-8&useSSL=false"

             }

           ]

         }

       }

     }

   ]

 }

}

2.测试代码

  • 需要指定datax的home目录
  • 指定运行的参数

public class TestMain {

   public static String getCurrentClasspath(){

       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

       String currentClasspath = classLoader.getResource("").getPath();

       // 当前操作系统

       String osName = System.getProperty("os.name");

       if (osName.startsWith("Win")) {

           // 删除path中最前面的/

           currentClasspath = currentClasspath.substring(1, currentClasspath.length()-1);

       }

       return currentClasspath;

   }

   public static void main(String[] args) {

     

       String fileName = getCurrentClasspath()+ "/configuration/job.json";

       System.out.println(fileName);

       System.setProperty("datax.home","D:\\devproject\\devcode\\code\\datax\\target\\datax\\datax");

       String[] datxArgs2 = {  "-job", getCurrentClasspath()+ "/configuration/job.json","-mode", "standalone", "-jobid", "-1"};

       try {

           Engine.entry(datxArgs2);

       } catch (Throwable e) {

           e.printStackTrace();

       }

   }

}

4.测试

4.1 不加splitpk的情况下

16:41:03.086 [job-0] INFO com.alibaba.datax.core.job.JobContainer -  

任务启动时刻                    : 2023-03-26 16:40:42

任务结束时刻                    : 2023-03-26 16:41:03

任务总计耗时                    :                 20s

任务平均流量                    :            1.22MB/s

记录写入速度                    :          50000rec/s

读出记录总数                    :             1000000

读写失败总数                    :                   0

4.1 加splitpk的情况下

任务启动时刻                    : 2023-03-26 16:48:26

任务结束时刻                    : 2023-03-26 16:48:37

任务总计耗时                    :                 10s

任务平均流量                    :            2.45MB/s

记录写入速度                    :         100000rec/s

读出记录总数                    :             1000000

读写失败总数                    :                   0


相关文章
|
3月前
|
SQL 分布式计算 Oracle
数据同步工具DataX的安装
数据同步工具DataX的安装
961 0
|
2月前
|
SQL 存储 关系型数据库
DataX - 全量数据同步工具(2)
DataX - 全量数据同步工具
|
6天前
|
关系型数据库 MySQL 大数据
DataX:数据同步的超音速英雄!阿里开源工具带你飞越数据传输的银河系,告别等待和故障的恐惧!快来见证这一数据工程的奇迹!
【8月更文挑战第13天】DataX是由阿里巴巴开源的一款专为大规模数据同步设计的工具,在数据工程领域展现强大竞争力。它采用插件化架构,支持多种数据源间的高效迁移。相较于Apache Sqoop和Flume,DataX通过并发写入和流处理实现了高性能同步,并简化了配置流程。DataX还支持故障恢复,能够在同步中断后继续执行,节省时间和资源。这些特性使其成为构建高效可靠数据同步方案的理想选择。
33 2
|
22天前
|
监控 数据挖掘 大数据
阿里云开源利器:DataX3.0——高效稳定的离线数据同步解决方案
对于需要集成多个数据源进行大数据分析的场景,DataX3.0同样提供了有力的支持。企业可以使用DataX将多个数据源的数据集成到一个统一的数据存储系统中,以便进行后续的数据分析和挖掘工作。这种集成能力有助于提升数据分析的效率和准确性,为企业决策提供有力支持。
|
22天前
|
分布式计算 关系型数据库 MySQL
MySQL超时参数优化与DataX高效数据同步实践
通过合理设置MySQL的超时参数,可以有效地提升数据库的稳定性和性能。而DataX作为一种高效的数据同步工具,可以帮助企业轻松实现不同数据源之间的数据迁移。无论是优化MySQL参数还是使用DataX进行数据同步,都需要根据具体的应用场景来进行细致的配置和测试,以达到最佳效果。
|
3月前
|
存储 监控 关系型数据库
DataX 概述、部署、数据同步运用示例
DataX是阿里巴巴开源的离线数据同步工具,支持多种数据源之间的高效传输。其特点是多数据源支持、可扩展性、灵活配置、高效传输、任务调度监控和活跃的开源社区支持。DataX通过Reader和Writer插件实现数据源的读取和写入,采用Framework+plugin架构。部署简单,解压即可用。示例展示了如何配置DataX同步MySQL到HDFS,并提供了速度和内存优化建议。此外,还解决了NULL值同步问题及配置文件变量传参的方法。
|
2月前
|
SQL 关系型数据库 MySQL
DataX - 全量数据同步工具(1)
DataX - 全量数据同步工具
|
3月前
|
SQL JSON DataWorks
DataWorks产品使用合集之DataWorks 数据集成任务中,将数据同步到 Elasticsearch(ES)中,并指定 NESTED 字段中的 properties 类型如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
49 0
|
1月前
|
SQL DataWorks 关系型数据库
DataWorks产品使用合集之数据集成时源头提供数据库自定义函数调用返回数据,数据源端是否可以写自定义SQL实现
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
数据采集 DataWorks 安全
DataWorks产品使用合集之选择独享调度,数据集成里可以使用,但是数据地图里面测试无法通过,是什么原因导致的
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
29 0
DataWorks产品使用合集之选择独享调度,数据集成里可以使用,但是数据地图里面测试无法通过,是什么原因导致的

热门文章

最新文章