Maxwell采集MySQL Binlog业务库数据同步方案

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 针对业务库MySQL Binlog日志数据进行数据同步,从MySQL到Kafka,最终实现实时(准实时)数据采集。其中推荐使用MaxWell,此篇文档只是介绍如何通过Maxwell打通上下游数据点。

产品目标

完成实时数据采集,拉取业务数据库数据。

版本选择

Maxwell-1.25.1

JDK-1.8

MySQL-5.7

Kafka-2.1.0

概述

MaxWell,一个能实时读取MySQL二进制日志Binlog,并生成JSON格式的消息,作为生产者发送给Kafka等系统的应用程序。常用的场景有:ETL、维护缓存、收集表级别的DML指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。主要提供了以下功能:

1、支持select * from table的方式,进行全量数据初始化。(意思就是直接拉取全量数据)

2、支持在主库发生failover(宕机)后,自动恢复binlog位置。(GTID)

3、可以对数据进行分区,解决数据倾斜的问题,发送到Kafka的数据支持database、table、column等不同级别的数据分区

4、工作方式为:伪装为Slave,接收binlog events,然后根据schemas信息拼装,可接收ddl、xid、row等各种event。

工作原理

MaxWell的操作开销很低,只需要一个mysql和一个可写的后续地方即可。当该应用程序伪装为mysql slave,读取MySQL二进制日志binlog后,以JSON格式更新写入到Kafka等流组件,特点就是简单易用、不需要编写额外的客户端,大大降低了开发成本。


Binlog使得MaxWell可以访问每次发生的更新的原始字节。为了生成有用的输出,MaxWell需要知道每一列的数据类型,以便可以将字节解释为数字、字符串、布尔值等


随着时间的推移,MaxWell可以获取到每次变更的内容,并将这些差异存储在schemas表中。每个差异被放置在数据库的时间轴中,包含以下信息:

binlog_file,binlog_position:Binlog流中发生架构更改的确切点。

deltas:模式更改的内部表示。

base_schema_id:此增量适用于之前的架构。

last_heartbeat_read:此更改之前在二进制日志中看到的最新MaxWell心跳。

server_id:此架构适用的服务器。

MaxWell安装与使用

maxwell下载与安装

Maxwell官网:http://maxwells-daemon.io/

Github网址:https://github.com/zendesk/maxwell

选择适用版本,下载maxwell-1.25.1.tar.gz即可。解压后的目录如下:

52ad78fa-2b9b-4b53-b17c-31ad87a3ebbc.png

注意:和Canal一样,首先需要确保Binlog日志已开启,此部分可以参考Canal部分,binlog_format是基于会话的属性,需要关闭所有额外连接从而完全转换为基于行的复制。并且MaxWell也需要权限才可以进行读取。

mysql> create user ‘maxwell’@’%’ identified by ‘123456’;
mysql> grant all on maxwell.* to ‘maxwell’@’%’;
mysql> grant select,replication client, replication slave on *.* to ‘maxwell’@’%’;

如果要写入kafka,需要在配置文件server.properties中修改:

advertised.host.name=hiwes

【如何通过Docker安装MaxWell】

# 拉取镜像 
docker pull zendesk/maxwell
# 启动maxwell,并将解析出的binlog输出到控制台
docker run -ti --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='123456' --host='hiwes' --producer=stdout

Maxwell配置

运行程序时,添加参数-output_ddl,可以捕捉到ddl语句。datatime列会输出“YYYY-MM-DD hh:mm:ss”,如果遇到“0000-00-00 00:00:00”会原样输出。timestamp作为UTC处理,要调整为自己的时区,需要在后端逻辑上进行处理。MaxWell支持多重编码,但仅输出UTF-8编码。默认情况下,MaxWell使用kafka1.0.0库,使用--kafka_version可以选择备用库的版本:0.8.2.2、0.9.0.1、0.10.0.1、0.10.2.1或0.11.0.1、1.0.0。此条命令,只能在命令行上使用。

f7b879b5-bfb2-4f3a-b0bc-77b5b66d3565.png

修改/etc/my.cnf文件,增加[mysqld]下面的属性,主要是binlog_format=row。
在MySQL中创建maxwell用户,并赋予相关权限。
如果需要启动kafka,增加配置文件中的advertised.host.name=hiwes
注意Json输出的格式和类型,针对不同的操作类型进行对应的操作即可。
在进行使用的时候,有以下几个相关的配置:
config 配置文件config.properties的路径
log_level 日志的级别【debug | info | warn | error】,默认:info
daemon 指定maxwell实例作为守护进程在后台运行
env_config_prefix 匹配该前缀的环境变量将被视为配置值【STRING】
一般的启动命令:

bin/Maxwell --config config.properties --log_level=debug -daemon

Maxwell配置启动参数

配置启动参数,在maxwell的config.properties文件中添加即可,主要的参数包括:

       log_level=debug
       host=hiwes
       user=maxwell
       password=123456
       producer=kafka
       kafka.bootstrap.servers=127.0.0.1:9092
       kafka.compression.type=snappy
       kafka.retries=0
       kafka.acks=1
       metrics_type=http
       metrics_jvm=true
       http_port=8099

【注意】小技巧,怎么显示一个配置文件中,非注释部分?

使用sed或grep命令, 其中:grep的-v代表反选,-e只能传递一个参数,-Ev表示反选后续跟正则表达式:

> sed '/^#/d' config.properties              删除包含#在内的注释行,空行依然存在。
> grep -v '^#' config.properties             删除包含#在内的注释行,空行依然存在。
> sed '/^#/d;/^$/d' config.properties        删除包含#在内的注释行和空行。
> grep -Ev '^#|^$' config.properties         删除包含#在内的注释行和空行。

Maxwell配置其他参数

根据用途,maxwel将MySQL划分为3种角色:

host:主机。建立maxwell库表,存储捕获到的schema等信息。

主要有6张表,bootstrap用于数据初始化、schemas记录所有的binlog文件信息、databases记录所有的数据库信息、tables记录所有的表信息、columns记录所有的字段信息、positions记录读取binlog的位移信息,heartbeats记录心跳信息。


replication_host:复制主机。Event监听,读取该主机的Binlog。

将host和replication_host分开,可避免replication_user往生产库写数据。


schema_host:schema主机。捕获表结果scheme的主机。

binlog里面没有字段信息,但是在binlog-proxy场景下就很实用。比如要将已经离线的binlog通过maxwell生成JSON流,于是自建一个mysql。Server里面没有结构,只用于发送binlog。此时的表结构就可以自动从schema_host获取。


一般来说,这三个主机都是同一个,schema_host只有在replication_host的时候使用。

和MySQL相关的有以下配置

选项

参数值

描述

默认值

host

STRING

mysql 地址

localhost

user

STRING

mysql 用户名

password

STRING

mysql 密码

(no password)

port

INT

mysql 端口 3306

jdbc_options

STRING

mysql jdbc connection options

DEFAULT_JDBC_OPTS

ssl

SSL_OPT

SSL behavior for mysql cx

DISABLED

schema_database

STRING

Maxwell用于维护的schema和position将使用的数据库

maxwell

client_id

STRING

用于标识Maxwell实例的唯一字符串

maxwell

replica_server_id

LONG

用于标识Maxwell实例的唯一数字

6379 (see notes)

master_recovery

BOOLEAN

enable experimental master recovery code

false

gtid_mode

BOOLEAN

是否开启基于GTID的复制

false

recapture_schema

BOOLEAN

重新捕获最新的表结构(schema),不可在config.properties中配置

false

replication_host

STRING

server to replicate from. See split server roles

schema-store host

replication_password

STRING

password on replication server

(none)

replication_port

INT

port on replication server

3306

replication_user

STRING

user on replication server

replication_ssl

SSL_OPT

SSL behavior for replication cx cx

DISABLED

schema_host

STRING

server to capture schema from. See split server roles

schema-store host

schema_password

STRING

password on schema-capture server

(none)

schema_port

INT

port on schema-capture server

3306

schema_user

STRING

user on schema-capture server

schema_ssl

SSL_OPT

SSL behavior for schema-capture server

DISABLED


producer参数

仅介绍Kafka,其他生产者的配置可从官方文档中查找。

Kafka是MaxWell支持最完善的一个生产者,内置了多个版本,默认1.0.0。注意,这个配置,不能在config.properties中配置,而是使用 --kafka_version进行选择。


MaxWell会将消息投递到Kafka的Topic中,由--kafka_topic指定,默认值为maxwell。除了指定为静态的topic,骇客指定为动态的,比如:namespace%{database}%{table}。其中%{database}和%{table}将被具体的消息内database和table替换。


MaxWell读取配置时,若配置项以kakfa. 开头,则该配置将设置到kafka producer客户端的连接参数中去,如:

kafka.acks=1               
kafka.compression.type=snappy           
kafka.retries=5

下面是MaxWell通用生产者和Kafka生产者的配置参数:


选项

参数值

描述

默认值

producer

PRODUCER_TYPE

生产者类型

stdout

custom_producer.factory

CLASS_NAME

自定义消费者的工厂类

producer_ack_timeout

PRODUCER_ACK_TIMEOUT

异步消费认为消息丢失的超时时间(毫秒ms)

producer_partition_by

PARTITION_BY

输入到kafka/kinesis的分区函数

database

producer_partition_columns

STRING

若按列分区,以逗号分隔的列名称

producer_partition_by_fallback

PARTITION_BY_FALLBACK

producer_partition_by=column时需要,当列不存在是使用

ignore_producer_error

BOOLEAN

为false时,在kafka/kinesis发生错误时退出程序;为true时,仅记录日志 See also dead_letter_topic

true

kafka.bootstrap.servers

STRING

kafka 集群列表,HOST:PORT[,HOST:PORT]

kafka_topic

STRING

kafka topic

maxwell

dead_letter_topic

STRING

详见官方文档

kafka_version

KAFKA_VERSION

指定maxwell的 kafka 生产者客户端版本,不可在config.properties中配置

0.11.0.1

kafka_partition_hash

[default | murmur3]

选择kafka分区时使用的hash方法

default

kafka_key_format

[array | hash]

how maxwell outputs kafka keys, either a hash or an array of hashes

hash

ddl_kafka_topic

STRING

当output_ddl为true时, 所有DDL的消息都将投递到该topic

kafka_topic


MaxWell配置过滤器:
MaxWell可通过--filter来制定过滤规则,通过exclude排除,通过include包含。值可以为具体的数据库、数据表、数据列,甚至可以用JS来定义复杂的过滤规则,也可以用正则表达式描述。如:

# 仅匹配foodb数据库的tbl表和所有table_数字的表。
--filter=’exclude:foodb.*,include:foodb.tbl,include:foodb./table_\d+/’
# 排除所有库所有表,仅匹配db1数据库
--filter=’exclude:.,include:db1.*’
# 排除含db.tbl.col列值为reject的所有更新
--filter=’exclude:db.tbl.col=reject’
# 排除任何包含col_a列的更新
--filter=‘exclude:..col_a=*’
# blacklist黑名单,完全排除bad_db数据库,若要恢复,需要删除maxwell库
--filter=‘blacklist:bad_db.*’

MaxWell的初始化和使用:
MaxWell启动之后会从MaxWell库中获取上一次停止时的position。从该断点处开始读取binlog。如果binlog已经清除,可以通过以下操作,将整张表都复制出来。

maxwell-bootstrap命令工具:帮助完成数据初始化,是基于select * from table的方式进行全量数据初始化,不会产生多余的binlog。

参数

说明

--log_level LOG_LEVEL

日志级别(DEBUG, INFO, WARN or ERROR)

--user USER

mysql 用户名

--password PASSWORD

mysql 密码

--host HOST

mysql 地址

--port PORT

mysql 端口

--database DATABASE

要bootstrap的表所在的数据库

--table TABLE

要引导的表

--where WHERE_CLAUSE

设置过滤条件

--client_id CLIENT_ID

指定执行引导操作的Maxwell实例


启动命令:

bin/maxwell-bootstrap --user maxwell --password 123456 --host 127.0.0.1 --database test --table test --client_id maxwell

【注意】


--bootstrapper=sync时,在处理bootstrap时,会阻塞正常的binlog解析;

--bootstrapper=async时,不会阻塞。


也可以执行以下的SQL,在maxwell.bootstrap表中插入记录,手动触发:

# insert into Maxwell.bootstrap(database_name,table_name) values (“test”,“test”);

此时的type,就不是insert了,而是bootstrap-insert


再一次查看binlog:

> show binlog events。

会发现只有与maxwell相关的binlog,并没有test.test相关的binlog。所以maxwell-bootstrap命令并不会产生多余的binlog。当数据表的数量很大的时候,这个好处更明显

Bootstrap的过程,是bootstrap-start ---> bootstrap-insert ---> bootstrap-complete。其中,start和complete的data字段为空,不携带数据。再进行bootstrap过程中,如果maxwell崩溃,重启的时候,bootstrap会完全重新开始,不管之前进行到了多少。如果不希望这样,可以在数据库中设置 is_complete字段为1(表示完成),或者删除改行。

Entry内部格式

其中跟输出格式相关的配置,可以通过查阅资料了解。输出的Json格式字符串:
  data  最新的数据,也是修改后的数据
  old   老数据,也是修改前的数据
  type    操作类型,有insert、update、delete、database-create、database-alter、database-drop、table-create、table-alter、table-drop、bootstrap-insrt、int(未知类型)
  xid   事务id
  commit    同一个xid代表同一个事务,事务的最后一条语句有commit,可利用这个重现事务。
  server_id 服务器id,一般用不到
  thread_id 线程id,一般用不到

Maxwell的监控

MaxWell,提供了4种监控方式,与监控相关的配置项有下列这些:

f5a369ed-f3a3-480f-8553-be5e3df17d40.png

18392619-d4e2-4596-86ec-c31d8ccf2016.png

上述有些指标,是Kafka持有的,并不支持所有的生产者。

通过http方式,获取监控指标:

bin/maxwell --user=’maxwell’ --password=’123456’ --host=’hiwes’ --producer=kafka --kafka.bootstrap.servers=’hiwes:9092’ --kafka_topic=test --log_level=debug --metrics_type=http --metrics_jvm=true --http_port=8080

http方式有4种后缀,分别对应4种不同的格式:

/metrics                 #所有指标以json格式返回
/prometheus              #所有指标以Prometheus格式返回
/healthcheck             #返回MaxWell过去15min是否健康
/ping                    #返回pong,简单的测试结果

MaxWell其他注意事项


多个MaxWell实例

在不同的配置下,MaxWell可以在同一个主服务器运行多个实例。如果希望生产者以不同的配置运行,例如将来自不同组的表(table)的事件投递到不同的topic中,这将非常有用。MaxWell的每个实例都必须配置一个唯一的client_id,以便区分binlog的位置。

Timestamp Column

MaxWell对时间类型(datetime、timestamp、date)都是当做字符串处理的。这也是为了保证数据一致(如0000-00-00 00:00:00在timestamp中非法,但是mysql认,解析为java或python就是null/None)


如果MySQL表上的字段是timestamp类型,是有时区的概念,binlog解析出来的是标准UTC时间,单用户看到的是本地时间。如f_create_time timestamp创建时间是北京时间2018-01-05 21:01:01,那么mysql实际存储的是2018-01-05 13:01:01,binlog里面也是这个时间字符串。如果消费者不做时区转换,会少8小时

Binary Column

MaxWell可以处理binary类型的列,如blob、varbinary。其做法就是对二进制列用base64_encode,当做字符串输出到json。消费者拿到这个列数据后,不能直接拼装,需要base64_encode。

表结构不同步

如果是拿比较老的binlog,放到新的mysql_server上用maxwell拉取,可能表结构已经发生了变化。比如binlog里面字段比schema_hose字段多一个。目前这种情况没有发生异常,如阿里RDS默认会为无主键无唯一索引的表,增加一个__##alibaba_rds_rowid##__,在show create table和schema里面都看不到这个隐藏主键,但是binlog里面会有,并同步到从库。

大事务binlog

当一个事物产生的binlog量非常大的时候,比如迁移日表数据。maxwell为了控制内存使用,会自动将处理不过来的binlog放到文件系统。


【注意】大表数据迁移,也要批量进行,不要一个insert into…select就搞定。

tableMapCache

如果只想获取某几个表的binlog变更,需用到include_tables来过滤,但如果mysql_server上现在删了一个表t1,但binlog是从昨天开始读取的,被删的这个表t1在maxwell启动的时候是拉取不到表结构的。然后昨天的binlog里面有t1的变更,因为找不到表结构给来组装成json,会抛异常。


手动在maxwell.tables/columns里面插入记录时可行的。但这个问题的根本,是maxwell在binlog过滤的时候,只在处理row_event的时候,而对tableMapCache要求binlog里面的所有表都要有。


自己(seanlook)提交了一个commit,可以再做tableMapCache的时候也仅要求缓存include_dbs/tables。

提高消费性能

在用rabbitmq时,routing_key是%db%.%table%,但某些表产生的binlog增量非常大,就会导致各个队列消息量很不平均。目前因为还没做到事务txid或者thread_id级别的并发回放,所以最小队列粒度也是表。尽量单独放一个队列,其他数据量小的合在一起。

总结

相较Canal,Maxwell的使用和部署更加简单,直接将数据变更输出为JSON字符串,不再需要编写客户端,对于缺乏基础建设,短时间又需要快速迭代的项目和公司比较合适。

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
6天前
|
关系型数据库 MySQL 数据库
docker MySQL删除数据库时的错误(errno: 39)
docker MySQL删除数据库时的错误(errno: 39)
60 0
|
6天前
|
Java 关系型数据库 MySQL
【MySQL × SpringBoot 突发奇想】全面实现流程 · xlsx文件,Excel表格导入数据库的接口(下)
【MySQL × SpringBoot 突发奇想】全面实现流程 · xlsx文件,Excel表格导入数据库的接口
44 0
|
6天前
|
Java 关系型数据库 MySQL
【MySQL × SpringBoot 突发奇想】全面实现流程 · xlsx文件,Excel表格导入数据库的接口(上)
【MySQL × SpringBoot 突发奇想】全面实现流程 · xlsx文件,Excel表格导入数据库的接口
47 0
|
6天前
|
关系型数据库 MySQL 分布式数据库
PolarDB MySQL版集群版本支持库表恢复功能的版本要求是什么?
【5月更文挑战第13天】PolarDB MySQL版集群版本支持库表恢复功能的版本要求是什么?
10 0
|
6天前
|
存储 关系型数据库 MySQL
学习MySQL(5.7)第二战:四大引擎、账号管理以及建库(干货满满)
学习MySQL(5.7)第二战:四大引擎、账号管理以及建库(干货满满)
|
6天前
|
SQL 关系型数据库 MySQL
使用Python的pymysql库连接MySQL,执行CRUD操作
使用Python的pymysql库连接MySQL,执行CRUD操作:安装pymysql,然后连接(host='localhost',user='root',password='yourpassword',database='yourdatabase'),创建游标。查询数据示例:`SELECT * FROM yourtable`;插入数据:`INSERT INTO yourtable...`;更新数据:`UPDATE yourtable SET...`;删除数据:`DELETE FROM yourtable WHERE...`。
29 0
|
6天前
|
Java 关系型数据库 MySQL
【JDBC编程】基于MySql的Java应用程序中访问数据库与交互数据的技术
【JDBC编程】基于MySql的Java应用程序中访问数据库与交互数据的技术
|
6天前
|
存储 SQL 关系型数据库
不停止MySQL服务增加从库的两种方式
不停止MySQL服务增加从库的两种方式
|
6天前
|
SQL 关系型数据库 MySQL
用MySQL创建公司资料库表格
创建了员工、分支、客户及工作关系的数据库表格。员工与分支间有works_with表记录销售数据,外键关联并处理删除操作(set null或cascade)。插入数据后,通过SQL查询获取员工、客户信息,使用聚合函数、通配符、联合查询和JOIN操作。子查询用于复杂条件筛选。数据库设计确保了数据完整性和参照完整性。
26 0
|
6天前
|
SQL 关系型数据库 MySQL
DDL语言之库和表的管理(mysql)
DDL语言之库和表的管理(mysql)