阿里云ElasticSearch索引元数据迁移-基于Python3原生类库

本文涉及的产品
Elasticsearch Serverless通用抵扣包,测试体验金 200元
简介: 阿里云ElasticSearch索引元数据迁移-基于Python3原生类库

1. 注解

参考:

  1. https://help.aliyun.com/zh/es/use-cases/use-logstash-to-migrate-full-or-incremental-data-from-self-managed-elasticsearch-to-alibaba-cloud-elasticsearch?spm=a2c4g.11186623.0.0.674a8cd6q9Z5rG

修改:

  1. 基于python3开发,使用python3默认的原生类库,无需安装额外package
  2. 增加了限制迁移、忽略索引的部分
  3. 增加了目标端Es集群已有非系统索引删除的功能,用以保证迁移索引的原子性
  4. 增加了副本数统一设置或应用默认的逻辑判断部分
  5. 增加了ES集群,DELETE方法的函数封装

2. 样例代码

#!/usr/bin/python# -*- coding: UTF-8 -*-importsysimportjsonimportrequests# 源集群host。oldClusterHost="http://es-cn-xx.public.elasticsearch.aliyuncs.com:9200"# 源集群用户名,可为空。oldClusterUserName="elastic"# 源集群密码,可为空。oldClusterPassword="xx"# 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。newClusterHost="http://es-cn-xx.public.elasticsearch.aliyuncs.com:9200"# 目标集群用户名。newClusterUser="elastic"# 目标集群密码。newClusterPassword="xx"# 是否清理目标集群用户索引,已写入数据慎用,脚本执行异常保证索引从0导入CLEAN_TARGET_INDICES=True# 统一副本数,不填即为源集群默认副本数DEFAULT_REPLICAS=None# 需要迁移的索引,不为空则仅迁移列表中的索引# transferIndex = ['insert_random','product_info_for_logstash']transferIndex= []
# 需要忽略的索引ignoreIndex= ['a0717']
# 是否忽略迁移系统索引ignoreSysIndex=TruedefhttpRequest(method, host, endpoint, params="", username="", password=""):
try:
ifmethodin ('GET', 'DELETE'):
res=requests.request(method=method,
url=host+endpoint,
auth=(username, password))
else:
res=requests.request(method=method,
url=host+endpoint,
auth=(username, password), json=json.loads(params))
assertres.status_code//100==2, f'status 异常\n{res.text}'exceptExceptionase:
print(f'es集群请求异常,参数: \nmethod:{method}\nhosts: {host}\nendpoint:{endpoint}\nparams:{params}\n')
print(e)
sys.exit(1)
returnres.textdefhttpGet(host, endpoint, username="", password=""):
returnhttpRequest("GET", host, endpoint, "", username, password)
defhttpPost(host, endpoint, params, username="", password=""):
returnhttpRequest("POST", host, endpoint, params, username, password)
defhttpPut(host, endpoint, params, username="", password=""):
returnhttpRequest("PUT", host, endpoint, params, username, password)
defhttpDelete(host, endpoint, username, password):
returnhttpRequest("DELETE", host, endpoint, "", username, password)
defgetIndices(host, username="", password="", filterOpen=True):
endpoint="/_cat/indices"indicesResult=httpGet(host, endpoint, username, password)
indicesList=indicesResult.strip().split("\n")
indexList= []
forindicesinindicesList:
infoList=indices.split()
iffilterOpen:
ifnotinfoList[1] =='open':
continueindexList.append(infoList[2])
returnindexListdefgetSettings(index, host, username="", password=""):
endpoint="/"+index+"/_settings"indexSettings=httpGet(host, endpoint, username, password)
print(index+" 原始settings如下:\n"+indexSettings)
settingsDict=json.loads(indexSettings)
## 分片数默认和源集群索引保持一致。number_of_shards=settingsDict[index]["settings"]["index"]["number_of_shards"]
## 副本数number_of_replicas=DEFAULT_REPLICASifDEFAULT_REPLICAS \
elsesettingsDict[index]["settings"]["index"]["number_of_replicas"]
newSetting="\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}"% (
number_of_shards, number_of_replicas)
returnnewSettingdefgetMapping(index, host, username="", password=""):
endpoint="/"+index+"/_mapping"indexMapping=httpGet(host, endpoint, username, password)
print(index+" 原始mapping如下:\n"+indexMapping)
mappingDict=json.loads(indexMapping)
mappings=json.dumps(mappingDict[index]["mappings"])
newMapping="\"mappings\" : "+mappingsreturnnewMappingdefcreateIndexStatement(oldIndexName):
settingStr=getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
mappingStr=getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
createstatement="{\n"+str(settingStr) +",\n"+str(mappingStr) +"\n}"returncreatestatementdefcreateIndex(oldIndexName, newIndexName=""):
if (newIndexName==""):
newIndexName=oldIndexNamecreatestatement=createIndexStatement(oldIndexName)
print("新索引 "+newIndexName+" 的setting和mapping如下:\n"+createstatement)
endpoint="/"+newIndexNamecreateResult=httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
print("新索引 "+newIndexName+" 创建结果:"+createResult)
defcleanTargetIndices():
targetIndexList=getIndices(newClusterHost, newClusterUser, newClusterPassword)
totalIndexNum=len(targetIndexList)
userIndexNum=0for_intargetIndexList:
ifnotstr(_).startswith("."):
userIndexNum+=1httpDelete(newClusterHost, f'/{_}', newClusterUser, newClusterPassword)
print(f'DELETE index : {_} .... Done')
print(f'DELETE {userIndexNum} user index of total index {totalIndexNum}')
if__name__=='__main__':
ifCLEAN_TARGET_INDICES:
print(f'\n-------------delete target cluster indices')
cleanTargetIndices()
print('-------------delete done')
else:
print(
f'\n-------------skip clean target cluster indices because of param CLEAN_TARGET_INDICES={CLEAN_TARGET_INDICES}')
indexList=getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
targetIndex= []
passIndex= []
iftransferIndex:
targetIndex=transferIndexpassIndex= [_for_intargetIndexif_notinindexList]
else:
forindexinindexList:
ifignoreSysIndexand (index.startswith(".")):
passIndex.append(index)
elifindexinignoreIndex:
passIndex.append(index)
else:
targetIndex.append(index)
iftargetIndex:
forindexintargetIndex:
print(f'\n-------------creating index: {index}')
createIndex(index, index)
ifpassIndex:
forindexinpassIndex:
print(index+" 为系统索引或被定义为忽略迁移的索引(ignoreIndex),未创建,如有需要,请单独处理~")
相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
打赏
0
0
0
0
57
分享
相关文章
CUDA重大更新:原生Python可直接编写高性能GPU程序
NVIDIA在2025年GTC大会上宣布CUDA并行计算平台正式支持原生Python编程,消除了Python开发者进入GPU加速领域的技术壁垒。这一突破通过重新设计CUDA开发模型,引入CUDA Core、cuPyNumeric、NVMath Python等核心组件,实现了Python与GPU加速的深度集成。开发者可直接用Python语法进行高性能并行计算,显著降低门槛,扩展CUDA生态,推动人工智能、科学计算等领域创新。此更新标志着CUDA向更包容的语言生态系统转型,未来还将支持Rust、Julia等语言。
177 3
CUDA重大更新:原生Python可直接编写高性能GPU程序
Python 原生爬虫教程:网络爬虫的基本概念和认知
网络爬虫是一种自动抓取互联网信息的程序,广泛应用于搜索引擎、数据采集、新闻聚合和价格监控等领域。其工作流程包括 URL 调度、HTTP 请求、页面下载、解析、数据存储及新 URL 发现。Python 因其丰富的库(如 requests、BeautifulSoup、Scrapy)和简洁语法成为爬虫开发的首选语言。然而,在使用爬虫时需注意法律与道德问题,例如遵守 robots.txt 规则、控制请求频率以及合法使用数据,以确保爬虫技术健康有序发展。
304 31
Python 原生爬虫教程:京东商品列表页面数据API
京东商品列表API是电商大数据分析的重要工具,支持开发者、商家和研究人员获取京东平台商品数据。通过关键词搜索、分类筛选、价格区间等条件,可返回多维度商品信息(如名称、价格、销量等),适用于市场调研与推荐系统开发。本文介绍其功能并提供Python请求示例。接口采用HTTP GET/POST方式,支持分页、排序等功能,满足多样化数据需求。
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
183 5
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
143 3
Python 原生爬虫教程:京东商品详情页面数据API
本文介绍京东商品详情API在电商领域的应用价值及功能。该API通过商品ID获取详细信息,如基本信息、价格、库存、描述和用户评价等,支持HTTP请求(GET/POST),返回JSON或XML格式数据。对于商家优化策略、开发者构建应用(如比价网站)以及消费者快速了解商品均有重要意义。研究此API有助于推动电商业务创新与发展。
云数据库实战:基于阿里云RDS的Python应用开发与优化
在互联网时代,数据驱动的应用已成为企业竞争力的核心。阿里云RDS为开发者提供稳定高效的数据库托管服务,支持多种数据库引擎,具备自动化管理、高可用性和弹性扩展等优势。本文通过Python应用案例,从零开始搭建基于阿里云RDS的数据库应用,详细演示连接、CRUD操作及性能优化与安全管理实践,帮助读者快速上手并提升应用性能。
我的阿里云社区年度总结报告:Python、人工智能与大数据领域的探索之旅
我的阿里云社区年度总结报告:Python、人工智能与大数据领域的探索之旅
230 35
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
255 2

热门文章

最新文章

相关产品

  • 检索分析服务 Elasticsearch版
  • 推荐镜像

    更多
    AI助理

    你好,我是AI助理

    可以解答问题、推荐解决方案等

    登录插画

    登录以查看您的控制台资源

    管理云资源
    状态一览
    快捷访问