Python:mysql-replication监控MySQL的binlog变动

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: Python:mysql-replication监控MySQL的binlog变动

Github: https://github.com/noplay/python-mysql-replication


设置同步账号权限

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%' IDENTIFIED BY '123456';

# 刷新权限
flush privileges;

参考

利用Python my-replication读取mysql的binlog

[mysql]mysql grant 用户权限总结


安装

pip install mysql-replication

代码示例

# -*- coding: utf-8 -*-


import datetime
import json

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent
)


class DateEncoder(json.JSONEncoder):
"""
自定义类,解决报错:
TypeError: Object of type 'datetime' is not JSON serializable
"""

def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')

elif isinstance(obj, datetime.date):
return obj.strftime("%Y-%m-%d")

else:
return json.JSONEncoder.default(self, obj)


# 配置数据库信息
mysql_settings = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': '123456'
}


def main():
# 实例化binlog 流对象
stream = BinLogStreamReader(
connection_settings=mysql_settings,
server_id=100, # slave标识,唯一
blocking=True, # 阻塞等待后续事件
# 设定只监控写操作:增、删、改
only_events=[
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent
]
)

for binlogevent in stream:
# binlogevent.dump() # 打印所有信息

for row in binlogevent.rows:
# 打印 库名 和 表名
event = {"schema": binlogevent.schema, "table": binlogevent.table}

if isinstance(binlogevent, DeleteRowsEvent):
event["action"] = "delete"
event["data"] = row["values"]

elif isinstance(binlogevent, UpdateRowsEvent):
event["action"] = "update"
event["data"] = row["after_values"] # 注意这里不是values

elif isinstance(binlogevent, WriteRowsEvent):
event["action"] = "insert"
event["data"] = row["values"]

print(json.dumps(event, cls=DateEncoder))
# sys.stdout.flush()

# stream.close() # 如果使用阻塞模式,这行多余了


if name == '__main__':
main()
"""
输出数据格式
{
"schema": "demo", # 数据库名
"table": "student", # 表名
"action": "update", # 动作 insert、delete、update
"data": { # 数据,里边包含所有字段
"id": 26,
"name": "haha",
"age": 34,
"update_time": "2019-06-06 16:59:06",
"display": 0
}
}
"""



            </div>
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
2月前
|
运维 Devops 应用服务中间件
自动化运维的利器:Ansible入门与实践
【8月更文挑战第27天】在这个数字化时代,高效的系统管理变得尤为重要。Ansible,作为一个简单而强大的自动化运维工具,正逐渐成为DevOps工程师的首选。本篇文章将带你了解Ansible的基本概念,通过实际操作演示其如何简化日常任务,以及它如何帮助你实现自动化部署和配置管理。无论你是初学者还是有经验的运维人员,这篇文章都将为你提供有价值的信息和启示。
|
5月前
|
SQL 前端开发 Java
编译JSqlParser4.6-4.7最新源代码
编译JSqlParser4.6-4.7最新源代码
136 0
|
关系型数据库 MySQL Java
DataX 测试同步MySQL数据到MySQL
nacos 的users和role同步到 datax-demo库的t_datax_role_user
DataX 测试同步MySQL数据到MySQL
|
SQL 存储 关系型数据库
【MySQL高级】MySql中常用工具及Mysql 日志
【MySQL高级】MySql中常用工具及Mysql 日志
62 0
【MySQL高级】MySql中常用工具及Mysql 日志
|
JSON 监控 关系型数据库
Python:mysql-replication监控MySQL的binlog变动
Python:mysql-replication监控MySQL的binlog变动
260 0
|
SQL 关系型数据库 MySQL
【MySQL高级】MySQL的日志
【MySQL高级】MySQL的日志
118 0
|
SQL 数据采集 弹性计算
MySQL Binlog导入日志服务最佳实践
本文为您介绍使用SLS导入MySQL Binlog的使用场景和最佳实践。
433 0
MySQL Binlog导入日志服务最佳实践
|
SQL 监控 关系型数据库
[MySQL FAQ]系列 — MySQL复制中slave延迟监控
[MySQL FAQ]系列 — MySQL复制中slave延迟监控
139 0
|
SQL 关系型数据库 MySQL
|
监控 关系型数据库 MySQL
[MySQL FAQ]系列 — 大数据量时如何部署MySQL Replication从库
[MySQL FAQ]系列 — 大数据量时如何部署MySQL Replication从库
115 0