Python:使用 mysqlsmom 模块实时同步MySQL数据到ElasticSearch

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: Python:使用 mysqlsmom 模块实时同步MySQL数据到ElasticSearch

mysqlsmom 文档: https://mysqlsmom.readthedocs.io/en/latest/hello.html

github: https://github.com/m358807551/mysqlsmom


环境要求:

1、python2.7
2、redis
3、Mysql 配置 binlog-format=row

安装

 pip install mysqlsmom

全量同步

# 创建全量同步配置文件


$ mom new test_mom/init_config.py -t init --force

# 编辑配置文件
$ vim ./test_mom/init_config.py # 按注释提示修改配置

# 开始同步
$ mom run -c ./test_mom/init_config.py

增量同步

配置三个文件

test_mom
├── binlog_config.py # 配置文件
├── my_filters.py # 过滤器 配置于 watched
└── my_handlers.py # 处理器 配置于 pipeline

新建配置

mom new test_mom/binlog_config.py -t binlog --force

1、binlog_config.py

# coding=utf-8

STREAM = "BINLOG" # "BINLOG" or "INIT"
SERVER_ID = 99
SLAVE_UUID = name

# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 1

BINLOG_CONNECTION = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': '123456'
}

# redis存储上次同步位置等信息
REDIS = {
"host": "127.0.0.1",
"port": 6379,
"db": 0,
# "password": "password", # 不需要密码则注释或删掉该行
}

NODES = [{"host": "127.0.0.1", "port": 9200}]

TASKS = [
{
"stream": {
"database": "demo",
"table": "student"
},
"jobs": [{
"actions": ["insert", "update"],
"watched": {
"filter_display": {}
},
"pipeline": [
{"only_fields": {"fields": ["id", "name", "age"]}},
{"change_name": {"key": "name", "prefix": "hot-"}},
{"set_id": {"field": "id"}}
],
"dest": {
"es": {
"action": "upsert",
"index": "demo",
"type": "student",
"nodes": NODES
}
}
},
{
"actions": ["delete"],
"pipeline": [
# {"only_fields": {"fields": ["id", "name", "age"]}},
{"set_id": {"field": "id"}}
],
"dest": {
"es": {
"action": "delete",
"index": "demo",
"type": "student",
"nodes": NODES
}
}
}
]
}
]

CUSTOM_ROW_HANDLERS = "./my_handlers.py"
CUSTOM_ROW_FILTERS = "./my_filters.py"

自定义处理器 my_handlers.py

# -- coding: utf-8 --


import copy

def change_name(row, key, prefix):
new_row = copy.deepcopy(row)
new_row[key] = "{}{}".format(prefix, row[key])
# 返回数据字典,下一工序继续处理
return new_row

自定义过滤器 my_filters.py

# -- coding: utf-8 --

def filter_display(event):
# 返回True 或 False,使用或丢弃
return event"values" == 1

启动

mom run -c test_mom/binlog_config.py 


            </div>
相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
3天前
|
数据处理 Python
如何使用Python的Pandas库进行数据排序和排名
【4月更文挑战第22天】Pandas Python库提供数据排序和排名功能。使用`sort_values()`按列进行升序或降序排序,如`df.sort_values(by=&#39;A&#39;, ascending=False)`。`rank()`函数用于计算排名,如`df[&#39;A&#39;].rank(ascending=False)`。多列操作可传入列名列表,如`df.sort_values(by=[&#39;A&#39;, &#39;B&#39;], ascending=[True, False])`和分别对&#39;A&#39;、&#39;B&#39;列排名。
13 2
|
2天前
|
机器学习/深度学习 算法 数据挖掘
PYTHON银行机器学习:回归、随机森林、KNN近邻、决策树、高斯朴素贝叶斯、支持向量机SVM分析营销活动数据|数据分享-2
PYTHON银行机器学习:回归、随机森林、KNN近邻、决策树、高斯朴素贝叶斯、支持向量机SVM分析营销活动数据|数据分享
24 1
|
1天前
|
机器学习/深度学习 算法 Python
数据分享|Python决策树、随机森林、朴素贝叶斯、KNN(K-最近邻居)分类分析银行拉新活动挖掘潜在贷款客户
数据分享|Python决策树、随机森林、朴素贝叶斯、KNN(K-最近邻居)分类分析银行拉新活动挖掘潜在贷款客户
17 4
|
1天前
|
机器学习/深度学习 算法 算法框架/工具
数据分享|PYTHON用KERAS的LSTM神经网络进行时间序列预测天然气价格例子
数据分享|PYTHON用KERAS的LSTM神经网络进行时间序列预测天然气价格例子
18 0
|
1天前
|
机器学习/深度学习 数据挖掘 网络架构
Python对商店数据进行lstm和xgboost销售量时间序列建模预测分析
Python对商店数据进行lstm和xgboost销售量时间序列建模预测分析
12 0
|
1天前
|
开发者 Python
Python的os模块详解
Python的os模块详解
11 0
|
2天前
|
数据挖掘 数据处理 索引
如何使用Python的Pandas库进行数据筛选和过滤?
Pandas是Python数据分析的核心库,提供DataFrame数据结构。基本步骤包括导入库、创建DataFrame及进行数据筛选。示例代码展示了如何通过布尔索引、`query()`和`loc[]`方法筛选`Age`大于19的记录。
10 0
|
4天前
|
Python
如何使用Python的Pandas库进行数据缺失值处理?
Pandas在Python中提供多种处理缺失值的方法:1) 使用`isnull()`检查;2) `dropna()`删除含缺失值的行或列;3) `fillna()`用常数、前后值填充;4) `interpolate()`进行插值填充。根据需求选择合适的方法处理数据缺失。
35 9
|
5天前
|
数据挖掘 API 数据安全/隐私保护
python请求模块requests如何添加代理ip
python请求模块requests如何添加代理ip
|
5天前
|
SQL 关系型数据库 MySQL
Python与MySQL数据库交互:面试实战
【4月更文挑战第16天】本文介绍了Python与MySQL交互的面试重点,包括使用`mysql-connector-python`或`pymysql`连接数据库、执行SQL查询、异常处理、防止SQL注入、事务管理和ORM框架。易错点包括忘记关闭连接、忽视异常处理、硬编码SQL、忽略事务及过度依赖低效查询。通过理解这些问题和提供策略,可提升面试表现。
25 6

热门文章

最新文章