Maxwell - 增量数据同步工具(2)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用版 2核4GB 50GB
简介: Maxwell - 增量数据同步工具

Maxwell - 增量数据同步工具(1)https://developer.aliyun.com/article/1532368

3.3、Maxwell 进程启动

3.3.1、使用命令行参数启动

bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout
  • user:连接 mysql 的用户(上面我们已经创建了)
  • password:连接 mysql 的用户密码
  • host: mysql 安装的主机名
  • producer:生产者模式(stdout:控制台,kafka:kafka 集群)

查看 maxwell 输出:

我们可以看到,我们的 insert 语句被 maxwell 转为了 json 被输出到了控制台。

3.3.2、定制化配置文件启动

编辑配置文件 config.properties:

log_level=info
 
producer=stdout
kafka.bootstrap.servers=localhost:9092
 
# mysql login info
host=hadoop102
user=maxwell
password=123456

启动 Maxwell:

4、Maxwell 使用

4.1、监控 MySQL 数据并在控制台打印

       上面我们已经演示过了,也就是首先在 mysql 的配置文件中指定需要打印 binlog 的数据库,然后通过命令或者配置文件来开启 Maxwell 进程:

bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout

接着我们对被监听的数据库的操作信息就会以 json 的格式打印出来:

4.2、监控 MySQL 数据并输出到 Kafka

       监控 MySQL 数据并输出到 Kafka,这种需求是我们实际工作用的最多的,这也是我们离线数仓和实时数仓所必须的。

4.2.1、实例

1)启动 Zookeeper 和 Kafka

2)启动 Maxwell 监控 binlog

bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=kafka --kafka.bootstrap.servers=hadoop102:9092 --kafka_topic=maxwell

注:这里我们制定了 Kafka 集群地址为 hadoop102:9092 ,主题为 maxwell,虽然主题并不存在,但是 kafka 会自动创建我们指定的主题

3)在 hadoop103 消费 maxwell 主题

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell

4)在 MySQL 中增加一条数据再删除:

可以看到我们的数据成功被 Kafka 消费,说明业务数据日志成功传输到 Kafka 。

4.2.2、Kafak 主题数据的分区控制

       上面的案例中,我们不论 MySQL 的那个数据库,哪张表,都被保存到了 Kafka 的 maxwell 主题,这样显然很乱。所以这里我们需要解决一下:

       在生产环境中,我们一般都会用 maxwell 监控多个 mysql 库的数据,然后将这些数据发往 Kafka 的一个主题,并且这个主题肯定是多分区的,为了提高并发度。那么如何控制这些数据的分区问题,那就变得至关重要,实现步骤如下:

(1)修改 maxwell 的配置文件,定制化启动 maxwell 进程

log_level=info
 
producer=kafka
kafka.bootstrap.servers=hadoop102:9092
 
# mysql login info
host=hadoop102
user=maxwell
password=123456
 
# kafka 主题
kafka_topic=maxwell3
 
# 默认配好的
kafka.compression.type=snappy
kafka.retries=0
kafka.acks=1
 
#producer_partition_by=database # [database, table, primary_key, transaction_id, thread_id, column]
# 常用的按照库分区、按照表分区,这里我们测试按照库分区
producer_partition_by=database

(2)手动创建 Kafka 多分区主题

kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 2 --partitions 3 --topic maxwell3

查看主题信息:

这样,我们不同数据库的操作记录就会保存到 Kafka 不同的分区当中, 分区可以使得集群负载均衡,还可以提高提高读写效率。

4.3、监控 MySQL 指定表输出控制台

(1)运行 maxwell 来监控 mysql 指定表数据更新

监控 test 库下的 staff 表

bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout --filter 'exclude: *.*,include:test.staff'

(2)向 staff 表中插入数据,查看 maxwell 控制台输出

(3)向 ws2 表中插入一条数据,查看

注:表 ws2 并不是我们指定监控的表

结果:maxwell 控制台并没有任何输出

注:还可以设置 include.test.* 来表示监控 test 库下的所有表

4.4、监控 MySQL 指定表全量输出到控制台

       Maxwell 进程默认只能监控 mysql 的 binlog 日志的新增以及变化的,但是 Maxwell 是支持数据初始化的,可以通过修改 Maxwell 的元数据来对 MySQL 中的某张表进行数据初始化,也就是我们常说的全量同步。具体操作如下:

       需求:将 test 库下的 staff 表中的数据全量导入到 maxwell 控制台打印。

我们可以在数据库 maxwell (这个数据库是我们初始化 maxwell 元数据库时创建的)中查看元数据表 positions 的内容:

这里的 server_id 是我们在 mysql 的配置文件 /etc/my.cnf 中配置的;这里的 binlog_file 代表的是最新的 binlog 文件 mysql-bin.000003 ;这里的 binlog_position 的值为 2802 代表目前已经读到了第 2802 个字节的位置,意思就说我们现在是从这个偏移量之后开始监听的。

注:也可以通过 SQL:show master status; 来查看当前的 binlog 状态。

1. 向元数据表 maxwell.bootstrap 中插入数据

insert into maxwell.bootstrap(database_name,table_name) values('test','staff');

这就相当于告诉 maxwell,下一次启动 maxwell 作业时,就会调用一个初始化作业,把我们指定的这个 test 数据库下的 staff 表进行一次增量同步。

我们可以看到,通过 bootstrap 表插入数据来完成初始化,这样插入的数据是 json 中 type 字段的值是 "bootstrap-insert" 而不是 "insert"。

再次查看 bootstrap 表:

我们可以发现,is_complete 字段的值从默认值 0 变成了 1,inserted_rows 从 0 变成了 6,后面的 started_at 和 completed_at 字段的值也不再为空了。

2. 使用 maxwell-bootstrap 脚本工具

除了上面的直接向 maxwell 元数据库中的 bootstrap 表插入数据,也可以使用 maxwell 提供的 maxwell-bootstrap 脚本,本质其实是一样的,具体看你觉得哪个省事。

bin/maxwell-bootstrap --database gmall --table user_info --config ./config.properties

这样,这个初始化作业就算完成了,当我们下次打开 maxwell 进程时,它并不会再次执行这个初始化任务,因为它已经完成过了。

总结

       至此,Maxwell 的学习也已经结束了,待会吧离线数仓项目里的 maxwell 部分完成。总体来说,这个工具还是很好用且简单的,希望有一天自己也可以用屎山堆砌出这么一个自己的框架!

相关文章
|
18天前
|
SQL 存储 关系型数据库
DataX - 全量数据同步工具(2)
DataX - 全量数据同步工具
|
1天前
|
SQL Oracle 关系型数据库
多环境数据同步(Navicat工具)
多环境数据同步(Navicat工具)
3 0
|
18天前
|
SQL 关系型数据库 MySQL
|
19天前
|
SQL 关系型数据库 MySQL
Maxwell - 增量数据同步工具(1)
Maxwell - 增量数据同步工具
|
9天前
|
Java 关系型数据库 流计算
实时计算 Flink版操作报错合集之配置cats进行从MySQL到StarRocks的数据同步任务时遇到报错,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
295 0
|
1月前
|
SQL Kubernetes 关系型数据库
实时计算 Flink版产品使用合集之如何实现MySQL单表数据同步到多个表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之使用 MySQL CDC 进行数据同步时,设置 server_id 参数如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
DataWorks Shell 对象存储
DataWorks产品使用合集之在 DataWorks 中,有一个 MySQL 数据表,数据量非常大且数据会不断更新将这些数据同步到 DataWorks如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
42 3
|
1月前
|
消息中间件 关系型数据库 MySQL
MySQL 到 Kafka 实时数据同步实操分享(1),字节面试官职级
MySQL 到 Kafka 实时数据同步实操分享(1),字节面试官职级
|
1月前
|
机器学习/深度学习 关系型数据库 MySQL
MySQL 到 Greenplum 实时数据同步实操分享,2024年最新【Python面试题
MySQL 到 Greenplum 实时数据同步实操分享,2024年最新【Python面试题

热门文章

最新文章