Python:使用 mysqlsmom 模块实时同步MySQL数据到ElasticSearch

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: Python:使用 mysqlsmom 模块实时同步MySQL数据到ElasticSearch

mysqlsmom 文档: https://mysqlsmom.readthedocs.io/en/latest/hello.html

github: https://github.com/m358807551/mysqlsmom


环境要求:

1、python2.7
2、redis
3、Mysql 配置 binlog-format=row

安装

 pip install mysqlsmom

全量同步

# 创建全量同步配置文件
$ mom new test_mom/init_config.py -t init --force
# 编辑配置文件
$ vim ./test_mom/init_config.py  # 按注释提示修改配置
# 开始同步
$ mom run -c ./test_mom/init_config.py

增量同步

配置三个文件

test_mom
├── binlog_config.py   # 配置文件
├── my_filters.py         # 过滤器 配置于 watched 
└── my_handlers.py    # 处理器 配置于 pipeline

新建配置

mom new test_mom/binlog_config.py -t binlog --force

1、binlog_config.py

# coding=utf-8
STREAM = "BINLOG"  # "BINLOG" or "INIT"
SERVER_ID = 99
SLAVE_UUID = __name__
# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 1
BINLOG_CONNECTION = {
    'host': '127.0.0.1',
    'port': 3306,
    'user': 'root',
    'passwd': '123456'
}
# redis存储上次同步位置等信息
REDIS = {
    "host": "127.0.0.1",
    "port": 6379,
    "db": 0,
    # "password": "password",  # 不需要密码则注释或删掉该行
}
NODES = [{"host": "127.0.0.1", "port": 9200}]
TASKS = [
    {
        "stream": {
            "database": "demo",
            "table": "student"
        },
        "jobs": [{
            "actions": ["insert", "update"],
            "watched": {
                "filter_display": {}
            },
            "pipeline": [
                {"only_fields": {"fields": ["id", "name", "age"]}},
                {"change_name": {"key": "name", "prefix": "hot-"}},
                {"set_id": {"field": "id"}}
            ],
            "dest": {
                "es": {
                    "action": "upsert",
                    "index": "demo",
                    "type": "student",
                    "nodes": NODES
                }
            }
        },
            {
                "actions": ["delete"],
                "pipeline": [
                    # {"only_fields": {"fields": ["id", "name", "age"]}},
                    {"set_id": {"field": "id"}}
                ],
                "dest": {
                    "es": {
                        "action": "delete",
                        "index": "demo",
                        "type": "student",
                        "nodes": NODES
                    }
                }
            }
        ]
    }
]
CUSTOM_ROW_HANDLERS = "./my_handlers.py"
CUSTOM_ROW_FILTERS = "./my_filters.py"

自定义处理器 my_handlers.py

# -*- coding: utf-8 -*-
import copy
def change_name(row, key, prefix):
    new_row = copy.deepcopy(row)
    new_row[key] = "{}{}".format(prefix, row[key])
    # 返回数据字典,下一工序继续处理
    return new_row

自定义过滤器 my_filters.py

# -*- coding: utf-8 -*-
def filter_display(event):
  # 返回True 或 False,使用或丢弃
    return event["values"]["display"] == 1

启动

mom run -c test_mom/binlog_config.py 


相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
13天前
|
安全 Java 数据处理
Python网络编程基础(Socket编程)多线程/多进程服务器编程
【4月更文挑战第11天】在网络编程中,随着客户端数量的增加,服务器的处理能力成为了一个重要的考量因素。为了处理多个客户端的并发请求,我们通常需要采用多线程或多进程的方式。在本章中,我们将探讨多线程/多进程服务器编程的概念,并通过一个多线程服务器的示例来演示其实现。
|
13天前
|
程序员 开发者 Python
Python网络编程基础(Socket编程) 错误处理和异常处理的最佳实践
【4月更文挑战第11天】在网络编程中,错误处理和异常管理不仅是为了程序的健壮性,也是为了提供清晰的用户反馈以及优雅的故障恢复。在前面的章节中,我们讨论了如何使用`try-except`语句来处理网络错误。现在,我们将深入探讨错误处理和异常处理的最佳实践。
|
17天前
|
缓存 监控 Python
解密Python中的装饰器:优雅而强大的编程利器
Python中的装饰器是一种强大而又优雅的编程工具,它能够在不改变原有代码结构的情况下,为函数或类添加新的功能和行为。本文将深入解析Python装饰器的原理、用法和实际应用,帮助读者更好地理解和利用这一技术,提升代码的可维护性和可扩展性。
|
1月前
|
编译器 测试技术 C++
【Python 基础教程 01 全面介绍】 Python编程基础全攻略:一文掌握Python语法精髓,从C/C++ 角度学习Python的差异
【Python 基础教程 01 全面介绍】 Python编程基础全攻略:一文掌握Python语法精髓,从C/C++ 角度学习Python的差异
165 0
|
6天前
|
安全 数据处理 开发者
《Python 简易速速上手小册》第7章:高级 Python 编程(2024 最新版)
《Python 简易速速上手小册》第7章:高级 Python 编程(2024 最新版)
19 1
|
6天前
|
人工智能 数据挖掘 程序员
《Python 简易速速上手小册》第1章:Python 编程入门(2024 最新版)
《Python 简易速速上手小册》第1章:Python 编程入门(2024 最新版)
35 0
|
7天前
|
API Python
Python模块化编程:面试题深度解析
【4月更文挑战第14天】了解Python模块化编程对于构建大型项目至关重要,它涉及代码组织、复用和维护。本文深入探讨了模块、包、导入机制、命名空间和作用域等基础概念,并列举了面试中常见的模块导入混乱、不适当星号导入等问题,强调了避免循环依赖、合理使用`__init__.py`以及理解模块作用域的重要性。掌握这些知识将有助于在面试中自信应对模块化编程的相关挑战。
20 0
|
8天前
|
Python
Python金融应用编程:衍生品定价和套期保值的随机过程
Python金融应用编程:衍生品定价和套期保值的随机过程
22 0
|
8天前
|
Python
python面型对象编程进阶(继承、多态、私有化、异常捕获、类属性和类方法)(上)
python面型对象编程进阶(继承、多态、私有化、异常捕获、类属性和类方法)(上)
50 0
|
9天前
|
机器学习/深度学习 算法 定位技术
python中使用马尔可夫决策过程(MDP)动态编程来解决最短路径强化学习问题
python中使用马尔可夫决策过程(MDP)动态编程来解决最短路径强化学习问题
23 1

热门文章

最新文章