阿里云ElasticSearch索引元数据迁移-基于Python3原生类库

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 阿里云ElasticSearch索引元数据迁移-基于Python3原生类库

1. 注解

参考:

  1. https://help.aliyun.com/zh/es/use-cases/use-logstash-to-migrate-full-or-incremental-data-from-self-managed-elasticsearch-to-alibaba-cloud-elasticsearch?spm=a2c4g.11186623.0.0.674a8cd6q9Z5rG

修改:

  1. 基于python3开发,使用python3默认的原生类库,无需安装额外package
  2. 增加了限制迁移、忽略索引的部分
  3. 增加了目标端Es集群已有非系统索引删除的功能,用以保证迁移索引的原子性
  4. 增加了副本数统一设置或应用默认的逻辑判断部分
  5. 增加了ES集群,DELETE方法的函数封装

2. 样例代码

#!/usr/bin/python# -*- coding: UTF-8 -*-importsysimportjsonimportrequests# 源集群host。oldClusterHost="http://es-cn-xx.public.elasticsearch.aliyuncs.com:9200"# 源集群用户名,可为空。oldClusterUserName="elastic"# 源集群密码,可为空。oldClusterPassword="xx"# 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。newClusterHost="http://es-cn-xx.public.elasticsearch.aliyuncs.com:9200"# 目标集群用户名。newClusterUser="elastic"# 目标集群密码。newClusterPassword="xx"# 是否清理目标集群用户索引,已写入数据慎用,脚本执行异常保证索引从0导入CLEAN_TARGET_INDICES=True# 统一副本数,不填即为源集群默认副本数DEFAULT_REPLICAS=None# 需要迁移的索引,不为空则仅迁移列表中的索引# transferIndex = ['insert_random','product_info_for_logstash']transferIndex= []
# 需要忽略的索引ignoreIndex= ['a0717']
# 是否忽略迁移系统索引ignoreSysIndex=TruedefhttpRequest(method, host, endpoint, params="", username="", password=""):
try:
ifmethodin ('GET', 'DELETE'):
res=requests.request(method=method,
url=host+endpoint,
auth=(username, password))
else:
res=requests.request(method=method,
url=host+endpoint,
auth=(username, password), json=json.loads(params))
assertres.status_code//100==2, f'status 异常\n{res.text}'exceptExceptionase:
print(f'es集群请求异常,参数: \nmethod:{method}\nhosts: {host}\nendpoint:{endpoint}\nparams:{params}\n')
print(e)
sys.exit(1)
returnres.textdefhttpGet(host, endpoint, username="", password=""):
returnhttpRequest("GET", host, endpoint, "", username, password)
defhttpPost(host, endpoint, params, username="", password=""):
returnhttpRequest("POST", host, endpoint, params, username, password)
defhttpPut(host, endpoint, params, username="", password=""):
returnhttpRequest("PUT", host, endpoint, params, username, password)
defhttpDelete(host, endpoint, username, password):
returnhttpRequest("DELETE", host, endpoint, "", username, password)
defgetIndices(host, username="", password="", filterOpen=True):
endpoint="/_cat/indices"indicesResult=httpGet(host, endpoint, username, password)
indicesList=indicesResult.strip().split("\n")
indexList= []
forindicesinindicesList:
infoList=indices.split()
iffilterOpen:
ifnotinfoList[1] =='open':
continueindexList.append(infoList[2])
returnindexListdefgetSettings(index, host, username="", password=""):
endpoint="/"+index+"/_settings"indexSettings=httpGet(host, endpoint, username, password)
print(index+" 原始settings如下:\n"+indexSettings)
settingsDict=json.loads(indexSettings)
## 分片数默认和源集群索引保持一致。number_of_shards=settingsDict[index]["settings"]["index"]["number_of_shards"]
## 副本数number_of_replicas=DEFAULT_REPLICASifDEFAULT_REPLICAS \
elsesettingsDict[index]["settings"]["index"]["number_of_replicas"]
newSetting="\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}"% (
number_of_shards, number_of_replicas)
returnnewSettingdefgetMapping(index, host, username="", password=""):
endpoint="/"+index+"/_mapping"indexMapping=httpGet(host, endpoint, username, password)
print(index+" 原始mapping如下:\n"+indexMapping)
mappingDict=json.loads(indexMapping)
mappings=json.dumps(mappingDict[index]["mappings"])
newMapping="\"mappings\" : "+mappingsreturnnewMappingdefcreateIndexStatement(oldIndexName):
settingStr=getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
mappingStr=getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
createstatement="{\n"+str(settingStr) +",\n"+str(mappingStr) +"\n}"returncreatestatementdefcreateIndex(oldIndexName, newIndexName=""):
if (newIndexName==""):
newIndexName=oldIndexNamecreatestatement=createIndexStatement(oldIndexName)
print("新索引 "+newIndexName+" 的setting和mapping如下:\n"+createstatement)
endpoint="/"+newIndexNamecreateResult=httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
print("新索引 "+newIndexName+" 创建结果:"+createResult)
defcleanTargetIndices():
targetIndexList=getIndices(newClusterHost, newClusterUser, newClusterPassword)
totalIndexNum=len(targetIndexList)
userIndexNum=0for_intargetIndexList:
ifnotstr(_).startswith("."):
userIndexNum+=1httpDelete(newClusterHost, f'/{_}', newClusterUser, newClusterPassword)
print(f'DELETE index : {_} .... Done')
print(f'DELETE {userIndexNum} user index of total index {totalIndexNum}')
if__name__=='__main__':
ifCLEAN_TARGET_INDICES:
print(f'\n-------------delete target cluster indices')
cleanTargetIndices()
print('-------------delete done')
else:
print(
f'\n-------------skip clean target cluster indices because of param CLEAN_TARGET_INDICES={CLEAN_TARGET_INDICES}')
indexList=getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
targetIndex= []
passIndex= []
iftransferIndex:
targetIndex=transferIndexpassIndex= [_for_intargetIndexif_notinindexList]
else:
forindexinindexList:
ifignoreSysIndexand (index.startswith(".")):
passIndex.append(index)
elifindexinignoreIndex:
passIndex.append(index)
else:
targetIndex.append(index)
iftargetIndex:
forindexintargetIndex:
print(f'\n-------------creating index: {index}')
createIndex(index, index)
ifpassIndex:
forindexinpassIndex:
print(index+" 为系统索引或被定义为忽略迁移的索引(ignoreIndex),未创建,如有需要,请单独处理~")
相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
30天前
|
存储 自然语言处理 关系型数据库
ElasticSearch索引 和MySQL索引那个更高效实用那个更合适
ElasticSearch索引 和MySQL索引那个更高效实用那个更合适
38 0
|
2月前
|
存储 算法 NoSQL
Elasticsearch拆分索引知多少
Elasticsearch拆分索引知多少
33 0
|
2月前
|
存储 容灾 安全
在阿里云RDS(Relational Database Service)迁移前准备目标区域选择
在阿里云RDS(Relational Database Service)迁移前准备目标区域选择
25 3
|
1月前
|
JSON 监控 数据管理
【Elasticsearch专栏 12】深入探索:Elasticsearch使用索引生命周期管理(ILM)自动化删除旧数据
Elasticsearch的ILM功能允许用户定义策略,自动管理索引从创建到删除的生命周期。用户可以设置策略,根据索引年龄或大小自动删除旧数据,节省存储空间。通过应用ILM策略于索引模板,新索引将遵循预定义的生命周期。用户还可以监控ILM状态,确保策略按预期执行。使用ILM,用户可以高效地管理数据,确保旧数据及时删除,同时保持数据完整性和安全性。
|
2月前
|
存储 自然语言处理 搜索推荐
【Elasticsearch专栏 01】深入探索:Elasticsearch的正向索引和倒排索引是什么?
正向索引根据文档ID直接查找文档内容,适用于精确匹配场景;而倒排索引则基于文档内容构建,通过关键词快速定位相关文档,适用于全文搜索,显著提高查询效率,是搜索引擎的核心技术。
|
2天前
|
存储 SQL 缓存
阿里云大学考试python中级题目及解析-python中级
阿里云大学考试python中级题目及解析-python中级
|
2天前
|
安全 API 数据安全/隐私保护
Elasticsearch 通过索引阻塞实现数据保护深入解析
Elasticsearch 通过索引阻塞实现数据保护深入解析
|
18天前
|
消息中间件 人工智能 监控
|
25天前
|
机器学习/深度学习 分布式计算 数据挖掘
阿里云 MaxCompute MaxFrame 开启免费邀测,统一 Python 开发生态
阿里云 MaxCompute MaxFrame 正式开启邀测,统一 Python 开发生态,打破大数据及 AI 开发使用边界。
309 1
|
1月前
|
SQL 数据可视化 Apache
阿里云数据库内核 Apache Doris 兼容 Presto、Trino、ClickHouse、Hive 等近十种 SQL 方言,助力业务平滑迁移
阿里云数据库 SelectDB 内核 Doris 的 SQL 方言转换工具, Doris SQL Convertor 致力于提供高效、稳定的 SQL 迁移解决方案,满足用户多样化的业务需求。兼容 Presto、Trino、ClickHouse、Hive 等近十种 SQL 方言,助力业务平滑迁移。
阿里云数据库内核 Apache Doris 兼容 Presto、Trino、ClickHouse、Hive 等近十种 SQL 方言,助力业务平滑迁移

相关产品

  • 检索分析服务 Elasticsearch版