Python:使用 mysqlsmom 模块实时同步MySQL数据到ElasticSearch

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
Elasticsearch Serverless通用抵扣包,测试体验金 200元
简介: Python:使用 mysqlsmom 模块实时同步MySQL数据到ElasticSearch

mysqlsmom 文档: https://mysqlsmom.readthedocs.io/en/latest/hello.html

github: https://github.com/m358807551/mysqlsmom


环境要求:

1、python2.7
2、redis
3、Mysql 配置 binlog-format=row

安装

pip install mysqlsmom

全量同步

# 创建全量同步配置文件
$ mom new test_mom/init_config.py -t init --force
# 编辑配置文件
$ vim ./test_mom/init_config.py  # 按注释提示修改配置
# 开始同步
$ mom run -c ./test_mom/init_config.py

增量同步

配置三个文件

test_mom
├── binlog_config.py   # 配置文件
├── my_filters.py         # 过滤器 配置于 watched 
└── my_handlers.py    # 处理器 配置于 pipeline

新建配置

mom new test_mom/binlog_config.py -t binlog --force

1、binlog_config.py


# coding=utf-8
STREAM = "BINLOG"  # "BINLOG" or "INIT"
SERVER_ID = 99
SLAVE_UUID = __name__
# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 1
BINLOG_CONNECTION = {
    'host': '127.0.0.1',
    'port': 3306,
    'user': 'root',
    'passwd': '123456'
}
# redis存储上次同步位置等信息
REDIS = {
    "host": "127.0.0.1",
    "port": 6379,
    "db": 0,
    # "password": "password",  # 不需要密码则注释或删掉该行
}
NODES = [{"host": "127.0.0.1", "port": 9200}]
TASKS = [
    {
        "stream": {
            "database": "demo",
            "table": "student"
        },
        "jobs": [{
            "actions": ["insert", "update"],
            "watched": {
                "filter_display": {}
            },
            "pipeline": [
                {"only_fields": {"fields": ["id", "name", "age"]}},
                {"change_name": {"key": "name", "prefix": "hot-"}},
                {"set_id": {"field": "id"}}
            ],
            "dest": {
                "es": {
                    "action": "upsert",
                    "index": "demo",
                    "type": "student",
                    "nodes": NODES
                }
            }
        },
            {
                "actions": ["delete"],
                "pipeline": [
                    # {"only_fields": {"fields": ["id", "name", "age"]}},
                    {"set_id": {"field": "id"}}
                ],
                "dest": {
                    "es": {
                        "action": "delete",
                        "index": "demo",
                        "type": "student",
                        "nodes": NODES
                    }
                }
            }
        ]
    }
]
CUSTOM_ROW_HANDLERS = "./my_handlers.py"
CUSTOM_ROW_FILTERS = "./my_filters.py"

自定义处理器 my_handlers.py


# -*- coding: utf-8 -*-
import copy
def change_name(row, key, prefix):
    new_row = copy.deepcopy(row)
    new_row[key] = "{}{}".format(prefix, row[key])
    # 返回数据字典,下一工序继续处理
    return new_row

自定义过滤器 my_filters.py


# -*- coding: utf-8 -*-
def filter_display(event):
  # 返回True 或 False,使用或丢弃
    return event["values"]["display"] == 1

启动

mom run -c test_mom/binlog_config.py
相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。  
相关文章
|
6月前
|
JavaScript 前端开发 数据可视化
20.6K star!Excel级交互体验!这款开源Web表格神器绝了!
Handsontable 是一款功能强大的 JavaScript 数据表格组件,提供类 Excel 的交互体验。支持实时协作、数据绑定、公式计算等企业级功能,可轻松集成到 React/Vue/Angular 等主流框架。
734 11
|
11月前
|
存储 开发工具 数据安全/隐私保护
什么是Iaas,Paas,Saas?
IaaS(基础设施即服务)提供网络上的IT基础设施服务,按需计费;PaaS(平台即服务)则提供运算平台与解决方案服务,助力用户在云端基础设施上构建与部署应用;而SaaS(软件即服务)通过网络交付软件服务,让用户能够便捷地使用已部署好的应用程序,无需关心底层技术细节。以厨房为例,IaaS如同提供厨房用品,用户自行烹饪;PaaS则是提供预制菜,减少前期准备;SaaS则像点外卖,直接享用成品菜肴。
3549 3
|
分布式计算 负载均衡 API
微服务架构设计原则与模式
【8月更文第29天】随着云计算和分布式计算的发展,微服务架构已成为构建大型复杂应用的一种流行方式。这种架构模式将单个应用程序分解成一组小型、独立的服务,每个服务运行在其自己的进程中,并通过轻量级机制(通常是HTTP资源API)进行通信。本文将探讨微服务架构的基本设计原则、常用模式以及如何有效地划分服务边界。
829 4
|
11月前
|
关系型数据库 MySQL 数据库
深入浅出MySQL索引优化:提升数据库性能的关键
在这个数据驱动的时代,数据库性能的优劣直接关系到应用的响应速度和用户体验。MySQL作为广泛使用的数据库之一,其索引优化是提升查询性能的关键。本文将带你一探MySQL索引的内部机制,分析索引的类型及其适用场景,并通过实际案例演示如何诊断和优化索引,以实现数据库性能的飞跃。
|
10月前
|
存储 安全 API
利用环境变量管理配置:最佳实践与技巧
本文介绍了如何利用环境变量管理应用程序配置,涵盖安全性、灵活性和简化部署等方面的优势。详细探讨了最佳实践,包括避免敏感信息泄露、使用`.env`文件、环境特定配置、环境变量注入与验证,以及使用第三方服务。同时分享了一些实用技巧,如分层管理、环境变量加密和版本控制。旨在帮助开发者更高效、安全地管理应用配置。
|
11月前
|
存储 安全 API
利用环境变量管理敏感信息
【10月更文挑战第16天】在软件开发中,环境变量是管理敏感信息如API密钥、数据库密码等的安全方式,避免了将这些信息硬编码在源代码中。本文介绍了环境变量的概念、优势及如何在应用中实施,包括本地开发、CI/CD流程和云服务中的应用,以及实战技巧和最佳实践。
|
机器学习/深度学习 人工智能 自然语言处理
利用AI实现情感分析的实践与探索
本文主要介绍了利用AI技术进行情感分析的实践过程。通过阿里云自然语言处理服务(NLP)提供的情感分析API,结合Python编程语言和Jupyter Notebook开发环境,实现对社交媒体上产品评论的情感分析。具体步骤包括数据收集、预处理和调用API进行分析。示例代码展示了如何使用Python SDK调用API并获取情感分析结果。通过情感分析,企业能快速了解用户反馈,优化产品策略。未来,情感分析在客户服务、市场调研等领域将有更广泛应用,而阿里云平台为实现情感分析提供了便捷高效的工具和服务。
1627 2
|
自然语言处理 Java 关系型数据库
ElasticSearch 实现分词全文检索 - SpringBoot 完整实现 Demo 附源码【完结篇】
ElasticSearch 实现分词全文检索 - SpringBoot 完整实现 Demo 附源码【完结篇】
256 0
|
算法 编译器 API
【CMake的文件操作】深入CMake的文件操作:精通遍历及高级应用
【CMake的文件操作】深入CMake的文件操作:精通遍历及高级应用
474 1
|
Web App开发 数据采集 数据挖掘
还有这种骚操作:使用Golang实现无头浏览器浏览和截图
还有这种骚操作:使用Golang实现无头浏览器浏览和截图
808 0