利用maxwell组件监听mysql之binlog日志进行实时同步数据

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 利用maxwell组件监听mysql之binlog日志进行实时同步数据

一 maxwell组件介绍



Maxwell是一个守护进程,它能监听并读取MySQL的binlog,然后解析输出为json,支持将数据输出到Kafka、Kinesis或其他流媒体平台,支持表和库过滤。


注意:对于增删改都有输出,但对于truncate操作,没有输出。


源码地址:https://github.com/zendesk/maxwell


下载地址:https://github.com/zendesk/maxwell/releases/download/v1.21.1/maxwell-1.21.1.tar.gz


示意如下:


mysql> insert into `test`.`maxwell` set id = 1, daemon = 'Stanislaw Lem';
  maxwell: {
    "database": "test",
    "table": "maxwell",
    "type": "insert",
    "ts": 1449786310,
    "xid": 940752,
    "commit": true,
    "data": { "id":1, "daemon": "Stanislaw Lem" }
  }
  mysql> update test.maxwell set daemon = 'firebus!  firebus!' where id = 1;
  maxwell: {
    "database": "test",
    "table": "maxwell",
    "type": "update",
    "ts": 1449786341,
    "xid": 940786,
    "commit": true,
    "data": {"id":1, "daemon": "Firebus!  Firebus!"},
    "old":  {"daemon": "Stanislaw Lem"}
  }


二 设备与组件版本梳理:


1.  linux内核版本(CentOS Linux 7):(命令:uname -a)


Linux slave1 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux


2.  mysql版本:(SQL命令:select version(); 或 status)


Server version: 5.6.43-log MySQL Community Server (GPL)


3.  maxwell版本:maxwell-1.21.1


4.  kafka版本:kafka_2.11-2.2.0


5.  zookeeper版本:zookeeper-3.4.5-cdh5.7.0


三 设备介绍


二台虚拟机,分别为salve1(192.168.175.21)和slave2(192.168.175.22),slave1上安装mysql和kafka,slave2上启动maxwell守护进程和zookeeper。


四 简要过程梳理



网络异常,图片无法展示
|


主流程示意图


如上图:这次主要介绍从binlog > maxwell > kafka的过程,而kafka后面的过程,就可以有很多种了,比如:


(1)binlog > maxwell > kafka > spark streaming > hdfs、kudu;

(2)binlog > maxwell > kafka > flume > hdfs;

(3)binlog > maxwell > kafka > es > kibana;


第一种spark streaming+hdfs、kudu,是目前我所在公司中使用的场景。简要流程梳理如下:


1.  在slave1上安装mysql;
2.  在slave2上启动maxwell,测试是否可以正常读取binlog;
3.  maxwell初步测试ok;
4.  在slave2上安装并启动zk;
5.  在slave1上安装并启动kafka server;
6.  通过kafka producer和consumer测试启动是否成功;
7.  启动maxwell将解析后的json数据发送到kafka;
8.  启动kafka consumer测试数据是否成功发送。


五 详细过程


1 在slave1(192.168.175.21)上安装mysql


详细过程,可参考笔记: https://www.jianshu.com/p/09936d9c7bf2


(1)在创建root账号并设置远程访问之后,接着创建maxwell账号并设置远程访问和权限:


mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';


权限:Maxwell需要权限将状态存储在schema_database选项(默认Maxwell)指定的数据库中。


(2)针对maxwell配置mysql: 确保配置了server_id,并打开了基于行的复制。

可参考maxwell快速入门文档:


https://github.com/zendesk/maxwell/blob/master/docs/docs/quickstart.md


$ vi my.cnf
[mysqld]
server_id=1
log-bin=master
binlog_format=row


注意:  binlog_format是一个基于会话的属性需要关闭所有活动连接才能完全转换为基于行的复制。


mysql配置文件my.cnf查找方式:


$ whereis my
my:/etc/my.cnf


配置完成后,要重启mysql服务,方可生效


2 在slave2上启动maxwell,测试是否可以正常读取binlog


(1)在slave2上测试是否可以进行远程访问数据


mysql -h 192.168.175.21 -P 3306 - u root -proot   #root登录成功后,窗口无需关闭,后面还要接着测试用
mysql -h 192.168.175.21 -P 3306 -u maxwell -p111111


(2)下载并解压maxwell


#解压到指定的文件夹
tar xzvf maxwell-1.21.1.tar.gz -C /usr/loca/hadoop/app


(3)命令行启动maxwell,将解析后的日志输出到控制台进行测试


[root@slave2 maxwell-1.21.1]# pwd
/usr/local/hadoop/app/maxwell-1.21.1
[root@slave2 maxwell-1.21.1]# bin/maxwell --user='maxwell' --password='111111' --host='192.168.175.21' --producer=stdout


启动成功后,会展示如下日志内容:


[root@slave2 maxwell-1.21.1]# bin/maxwell --user='maxwell' --password='111111' --host='192.168.175.21' --producer=stdout
Using kafka version: 1.0.0
15:49:04,967 WARN  MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
15:49:08,334 INFO  Maxwell - Maxwell v1.21.1 is booting (StdoutProducer), starting at Position[BinlogPosition[master.000002:984013], lastHeartbeat=1555141484690]
15:49:10,071 INFO  MysqlSavedSchema - Restoring schema id 2 (last modified at Position[BinlogPosition[master.000001:40754], lastHeartbeat=1555112822550])
15:49:12,759 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[master.000001:3816], lastHeartbeat=0])
15:49:13,122 INFO  MysqlSavedSchema - beginning to play deltas...
15:49:13,138 INFO  MysqlSavedSchema - played 1 deltas in 12ms
15:49:13,426 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: master.000002:984013
15:49:13,997 INFO  BinaryLogClient - Connected to 192.168.175.21:3306 at master.000002/984013 (sid:6379, cid:59)
15:49:14,003 INFO  BinlogConnectorLifecycleListener - Binlog connected.


在slave2上root连接mysql的窗口中,执行insert,delete,update操作:


MySQL [mysql]> insert into tb_dept (id,name,description) values(16,'xiaoman','manger');
Query OK, 1 row affected (0.02 sec)
MySQL [mysql]> delete from tb_dept where id = 16;
Query OK, 1 row affected (0.01 sec)
MySQL [mysql]> update tb_dept set name='xiaofei' where id=14;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1  Changed: 1  Warnings: 0


在maxwell的stdout窗口中会产生如下日志:


{"database":"mysql","table":"tb_dept","type":"insert","ts":1555142065,"xid":6349,"commit":true,"data":{"Id":16,"Name":"xiaoman","description":"manger"}}
{"database":"mysql","table":"tb_dept","type":"delete","ts":1555142096,"xid":6361,"commit":true,"data":{"Id":16,"Name":"xiaoman","description":"manger"}}
{"database":"mysql","table":"tb_dept","type":"update","ts":1555142157,"xid":6383,"commit":true,"data":{"Id":14,"Name":"xiaofei","description":"sales"},"old":{"Name":"xiaoming1"}}


3 在slave2上安装并启动zk


具体安装方式,可参考笔记:https://www.jianshu.com/p/10d5a20ab9b7


注意启动完成后,要检查zk是否安装成功


#查看状态
zkServer.sh status


4 在slave1上安装并启动kafka server:


(1)具体启动安装方式,可参考笔记:https://www.jianshu.com/p/3d017bdbfb3c

修改kafka配置文件 $KAFKA_HOME/config/server.properties:


broker.id=1
listeners=PLAINTEXT://slave1:9092
log.dirs=/usr/local/app/tmp/kafka-logs
zookeeper.connect=slave2:2181


(2)通过kafka producer和consumer测试启动是否成功


5 在slave1的kafka上创建名为maxwell的topic


(1)创建topic


kafka-topics.sh --create --zookeeper slave2:2181 --replication-factor 1 --partitions 1 --topic maxwell


(2)检查topic是否创建成功


#查看topic列表:
kafka-topics.sh --list --zookeeper slave2:2181
#查看topic具体描述:
kafka-topics.sh --describe --zookeeper slave2:2181  --topic maxwell


6 在slave1上启动消费topic名称为maxwell的kafka consumer:


kafka-console-consumer.sh --zookeeper slave2:2181 --topic maxwell --from-beginning


7 在slave2上启动maxwell进程,将解析后的json数据输出到kafka:


bin/maxwell --user='maxwell' --password='111111' --host='192.168.175.21' --producer=kafka --kafka.bootstrap.servers=192.168.175.21:9092 --kafka_topic=maxwell


注意:启动之前,需要将输出到stdout上的maxwell进程停掉,否则会报错。


同样,启动成功后,会输出Binlog连接成功的日志信息。如下:


16:22:19,127 INFO  AppInfoParser - Kafka version : 1.0.0
16:22:19,129 INFO  AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
16:22:19,391 INFO  Maxwell - Maxwell v1.21.1 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[master.000002:1084309], lastHeartbeat=1555143612778]
16:22:20,916 INFO  MysqlSavedSchema - Restoring schema id 2 (last modified at Position[BinlogPosition[master.000001:40754], lastHeartbeat=1555112822550])
16:22:22,861 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[master.000001:3816], lastHeartbeat=0])
16:22:23,178 INFO  MysqlSavedSchema - beginning to play deltas...
16:22:23,192 INFO  MysqlSavedSchema - played 1 deltas in 7ms
16:22:23,482 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: master.000002:1084309
16:22:23,887 INFO  BinaryLogClient - Connected to 192.168.175.21:3306 at master.000002/1084309 (sid:6379, cid:64)
16:22:23,894 INFO  BinlogConnectorLifecycleListener - Binlog connected.


8 验证


(1)在slave1中的root账号登录的mysql窗口中,执行一条更新操作:


MySQL [mysql]> update tb_dept set name='xiaofei123' where id=14;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1  Changed: 1  Warnings: 0


(2)随机在消费topic=maxwell的kafka consumer中输出日志如下:


{"database":"mysql","table":"tb_dept","type":"update","ts":1555143780,"xid":6888,"commit":true,"data":{"Id":14,"Name":"xiaofei123","description":"sales"},"old":{"Name":"xiaofei"}}


到此,流程梳理完毕!


六 遗留问题:


1 mysql数据库


设置远程访问之后,在本地一直访问不了,尝试修改密码同样访问不了。猜测是由于设置远程访问时的%百分号影响的。故而这里访问数据库用的都是远程访问的,其实正常生产环境,我们也都是远程进行访问的。


ERROR 1045 (28000): Access denied for user 'root'@'localhost' (using password: YES)


2 kafka:


在将maxwell进程和kafka server运行在同一台虚拟机上时,启动kafka consumer时,经常报ConsumerRebalanceFailedException的异常,如下:没有找到具体原因,最终通过kafka server和maxwell不运行在一起将问题解决。其实生产环境,一般kafka都是有专用的集群,也都不会和maxwell运行在一起。


[2019-04-13 13:12:03,599] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
kafka.common.ConsumerRebalanceFailedException: console-consumer-72779_slave2-1555132312263-9d33f2d8 can't rebalance after 4 retries
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:660)
    at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:967)
    at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:1001)
    at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:163)
    at kafka.consumer.OldConsumer.<init>(BaseConsumer.scala:75)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:63)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:47)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)


相关文章
|
26天前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1601 14
|
28天前
|
SQL 存储 关系型数据库
Mysql主从同步 清理二进制日志的技巧
Mysql主从同步 清理二进制日志的技巧
22 1
|
30天前
|
关系型数据库 MySQL 数据库
DZ社区 mysql日志清理 Discuz! X3.5数据库可以做定期常规清理的表
很多站长在网站日常维护中忽略了比较重要的一个环节,就是对于数据库的清理工作,造成数据库使用量增加必须多的原因一般有2个:后台站点功能开启了家园,此功能现在很少有论坛会用到,但是灌水机会灌入大量垃圾信息致使站长长时间未能发觉;再有就是程序默认的一些通知类表单会存放大量的、对于网站日常运行并无意义的通知信息。
52 2
|
2月前
|
消息中间件 canal 关系型数据库
Maxwell:binlog 解析器,轻松同步 MySQL 数据
Maxwell:binlog 解析器,轻松同步 MySQL 数据
228 11
|
13天前
|
存储 关系型数据库 MySQL
MySQL中的Redo Log、Undo Log和Binlog:深入解析
【10月更文挑战第21天】在数据库管理系统中,日志是保障数据一致性和完整性的关键机制。MySQL作为一种广泛使用的关系型数据库管理系统,提供了多种日志类型来满足不同的需求。本文将详细介绍MySQL中的Redo Log、Undo Log和Binlog,从背景、业务场景、功能、底层实现原理、使用措施等方面进行详细分析,并通过Java代码示例展示如何与这些日志进行交互。
16 0
|
26天前
|
缓存 Linux 编译器
【C++】CentOS环境搭建-安装log4cplus日志组件包及报错解决方案
通过上述步骤,您应该能够在CentOS环境中成功安装并使用log4cplus日志组件。面对任何安装或使用过程中出现的问题,仔细检查错误信息,对照提供的解决方案进行调整,通常都能找到合适的解决之道。log4cplus的强大功能将为您的项目提供灵活、高效的日志管理方案,助力软件开发与维护。
47 0
|
分布式计算 关系型数据库 MySQL
E-Mapreduce如何处理RDS的数据
目前网站的一些业务数据存在了数据库中,这些数据往往需要做进一步的分析,如:需要跟一些日志数据关联分析,或者需要进行一些如机器学习的分析。在阿里云上,目前E-Mapreduce可以满足这类进一步分析的需求。
4970 0
|
20天前
|
存储 关系型数据库 MySQL
Mysql(4)—数据库索引
数据库索引是用于提高数据检索效率的数据结构,类似于书籍中的索引。它允许用户快速找到数据,而无需扫描整个表。MySQL中的索引可以显著提升查询速度,使数据库操作更加高效。索引的发展经历了从无索引、简单索引到B-树、哈希索引、位图索引、全文索引等多个阶段。
55 3
Mysql(4)—数据库索引
|
5天前
|
关系型数据库 MySQL Linux
在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。
本文介绍了在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。同时,文章还对比了编译源码安装与使用 RPM 包安装的优缺点,帮助读者根据需求选择最合适的方法。通过具体案例,展示了编译源码安装的灵活性和定制性。
36 2
|
8天前
|
存储 关系型数据库 MySQL
MySQL vs. PostgreSQL:选择适合你的开源数据库
在众多开源数据库中,MySQL和PostgreSQL无疑是最受欢迎的两个。它们都有着强大的功能、广泛的社区支持和丰富的生态系统。然而,它们在设计理念、性能特点、功能特性等方面存在着显著的差异。本文将从这三个方面对MySQL和PostgreSQL进行比较,以帮助您选择更适合您需求的开源数据库。
38 4