Maxwell 概述、安装、数据同步【一篇搞定】!

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
大数据开发治理平台 DataWorks,不限时长
简介: Maxwell 是一个由 Zendesk 开源的用于 MySQL 数据库实时数据捕获和同步的工具,支持多种数据库系统,以 JSON 格式输出变更数据。它实时监控数据库中的更新,将变化传递给其他系统,常用于实时数据管道、数据仓库和事件驱动架构。Maxwell 具有实时性、可配置性和高性能等特点。其工作流程包括 Binlog 解析、数据解析、重构、发布到消息队列(如 Kafka)以及事件处理。安装时需注意 JDK 版本,并配置 MySQL、Zookeeper 和 Kafka。此外,Maxwell 支持定向监听特定库表,并能进行历史和增量数据同步。

@[toc]

什么是 Maxwell?

Maxwell 在大数据领域通常指的是一个用于数据同步和数据捕获的开源工具,由美国 Zendesk 开源,用 Java 编写的 MySQL 等关系型数据库的实时抓取软件。

Maxwell 可以监控数据库中的更改,并将这些更改以可消费的方式传递给其他系统。它通常用于实时数据管道、数据仓库、事件驱动架构等场景中,帮助将数据库中的变更数据传送到其他系统,以便进行分析、报告和其他数据处理操作。

Maxwell 的特点包括:

  1. 实时数据捕获:Maxwell 可以实时地捕获数据库中的更改,包括插入、更新和删除操作。

  2. 支持多种数据库:它可以与多种关系型数据库系统(如 MySQL、PostgreSQL)集成。

  3. JSON输出:Maxwell 通常以 JSON 格式输出变更数据,这种格式易于处理和解析。

  4. 可配置性:用户可以根据自己的需求配置 Maxwell,包括选择要捕获的表格、输出目标等。

  5. 高性能:Maxwell 经过优化,可以处理高吞吐量的数据流。

Maxwell 官网地址

Maxwell 输出格式

下面通过一个官方案例来了解 Maxwell 的输出格式。

mysql> update `test`.`maxwell` set mycol = 55, daemon = 'Stanislaw Lem';

maxwell -> kafka: 
  {
    "database": "test",
    "table": "maxwell",
    "type": "insert",
    "ts": 1449786310,
    "data": { "id":1, "daemon": "Stanislaw Lem", "mycol": 55 },
    "old": { "mycol":, 23, "daemon": "what once was" }
  }

Stage Analysis

  1. 首先,执行了一条 MySQL 的 UPDATE 语句,用于更新名为 test 数据库中的 maxwell 表中的数据。它将 mycol 字段的值更改为 55,将 daemon 字段的值更改为 'Stanislaw Lem'

  2. maxwell -> kafka:这一行表示 Maxwell 捕获到了 MySQL 数据库中的更新操作,并将其传输到 Kafka 消息队列中。

  3. JSON 数据块:以下是 JSON 格式的数据块,其中包含了关于更新操作的详细信息:

    • "database": "test":这是更新操作所在的数据库名称,即 test

    • "table": "maxwell":这是更新操作所在的表格名称,即 maxwell

    • "type": "update":这指示 Maxwell 将此更新操作视为 UPDATE 类型,因为实际上是对表中的数据进行了更新。

    • "ts": 1449786310:这是时间戳,表示更新操作发生的时间。

    • "data":这个部分包含了新的数据,包括 iddaemonmycol 字段的新值,表示更新后的值。

    • "old":这个部分包含了旧的数据,包括 mycoldaemon 字段的旧值,表示更新前的值。在示例中,mycol 字段的旧值为 23daemon 字段的旧值为 'what once was'

这个示例演示了一个 MySQL 数据库中的 UPDATE 操作,Maxwell 捕获了这个操作并将其转化为 JSON 格式的事件,然后将这些事件发送到 Kafka 消息队列,以供其他系统订阅和处理。

Maxwell 工作原理

Maxwell 是一个开源的数据变更捕获工具,它的主要作用是捕获关系型数据库中的数据变更事件,并将这些事件以结构化的方式传送到消息队列(通常是Kafka)或其他目标,以便其他系统可以实时处理这些变更数据。

下面是 Maxwell 的工作原理简要解释:

  1. 数据库 Binlog 解析:Maxwell 通过订阅数据库的二进制日志(Binlog)来实时监控数据库中的变更。Binlog 是数据库引擎记录数据库操作的详细日志,包括插入、更新和删除等操作。

  2. 数据解析:一旦 Maxwell 连接到数据库的 Binlog,它开始解析 Binlog 中的数据变更事件。Maxwell 能够解析这些事件并将其转化为易于理解和处理的数据格式。

  3. 数据重构:Maxwell 将解析后的数据重构为 JSON 格式或其他结构化数据格式,以便后续系统可以轻松处理和消费这些数据。

  4. 数据发布:捕获的数据变更事件被发送到一个消息队列(通常是 Kafka),以实现异步和实时的数据传输。将数据发送到消息队列允许其他系统根据需要消费数据,而不会对原始数据库产生太多负载。

  5. 事件处理:一旦数据变更事件被发送到消息队列,其他系统可以订阅这些事件并处理它们。这些系统可以是数据仓库、实时分析系统、缓存系统、搜索引擎或其他需要实时数据的应用程序。

  6. 可配置性:Maxwell 具有丰富的配置选项,允许用户指定要捕获的数据库、表格、字段,以及如何处理捕获的事件。

Maxwell 的工作原理可以概括为捕获、解析、重构、发布和处理数据库中的数据变更事件。通常用于与流数据处理系统和实时分析工具集成,以支持实时数据分析和应用程序。

Maxwell 安装

image.png

安装之前需要注意,从 v1.30.0 开始,Maxwell 不再支持 JDK1.8,所以安装之前注意 JDK 版本!

官方快速入门文档

官方版本说明与下载

本节使用最后一个支持 JDK1.8 版本的 Maxwell v1.29.2 进行部署。

  1. 解压文件
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /opt/module/
  1. 添加环境变量
sudo vim /etc/profile.d/my.sh

# 添加如下内容:
#MAXWELL_HOME
export MAXWELL_HOME=/opt/module/maxwell-1.29.2
export PATH=$PATH:$MAXWELL_HOME/bin

刷新环境变量:source /etc/profile.d/my.sh

  1. 配置 MySQL
sudo vim /etc/my.cnf

# 添加如下内容:
[mysqld]
#maxwell 需要指定 Binlog 日志以"行级别"的方式进行记录
binlog_format=row
#MySQL服务器的唯一标识号
server_id=1 
#启用二进制日志 Binlog,指定 "master" 作为 Binlog 文件的前缀名
log-bin=master

配置完成后,重启 MySQL:

sudo systemctl restart mysqld.service
  1. 创建 MySQL 用户并给予权限
CREATE USER 'maxwell'@'%' IDENTIFIED BY '000000';

GRANT ALL ON maxwell.* TO 'maxwell'@'%';

GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
  1. 修改配置文件
cd $MAXWELL_HOME

cp config.properties.example config.properties

vim config.properties

############ 添加如下信息 ############

# 指定生产者对象
producer=kafka

# 指定 kafka 目标机器
kafka.bootstrap.servers=hadoop120:9092,hadoop121:9092,hadoop122:9092

# 指定 kafka topic
kafka_topic=maxwell

# 指定 mysql 连接信息
host=hadoop120
user=maxwell
password=000000
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true

# 指定数据按照主键分组进入 kafka 不同分区,避免数据倾斜
producer_partition_by=primary_key

image.png

  1. 启动 Maxwell

启动 Maxwell 之前请先启动 MySQL、Zookeeper、Kafka。

cd $MAXWELL_HOME

maxwell --config config.properties --daemon
  • --daemon:告诉 Maxwell 以守护进程模式运行,也就是在后台运行而不会阻塞当前终端。

image.png

  1. 验证 Maxwell 是否启动成功
ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l
  • ps -ef:列出所有进程信息。

  • grep com.zendesk.maxwell.Maxwell:查找 Maxwell 进程。

  • grep -v grep:排除包含字符串 "grep" 的行。在查找进程时,通常会包含一个与查找本身相关的 "grep" 进程,因此我们需要排除它。
  • wc -l:统计行数。

结果为 1 则表示启动成功了,为 0 则表示服务没有启动成功。

image.png

通过 jps 命令查看:

image.png

  1. 设置自动启停脚本
sudo vim /bin/mxw

添加下列内容:

#! /bin/bash

if [[ $# -ne 1 ]]; then
    echo "参数有误,请重新输入!"
elif [[ "$1" = "start" ]]; then
    echo "-----------------$host MAXWELL START-----------------"
    maxwell -config $MAXWELL_HOME/config.properties --daemon
elif [[ "$1" = "stop" ]]; then
    echo "-----------------$host MAXWELL STOP-----------------"
    ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
elif [[ "$1" = "restart" ]]; then
    echo "-----------------$host MAXWELL RESTART-----------------"
    ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
    maxwell -config $MAXWELL_HOME/config.properties --daemon
elif [[ "$1" = "status" ]]; then
    echo "-----------------$host MAXWELL STATUS-----------------"
    ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l
fi

给予权限:

sudo chmod +755 /bin/mxw

Maxwell 定向监听

如果我们只想通过 Maxwell 监听指定的某个库表,那么可以通过 Maxwell 中的过滤器 filter 实现,这里通过两个官方提供的案例来进行说明。

修改 Maxwell 中的配置文件 config.properties,添加如下所示的配置项:

示例一

filter=exclude: foodb.*, include: foodb.tbl, include: foodb./table_\\d+/

排除监听 foodb 库下的所有表,只监听 foodb.tbl 表和 foodb 库中以 table_ 开头并以数字结尾的表。

注意:在正则表达式中,\d 表示匹配任意数字字符。然而,在配置文件中,反斜杠 \ 被视为转义字符,因此为了在正则表达式中表示一个反斜杠和一个字母 d,我们需要使用两个连续的反斜杠 \\

示例二

filter=exclude: *.*, include: db1.*

排除监听所有库表,只监听 db1 库下的所有表。

Maxwell 历史数据同步

当前在 MySQL 中创建了 finance_result 库,其中存储了许多表,现在对该库中的表进行全量历史数据同步。

image.png

创建 Kakfa 消费者

kafka-console-consumer.sh --bootstrap-server hadoop120:9092 --topic maxwell

注意,这里创建的 topic 要和 Maxwell 配置文件中设置的 topic 主题保持一致。

启动 Maxwell 历史数据同步,指定库名和表名。

cd $MAXWELL_HOME

maxwell-bootstrap --database finance_result --table industry --config $MAXWELL_HOME/config.properties

程序正常启动后 Kafka 消费端将会收到 Maxwell 发送过来的数据



其中发送过来的数据第一行及最后一行数据是标识 Maxwell 历史数据同步的,不携带任何数据。

maxwell -> kafka: 
{
   
   
    "database": "finance_result",
    "table": "industry",
    "type": "bootstrap-start",
    "ts": 1694748250,
    "data": {
   
   }
}

{
   
   
    "database": "finance_result",
    "table": "industry",
    "type": "bootstrap-insert",
    "ts": 1694748250,
    "data": {
   
   
        "id": 1,
        "create_time": "2022-08-19 00:00:00.000000",
        "update_time": "2022-08-19 00:00:00.000000",
        "industry_level": 1,
        "industry_name": "工程建设",
        "superior_industry_id": null
    }
} {
   
   
    "database": "finance_result",
    "table": "industry",
    "type": "bootstrap-insert",
    "ts": 1694748250,
    "data": {
   
   
        "id": 2,
        "create_time": "2022-08-19 00:00:00.000000",
        "update_time": "2022-08-19 00:00:00.000000",
        "industry_level": 1,
        "industry_name": "轻工",
        "superior_industry_id": null
    }
} {
   
   
    "database": "finance_result",
    "table": "industry",
    "type": "bootstrap-insert",
    "ts": 1694748250,
    "data": {
   
   
        "id": 3,
        "create_time": "2022-08-19 00:00:00.000000",
        "update_time": "2022-08-19 00:00:00.000000",
        "industry_level": 2,
        "industry_name": "土木",
        "superior_industry_id": 1
    }
}
......

{
   
   
    "database": "finance_result",
    "table": "industry",
    "type": "bootstrap-complete",
    "ts": 1694748250,
    "data": {
   
   }
}

Maxwell 增量数据同步

增量数据同步就是对 MySQL 中的所有库表进行实时监听,你对库表执行任何的操作都会发送到 Kakfa 消费者中。

先创建 Kakfa 消费者

kafka-console-consumer.sh --bootstrap-server hadoop120:9092 --topic maxwell

注意,这里创建的 topic 要和 Maxwell 配置文件中设置的 topic 主题保持一致。

启动 Maxwell 增量数据同步

cd $MAXWELL_HOME

maxwell --config config.properties --daemon

修改所监听 MySQL 库中任意表的一条数据,进行测试:

update finance_result.industry set industry_name="纱线行业" where id = 11;

finance_result.industry 表中 id11industry_name 修改为 "纱线行业"

maxwell -> kafka: 
{
   
   
    "database": "finance_result",
    "table": "industry",
    "type": "update",
    "ts": 1694751386,
    "xid": 21025,
    "commit": true,
    "data": {
   
   
        "id": 11,
        "create_time": "2022-08-19 00:00:00.000000",
        "update_time": "2022-08-19 00:00:00.000000",
        "industry_level": 3,
        "industry_name": "纱线行业",
        "superior_industry_id": 5
    },
    "old": {
   
   
        "industry_name": "纱线行业233"
    }
}

修改完成后 Kafka 消费端将会收到 Maxwell 发送过来的增量数据信息,其中包括数据的相关更新信息以及历史数据。

Maxwell 首日数据同步

在数仓环境搭建好之后,我们可能需要对增量表进行首日同步,将今天之前的增量数据表都进行初始化,同步到数据仓库中来。

在 Maxwell 中提供了专门用于首日同步数据的脚本 —— maxwell-bootstrap,语法格式如下:

$MAXWELL_HOME/bin/maxwell-bootstrap --database $DATABASE_NAME --table $TABLE_NAME --config $MAXWELL_HOME/config.properties

其中:

  • --database 用于指定同步 MySQL 中的哪个库;
  • --table 用于指定同步该库中的哪个表;
  • --config 用于指定 Maxwell 的配置文件。

注意,这三个配置项均为必选项!一般情况下该脚本只执行一次,否则会发生数据覆盖。

同步多张增量表时,可以通过自定义脚本来实现,如下所示:

#!/bin/bash

MAXWELL_HOME=YOUR_PATH

import_data() {
 $MAXWELL_HOME/bin/maxwell-bootstrap --database financial_lease --table $1 --config $MAXWELL_HOME/config.properties
}

for table in "table1" "table2" "table3" "..."
do
  import_data $table
done

Maxwell 启动报错

完整报错信息如下:

com.github.shyiko.mysql.binlog.network.ServerException: Could not find first log file name in binary log index file
    at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:926) [mysql-binlog-connector-java-0.23.3.jar:0.23.3]
    at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:595) [mysql-binlog-connector-java-0.23.3.jar:0.23.3]
    at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:839) [mysql-binlog-connector-java-0.23.3.jar:0.23.3]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]
15:58:23,503 INFO  BinlogConnectorReplicator - Binlog disconnected.
15:58:23,601 INFO  TaskManager - Stopping 4 tasks
15:58:23,601 ERROR TaskManager - cause: 
com.github.shyiko.mysql.binlog.network.ServerException: Could not find first log file name in binary log index file
    at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:926) ~[mysql-binlog-connector-java-0.23.3.jar:0.23.3]
    at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:595) ~[mysql-binlog-connector-java-0.23.3.jar:0.23.3]
    at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:839) ~[mysql-binlog-connector-java-0.23.3.jar:0.23.3]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]

出现这个问题的原因可能是你在没有关闭 Maxwell 的情况下,修改了 Maxwell 的配置文件信息,且 Maxwell 正在后台监听采集任务,配置文件修改后导致其丢失了正在采集的数据位置记录,当再次启动时就会出现文件验证失败的情况。

解决办法:

清空 MySQL 中 Maxwell 存储库下的 positions 表:

use maxwell;

truncate table positions;

清空后再次启动 Maxwell 即可解决。

相关文章
|
1月前
|
SQL 分布式计算 Oracle
数据同步工具DataX的安装
数据同步工具DataX的安装
645 0
|
8月前
|
存储
百度搜索:蓝易云【Logstash、Filebeat安装与数据同步(+ES安装讲解)】
现在,您已成功安装了Logstash、Filebeat和Elasticsearch,并可以使用它们来实现数据同步和日志处理的功
152 0
|
17天前
|
消息中间件 监控 关系型数据库
Maxwell - 增量数据同步工具(2)
Maxwell - 增量数据同步工具
|
29天前
|
存储 监控 关系型数据库
DataX 概述、部署、数据同步运用示例
DataX是阿里巴巴开源的离线数据同步工具,支持多种数据源之间的高效传输。其特点是多数据源支持、可扩展性、灵活配置、高效传输、任务调度监控和活跃的开源社区支持。DataX通过Reader和Writer插件实现数据源的读取和写入,采用Framework+plugin架构。部署简单,解压即可用。示例展示了如何配置DataX同步MySQL到HDFS,并提供了速度和内存优化建议。此外,还解决了NULL值同步问题及配置文件变量传参的方法。
|
17天前
|
SQL 关系型数据库 MySQL
Maxwell - 增量数据同步工具(1)
Maxwell - 增量数据同步工具
|
8月前
|
Web App开发 监控 Java
Logstash、Filebeat安装与数据同步(+ES安装讲解)
Logstash、Filebeat安装与数据同步(+ES安装讲解)
97 0
|
canal 关系型数据库 MySQL
【数据同步】你还不会在Docker下安装Canal吗?
最近在工作中遇到了需求,主要是为了完成对Binlog日志的监听和消费,做了一些数据同步的技术选型,最后选择了Canal。
|
canal SQL 关系型数据库
Canal 数据同步(canal 安装) | 学习笔记
快速学习 Canal 数据同步(canal 安装)
257 0
Canal 数据同步(canal 安装) | 学习笔记
|
7天前
|
Java 关系型数据库 流计算
实时计算 Flink版操作报错合集之配置cats进行从MySQL到StarRocks的数据同步任务时遇到报错,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
292 0
|
1月前
|
SQL Kubernetes 关系型数据库
实时计算 Flink版产品使用合集之如何实现MySQL单表数据同步到多个表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。