阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!下

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!下

3.安装 MySQL 数据库

分别在两台主机上安装:

[root@MySQL-1 ~]# yum -y install mariadb mariadb-server mariadb-libs mariadb-devel   
[root@MySQL-1 ~]# systemctl start mariadb                       # 安装 MariaDB 数据库
[root@MySQL-1 ~]# mysql_secure_installation                       # 初始化 
NOTE: RUNNING ALL PARTS OF THIS SCRIPT IS RECOMMENDED FOR ALL MariaDB
      SERVERS IN PRODUCTION USE!  PLEASE READ EACH STEP CAREFULLY!
Enter current password for root (enter for none):       # 直接回车
OK, successfully used password, moving on...
Set root password? [Y/n] y                            # 配置 root 密码
New password: 
Re-enter new password: 
Password updated successfully!
Reloading privilege tables..
 ... Success!
Remove anonymous users? [Y/n] y                       # 移除匿名用户
 ... skipping.
Disallow root login remotely? [Y/n] n                 # 允许 root 远程登录
 ... skipping.
Remove test database and access to it? [Y/n] y          # 移除测试数据库
 ... skipping.
Reload privilege tables now? [Y/n] y                    # 重新加载表
 ... Success!

1)准备同步数据(要同步的两台主机都要有这个表)

MariaDB [(none)]> create database `course-study`;
Query OK, 1 row affected (0.00 sec)
MariaDB [(none)]> create table `course-study`.t_member(ID int,Name varchar(20),Email varchar(30));
Query OK, 0 rows affected (0.00 sec)

因为是使用 DataX 程序进行同步的,所以需要在双方的数据库上开放权限:

grant all privileges on *.* to root@'%' identified by '123123';
flush privileges;

2)创建存储过程:

DELIMITER $$
CREATE PROCEDURE test()
BEGIN
declare A int default 1;
while (A < 3000000)do
insert into `course-study`.t_member values(A,concat("LiSa",A),concat("LiSa",A,"@163.com"));
set A = A + 1;
END while;
END $$
DELIMITER ;

3)调用存储过程(在数据源配置,验证同步使用):

call test();

4.通过 DataX 实 MySQL 数据同步

1)生成 MySQL 到 MySQL 同步的模板:

[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py -r mysqlreader -w mysqlwriter
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",              # 读取端
                    "parameter": {
                        "column": [],                 # 需要同步的列 (* 表示所有的列)
                        "connection": [
                            {
                                "jdbcUrl": [],            # 连接信息
                                "table": []             # 连接表
                            }
                        ], 
                        "password": "",               # 连接用户
                        "username": "",               # 连接密码
                        "where": ""                 # 描述筛选条件
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter",              # 写入端
                    "parameter": {
                        "column": [],                 # 需要同步的列
                        "connection": [
                            {
                                "jdbcUrl": "",            # 连接信息
                                "table": []             # 连接表
                            }
                        ], 
                        "password": "",               # 连接密码
                        "preSql": [],                 # 同步前. 要做的事
                        "session": [], 
                        "username": "",               # 连接用户 
                        "writeMode": ""               # 操作类型
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""                   # 指定并发数
            }
        }
    }
}

2)编写 json 文件:

[root@MySQL-1 ~]# vim install.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "username": "root",
                        "password": "123123",
                        "column": ["*"],
                        "splitPk": "ID",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"
                                ], 
                                "table": ["t_member"]
                            }
                        ]
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": ["*"], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8",
                                "table": ["t_member"]
                            }
                        ], 
                        "password": "123123",
                        "preSql": [
                            "truncate t_member"
                        ], 
                        "session": [
                            "set session sql_mode='ANSI'"
                        ], 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}

3)验证

[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py install.json

输出:

2021-12-15 16:45:15.120 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-12-15 16:45:15.120 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2999999 records, 107666651 bytes | Speed 2.57MB/s, 74999 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 82.173s |  All Task WaitReaderTime 75.722s | Percentage 100.00%
2021-12-15 16:45:15.124 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2021-12-15 16:44:32
任务结束时刻                    : 2021-12-15 16:45:15
任务总计耗时                    :                 42s
任务平均流量                    :            2.57MB/s
记录写入速度                    :          74999rec/s
读出记录总数                    :             2999999
读写失败总数                    :                   0

你们可以在目的数据库进行查看,是否同步完成。

image.png

  • 上面的方式相当于是完全同步,但是当数据量较大时,同步的时候被中断,是件很痛苦的事情;
  • 所以在有些情况下,增量同步还是蛮重要的。

5.使用 DataX 进行增量同步

使用 DataX 进行全量同步和增量同步的唯一区别就是:增量同步需要使用where进行条件筛选。 (即,同步筛选后的 SQL)


1)编写 json 文件:

[root@MySQL-1 ~]# vim where.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "username": "root",
                        "password": "123123",
                        "column": ["*"],
                        "splitPk": "ID",
                        "where": "ID <= 1888",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"
                                ], 
                                "table": ["t_member"]
                            }
                        ]
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": ["*"], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8",
                                "table": ["t_member"]
                            }
                        ], 
                        "password": "123123",
                        "preSql": [
                            "truncate t_member"
                        ], 
                        "session": [
                            "set session sql_mode='ANSI'"
                        ], 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}
  • 需要注意的部分就是:where(条件筛选) 和 preSql(同步前,要做的事) 参数。

2)验证:

[root@MySQL-1 ~]# python /usr/local/data/bin/data.py where.json

输出:

2021-12-16 17:34:38.534 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-12-16 17:34:38.534 [job-0] INFO  StandAloneJobContainerCommunicator - Total 1888 records, 49543 bytes | Speed 1.61KB/s, 62 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.002s |  All Task WaitReaderTime 100.570s | Percentage 100.00%
2021-12-16 17:34:38.537 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2021-12-16 17:34:06
任务结束时刻                    : 2021-12-16 17:34:38
任务总计耗时                    :                 32s
任务平均流量                    :            1.61KB/s
记录写入速度                    :             62rec/s
读出记录总数                    :                1888
读写失败总数                    :                   0

目标数据库上查看:

3)基于上面数据,再次进行增量同步:

主要是 where 配置:"where": "ID > 1888 AND ID <= 2888"  
同时需要将我上面的 preSql 删除(因为我上面做的操作时 truncate 表)




相关文章
|
5月前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
1004 4
|
6月前
|
分布式计算 DataWorks 关系型数据库
MaxCompute 生态系统中的数据集成工具
【8月更文第31天】在大数据时代,数据集成对于构建高效的数据处理流水线至关重要。阿里云的 MaxCompute 是一个用于处理大规模数据集的服务平台,它提供了强大的计算能力和丰富的生态系统工具来帮助用户管理和处理数据。本文将详细介绍如何使用 DataWorks 这样的工具将 MaxCompute 整合到整个数据处理流程中,以便更有效地管理数据生命周期。
203 0
|
6月前
|
运维 监控 Unix
运维必看,Linux 远程数据同步工具详解。
运维必看,Linux 远程数据同步工具详解。
|
6月前
|
关系型数据库 MySQL 大数据
DataX:数据同步的超音速英雄!阿里开源工具带你飞越数据传输的银河系,告别等待和故障的恐惧!快来见证这一数据工程的奇迹!
【8月更文挑战第13天】DataX是由阿里巴巴开源的一款专为大规模数据同步设计的工具,在数据工程领域展现强大竞争力。它采用插件化架构,支持多种数据源间的高效迁移。相较于Apache Sqoop和Flume,DataX通过并发写入和流处理实现了高性能同步,并简化了配置流程。DataX还支持故障恢复,能够在同步中断后继续执行,节省时间和资源。这些特性使其成为构建高效可靠数据同步方案的理想选择。
474 2
|
6月前
|
Java 关系型数据库 DataX
DATAX数据同步
DATAX数据同步
755 0
|
1月前
|
DataWorks 关系型数据库 Serverless
DataWorks数据集成同步至Hologres能力介绍
本次分享的主题是DataWorks数据集成同步至Hologres能力,由计算平台的产品经理喆别(王喆)分享。介绍DataWorks将数据集成并同步到Hologres的能力。DataWorks数据集成是一款低成本、高效率、全场景覆盖的产品。当我们面向数据库级别,向Hologres进行同步时,能够实现简单且快速的同步设置。目前仅需配置一个任务,就能迅速地将一个数据库实例内的所有库表一并传输到Hologres中。
58 12
|
6月前
|
数据采集 DataWorks 监控
DataWorks产品使用合集之数据集成并发数不支持批量修改,该怎么办
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
数据采集 DataWorks 数据管理
DataWorks不是Excel,它是一个数据集成和数据管理平台
【10月更文挑战第10天】随着大数据技术的发展,企业对数据处理的需求日益增长。阿里云推出的DataWorks是一款强大的数据集成和管理平台,提供从数据采集、清洗、加工到应用的一站式解决方案。本文通过电商平台案例,详细介绍了DataWorks的核心功能和优势,展示了如何高效处理大规模数据,帮助企业挖掘数据价值。
187 1
|
4月前
|
数据采集 SQL DataWorks
DataWorks不是Excel,它是一个数据集成和数据管理平台
【10月更文挑战第5天】本文通过一家电商平台的案例,详细介绍了阿里云DataWorks在数据处理全流程中的应用。从多源数据采集、清洗加工到分析可视化,DataWorks提供了强大的一站式解决方案,显著提升了数据分析效率和质量。通过具体SQL示例,展示了如何构建高效的数据处理流程,突显了DataWorks相较于传统工具如Excel的优势,为企业决策提供了有力支持。
165 3
|
5月前
|
存储 分布式计算 DataWorks
dataworks数据集成
dataworks数据集成
210 2