【最佳实践】ECS自建的Elasticsearch迁移至阿里云Elasticsearch

简介: 本文档为您介绍将ECS自建的Elasticsearch迁移至阿里云Elasticsearch的方法,包括创建索引和数据迁移。

教程概述

本案例的整体步骤如下。

  1. 创建索引
  2. 数据迁移

同时本文档也为您介绍了一些操作过程中可能遇到的问题和解决方法,详情请参见常见问题

前提条件

参考本文档做迁移前必须先满足以下条件,如果不满足需要通过其他方案进行迁移,详情请参见Logstash部署

  • 自建Elasticsearch所在的ECS必须是VPC网络(不支持Classiclink方式打通的ECS),并且自建Elasticsearch必须与阿里云Elasticsearch在同一个VPC下。
  • 您可以通过中控机器(或者任意一台机器)执行文档中的脚本,前提是该中控机器可以同时访问新旧Elasticsearch集群的9200端口。
  • 自建Elasticsearch所在的ECS的VPC安全组不能限制IP白名单,并且需要开启9200端口。
  • 自建Elasticsearch所在的ECS的VPC安全组不能限制阿里云Elasticsearch实例的各节点IP(Kibana控制台可查看阿里云Elasticsearch实例各节点的IP)。
  • 自建Elasticsearch与阿里云Elasticsearch实例已经连通。可以在执行脚本的机器上使用curl -XGET http://<host>:9200进行验证。

创建索引

参考旧集群中需要迁移的索引配置,提前在新集群中创建索引。或者为新集群开启自动创建索引和动态映射(不建议)功能。
以Python为例,使用如下脚本在新集群中批量创建旧集群索引,默认新创建的索引副本数为0。

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 文件名:indiceCreate.py
import sys
import base64
import time
import httplib
import json
## 老集群host(ip+port)
oldClusterHost = "old-cluster.com"
## 老集群用户名,可为空
oldClusterUserName = "old-username"
## 老集群密码,可为空
oldClusterPassword = "old-password"
## 新集群host(ip+port)
newClusterHost = "new-cluster.com"
## 新集群用户名,可为空
newClusterUser = "new-username"
## 新集群密码,可为空
newClusterPassword = "new-password"
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
    conn = httplib.HTTPConnection(host)
    headers = {}
    if (username != "") :
        'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
        base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
        headers["Authorization"] = "Basic %s" % base64string;
    if "GET" == method:
        headers["Content-Type"] = "application/x-www-form-urlencoded"
        conn.request(method=method, url=endpoint, headers=headers)
    else :
        headers["Content-Type"] = "application/json"
        conn.request(method=method, url=endpoint, body=params, headers=headers)
    response = conn.getresponse()
    res = response.read()
    return res
def httpGet(host, endpoint, username="", password=""):
    return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
    return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
    return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
    endpoint = "/_cat/indices"
    indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
    indicesList = indicesResult.split("\n")
    indexList = []
    for indices in indicesList:
        if (indices.find("open") > 0):
            indexList.append(indices.split()[2])
    return indexList
def getSettings(index, host, username="", password=""):
    endpoint = "/" + index + "/_settings"
    indexSettings = httpGet(host, endpoint, username, password)
    print index + "  原始settings如下:\n" + indexSettings
    settingsDict = json.loads(indexSettings)
    ## 分片数默认和老集群索引保持一致
    number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
    ## 副本数默认为0
    number_of_replicas = DEFAULT_REPLICAS
    newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
    return newSetting
def getMapping(index, host, username="", password=""):
    endpoint = "/" + index + "/_mapping"
    indexMapping = httpGet(host, endpoint, username, password)
    print index + " 原始mapping如下:\n" + indexMapping
    mappingDict = json.loads(indexMapping)
    mappings = json.dumps(mappingDict[index]["mappings"])
    newMapping = "\"mappings\" : " + mappings
    return newMapping
def createIndexStatement(oldIndexName):
    settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
    return createstatement
def createIndex(oldIndexName, newIndexName=""):
    if (newIndexName == "") :
        newIndexName = oldIndexName
    createstatement = createIndexStatement(oldIndexName)
    print "新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement
    endpoint = "/" + newIndexName
    createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
    print "新索引 " + newIndexName + " 创建结果:" + createResult
## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:
    if (index.startswith(".")):
        systemIndex.append(index)
    else :
        createIndex(index, index)
if (len(systemIndex) > 0) :
    for index in systemIndex:
        print index + "  或许是系统索引,不会重新创建,如有需要,请单独处理~"

数据迁移

以下提供了三种数据迁移的方式供您参考。您可以根据迁移的数据量大小以及具体的操作情况,选择合适的方式进行数据迁移。

注意

  • 为保证数据迁移前后的一致性,需要上游业务停止旧集群的写操作,读服务才可以正常进行。迁移完毕后,直接切换到新集群进行读写。如果不停止写操作可能会存在迁移前后数据不一致的情况。
  • 使用下述方案迁移时,如果是通过IP + Port的方式访问旧集群,则必须在新集群的yml文件中配置reindex白名单(加入旧集群的IP地址),例如reindex.remote.whitelist: 1.1.1.1:9200,1.2.*.*:*
  • 如果使用域名访问,则不允许通过http://host:port/path这种带path的形式访问。
  • 数据量小。
    使用reindex.sh脚本。
#!/bin/bash
# file:reindex.sh
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
newClusterHost="新集群host"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
# 老集群host必须是[scheme]://[host]:[port],例如http://10.37.1.1:9200
oldClusterHost="老集群host"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d'{
    "source": {
        "remote": {
            "host": "'${oldClusterHost}'",
            "username": "'${oldClusterUser}'",
            "password": "'${oldClusterPass}'"
        },
        "index": "'${indexName}'",
        "query": {
            "match_all": {}
        }
    },
    "dest": {
       "index": "'${indexName}'"
    }
}'
  • 数据量大、无删除操作、有更新时间。
    数据量较大且无删除操作时,可以使用滚动迁移的方式,减少停止写服务的时间。滚动迁移需要有一个类似于更新时间的字段代表新数据的写时序。可以在数据迁移完成后,再停止写服务,快速更新一次。即可切换到新集群,恢复读写。
#!/bin/bash
# file: circleReindex.sh
# CONTROLLING STARTUP:
# 这是通过reindex操作远程重建索引的脚本,要求:
# 1. 新集群已经创建完索引,或者支持自动创建和动态映射。
# 2. 新集群必须在yml里配置IP白名单 reindex.remote.whitelist: 172.16.123.*:9200
# 3. host必须是[scheme]://[host]:[port]
USAGE="Usage: sh circleReindex.sh <count>
       count: 执行次数,多次(负数为循环)增量执行或者单次执行
Example:
        sh circleReindex.sh 1
        sh circleReindex.sh 5
        sh circleReindex.sh -1"
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
## http://myescluster.com
newClusterHost="新集群host"
# 老集群host必须是[scheme]://[host]:[port],例如http://10.37.1.1:9200
oldClusterHost="老集群host"
timeField="更新时间字段"
reindexTimes=0
lastTimestamp=0
curTimestamp=`date +%s`
hasError=false
function reIndexOP() {
    reindexTimes=$[${reindexTimes} + 1]
    curTimestamp=`date +%s`
    ret=`curl -u ${newClusterUser}:${newClusterPass} -XPOST "${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
        "source": {
            "remote": {
                "host": "'${oldClusterHost}'",
                "username": "'${oldClusterUser}'",
                "password": "'${oldClusterPass}'"
            },
            "index": "'${indexName}'",
            "query": {
                "range" : {
                    "'${timeField}'" : {
                        "gte" : '${lastTimestamp}',
                        "lt" : '${curTimestamp}'
                    }
                }
            }
        },
        "dest": {
            "index": "'${indexName}'"
        }
    }'`
    lastTimestamp=${curTimestamp}
    echo "第${reindexTimes}次reIndex,本次更新截止时间 ${lastTimestamp} 结果:${ret}"
    if [[ ${ret} == *error* ]]; then
        hasError=true
        echo "本次执行异常,中断后续执行操作~~,请检查"
    fi
}
function start() {
    ## 负数就不停循环执行
    if [[ $1 -lt 0 ]]; then
        while :
        do
            reIndexOP
        done
    elif [[ $1 -gt 0 ]]; then
        k=0
        while [[ k -lt $1 ]] && [[ ${hasError} == false ]]; do
            reIndexOP
            let ++k
        done
    fi
}
## main 
if [ $# -lt 1 ]; then
    echo "$USAGE"
    exit 1
fi
echo "开始执行索引 ${indexName} 的 ReIndex操作"
start $1
echo "总共执行 ${reindexTimes} 次 reIndex 操作"
  • 数据量大、无删除操作、无更新时间。
    当数据量较大,且索引的mapping中没有定义更新时间字段时,需要由上游业务修改代码添加更新时间字段。添加完成后可以先将历史数据迁移完,然后再使用上述的第二种方案。
#!/bin/bash
# file:miss.sh
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
newClusterHost="新集群host"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
# 老集群host必须是[scheme]://[host]:[port],例如http://10.37.1.1:9200
oldClusterHost="老集群host"
timeField="updatetime"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
    "source": {
        "remote": {
            "host": "'${oldClusterHost}'",
            "username": "'${oldClusterUser}'",
            "password": "'${oldClusterPass}'"
        },
        "index": "'${indexName}'",
        "query": {
            "bool": {
                "must_not": {
                    "exists": {
                        "field": "'${timeField}'"
                    }
                }
            }
        }
    },
    "dest": {
       "index": "'${indexName}'"
    }
}'
  • 不停止服务。
    敬请期待

说明

您也可以使用Logstash进行数据迁移,详情请参见 Logstash迁移Elasticsearch数据方法解读。

常见问题

  • 问题:执行curl命令时提示{"error":"Content-Type header [application/x-www-form-urlencoded] is not supported","status":406}。
    解决方法:可以在curl命令中添加-H "Content-Type: application/json"参数重试。

// 获取老集群中所有索引信息,如果没有权限可去掉"-u user:pass"参数,oldClusterHost为老集群的host,注意替换。
  curl -u user:pass -XGET http://oldClusterHost/_cat/indices | awk '{print $3}'
  // 参考上面返回的索引列表,获取需要迁移的指定用户索引的setting和mapping,注意替换indexName为要查询的用户索引名。
  curl -u user:pass -XGET http://oldClusterHost/indexName/_settings,_mapping?pretty=true
  // 参考上面获取到的对应索引的_settings,_mapping信息,在新集群中创建对应索引,索引副本数可以先设置为0,用于加快数据同步速度,数据迁移完成后再重置副本数为1。
  //其中newClusterHost是新集群的host,testindex是已经创建的索引名,testtype是对应索引的type。
  curl -u user:pass -XPUT http://<newClusterHost>/<testindex> -d '{
    "testindex" : {
        "settings" : {
            "number_of_shards" : "5", //假设老集群中对应索引的shard数是5个
            "number_of_replicas" : "0" //设置索引副本为0
          }
        },
        "mappings" : { //假设老集群中对应索引的mappings配置如下
            "testtype" : {
                "properties" : {
                    "uid" : {
                        "type" : "long"
                    },
                    "name" : {
                        "type" : "text"
                    },
                    "create_time" : {
                      "type" : "long"
                    }
                }
           }
       }
   }
}'
  • 问题:数据同步速度太慢。
    解决方法: 如果单索引数据量比较大,可以在迁移前将目的索引的副本数设置为 0,刷新时间为 -1。待数据迁移完成后,再更改回来,这样可以加快数据同步速度。
// 迁移索引数据前可以先将索引副本数设为0,不刷新,用于加快数据迁移速度。
curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
        "number_of_replicas" : 0,
        "refresh_interval" : "-1"
}'
// 索引数据迁移完成后,可以重置索引副本数为1,刷新时间1s(1s是默认值)。
curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
        "number_of_replicas" : 1,
        "refresh_interval" : "1s"
}'

说明

本文部分内容参考了官网文档

加入社群

如有其它技术问题,可通过钉钉扫一扫下面的二维码

开发者社区二维码.png

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
14天前
|
存储 机器学习/深度学习 人工智能
2025年阿里云GPU服务器租用价格、选型策略与应用场景详解
随着AI与高性能计算需求的增长,阿里云提供了多种GPU实例,如NVIDIA V100、A10、T4等,适配不同场景。2025年重点实例中,V100实例GN6v单月3830元起,适合大规模训练;A10实例GN7i单月3213.99元起,适用于混合负载。计费模式有按量付费和包年包月,后者成本更低。针对AI训练、图形渲染及轻量级推理等场景,推荐不同配置以优化成本和性能。阿里云还提供抢占式实例、ESSD云盘等资源优化策略,支持eRDMA网络加速和倚天ARM架构,助力企业在2025年实现智能计算的效率与成本最优平衡。 (该简介为原文内容的高度概括,符合要求的字符限制。)
|
5天前
|
人工智能 负载均衡 数据可视化
阿里云出手了,DeepSeek服务器拒绝繁忙,免费部署DeepSeek模型671B满血版
阿里云推出免费部署DeepSeek模型671B满血版服务,通过百炼大模型平台,用户无需编码,最快5分钟、最低0元即可完成部署。平台提供100万免费Token,支持DeepSeek-R1和DeepSeek-V3等多款模型调用,有效解决服务器繁忙问题。新手零基础也能轻松上手,享受高效稳定的API调用和自动弹性扩展功能。教程涵盖开通服务、获取API-KEY及配置Chatbox客户端等步骤,详细指引助您快速实现DeepSeek自由。
72 18
|
1天前
|
Kubernetes 监控 Serverless
基于阿里云Serverless Kubernetes(ASK)的无服务器架构设计与实践
无服务器架构(Serverless Architecture)在云原生技术中备受关注,开发者只需专注于业务逻辑,无需管理服务器。阿里云Serverless Kubernetes(ASK)是基于Kubernetes的托管服务,提供极致弹性和按需付费能力。本文深入探讨如何使用ASK设计和实现无服务器架构,涵盖事件驱动、自动扩展、无状态设计、监控与日志及成本优化等方面,并通过图片处理服务案例展示具体实践,帮助构建高效可靠的无服务器应用。
|
12天前
|
机器学习/深度学习 人工智能 程序员
阿里云出手DeepSeek拒绝服务器繁忙,程序员直呼:真香!
阿里云PAI平台支持一键部署DeepSeek-V3和DeepSeek-R1大模型,用户无需编写代码即可完成从训练到部署的全过程。通过PAI Model Gallery,开发者可轻松选择并部署所需模型版本,享受高效、便捷的AI开发体验。教程详细介绍了开通PAI、选择模型及一键部署的具体步骤,帮助用户快速上手。
|
1天前
|
安全 JavaScript 测试技术
阿里云轻量应用服务器38元1年性能、适用场景简单测评
在阿里云目前的活动中,轻量云服务器2核2G200M峰值带宽每天10点和15点抢购价只要38元一年,e实例云服务器2核2G3M带宽99元1年,u1实例2核4G5M带宽199元一年。其中,阿里云轻量应用服务器38元1年的抢购价,让不少用户心动不已。那么,这款特价轻量应用服务器到底怎么样?是否值得购买呢?本文将从配置、性能、适用场景、价格优势等多个方面对这款轻量应用服务器做个介绍,以供参考。
阿里云轻量应用服务器38元1年性能、适用场景简单测评
|
7天前
|
人工智能 负载均衡 数据可视化
阿里云出手了,基于百炼一键部署DeepSeek满血版,告别服务器繁忙1
阿里云百炼平台推出一键部署DeepSeek-R1满血版671B模型,提供100万免费Token,无需编码,新手5分钟内即可完成部署。通过Chatbox客户端配置API,轻松实现模型调用,解决服务器繁忙问题,支持自动弹性扩展,降低硬件成本。详情及教程见阿里云百炼官网。
168 5
|
14天前
|
机器学习/深度学习 存储 人工智能
2025年阿里云GPU服务器的租赁价格与选型指南
随着AI、深度学习等领域的发展,GPU服务器成为企业及科研机构的核心算力选择。阿里云提供多种GPU实例类型(如NVIDIA V100、A100等),涵盖计算型、共享型和弹性裸金属等,满足不同场景需求。本文详解2025年阿里云GPU服务器的核心配置、价格策略及适用场景,帮助用户优化选型与成本控制,实现高效智能计算。
|
7天前
|
存储 安全 网络安全
阿里云国际站:阿里云服务器端口配置
悟空云@CloudWuKong阿里云是全球领先的云计算服务提供商,为用户提供弹性计算、数据库、存储、网络安全等一系列云计算服务。在使用阿里云服务器时,合理配置端口非常重要,可以提高服务器安全性和稳定性。
|
13天前
|
机器学习/深度学习 存储 弹性计算
阿里云gpu云服务器租用价格:最新收费标准及活动价格参考
阿里云gpu云服务器多少钱?A10卡GN7i GPU云服务器32核188G3213.99/1个月起,V100卡GN6v GPU云服务器8核32G3830.00/1个月起,阿里云GPU云服务器是基于GPU应用的计算服务,多适用于视频解码,图形渲染,深度学习,科学计算等应用场景,该产品具有超强计算能力、网络性能出色、购买方式灵活、高性能实例存储( GA1和GN5特有)等特点。下面小编来介绍下阿里云gpu云服务器最新的收费标准及活动价格。
|
2天前
|
存储 弹性计算 人工智能
阿里云轻量应用服务器全球上新!
阿里云轻量应用服务器全球上新!
27 0

热门文章

最新文章

相关产品

  • 检索分析服务 Elasticsearch版