阿里云ElasticSearch索引元数据迁移-基于Python3原生类库

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 阿里云ElasticSearch索引元数据迁移-基于Python3原生类库

1. 注解

参考:

  1. https://help.aliyun.com/zh/es/use-cases/use-logstash-to-migrate-full-or-incremental-data-from-self-managed-elasticsearch-to-alibaba-cloud-elasticsearch?spm=a2c4g.11186623.0.0.674a8cd6q9Z5rG

修改:

  1. 基于python3开发,使用python3默认的原生类库,无需安装额外package
  2. 增加了限制迁移、忽略索引的部分
  3. 增加了目标端Es集群已有非系统索引删除的功能,用以保证迁移索引的原子性
  4. 增加了副本数统一设置或应用默认的逻辑判断部分
  5. 增加了ES集群,DELETE方法的函数封装

2. 样例代码

#!/usr/bin/python# -*- coding: UTF-8 -*-importsysimportjsonimportrequests# 源集群host。oldClusterHost="http://es-cn-xx.public.elasticsearch.aliyuncs.com:9200"# 源集群用户名,可为空。oldClusterUserName="elastic"# 源集群密码,可为空。oldClusterPassword="xx"# 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。newClusterHost="http://es-cn-xx.public.elasticsearch.aliyuncs.com:9200"# 目标集群用户名。newClusterUser="elastic"# 目标集群密码。newClusterPassword="xx"# 是否清理目标集群用户索引,已写入数据慎用,脚本执行异常保证索引从0导入CLEAN_TARGET_INDICES=True# 统一副本数,不填即为源集群默认副本数DEFAULT_REPLICAS=None# 需要迁移的索引,不为空则仅迁移列表中的索引# transferIndex = ['insert_random','product_info_for_logstash']transferIndex= []
# 需要忽略的索引ignoreIndex= ['a0717']
# 是否忽略迁移系统索引ignoreSysIndex=TruedefhttpRequest(method, host, endpoint, params="", username="", password=""):
try:
ifmethodin ('GET', 'DELETE'):
res=requests.request(method=method,
url=host+endpoint,
auth=(username, password))
else:
res=requests.request(method=method,
url=host+endpoint,
auth=(username, password), json=json.loads(params))
assertres.status_code//100==2, f'status 异常\n{res.text}'exceptExceptionase:
print(f'es集群请求异常,参数: \nmethod:{method}\nhosts: {host}\nendpoint:{endpoint}\nparams:{params}\n')
print(e)
sys.exit(1)
returnres.textdefhttpGet(host, endpoint, username="", password=""):
returnhttpRequest("GET", host, endpoint, "", username, password)
defhttpPost(host, endpoint, params, username="", password=""):
returnhttpRequest("POST", host, endpoint, params, username, password)
defhttpPut(host, endpoint, params, username="", password=""):
returnhttpRequest("PUT", host, endpoint, params, username, password)
defhttpDelete(host, endpoint, username, password):
returnhttpRequest("DELETE", host, endpoint, "", username, password)
defgetIndices(host, username="", password="", filterOpen=True):
endpoint="/_cat/indices"indicesResult=httpGet(host, endpoint, username, password)
indicesList=indicesResult.strip().split("\n")
indexList= []
forindicesinindicesList:
infoList=indices.split()
iffilterOpen:
ifnotinfoList[1] =='open':
continueindexList.append(infoList[2])
returnindexListdefgetSettings(index, host, username="", password=""):
endpoint="/"+index+"/_settings"indexSettings=httpGet(host, endpoint, username, password)
print(index+" 原始settings如下:\n"+indexSettings)
settingsDict=json.loads(indexSettings)
## 分片数默认和源集群索引保持一致。number_of_shards=settingsDict[index]["settings"]["index"]["number_of_shards"]
## 副本数number_of_replicas=DEFAULT_REPLICASifDEFAULT_REPLICAS \
elsesettingsDict[index]["settings"]["index"]["number_of_replicas"]
newSetting="\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}"% (
number_of_shards, number_of_replicas)
returnnewSettingdefgetMapping(index, host, username="", password=""):
endpoint="/"+index+"/_mapping"indexMapping=httpGet(host, endpoint, username, password)
print(index+" 原始mapping如下:\n"+indexMapping)
mappingDict=json.loads(indexMapping)
mappings=json.dumps(mappingDict[index]["mappings"])
newMapping="\"mappings\" : "+mappingsreturnnewMappingdefcreateIndexStatement(oldIndexName):
settingStr=getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
mappingStr=getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
createstatement="{\n"+str(settingStr) +",\n"+str(mappingStr) +"\n}"returncreatestatementdefcreateIndex(oldIndexName, newIndexName=""):
if (newIndexName==""):
newIndexName=oldIndexNamecreatestatement=createIndexStatement(oldIndexName)
print("新索引 "+newIndexName+" 的setting和mapping如下:\n"+createstatement)
endpoint="/"+newIndexNamecreateResult=httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
print("新索引 "+newIndexName+" 创建结果:"+createResult)
defcleanTargetIndices():
targetIndexList=getIndices(newClusterHost, newClusterUser, newClusterPassword)
totalIndexNum=len(targetIndexList)
userIndexNum=0for_intargetIndexList:
ifnotstr(_).startswith("."):
userIndexNum+=1httpDelete(newClusterHost, f'/{_}', newClusterUser, newClusterPassword)
print(f'DELETE index : {_} .... Done')
print(f'DELETE {userIndexNum} user index of total index {totalIndexNum}')
if__name__=='__main__':
ifCLEAN_TARGET_INDICES:
print(f'\n-------------delete target cluster indices')
cleanTargetIndices()
print('-------------delete done')
else:
print(
f'\n-------------skip clean target cluster indices because of param CLEAN_TARGET_INDICES={CLEAN_TARGET_INDICES}')
indexList=getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
targetIndex= []
passIndex= []
iftransferIndex:
targetIndex=transferIndexpassIndex= [_for_intargetIndexif_notinindexList]
else:
forindexinindexList:
ifignoreSysIndexand (index.startswith(".")):
passIndex.append(index)
elifindexinignoreIndex:
passIndex.append(index)
else:
targetIndex.append(index)
iftargetIndex:
forindexintargetIndex:
print(f'\n-------------creating index: {index}')
createIndex(index, index)
ifpassIndex:
forindexinpassIndex:
print(index+" 为系统索引或被定义为忽略迁移的索引(ignoreIndex),未创建,如有需要,请单独处理~")
相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
2月前
|
自然语言处理 大数据 应用服务中间件
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
72 5
|
2月前
|
存储 分布式计算 大数据
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
66 3
|
4月前
|
存储 API 数据库
检索服务elasticsearch索引(Index)
【8月更文挑战第23天】
70 6
|
3天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
17 2
|
9天前
|
弹性计算 安全 开发工具
灵码评测-阿里云提供的ECS python3 sdk做安全组管理
批量变更阿里云ECS安全组策略(批量变更)
|
1月前
|
存储 缓存 监控
优化Elasticsearch 索引设计
优化Elasticsearch 索引设计
22 5
|
1月前
|
存储 JSON 关系型数据库
Elasticsearch 索引
【11月更文挑战第3天】
42 4
|
1月前
|
机器学习/深度学习 自然语言处理 API
如何使用阿里云的语音合成服务(TTS)将文本转换为语音?本文详细介绍了从注册账号、获取密钥到编写Python代码调用TTS服务的全过程
如何使用阿里云的语音合成服务(TTS)将文本转换为语音?本文详细介绍了从注册账号、获取密钥到编写Python代码调用TTS服务的全过程。通过简单的代码示例,展示如何将文本转换为自然流畅的语音,适用于有声阅读、智能客服等场景。
225 3
|
1月前
|
测试技术 API 开发工具
ElasticSearch7.6.x 模板及滚动索引创建及注意事项
ElasticSearch7.6.x 模板及滚动索引创建及注意事项
49 8
|
2月前
|
存储 安全 网络安全
Python编程--使用PyPDF解析PDF文件中的元数据
Python编程--使用PyPDF解析PDF文件中的元数据
67 1

相关产品

  • 检索分析服务 Elasticsearch版
  • 下一篇
    DataWorks