阿里云ElasticSearch索引元数据迁移-基于Python3原生类库

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 阿里云ElasticSearch索引元数据迁移-基于Python3原生类库

1. 注解

参考:

  1. https://help.aliyun.com/zh/es/use-cases/use-logstash-to-migrate-full-or-incremental-data-from-self-managed-elasticsearch-to-alibaba-cloud-elasticsearch?spm=a2c4g.11186623.0.0.674a8cd6q9Z5rG

修改:

  1. 基于python3开发,使用python3默认的原生类库,无需安装额外package
  2. 增加了限制迁移、忽略索引的部分
  3. 增加了目标端Es集群已有非系统索引删除的功能,用以保证迁移索引的原子性
  4. 增加了副本数统一设置或应用默认的逻辑判断部分
  5. 增加了ES集群,DELETE方法的函数封装

2. 样例代码

#!/usr/bin/python# -*- coding: UTF-8 -*-importsysimportjsonimportrequests# 源集群host。oldClusterHost="http://es-cn-xx.public.elasticsearch.aliyuncs.com:9200"# 源集群用户名,可为空。oldClusterUserName="elastic"# 源集群密码,可为空。oldClusterPassword="xx"# 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。newClusterHost="http://es-cn-xx.public.elasticsearch.aliyuncs.com:9200"# 目标集群用户名。newClusterUser="elastic"# 目标集群密码。newClusterPassword="xx"# 是否清理目标集群用户索引,已写入数据慎用,脚本执行异常保证索引从0导入CLEAN_TARGET_INDICES=True# 统一副本数,不填即为源集群默认副本数DEFAULT_REPLICAS=None# 需要迁移的索引,不为空则仅迁移列表中的索引# transferIndex = ['insert_random','product_info_for_logstash']transferIndex= []
# 需要忽略的索引ignoreIndex= ['a0717']
# 是否忽略迁移系统索引ignoreSysIndex=TruedefhttpRequest(method, host, endpoint, params="", username="", password=""):
try:
ifmethodin ('GET', 'DELETE'):
res=requests.request(method=method,
url=host+endpoint,
auth=(username, password))
else:
res=requests.request(method=method,
url=host+endpoint,
auth=(username, password), json=json.loads(params))
assertres.status_code//100==2, f'status 异常\n{res.text}'exceptExceptionase:
print(f'es集群请求异常,参数: \nmethod:{method}\nhosts: {host}\nendpoint:{endpoint}\nparams:{params}\n')
print(e)
sys.exit(1)
returnres.textdefhttpGet(host, endpoint, username="", password=""):
returnhttpRequest("GET", host, endpoint, "", username, password)
defhttpPost(host, endpoint, params, username="", password=""):
returnhttpRequest("POST", host, endpoint, params, username, password)
defhttpPut(host, endpoint, params, username="", password=""):
returnhttpRequest("PUT", host, endpoint, params, username, password)
defhttpDelete(host, endpoint, username, password):
returnhttpRequest("DELETE", host, endpoint, "", username, password)
defgetIndices(host, username="", password="", filterOpen=True):
endpoint="/_cat/indices"indicesResult=httpGet(host, endpoint, username, password)
indicesList=indicesResult.strip().split("\n")
indexList= []
forindicesinindicesList:
infoList=indices.split()
iffilterOpen:
ifnotinfoList[1] =='open':
continueindexList.append(infoList[2])
returnindexListdefgetSettings(index, host, username="", password=""):
endpoint="/"+index+"/_settings"indexSettings=httpGet(host, endpoint, username, password)
print(index+" 原始settings如下:\n"+indexSettings)
settingsDict=json.loads(indexSettings)
## 分片数默认和源集群索引保持一致。number_of_shards=settingsDict[index]["settings"]["index"]["number_of_shards"]
## 副本数number_of_replicas=DEFAULT_REPLICASifDEFAULT_REPLICAS \
elsesettingsDict[index]["settings"]["index"]["number_of_replicas"]
newSetting="\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}"% (
number_of_shards, number_of_replicas)
returnnewSettingdefgetMapping(index, host, username="", password=""):
endpoint="/"+index+"/_mapping"indexMapping=httpGet(host, endpoint, username, password)
print(index+" 原始mapping如下:\n"+indexMapping)
mappingDict=json.loads(indexMapping)
mappings=json.dumps(mappingDict[index]["mappings"])
newMapping="\"mappings\" : "+mappingsreturnnewMappingdefcreateIndexStatement(oldIndexName):
settingStr=getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
mappingStr=getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
createstatement="{\n"+str(settingStr) +",\n"+str(mappingStr) +"\n}"returncreatestatementdefcreateIndex(oldIndexName, newIndexName=""):
if (newIndexName==""):
newIndexName=oldIndexNamecreatestatement=createIndexStatement(oldIndexName)
print("新索引 "+newIndexName+" 的setting和mapping如下:\n"+createstatement)
endpoint="/"+newIndexNamecreateResult=httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
print("新索引 "+newIndexName+" 创建结果:"+createResult)
defcleanTargetIndices():
targetIndexList=getIndices(newClusterHost, newClusterUser, newClusterPassword)
totalIndexNum=len(targetIndexList)
userIndexNum=0for_intargetIndexList:
ifnotstr(_).startswith("."):
userIndexNum+=1httpDelete(newClusterHost, f'/{_}', newClusterUser, newClusterPassword)
print(f'DELETE index : {_} .... Done')
print(f'DELETE {userIndexNum} user index of total index {totalIndexNum}')
if__name__=='__main__':
ifCLEAN_TARGET_INDICES:
print(f'\n-------------delete target cluster indices')
cleanTargetIndices()
print('-------------delete done')
else:
print(
f'\n-------------skip clean target cluster indices because of param CLEAN_TARGET_INDICES={CLEAN_TARGET_INDICES}')
indexList=getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
targetIndex= []
passIndex= []
iftransferIndex:
targetIndex=transferIndexpassIndex= [_for_intargetIndexif_notinindexList]
else:
forindexinindexList:
ifignoreSysIndexand (index.startswith(".")):
passIndex.append(index)
elifindexinignoreIndex:
passIndex.append(index)
else:
targetIndex.append(index)
iftargetIndex:
forindexintargetIndex:
print(f'\n-------------creating index: {index}')
createIndex(index, index)
ifpassIndex:
forindexinpassIndex:
print(index+" 为系统索引或被定义为忽略迁移的索引(ignoreIndex),未创建,如有需要,请单独处理~")
相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
1月前
|
自然语言处理 大数据 应用服务中间件
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
61 5
|
1月前
|
存储 分布式计算 大数据
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
57 3
|
3月前
|
存储 API 数据库
检索服务elasticsearch索引(Index)
【8月更文挑战第23天】
66 6
|
8天前
|
存储 JSON 关系型数据库
Elasticsearch 索引
【11月更文挑战第3天】
25 4
|
11天前
|
机器学习/深度学习 自然语言处理 API
如何使用阿里云的语音合成服务(TTS)将文本转换为语音?本文详细介绍了从注册账号、获取密钥到编写Python代码调用TTS服务的全过程
如何使用阿里云的语音合成服务(TTS)将文本转换为语音?本文详细介绍了从注册账号、获取密钥到编写Python代码调用TTS服务的全过程。通过简单的代码示例,展示如何将文本转换为自然流畅的语音,适用于有声阅读、智能客服等场景。
54 3
|
19天前
|
测试技术 API 开发工具
ElasticSearch7.6.x 模板及滚动索引创建及注意事项
ElasticSearch7.6.x 模板及滚动索引创建及注意事项
35 8
|
1月前
|
存储 安全 网络安全
Python编程--使用PyPDF解析PDF文件中的元数据
Python编程--使用PyPDF解析PDF文件中的元数据
|
2月前
|
JSON 自然语言处理 数据库
ElasticSearch基础1——索引和文档。Kibana,RestClient操作索引和文档+黑马旅游ES库导入
概念、ik分词器、倒排索引、索引和文档的增删改查、RestClient对索引和文档的增删改查
ElasticSearch基础1——索引和文档。Kibana,RestClient操作索引和文档+黑马旅游ES库导入
|
2月前
|
存储 搜索推荐 数据建模
Elasticsearch 的数据建模与索引设计
【9月更文第3天】Elasticsearch 是一个基于 Lucene 的搜索引擎,广泛应用于全文检索、数据分析等领域。为了确保 Elasticsearch 的高效运行,合理的数据建模和索引设计至关重要。本文将探讨如何为不同的应用场景设计高效的索引结构,并分享一些数据建模的最佳实践。
112 2
|
3月前
|
SQL 关系型数据库 API
Python 开发环境的准备以及一些常用类库模块的安装
在学习和开发Python的时候,第一步的工作就是先准备好开发环境,包括相关常用的插件,以及一些辅助工具,这样我们在后续的开发工作中,才能做到事半功倍。下面介绍一些Python 开发环境的准备以及一些常用类库模块的安装和使用的经验总结,供大家参考了解。

相关产品

  • 检索分析服务 Elasticsearch版