Maxwell - 增量数据同步工具(1)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB PostgreSQL 版,企业版 4核16GB
推荐场景:
HTAP混合负载
简介: Maxwell - 增量数据同步工具

前言

       今天来学习一个新的大数据小工具 Maxwell ,它和 Sqoop 很像。Sqoop主要用于在 Hadoop (比如 HDFS、Hive、HBase 等)和关系型数据库之间进行数据的批量导入和导出,而 Maxwell 则主要用于监控数据库的变化(通过监控 binlog ),并将变化的数据以JSON格式发布到消息队列(一般是 Kafka)或 Redis 中。

       Sqoop 一般都是在特定时间(比如用 Azkaban 调度在每天0点进行作业),可以用于离线数仓的数据同步问题;但是对于实时数据分析,Sqoop 并不方便。所以这里就引出了 Maxwell 这个工具,它的特点包括轻量级、能够捕获完整的历史数据(通过bootstrap功能)以及支持断点还原(即在错误解决后可以从上次的位置继续读取数据)。然而,Maxwell只支持 json 格式,并且不能直接支持HA(高可用性)。

       这里说起 Sqoop 我又想到了 DataX ,它也是一款全量数据采集工具,和 Sqoop 很相似,但是还是有所区别:

  • Sqoop是基于Hadoop生态系统的,充分利用了MapReduce计算框架进行数据的导入导出。而DataX仅仅在运行DataX的单台机器上进行数据的抽取和加载。
  • Sqoop采用MapReduce框架,具有良好的并发性和容错性,可以在多个节点同时进行import或export操作。而DataX则是单进程的,没有并发性和容错性。
  • Sqoop主要支持在关系型数据库和Hadoop组件(如HDFS、Hive、HBase)之间进行数据迁移,但不支持在Hadoop组件之间或关系型数据库之间进行数据迁移。相比之下,DataX支持更广泛的数据迁移场景,包括关系型数据库、Hadoop组件以及其他数据源之间的数据迁移。
  • Sqoop在导入导出数据时,会生成一个MapReduce作业在Hadoop集群中运行,可以处理大规模的数据,但不具备统计和校验能力。而DataX在传输过程中可以进行数据过滤,并且可以统计传输数据的信息,对于业务场景复杂、表结构变更的情况更为适用。
  • Sqoop采用命令行的方式调用,容易与现有的调度监控方案相结合。而DataX采用XML配置文件的方式,开发运维上相对不太方便。

接下来就来学习 Maxwell ,内容不多,过两天再把 DataX 搞定。

1、MaxWell 简介

1.1 Maxwell概述

       Maxwell 是由美国 Zendesk 公司开源,用 Java 编写的 MySQL 变更数据抓取软件。它会实时监控Mysql数据库的数据变更操作(包括insert、update、delete),并将变更数据以 JSON 格式发送给 Kafka、Kinesi、RabbitMQ、Redis 或者其它平台的应用程序。

       官网地址:Maxwell's Daemon

1.2 Maxwell 输出数据格式

我们知道 Maxwell 是以 JSON 格式传输数据的,上面显示了不同数据库数据操作对应的 json 格式,这里是这些 json 字段的说明:

字段

解释

database

变更数据所属的数据库

table

表更数据所属的表

type

数据变更类型

ts

数据变更发生的时间

xid

事务id

commit

事务提交标志,可用于重新组装事务

data

对于insert类型,表示插入的数据;对于update类型,标识修改之后的数据;对于delete类型,表示删除的数据

old

对于update类型,表示修改之前的数据,只包含变更字段

2、Maxwell 原理

       Maxwell的工作原理是实时读取MySQL数据库的二进制日志(Binlog),从中获取变更数据,再将变更数据以JSON格式发送至Kafka等流处理平台。

2.1 MySQL二进制日志

2.1.1、什么是 binlog

       二进制日志(Binlog)是MySQL服务端非常重要的一种日志,它记录了所有的 DDL 和 DML (除了查询语句)语句,以事件的形式记录,还包含所执行的消耗的时间,MySQL 的二进制日志是事务安全类型的。Maxwell的工作原理和主从复制密切相关。

       一般开启二进制日志会有 1% 的性能损耗。二进制日志有两个最重要的场景:

  • MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志传递给其它 salves 来达到 master-salve 数据一致的目的。
  • 通过使用 musqlbinlog 工具来进行数据恢复。

       MySQL二进制日志的存储机制很像我们之前学的Kafka 文件存储机制,它也包含两类文件:二进制索引文件(存储二进制日志文件索引,后缀为 .index)和二进制日志文件(用于记录所有 DDL 和 DML 语句,后缀为 .00000*)。

2.1.2、binlog 的开启

  • 找到 MySQL 配置文件的位置
  • linux:/etc/my.cnf (可以通过 locate my.cnf) 查找位置
  • window:\my.ini
  • 修改配置:在 [mysqlId] 区块,设置添加 log-bin=mysql-bin (这个表示 binlog 的前缀是 mysql-bin,也可以换成别的,也就是说以后生成的日志文件就是 mysq-bin.000001 格式的文件,每次 mysql 重启或者达到单个文件大小的阈值时,生成一个新的日志文件、按顺序编号)

2.1.3、binlog 的分类设置

mysql binlog 的格式共有三种:STATEMENT,MINED,ROW。

在配置文件中可以选择配置:binllog_format = statment|mixed|row

  • statement:语句级,它会记录每一次执行写操作的雨具。相比较 row 模式节省空间,但是可能产生不一致,比如 update student enroll_time = now(); 这种 SQL 如果进行数据恢复那一定是有问题的。
  • 优点:节省空间
  • 缺点:有可能造成数据不一致
  • row:行级,binlog 记录每次操作后每行记录的变化,相当于它记录的是我们每次操作后表格的内容。
  • 优点:保持数据的绝对一致性,不管什么sql,它只记录执行后的结果
  • 缺点:占用磁盘较大
  • mixed:混合级别,statement 的升级版,一定程度上解决了 statement 模式因为一些情况而造成的数据不一致问题。比如对于 SQL 中包含随机数或者时间的语句,它保存的是 SQL 执行的结果;对于 SQL 不包含随机数或者时间的语句,它保存的是 SQL 语句本身。
  • 优点:节省空间
  • 缺点:极个别情况下仍然存在不一致

综上,Maxwell 做监控分析,选择 row 模式比较合适(必须选择 row),因为我们的下游(比如 Kafka 是无法执行 sql 语句的)

2.1.4、Maxwell 和 Canal 对比

对比 Canal Maxwell
语言

java

java
数据格式 自由 json
采集模式 增量 全量/增量
数据落地 定制 支持 kafka 等多种平台
HA 支持 支持

Canal 的原理和 Maxwell 都是通过监控 binlog 实现的。

2.2 MySQL主从复制

       MySQL的主从复制,就是用来建立一个和主数据库完全一样的数据库环境,这个数据库称为从数据库。

1)主从复制的应用场景如下:

(1)做数据库的热备:主数据库服务器故障后,可切换到从数据库继续工作。

(2)读写分离:主数据库只负责业务数据的写入操作,而多个从数据库只负责业务数据的查询工作,在读多写少场景下,可以提高数据库工作效率。

2)主从复制的工作原理如下:

(1)Master主库将数据变更记录,写到二进制日志(binary log)中

(2)Slave从库向mysql master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log)

(3)Slave从库读取并执行中继日志中的事件,将改变的数据同步到自己的数据库。

2.3 Maxwell原理

       Maxwell 的原理很简单,就是将自己伪装成slave,并遵循MySQL主从复制的协议,从master同步数据。

3、Maxwell 部署

3.1、启用 MySQL binlog

MySQL服务器的Binlog默认是未开启的,如需进行同步,需要先进行开启。

1)修改MySQL配置文件/etc/my.cnf

sudo vim /etc/my.cnf

2)增加如下配置

[mysqld]
 
#数据库id
server-id = 1
#启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
#binlog类型,maxwell要求为row类型
binlog_format=row
#启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=gmall
#如果需要监控多个数据库 不要加逗号 直接新开一行
binlog-do-db=test

如果我们要监控除了个别数据库之外的其它所有数据库,我们可以使用 binlog-ignore-db 属性。

3)重启MySQL服务

sudo systemctl restart mysqld

4)查看是否修改完成

show variables like '%binlog%';

我们看到 binlog_format 的 Value 现在是 ROW ,说明 binlog 配置成功。

5)查看 binlog 文件

进入 /var/lib/mysql 目录,查看 MySQL 生成的 binlog 文件:

cd /var/lib/mysql
sudo ls -l | grep mysql-bin

注:MySQL 生成的 binlog 文件初始大小一定是 514 字节。

3.2、初始化 maxwell 元数据库

(1)创建元数据库 maxwell 用于存储 maxwell 的元数据

create database maxwell;

(2)设置 mysql 用户密码安全级别

set global validate_password_policy=0;
set global validate_password_length=4;

如果上面的命令报错,去 /etc/my.cnf 的 [mysqld] 下添加下面两行:

plugin-load-add=validate_password.so
validate-password=FORCE_PLUS_PERMANENT

然后重启 mysqld 服务

sudo systemctl restart mysqld

(3)分配一个用户账号可以操作该元数据库

GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '123456';

(4)分配这个账号可以监控其他数据库的权限

GRANT SELECT ,REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO maxwell@'%';

(5)刷新 mysql 表权限

flush privileges;

Maxwell - 增量数据同步工具(2)https://developer.aliyun.com/article/1532370

相关文章
|
1月前
|
SQL 存储 关系型数据库
DataX - 全量数据同步工具(2)
DataX - 全量数据同步工具
|
1月前
|
消息中间件 监控 关系型数据库
Maxwell - 增量数据同步工具(2)
Maxwell - 增量数据同步工具
|
18天前
|
SQL Oracle 关系型数据库
多环境数据同步(Navicat工具)
多环境数据同步(Navicat工具)
11 0
|
1月前
|
SQL 关系型数据库 MySQL
DataX - 全量数据同步工具(1)
DataX - 全量数据同步工具
|
26天前
|
Java 关系型数据库 流计算
实时计算 Flink版操作报错合集之配置cats进行从MySQL到StarRocks的数据同步任务时遇到报错,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
307 0
|
2月前
|
SQL Kubernetes 关系型数据库
实时计算 Flink版产品使用合集之如何实现MySQL单表数据同步到多个表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之使用 MySQL CDC 进行数据同步时,设置 server_id 参数如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
DataWorks Shell 对象存储
DataWorks产品使用合集之在 DataWorks 中,有一个 MySQL 数据表,数据量非常大且数据会不断更新将这些数据同步到 DataWorks如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
47 3
|
2月前
|
消息中间件 关系型数据库 MySQL
MySQL 到 Kafka 实时数据同步实操分享(1),字节面试官职级
MySQL 到 Kafka 实时数据同步实操分享(1),字节面试官职级
|
2月前
|
机器学习/深度学习 关系型数据库 MySQL
MySQL 到 Greenplum 实时数据同步实操分享,2024年最新【Python面试题
MySQL 到 Greenplum 实时数据同步实操分享,2024年最新【Python面试题

热门文章

最新文章