Python:mysql-replication监控MySQL的binlog变动

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: Python:mysql-replication监控MySQL的binlog变动

Github: https://github.com/noplay/python-mysql-replication


设置同步账号权限

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%' IDENTIFIED BY '123456';

# 刷新权限
flush privileges;

参考

利用Python my-replication读取mysql的binlog

[mysql]mysql grant 用户权限总结


安装

pip install mysql-replication

代码示例

# -*- coding: utf-8 -*-


import datetime
import json

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent
)


class DateEncoder(json.JSONEncoder):
"""
自定义类,解决报错:
TypeError: Object of type 'datetime' is not JSON serializable
"""

def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')

elif isinstance(obj, datetime.date):
return obj.strftime("%Y-%m-%d")

else:
return json.JSONEncoder.default(self, obj)


# 配置数据库信息
mysql_settings = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': '123456'
}


def main():
# 实例化binlog 流对象
stream = BinLogStreamReader(
connection_settings=mysql_settings,
server_id=100, # slave标识,唯一
blocking=True, # 阻塞等待后续事件
# 设定只监控写操作:增、删、改
only_events=[
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent
]
)

for binlogevent in stream:
# binlogevent.dump() # 打印所有信息

for row in binlogevent.rows:
# 打印 库名 和 表名
event = {"schema": binlogevent.schema, "table": binlogevent.table}

if isinstance(binlogevent, DeleteRowsEvent):
event["action"] = "delete"
event["data"] = row["values"]

elif isinstance(binlogevent, UpdateRowsEvent):
event["action"] = "update"
event["data"] = row["after_values"] # 注意这里不是values

elif isinstance(binlogevent, WriteRowsEvent):
event["action"] = "insert"
event["data"] = row["values"]

print(json.dumps(event, cls=DateEncoder))
# sys.stdout.flush()

# stream.close() # 如果使用阻塞模式,这行多余了


if name == '__main__':
main()
"""
输出数据格式
{
"schema": "demo", # 数据库名
"table": "student", # 表名
"action": "update", # 动作 insert、delete、update
"data": { # 数据,里边包含所有字段
"id": 26,
"name": "haha",
"age": 34,
"update_time": "2019-06-06 16:59:06",
"display": 0
}
}
"""



            </div>
相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
编解码 Android开发
Android 使用VideoView播放本地视频详解
Android 使用VideoView播放本地视频详解
829 2
|
11月前
|
JSON 数据库 数据格式
[开发技巧] 如何获取汉字笔画数?
在开发卜筮小脚本时遇到获取汉字笔画数的需求,起初尝试使用`pypinyin`库却未得理想结果。经过探索,发现Unicode联盟维护的Unihan数据库提供准确的汉字笔画数据。通过下载Unihan数据库文件,解析其中的`kTotalStrokes`字段,利用正则表达式提取所需信息,并将其保存为JSON格式以供快速查询。最终编写函数`get_character_stroke_count`实现任意汉字笔画数的高效获取,满足了项目需求并提供了准确的数据支持。此方法不仅解决了问题,还为类似需求提供了参考方案。
407 10
[开发技巧] 如何获取汉字笔画数?
|
人工智能 算法 安全
基于YOLOv8的交通车辆实时检测系统【训练和系统源码+Pyside6+数据集+包运行】
基于YOLOv8的交通车辆实时检测系统,使用5830张图片训练出有效模型,开发了Python和Pyside6的GUI界面系统,支持图片、视频和摄像头实时检测,具备模型权重导入、检测置信度调节等功能,旨在提升道路安全和改善交通管理。
1795 1
基于YOLOv8的交通车辆实时检测系统【训练和系统源码+Pyside6+数据集+包运行】
|
前端开发 容器
|
网络协议 文件存储 Windows
Windows Server 2019 FTP服务器搭建
Windows Server 2019 FTP服务器搭建
400 0
|
弹性计算 监控 数据挖掘
事件驱动架构的优势与应用:深度解析与实战应用
【8月更文挑战第17天】事件驱动架构以其松耦合、可扩展性、异步处理、实时性和高可靠性等优势,在实时数据处理、复杂业务流程、弹性伸缩和实时通信等多个领域展现出巨大的应用潜力。通过合理应用事件驱动架构,可以构建灵活、可扩展和可维护的系统架构,满足不断变化的业务需求和技术挑战。对于开发者而言,深入理解事件驱动架构的核心概念和优势,将有助于更好地设计和实现高质量的软件系统。
|
机器学习/深度学习 数据采集 PyTorch
使用 PyTorch 创建的多步时间序列预测的 Encoder-Decoder 模型
本文提供了一个用于解决 Kaggle 时间序列预测任务的 encoder-decoder 模型,并介绍了获得前 10% 结果所涉及的步骤。
278 0
|
机器学习/深度学习 并行计算 测试技术
BiTCN:基于卷积网络的多元时间序列预测
该文探讨了时间序列预测中模型架构的选择,指出尽管MLP和Transformer模型常见,但CNN在预测领域的应用较少。BiTCN是一种利用两个时间卷积网络来编码历史和未来协变量的模型,提出于《Parameter-efficient deep probabilistic forecasting》(2023年3月)。它包含多个由扩张卷积、GELU激活函数、dropout和全连接层组成的临时块,有效地处理序列数据。实验表明,BiTCN在具有外生特征的预测任务中表现优于N-HiTS和PatchTST。BiTCN的效率和性能展示了CNN在时间序列预测中的潜力。
808 1
|
关系型数据库 MySQL 自然语言处理
不引入ES,如何利用MySQL实现模糊匹配
本文介绍了实现一个公司申请审批流程的业务场景,该流程涉及商务角色申请添加公司,然后由管理员审批。为了防止添加重复的公司,管理员在审批前需检查已有公司信息。核心思路是通过分词、匹配数据库中的数据并按匹配度排序。在技术选型上,由于系统规模小,选择了使用MySQL的正则匹配功能而非引入ES,以降低复杂性。实现过程中,首先对输入的公司名称进行预处理,移除无用信息如地名等,然后使用IKAnalyzer进行分词,最后通过正则表达式在数据库中进行模糊匹配并按匹配度排序。代码示例展示了如何处理公司名称、分词和执行模糊匹配的SQL查询。
214 0
|
C++
【qt】QTreeWidget 树形组件1
【qt】QTreeWidget 树形组件
254 0