Python:使用 mysqlsmom 模块实时同步MySQL数据到ElasticSearch

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Elasticsearch Serverless通用抵扣包,测试体验金 200元
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Python:使用 mysqlsmom 模块实时同步MySQL数据到ElasticSearch

mysqlsmom 文档: https://mysqlsmom.readthedocs.io/en/latest/hello.html

github: https://github.com/m358807551/mysqlsmom


环境要求:

1、python2.7
2、redis
3、Mysql 配置 binlog-format=row

安装

 pip install mysqlsmom

全量同步

# 创建全量同步配置文件


$ mom new test_mom/init_config.py -t init --force

# 编辑配置文件
$ vim ./test_mom/init_config.py # 按注释提示修改配置

# 开始同步
$ mom run -c ./test_mom/init_config.py

增量同步

配置三个文件

test_mom
├── binlog_config.py # 配置文件
├── my_filters.py # 过滤器 配置于 watched
└── my_handlers.py # 处理器 配置于 pipeline

新建配置

mom new test_mom/binlog_config.py -t binlog --force

1、binlog_config.py

# coding=utf-8

STREAM = "BINLOG" # "BINLOG" or "INIT"
SERVER_ID = 99
SLAVE_UUID = name

# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 1

BINLOG_CONNECTION = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': '123456'
}

# redis存储上次同步位置等信息
REDIS = {
"host": "127.0.0.1",
"port": 6379,
"db": 0,
# "password": "password", # 不需要密码则注释或删掉该行
}

NODES = [{"host": "127.0.0.1", "port": 9200}]

TASKS = [
{
"stream": {
"database": "demo",
"table": "student"
},
"jobs": [{
"actions": ["insert", "update"],
"watched": {
"filter_display": {}
},
"pipeline": [
{"only_fields": {"fields": ["id", "name", "age"]}},
{"change_name": {"key": "name", "prefix": "hot-"}},
{"set_id": {"field": "id"}}
],
"dest": {
"es": {
"action": "upsert",
"index": "demo",
"type": "student",
"nodes": NODES
}
}
},
{
"actions": ["delete"],
"pipeline": [
# {"only_fields": {"fields": ["id", "name", "age"]}},
{"set_id": {"field": "id"}}
],
"dest": {
"es": {
"action": "delete",
"index": "demo",
"type": "student",
"nodes": NODES
}
}
}
]
}
]

CUSTOM_ROW_HANDLERS = "./my_handlers.py"
CUSTOM_ROW_FILTERS = "./my_filters.py"

自定义处理器 my_handlers.py

# -- coding: utf-8 --


import copy

def change_name(row, key, prefix):
new_row = copy.deepcopy(row)
new_row[key] = "{}{}".format(prefix, row[key])
# 返回数据字典,下一工序继续处理
return new_row

自定义过滤器 my_filters.py

# -- coding: utf-8 --

def filter_display(event):
# 返回True 或 False,使用或丢弃
return event"values" == 1

启动

mom run -c test_mom/binlog_config.py 


            </div>
相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。 &nbsp;
目录
相关文章
|
SQL 关系型数据库 MySQL
MySQL操作利器——mysql-connector-python库详解
MySQL操作利器——mysql-connector-python库详解
2422 0
|
9月前
|
SQL 关系型数据库 MySQL
Python中使用MySQL模糊查询的方法
本文介绍了两种使用Python进行MySQL模糊查询的方法:一是使用`pymysql`库,二是使用`mysql-connector-python`库。通过这两种方法,可以连接MySQL数据库并执行模糊查询。具体步骤包括安装库、配置数据库连接参数、编写SQL查询语句以及处理查询结果。文中详细展示了代码示例,并提供了注意事项,如替换数据库连接信息、正确使用通配符和关闭数据库连接等。确保在实际应用中注意SQL注入风险,使用参数化查询以保障安全性。
|
11月前
|
关系型数据库 MySQL 数据库
Python处理数据库:MySQL与SQLite详解 | python小知识
本文详细介绍了如何使用Python操作MySQL和SQLite数据库,包括安装必要的库、连接数据库、执行增删改查等基本操作,适合初学者快速上手。
1100 15
|
关系型数据库 MySQL 数据库
Mysql学习笔记(四):Python与Mysql交互--实现增删改查
如何使用Python与MySQL数据库进行交互,实现增删改查等基本操作的教程。
190 1
|
搜索推荐 前端开发 数据可视化
基于Python协同过滤的旅游景点推荐系统,采用Django框架,MySQL数据存储,Bootstrap前端,echarts可视化实现
本文介绍了一个基于Python协同过滤算法的旅游景点推荐系统,该系统采用Django框架、MySQL数据库、Bootstrap前端和echarts数据可视化技术,旨在为用户提供个性化的旅游推荐服务,提升用户体验和旅游市场增长。
1686 9
基于Python协同过滤的旅游景点推荐系统,采用Django框架,MySQL数据存储,Bootstrap前端,echarts可视化实现
|
关系型数据库 MySQL 数据库
Python MySQL查询返回字典类型数据的方法
通过使用 `mysql-connector-python`库并选择 `MySQLCursorDict`作为游标类型,您可以轻松地将MySQL查询结果以字典类型返回。这种方式提高了代码的可读性,使得数据操作更加直观和方便。上述步骤和示例代码展示了如何实现这一功能,希望对您的项目开发有所帮助。
449 4
|
关系型数据库 MySQL Python
mysql之python客户端封装类
mysql之python客户端封装类
|
关系型数据库 MySQL Python
pymysql模块,python与MySQL之间的交互
pymysql模块,python与MySQL之间的交互
|
SQL 关系型数据库 MySQL
Python之MySQL操作及Paramiko模块操作
Python之MySQL操作及Paramiko模块操作