Python:mysql-replication监控MySQL的binlog变动

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: Python:mysql-replication监控MySQL的binlog变动

Github: https://github.com/noplay/python-mysql-replication


设置同步账号权限

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%' IDENTIFIED BY '123456';

# 刷新权限
flush privileges;

参考

利用Python my-replication读取mysql的binlog

[mysql]mysql grant 用户权限总结


安装

pip install mysql-replication

代码示例

# -*- coding: utf-8 -*-


import datetime
import json

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent
)


class DateEncoder(json.JSONEncoder):
"""
自定义类,解决报错:
TypeError: Object of type 'datetime' is not JSON serializable
"""

def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')

elif isinstance(obj, datetime.date):
return obj.strftime("%Y-%m-%d")

else:
return json.JSONEncoder.default(self, obj)


# 配置数据库信息
mysql_settings = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': '123456'
}


def main():
# 实例化binlog 流对象
stream = BinLogStreamReader(
connection_settings=mysql_settings,
server_id=100, # slave标识,唯一
blocking=True, # 阻塞等待后续事件
# 设定只监控写操作:增、删、改
only_events=[
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent
]
)

for binlogevent in stream:
# binlogevent.dump() # 打印所有信息

for row in binlogevent.rows:
# 打印 库名 和 表名
event = {"schema": binlogevent.schema, "table": binlogevent.table}

if isinstance(binlogevent, DeleteRowsEvent):
event["action"] = "delete"
event["data"] = row["values"]

elif isinstance(binlogevent, UpdateRowsEvent):
event["action"] = "update"
event["data"] = row["after_values"] # 注意这里不是values

elif isinstance(binlogevent, WriteRowsEvent):
event["action"] = "insert"
event["data"] = row["values"]

print(json.dumps(event, cls=DateEncoder))
# sys.stdout.flush()

# stream.close() # 如果使用阻塞模式,这行多余了


if name == '__main__':
main()
"""
输出数据格式
{
"schema": "demo", # 数据库名
"table": "student", # 表名
"action": "update", # 动作 insert、delete、update
"data": { # 数据,里边包含所有字段
"id": 26,
"name": "haha",
"age": 34,
"update_time": "2019-06-06 16:59:06",
"display": 0
}
}
"""



            </div>
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
2月前
|
关系型数据库 MySQL 机器人
【MySQL】两个脚本自动化搞定 MySQL 备份恢复--XtraBackup
【MySQL】两个脚本自动化搞定 MySQL 备份恢复--XtraBackup
|
关系型数据库 MySQL Java
DataX 测试同步MySQL数据到MySQL
nacos 的users和role同步到 datax-demo库的t_datax_role_user
DataX 测试同步MySQL数据到MySQL
|
SQL 存储 关系型数据库
【MySQL高级】MySql中常用工具及Mysql 日志
【MySQL高级】MySql中常用工具及Mysql 日志
62 0
【MySQL高级】MySql中常用工具及Mysql 日志
|
数据采集 Python
如何从 Python 中的字符串列表中删除特殊字符?
如何从 Python 中的字符串列表中删除特殊字符?
349 0
|
JSON 监控 关系型数据库
Python:mysql-replication监控MySQL的binlog变动
Python:mysql-replication监控MySQL的binlog变动
260 0
|
SQL 关系型数据库 MySQL
【MySQL高级】MySQL的日志
【MySQL高级】MySQL的日志
118 0
|
SQL 监控 关系型数据库
[MySQL FAQ]系列 — MySQL复制中slave延迟监控
[MySQL FAQ]系列 — MySQL复制中slave延迟监控
139 0
|
监控 关系型数据库 MySQL
[MySQL FAQ]系列 — 大数据量时如何部署MySQL Replication从库
[MySQL FAQ]系列 — 大数据量时如何部署MySQL Replication从库
114 0
|
监控 关系型数据库 MySQL
Python:mysql-replication监控MySQL的binlog变动
Python:mysql-replication监控MySQL的binlog变动
309 0