产品目标
完成实时数据采集,拉取业务数据库数据。
版本选择
Maxwell-1.25.1
JDK-1.8
MySQL-5.7
Kafka-2.1.0
概述
MaxWell,一个能实时读取MySQL二进制日志Binlog,并生成JSON格式的消息,作为生产者发送给Kafka等系统的应用程序。常用的场景有:ETL、维护缓存、收集表级别的DML指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。主要提供了以下功能:
1、支持select * from table的方式,进行全量数据初始化。(意思就是直接拉取全量数据)
2、支持在主库发生failover(宕机)后,自动恢复binlog位置。(GTID)
3、可以对数据进行分区,解决数据倾斜的问题,发送到Kafka的数据支持database、table、column等不同级别的数据分区。
4、工作方式为:伪装为Slave,接收binlog events,然后根据schemas信息拼装,可接收ddl、xid、row等各种event。
工作原理
MaxWell的操作开销很低,只需要一个mysql和一个可写的后续地方即可。当该应用程序伪装为mysql slave,读取MySQL二进制日志binlog后,以JSON格式更新写入到Kafka等流组件,特点就是简单易用、不需要编写额外的客户端,大大降低了开发成本。
Binlog使得MaxWell可以访问每次发生的更新的原始字节。为了生成有用的输出,MaxWell需要知道每一列的数据类型,以便可以将字节解释为数字、字符串、布尔值等。
随着时间的推移,MaxWell可以获取到每次变更的内容,并将这些差异存储在schemas表中。每个差异被放置在数据库的时间轴中,包含以下信息:
binlog_file,binlog_position:Binlog流中发生架构更改的确切点。
deltas:模式更改的内部表示。
base_schema_id:此增量适用于之前的架构。
last_heartbeat_read:此更改之前在二进制日志中看到的最新MaxWell心跳。
server_id:此架构适用的服务器。
MaxWell安装与使用
maxwell下载与安装
Maxwell官网:http://maxwells-daemon.io/
Github网址:https://github.com/zendesk/maxwell
选择适用版本,下载maxwell-1.25.1.tar.gz即可。解压后的目录如下:
注意:和Canal一样,首先需要确保Binlog日志已开启,此部分可以参考Canal部分,binlog_format是基于会话的属性,需要关闭所有额外连接从而完全转换为基于行的复制。并且MaxWell也需要权限才可以进行读取。
mysql> create user ‘maxwell’@’%’ identified by ‘123456’; mysql> grant all on maxwell.* to ‘maxwell’@’%’; mysql> grant select,replication client, replication slave on *.* to ‘maxwell’@’%’;
如果要写入kafka,需要在配置文件server.properties中修改:
advertised.host.name=hiwes
【如何通过Docker安装MaxWell】
# 拉取镜像 docker pull zendesk/maxwell # 启动maxwell,并将解析出的binlog输出到控制台 docker run -ti --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='123456' --host='hiwes' --producer=stdout
Maxwell配置
运行程序时,添加参数-output_ddl,可以捕捉到ddl语句。datatime列会输出“YYYY-MM-DD hh:mm:ss”,如果遇到“0000-00-00 00:00:00”会原样输出。timestamp作为UTC处理,要调整为自己的时区,需要在后端逻辑上进行处理。MaxWell支持多重编码,但仅输出UTF-8编码。默认情况下,MaxWell使用kafka1.0.0库,使用--kafka_version可以选择备用库的版本:0.8.2.2、0.9.0.1、0.10.0.1、0.10.2.1或0.11.0.1、1.0.0。此条命令,只能在命令行上使用。
修改/etc/my.cnf文件,增加[mysqld]下面的属性,主要是binlog_format=row。
在MySQL中创建maxwell用户,并赋予相关权限。
如果需要启动kafka,增加配置文件中的advertised.host.name=hiwes
注意Json输出的格式和类型,针对不同的操作类型进行对应的操作即可。
在进行使用的时候,有以下几个相关的配置:
config 配置文件config.properties的路径
log_level 日志的级别【debug | info | warn | error】,默认:info
daemon 指定maxwell实例作为守护进程在后台运行
env_config_prefix 匹配该前缀的环境变量将被视为配置值【STRING】
一般的启动命令:
bin/Maxwell --config config.properties --log_level=debug -daemon
Maxwell配置启动参数
配置启动参数,在maxwell的config.properties文件中添加即可,主要的参数包括:
log_level=debug host=hiwes user=maxwell password=123456 producer=kafka kafka.bootstrap.servers=127.0.0.1:9092 kafka.compression.type=snappy kafka.retries=0 kafka.acks=1 metrics_type=http metrics_jvm=true http_port=8099
【注意】小技巧,怎么显示一个配置文件中,非注释部分?
使用sed或grep命令, 其中:grep的-v代表反选,-e只能传递一个参数,-Ev表示反选后续跟正则表达式:
> sed '/^#/d' config.properties 删除包含#在内的注释行,空行依然存在。 > grep -v '^#' config.properties 删除包含#在内的注释行,空行依然存在。 > sed '/^#/d;/^$/d' config.properties 删除包含#在内的注释行和空行。 > grep -Ev '^#|^$' config.properties 删除包含#在内的注释行和空行。
Maxwell配置其他参数
根据用途,maxwel将MySQL划分为3种角色:
① host:主机。建立maxwell库表,存储捕获到的schema等信息。
主要有6张表,bootstrap用于数据初始化、schemas记录所有的binlog文件信息、databases记录所有的数据库信息、tables记录所有的表信息、columns记录所有的字段信息、positions记录读取binlog的位移信息,heartbeats记录心跳信息。
② replication_host:复制主机。Event监听,读取该主机的Binlog。
将host和replication_host分开,可避免replication_user往生产库写数据。
③ schema_host:schema主机。捕获表结果scheme的主机。
binlog里面没有字段信息,但是在binlog-proxy场景下就很实用。比如要将已经离线的binlog通过maxwell生成JSON流,于是自建一个mysql。Server里面没有结构,只用于发送binlog。此时的表结构就可以自动从schema_host获取。
一般来说,这三个主机都是同一个,schema_host只有在replication_host的时候使用。
和MySQL相关的有以下配置
选项 |
参数值 |
描述 |
默认值 |
host |
STRING |
mysql 地址 |
localhost |
user |
STRING |
mysql 用户名 |
|
password |
STRING |
mysql 密码 |
(no password) |
port |
INT |
mysql 端口 3306 |
|
jdbc_options |
STRING |
mysql jdbc connection options |
DEFAULT_JDBC_OPTS |
ssl |
SSL_OPT |
SSL behavior for mysql cx |
DISABLED |
schema_database |
STRING |
Maxwell用于维护的schema和position将使用的数据库 |
maxwell |
client_id |
STRING |
用于标识Maxwell实例的唯一字符串 |
maxwell |
replica_server_id |
LONG |
用于标识Maxwell实例的唯一数字 |
6379 (see notes) |
master_recovery |
BOOLEAN |
enable experimental master recovery code |
false |
gtid_mode |
BOOLEAN |
是否开启基于GTID的复制 |
false |
recapture_schema |
BOOLEAN |
重新捕获最新的表结构(schema),不可在config.properties中配置 |
false |
replication_host |
STRING |
server to replicate from. See split server roles |
schema-store host |
replication_password |
STRING |
password on replication server |
(none) |
replication_port |
INT |
port on replication server |
3306 |
replication_user |
STRING |
user on replication server |
|
replication_ssl |
SSL_OPT |
SSL behavior for replication cx cx |
DISABLED |
schema_host |
STRING |
server to capture schema from. See split server roles |
schema-store host |
schema_password |
STRING |
password on schema-capture server |
(none) |
schema_port |
INT |
port on schema-capture server |
3306 |
schema_user |
STRING |
user on schema-capture server |
|
schema_ssl |
SSL_OPT |
SSL behavior for schema-capture server |
DISABLED |
producer参数
仅介绍Kafka,其他生产者的配置可从官方文档中查找。
Kafka是MaxWell支持最完善的一个生产者,内置了多个版本,默认1.0.0。注意,这个配置,不能在config.properties中配置,而是使用 --kafka_version进行选择。
MaxWell会将消息投递到Kafka的Topic中,由--kafka_topic指定,默认值为maxwell。除了指定为静态的topic,骇客指定为动态的,比如:namespace%{database}%{table}。其中%{database}和%{table}将被具体的消息内database和table替换。
MaxWell读取配置时,若配置项以kakfa. 开头,则该配置将设置到kafka producer客户端的连接参数中去,如:
kafka.acks=1 kafka.compression.type=snappy kafka.retries=5
下面是MaxWell通用生产者和Kafka生产者的配置参数:
选项 |
参数值 |
描述 |
默认值 |
producer |
PRODUCER_TYPE |
生产者类型 |
stdout |
custom_producer.factory |
CLASS_NAME |
自定义消费者的工厂类 |
|
producer_ack_timeout |
PRODUCER_ACK_TIMEOUT |
异步消费认为消息丢失的超时时间(毫秒ms) |
|
producer_partition_by |
PARTITION_BY |
输入到kafka/kinesis的分区函数 |
database |
producer_partition_columns |
STRING |
若按列分区,以逗号分隔的列名称 |
|
producer_partition_by_fallback |
PARTITION_BY_FALLBACK |
producer_partition_by=column时需要,当列不存在是使用 |
|
ignore_producer_error |
BOOLEAN |
为false时,在kafka/kinesis发生错误时退出程序;为true时,仅记录日志 See also dead_letter_topic |
true |
kafka.bootstrap.servers |
STRING |
kafka 集群列表,HOST:PORT[,HOST:PORT] |
|
kafka_topic |
STRING |
kafka topic |
maxwell |
dead_letter_topic |
STRING |
详见官方文档 |
|
kafka_version |
KAFKA_VERSION |
指定maxwell的 kafka 生产者客户端版本,不可在config.properties中配置 |
0.11.0.1 |
kafka_partition_hash |
[default | murmur3] |
选择kafka分区时使用的hash方法 |
default |
kafka_key_format |
[array | hash] |
how maxwell outputs kafka keys, either a hash or an array of hashes |
hash |
ddl_kafka_topic |
STRING |
当output_ddl为true时, 所有DDL的消息都将投递到该topic |
kafka_topic |
MaxWell配置过滤器:
MaxWell可通过--filter来制定过滤规则,通过exclude排除,通过include包含。值可以为具体的数据库、数据表、数据列,甚至可以用JS来定义复杂的过滤规则,也可以用正则表达式描述。如:
# 仅匹配foodb数据库的tbl表和所有table_数字的表。 --filter=’exclude:foodb.*,include:foodb.tbl,include:foodb./table_\d+/’ # 排除所有库所有表,仅匹配db1数据库 --filter=’exclude:.,include:db1.*’ # 排除含db.tbl.col列值为reject的所有更新 --filter=’exclude:db.tbl.col=reject’ # 排除任何包含col_a列的更新 --filter=‘exclude:..col_a=*’ # blacklist黑名单,完全排除bad_db数据库,若要恢复,需要删除maxwell库 --filter=‘blacklist:bad_db.*’
MaxWell的初始化和使用:
MaxWell启动之后会从MaxWell库中获取上一次停止时的position。从该断点处开始读取binlog。如果binlog已经清除,可以通过以下操作,将整张表都复制出来。
maxwell-bootstrap命令工具:帮助完成数据初始化,是基于select * from table的方式进行全量数据初始化,不会产生多余的binlog。
参数 |
说明 |
--log_level LOG_LEVEL |
日志级别(DEBUG, INFO, WARN or ERROR) |
--user USER |
mysql 用户名 |
--password PASSWORD |
mysql 密码 |
--host HOST |
mysql 地址 |
--port PORT |
mysql 端口 |
--database DATABASE |
要bootstrap的表所在的数据库 |
--table TABLE |
要引导的表 |
--where WHERE_CLAUSE |
设置过滤条件 |
--client_id CLIENT_ID |
指定执行引导操作的Maxwell实例 |
启动命令:
bin/maxwell-bootstrap --user maxwell --password 123456 --host 127.0.0.1 --database test --table test --client_id maxwell
【注意】
--bootstrapper=sync时,在处理bootstrap时,会阻塞正常的binlog解析;
--bootstrapper=async时,不会阻塞。
也可以执行以下的SQL,在maxwell.bootstrap表中插入记录,手动触发:
# insert into Maxwell.bootstrap(database_name,table_name) values (“test”,“test”);
此时的type,就不是insert了,而是bootstrap-insert。
再一次查看binlog:
> show binlog events。
会发现只有与maxwell相关的binlog,并没有test.test相关的binlog。所以maxwell-bootstrap命令并不会产生多余的binlog。当数据表的数量很大的时候,这个好处更明显
Bootstrap的过程,是bootstrap-start ---> bootstrap-insert ---> bootstrap-complete。其中,start和complete的data字段为空,不携带数据。再进行bootstrap过程中,如果maxwell崩溃,重启的时候,bootstrap会完全重新开始,不管之前进行到了多少。如果不希望这样,可以在数据库中设置 is_complete字段为1(表示完成),或者删除改行。
Entry内部格式
其中跟输出格式相关的配置,可以通过查阅资料了解。输出的Json格式字符串: data 最新的数据,也是修改后的数据 old 老数据,也是修改前的数据 type 操作类型,有insert、update、delete、database-create、database-alter、database-drop、table-create、table-alter、table-drop、bootstrap-insrt、int(未知类型) xid 事务id commit 同一个xid代表同一个事务,事务的最后一条语句有commit,可利用这个重现事务。 server_id 服务器id,一般用不到 thread_id 线程id,一般用不到
Maxwell的监控
MaxWell,提供了4种监控方式,与监控相关的配置项有下列这些:
上述有些指标,是Kafka持有的,并不支持所有的生产者。
通过http方式,获取监控指标:
bin/maxwell --user=’maxwell’ --password=’123456’ --host=’hiwes’ --producer=kafka --kafka.bootstrap.servers=’hiwes:9092’ --kafka_topic=test --log_level=debug --metrics_type=http --metrics_jvm=true --http_port=8080
http方式有4种后缀,分别对应4种不同的格式:
/metrics #所有指标以json格式返回 /prometheus #所有指标以Prometheus格式返回 /healthcheck #返回MaxWell过去15min是否健康 /ping #返回pong,简单的测试结果
MaxWell其他注意事项
多个MaxWell实例
在不同的配置下,MaxWell可以在同一个主服务器运行多个实例。如果希望生产者以不同的配置运行,例如将来自不同组的表(table)的事件投递到不同的topic中,这将非常有用。MaxWell的每个实例都必须配置一个唯一的client_id,以便区分binlog的位置。
Timestamp Column
MaxWell对时间类型(datetime、timestamp、date)都是当做字符串处理的。这也是为了保证数据一致(如0000-00-00 00:00:00在timestamp中非法,但是mysql认,解析为java或python就是null/None)
如果MySQL表上的字段是timestamp类型,是有时区的概念,binlog解析出来的是标准UTC时间,单用户看到的是本地时间。如f_create_time timestamp创建时间是北京时间2018-01-05 21:01:01,那么mysql实际存储的是2018-01-05 13:01:01,binlog里面也是这个时间字符串。如果消费者不做时区转换,会少8小时
Binary Column
MaxWell可以处理binary类型的列,如blob、varbinary。其做法就是对二进制列用base64_encode,当做字符串输出到json。消费者拿到这个列数据后,不能直接拼装,需要base64_encode。
表结构不同步
如果是拿比较老的binlog,放到新的mysql_server上用maxwell拉取,可能表结构已经发生了变化。比如binlog里面字段比schema_hose字段多一个。目前这种情况没有发生异常,如阿里RDS默认会为无主键无唯一索引的表,增加一个__##alibaba_rds_rowid##__,在show create table和schema里面都看不到这个隐藏主键,但是binlog里面会有,并同步到从库。
大事务binlog
当一个事物产生的binlog量非常大的时候,比如迁移日表数据。maxwell为了控制内存使用,会自动将处理不过来的binlog放到文件系统。
【注意】大表数据迁移,也要批量进行,不要一个insert into…select就搞定。
tableMapCache
如果只想获取某几个表的binlog变更,需用到include_tables来过滤,但如果mysql_server上现在删了一个表t1,但binlog是从昨天开始读取的,被删的这个表t1在maxwell启动的时候是拉取不到表结构的。然后昨天的binlog里面有t1的变更,因为找不到表结构给来组装成json,会抛异常。
手动在maxwell.tables/columns里面插入记录时可行的。但这个问题的根本,是maxwell在binlog过滤的时候,只在处理row_event的时候,而对tableMapCache要求binlog里面的所有表都要有。
自己(seanlook)提交了一个commit,可以再做tableMapCache的时候也仅要求缓存include_dbs/tables。
提高消费性能
在用rabbitmq时,routing_key是%db%.%table%,但某些表产生的binlog增量非常大,就会导致各个队列消息量很不平均。目前因为还没做到事务txid或者thread_id级别的并发回放,所以最小队列粒度也是表。尽量单独放一个队列,其他数据量小的合在一起。
总结
相较Canal,Maxwell的使用和部署更加简单,直接将数据变更输出为JSON字符串,不再需要编写客户端,对于缺乏基础建设,短时间又需要快速迭代的项目和公司比较合适。