阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!(2)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用版 2核4GB 50GB
简介: 阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!

3.安装 MySQL 数据库

分别在两台主机上安装:

[root@MySQL-1 ~]# yum -y install mariadb mariadb-server mariadb-libs mariadb-devel
[root@MySQL-1 ~]# systemctl start mariadb                        # 安装 MariaDB 数据库
[root@MySQL-1 ~]# mysql_secure_install                                  # 初始化
NOTE: RUNNING ALL PARTS OF THIS SCRIPT IS RECOMMENDED FOR ALL MariaDB
      SERVERS IN PRODUCTION USE!  PLEASE READ EACH STEP CAREFULLY!
Enter current password for root (enter for none):                     # 直接回车
OK, successfully used password, moving on...
Set root password? [Y/n] y                                        # 配置 root 密码
New password: 123123
Re-enter new password: 123123
Password updated successfully!
Reloading privilege tables..
 ... Success!
Remove anonymous users? [Y/n] y                                   # 移除匿名用户
 ... skipping.
Disallow root login remotely? [Y/n] n                           # 允许 root 远程登录
 ... skipping.
Remove test database and access to it? [Y/n] y                         # 移除测试数据库
 ... skipping.
Reload privilege tables now? [Y/n] y                                 # 重新加载表
 ... Success!



)准备同步数据(要同步的两台主机都要有这个表)


MariaDB [(none)]> create database `course-study`;
Query OK, 1 row affected (0.00 sec)
MariaDB [(none)]> create table `course-study`.t_member(ID int,Name varchar(20),Email varchar(30));
Query OK, 0 rows affected (0.00 sec)

image.png


因为是使用 DataX 程序进行同步的,所以需要在双方的数据库上开放权限:


grant all privileges on *.* to root@'%' identified by '123123';

flush privileges;


2)创建存储过程:


DELIMITER $$
CREATE PROCEDURE test()
BEGIN
declare A int default 1;
while (A < 3000000)do
insert into `course-study`.t_member values(A,concat("LiSa",A),concat("LiSa",A,"@163.com"));
set A = A + 1;
END while;
END $$
DELIMITER ;



3)调用存储过程(在数据源配置,验证同步使用):


call test();

4.通过 DataX 实 MySQL 数据同步

1)生成 MySQL 到 MySQL 同步的模板:

[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py -r mysqlreader -w mysqlwriter
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",                # 读取端
                    "parameter": {
                        "column": [],                     # 需要同步的列 (* 表示所有的列)
                        "connection": [
                            {
                                "jdbcUrl": [],                 # 连接信息
                                "table": []                # 连接表
                            }
                        ],
                        "password": "",                 # 连接用户
                        "username": "",                 # 连接密码
                        "where": ""                    # 描述筛选条件
                    }
                },
                "writer": {
                    "name": "mysqlwriter",                # 写入端
                    "parameter": {
                        "column": [],                         # 需要同步的列
                        "connection": [
                            {
                                "jdbcUrl": "",                     # 连接信息
                                "table": []                # 连接表
                            }
                        ],
                        "password": "",                 # 连接密码
                        "preSql": [],                     # 同步前. 要做的事
                        "session": [],
                        "username": "",                    # 连接用户
                        "writeMode": ""                    # 操作类型
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""                        # 指定并发数
            }
        }
    }
}

2)编写 json 文件:

[root@MySQL-1 ~]# vim install.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123123",
                        "column": ["*"],
                        "splitPk": "ID",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"
                                ],
                                "table": ["t_member"]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": ["*"],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8",
                                "table": ["t_member"]
                            }
                        ],
                        "password": "123123",
                        "preSql": [
                            "truncate t_member"
                        ],
                        "session": [
                            "set session sql_mode='ANSI'"
                        ],
                        "username": "root",
                        "writeMode": "insert"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}

3)验证

[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py install.json

输出:

2021-12-15 16:45:15.120 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-12-15 16:45:15.120 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2999999 records, 107666651 bytes | Speed 2.57MB/s, 74999 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 82.173s |  All Task WaitReaderTime 75.722s | Percentage 100.00%
2021-12-15 16:45:15.124 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-12-15 16:44:32
任务结束时刻                    : 2021-12-15 16:45:15
任务总计耗时                    :                 42s
任务平均流量                    :            2.57MB/s
记录写入速度                    :          74999rec/s
读出记录总数                    :             2999999
读写失败总数                    :                   0


你们可以在目的数据库进行查看,是否同步完成。


image.png


上面的方式相当于是完全同步,但是当数据量较大时,同步的时候被中断,是件很痛苦的事情;

所以在有些情况下,增量同步还是蛮重要的。

5.使用 DataX 进行增量同步

使用 DataX 进行全量同步和增量同步的唯一区别就是:增量同步需要使用 where 进行条件筛选。


即,同步筛选后的 SQL。


1)编写 json 文件:

[root@MySQL-1 ~]# vim where.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123123",
                        "column": ["*"],
                        "splitPk": "ID",
                        "where": "ID <= 1888",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"
                                ],
                                "table": ["t_member"]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": ["*"],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8",
                                "table": ["t_member"]
                            }
                        ],
                        "password": "123123",
                        "preSql": [
                            "truncate t_member"
                        ],
                        "session": [
                            "set session sql_mode='ANSI'"
                        ],
                        "username": "root",
                        "writeMode": "insert"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}




  • 需要注意的部分就是:where(条件筛选) 和 preSql(同步前,要做的事) 参数。

2)验证:

[root@MySQL-1 ~]# python /usr/local/data/bin/data.py where.json


输出:


2021-12-16 17:34:38.534 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-12-16 17:34:38.534 [job-0] INFO  StandAloneJobContainerCommunicator - Total 1888 records, 49543 bytes | Speed 1.61KB/s, 62 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.002s |  All Task WaitReaderTime 100.570s | Percentage 100.00%
2021-12-16 17:34:38.537 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-12-16 17:34:06
任务结束时刻                    : 2021-12-16 17:34:38
任务总计耗时                    :                 32s
任务平均流量                    :            1.61KB/s
记录写入速度                    :             62rec/s
读出记录总数                    :                1888
读写失败总数                    :                   0


目标数据库上查看:


image.png


3)基于上面数据,再次进行增量同步:


主要是 where 配置:"where": "ID > 1888 AND ID <= 2888"(通过条件筛选来进行增量同步)

同时需要将我上面的 preSql 删除 (因为我上面做的操作是 truncate 表)


image.png

相关文章
|
1月前
|
SQL 存储 关系型数据库
DataX - 全量数据同步工具(2)
DataX - 全量数据同步工具
|
25天前
|
SQL Oracle 关系型数据库
多环境数据同步(Navicat工具)
多环境数据同步(Navicat工具)
11 0
|
1月前
|
SQL 关系型数据库 MySQL
DataX - 全量数据同步工具(1)
DataX - 全量数据同步工具
|
1天前
|
SQL 关系型数据库 MySQL
探索MySQL的高效数据同步:并行复制原理
MySQL主从延迟由网络、从服务器性能和复制线程不足引起。解决方法包括优化网络、提升从服务器性能和采用并行复制。MySQL 5.6引入基于库的并行复制,5.7采用组提交(MTS),8.0则有基于WRITESET的并行复制,通过检测事务冲突允许更多并行,减少延迟。启用并行复制涉及调整如`binlog_transaction_dependency_tracking`和`transaction_write_set_extraction`等参数。
|
5天前
|
SQL 关系型数据库 MySQL
《揭秘MySQL主从复制:数据同步的幕后故事》
MySQL主从复制依赖binlog,从库通过I/O线程拉取主库的binlog并存入relay log,SQL线程再将relay log中的事件应用到数据。复制过程是异步的,确保从库能独立管理同步。三种复制方式:默认异步(可能丢失数据)、全同步(低性能)和半同步(折中)。半同步要求至少一个从库确认收到binlog事件。主从延迟指的是从库相对于主库的数据滞后。
|
2天前
|
分布式计算 关系型数据库 MySQL
MaxCompute产品使用合集之用flink mysql的数据同步到mc的Transaction Table2.0,时间会比mysql的时间多8小时,是什么导致的
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
1月前
|
Java 关系型数据库 流计算
实时计算 Flink版操作报错合集之配置cats进行从MySQL到StarRocks的数据同步任务时遇到报错,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
310 0
|
2月前
|
SQL Kubernetes 关系型数据库
实时计算 Flink版产品使用合集之如何实现MySQL单表数据同步到多个表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之使用 MySQL CDC 进行数据同步时,设置 server_id 参数如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
DataWorks Shell 对象存储
DataWorks产品使用合集之在 DataWorks 中,有一个 MySQL 数据表,数据量非常大且数据会不断更新将这些数据同步到 DataWorks如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
49 3