1. 注解
参考:
修改:
- 基于python3开发,使用python3默认的原生类库,无需安装额外package
- 增加了限制迁移、忽略索引的部分
- 增加了目标端Es集群已有非系统索引删除的功能,用以保证迁移索引的原子性
- 增加了副本数统一设置或应用默认的逻辑判断部分
- 增加了ES集群,DELETE方法的函数封装
2. 样例代码
#!/usr/bin/python# -*- coding: UTF-8 -*-importsysimportjsonimportrequests# 源集群host。oldClusterHost="http://es-cn-xx.public.elasticsearch.aliyuncs.com:9200"# 源集群用户名,可为空。oldClusterUserName="elastic"# 源集群密码,可为空。oldClusterPassword="xx"# 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。newClusterHost="http://es-cn-xx.public.elasticsearch.aliyuncs.com:9200"# 目标集群用户名。newClusterUser="elastic"# 目标集群密码。newClusterPassword="xx"# 是否清理目标集群用户索引,已写入数据慎用,脚本执行异常保证索引从0导入CLEAN_TARGET_INDICES=True# 统一副本数,不填即为源集群默认副本数DEFAULT_REPLICAS=None# 需要迁移的索引,不为空则仅迁移列表中的索引# transferIndex = ['insert_random','product_info_for_logstash']transferIndex= [] # 需要忽略的索引ignoreIndex= ['a0717'] # 是否忽略迁移系统索引ignoreSysIndex=TruedefhttpRequest(method, host, endpoint, params="", username="", password=""): try: ifmethodin ('GET', 'DELETE'): res=requests.request(method=method, url=host+endpoint, auth=(username, password)) else: res=requests.request(method=method, url=host+endpoint, auth=(username, password), json=json.loads(params)) assertres.status_code//100==2, f'status 异常\n{res.text}'exceptExceptionase: print(f'es集群请求异常,参数: \nmethod:{method}\nhosts: {host}\nendpoint:{endpoint}\nparams:{params}\n') print(e) sys.exit(1) returnres.textdefhttpGet(host, endpoint, username="", password=""): returnhttpRequest("GET", host, endpoint, "", username, password) defhttpPost(host, endpoint, params, username="", password=""): returnhttpRequest("POST", host, endpoint, params, username, password) defhttpPut(host, endpoint, params, username="", password=""): returnhttpRequest("PUT", host, endpoint, params, username, password) defhttpDelete(host, endpoint, username, password): returnhttpRequest("DELETE", host, endpoint, "", username, password) defgetIndices(host, username="", password="", filterOpen=True): endpoint="/_cat/indices"indicesResult=httpGet(host, endpoint, username, password) indicesList=indicesResult.strip().split("\n") indexList= [] forindicesinindicesList: infoList=indices.split() iffilterOpen: ifnotinfoList[1] =='open': continueindexList.append(infoList[2]) returnindexListdefgetSettings(index, host, username="", password=""): endpoint="/"+index+"/_settings"indexSettings=httpGet(host, endpoint, username, password) print(index+" 原始settings如下:\n"+indexSettings) settingsDict=json.loads(indexSettings) ## 分片数默认和源集群索引保持一致。number_of_shards=settingsDict[index]["settings"]["index"]["number_of_shards"] ## 副本数number_of_replicas=DEFAULT_REPLICASifDEFAULT_REPLICAS \ elsesettingsDict[index]["settings"]["index"]["number_of_replicas"] newSetting="\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}"% ( number_of_shards, number_of_replicas) returnnewSettingdefgetMapping(index, host, username="", password=""): endpoint="/"+index+"/_mapping"indexMapping=httpGet(host, endpoint, username, password) print(index+" 原始mapping如下:\n"+indexMapping) mappingDict=json.loads(indexMapping) mappings=json.dumps(mappingDict[index]["mappings"]) newMapping="\"mappings\" : "+mappingsreturnnewMappingdefcreateIndexStatement(oldIndexName): settingStr=getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword) mappingStr=getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword) createstatement="{\n"+str(settingStr) +",\n"+str(mappingStr) +"\n}"returncreatestatementdefcreateIndex(oldIndexName, newIndexName=""): if (newIndexName==""): newIndexName=oldIndexNamecreatestatement=createIndexStatement(oldIndexName) print("新索引 "+newIndexName+" 的setting和mapping如下:\n"+createstatement) endpoint="/"+newIndexNamecreateResult=httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword) print("新索引 "+newIndexName+" 创建结果:"+createResult) defcleanTargetIndices(): targetIndexList=getIndices(newClusterHost, newClusterUser, newClusterPassword) totalIndexNum=len(targetIndexList) userIndexNum=0for_intargetIndexList: ifnotstr(_).startswith("."): userIndexNum+=1httpDelete(newClusterHost, f'/{_}', newClusterUser, newClusterPassword) print(f'DELETE index : {_} .... Done') print(f'DELETE {userIndexNum} user index of total index {totalIndexNum}') if__name__=='__main__': ifCLEAN_TARGET_INDICES: print(f'\n-------------delete target cluster indices') cleanTargetIndices() print('-------------delete done') else: print( f'\n-------------skip clean target cluster indices because of param CLEAN_TARGET_INDICES={CLEAN_TARGET_INDICES}') indexList=getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword) targetIndex= [] passIndex= [] iftransferIndex: targetIndex=transferIndexpassIndex= [_for_intargetIndexif_notinindexList] else: forindexinindexList: ifignoreSysIndexand (index.startswith(".")): passIndex.append(index) elifindexinignoreIndex: passIndex.append(index) else: targetIndex.append(index) iftargetIndex: forindexintargetIndex: print(f'\n-------------creating index: {index}') createIndex(index, index) ifpassIndex: forindexinpassIndex: print(index+" 为系统索引或被定义为忽略迁移的索引(ignoreIndex),未创建,如有需要,请单独处理~")