【最佳实践】ECS自建的Elasticsearch迁移至阿里云Elasticsearch

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 本文档为您介绍将ECS自建的Elasticsearch迁移至阿里云Elasticsearch的方法,包括创建索引和数据迁移。

教程概述

本案例的整体步骤如下。

  1. 创建索引
  2. 数据迁移

同时本文档也为您介绍了一些操作过程中可能遇到的问题和解决方法,详情请参见常见问题

前提条件

参考本文档做迁移前必须先满足以下条件,如果不满足需要通过其他方案进行迁移,详情请参见Logstash部署

  • 自建Elasticsearch所在的ECS必须是VPC网络(不支持Classiclink方式打通的ECS),并且自建Elasticsearch必须与阿里云Elasticsearch在同一个VPC下。
  • 您可以通过中控机器(或者任意一台机器)执行文档中的脚本,前提是该中控机器可以同时访问新旧Elasticsearch集群的9200端口。
  • 自建Elasticsearch所在的ECS的VPC安全组不能限制IP白名单,并且需要开启9200端口。
  • 自建Elasticsearch所在的ECS的VPC安全组不能限制阿里云Elasticsearch实例的各节点IP(Kibana控制台可查看阿里云Elasticsearch实例各节点的IP)。
  • 自建Elasticsearch与阿里云Elasticsearch实例已经连通。可以在执行脚本的机器上使用curl -XGET http://<host>:9200进行验证。

创建索引

参考旧集群中需要迁移的索引配置,提前在新集群中创建索引。或者为新集群开启自动创建索引和动态映射(不建议)功能。
以Python为例,使用如下脚本在新集群中批量创建旧集群索引,默认新创建的索引副本数为0。

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 文件名:indiceCreate.py
import sys
import base64
import time
import httplib
import json
## 老集群host(ip+port)
oldClusterHost = "old-cluster.com"
## 老集群用户名,可为空
oldClusterUserName = "old-username"
## 老集群密码,可为空
oldClusterPassword = "old-password"
## 新集群host(ip+port)
newClusterHost = "new-cluster.com"
## 新集群用户名,可为空
newClusterUser = "new-username"
## 新集群密码,可为空
newClusterPassword = "new-password"
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
    conn = httplib.HTTPConnection(host)
    headers = {}
    if (username != "") :
        'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
        base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
        headers["Authorization"] = "Basic %s" % base64string;
    if "GET" == method:
        headers["Content-Type"] = "application/x-www-form-urlencoded"
        conn.request(method=method, url=endpoint, headers=headers)
    else :
        headers["Content-Type"] = "application/json"
        conn.request(method=method, url=endpoint, body=params, headers=headers)
    response = conn.getresponse()
    res = response.read()
    return res
def httpGet(host, endpoint, username="", password=""):
    return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
    return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
    return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
    endpoint = "/_cat/indices"
    indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
    indicesList = indicesResult.split("\n")
    indexList = []
    for indices in indicesList:
        if (indices.find("open") > 0):
            indexList.append(indices.split()[2])
    return indexList
def getSettings(index, host, username="", password=""):
    endpoint = "/" + index + "/_settings"
    indexSettings = httpGet(host, endpoint, username, password)
    print index + "  原始settings如下:\n" + indexSettings
    settingsDict = json.loads(indexSettings)
    ## 分片数默认和老集群索引保持一致
    number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
    ## 副本数默认为0
    number_of_replicas = DEFAULT_REPLICAS
    newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
    return newSetting
def getMapping(index, host, username="", password=""):
    endpoint = "/" + index + "/_mapping"
    indexMapping = httpGet(host, endpoint, username, password)
    print index + " 原始mapping如下:\n" + indexMapping
    mappingDict = json.loads(indexMapping)
    mappings = json.dumps(mappingDict[index]["mappings"])
    newMapping = "\"mappings\" : " + mappings
    return newMapping
def createIndexStatement(oldIndexName):
    settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
    return createstatement
def createIndex(oldIndexName, newIndexName=""):
    if (newIndexName == "") :
        newIndexName = oldIndexName
    createstatement = createIndexStatement(oldIndexName)
    print "新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement
    endpoint = "/" + newIndexName
    createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
    print "新索引 " + newIndexName + " 创建结果:" + createResult
## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:
    if (index.startswith(".")):
        systemIndex.append(index)
    else :
        createIndex(index, index)
if (len(systemIndex) > 0) :
    for index in systemIndex:
        print index + "  或许是系统索引,不会重新创建,如有需要,请单独处理~"

数据迁移

以下提供了三种数据迁移的方式供您参考。您可以根据迁移的数据量大小以及具体的操作情况,选择合适的方式进行数据迁移。

注意

  • 为保证数据迁移前后的一致性,需要上游业务停止旧集群的写操作,读服务才可以正常进行。迁移完毕后,直接切换到新集群进行读写。如果不停止写操作可能会存在迁移前后数据不一致的情况。
  • 使用下述方案迁移时,如果是通过IP + Port的方式访问旧集群,则必须在新集群的yml文件中配置reindex白名单(加入旧集群的IP地址),例如reindex.remote.whitelist: 1.1.1.1:9200,1.2.*.*:*
  • 如果使用域名访问,则不允许通过http://host:port/path这种带path的形式访问。
  • 数据量小。
    使用reindex.sh脚本。
#!/bin/bash
# file:reindex.sh
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
newClusterHost="新集群host"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
# 老集群host必须是[scheme]://[host]:[port],例如http://10.37.1.1:9200
oldClusterHost="老集群host"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d'{
    "source": {
        "remote": {
            "host": "'${oldClusterHost}'",
            "username": "'${oldClusterUser}'",
            "password": "'${oldClusterPass}'"
        },
        "index": "'${indexName}'",
        "query": {
            "match_all": {}
        }
    },
    "dest": {
       "index": "'${indexName}'"
    }
}'
  • 数据量大、无删除操作、有更新时间。
    数据量较大且无删除操作时,可以使用滚动迁移的方式,减少停止写服务的时间。滚动迁移需要有一个类似于更新时间的字段代表新数据的写时序。可以在数据迁移完成后,再停止写服务,快速更新一次。即可切换到新集群,恢复读写。
#!/bin/bash
# file: circleReindex.sh
# CONTROLLING STARTUP:
# 这是通过reindex操作远程重建索引的脚本,要求:
# 1. 新集群已经创建完索引,或者支持自动创建和动态映射。
# 2. 新集群必须在yml里配置IP白名单 reindex.remote.whitelist: 172.16.123.*:9200
# 3. host必须是[scheme]://[host]:[port]
USAGE="Usage: sh circleReindex.sh <count>
       count: 执行次数,多次(负数为循环)增量执行或者单次执行
Example:
        sh circleReindex.sh 1
        sh circleReindex.sh 5
        sh circleReindex.sh -1"
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
## http://myescluster.com
newClusterHost="新集群host"
# 老集群host必须是[scheme]://[host]:[port],例如http://10.37.1.1:9200
oldClusterHost="老集群host"
timeField="更新时间字段"
reindexTimes=0
lastTimestamp=0
curTimestamp=`date +%s`
hasError=false
function reIndexOP() {
    reindexTimes=$[${reindexTimes} + 1]
    curTimestamp=`date +%s`
    ret=`curl -u ${newClusterUser}:${newClusterPass} -XPOST "${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
        "source": {
            "remote": {
                "host": "'${oldClusterHost}'",
                "username": "'${oldClusterUser}'",
                "password": "'${oldClusterPass}'"
            },
            "index": "'${indexName}'",
            "query": {
                "range" : {
                    "'${timeField}'" : {
                        "gte" : '${lastTimestamp}',
                        "lt" : '${curTimestamp}'
                    }
                }
            }
        },
        "dest": {
            "index": "'${indexName}'"
        }
    }'`
    lastTimestamp=${curTimestamp}
    echo "第${reindexTimes}次reIndex,本次更新截止时间 ${lastTimestamp} 结果:${ret}"
    if [[ ${ret} == *error* ]]; then
        hasError=true
        echo "本次执行异常,中断后续执行操作~~,请检查"
    fi
}
function start() {
    ## 负数就不停循环执行
    if [[ $1 -lt 0 ]]; then
        while :
        do
            reIndexOP
        done
    elif [[ $1 -gt 0 ]]; then
        k=0
        while [[ k -lt $1 ]] && [[ ${hasError} == false ]]; do
            reIndexOP
            let ++k
        done
    fi
}
## main 
if [ $# -lt 1 ]; then
    echo "$USAGE"
    exit 1
fi
echo "开始执行索引 ${indexName} 的 ReIndex操作"
start $1
echo "总共执行 ${reindexTimes} 次 reIndex 操作"
  • 数据量大、无删除操作、无更新时间。
    当数据量较大,且索引的mapping中没有定义更新时间字段时,需要由上游业务修改代码添加更新时间字段。添加完成后可以先将历史数据迁移完,然后再使用上述的第二种方案。
#!/bin/bash
# file:miss.sh
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
newClusterHost="新集群host"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
# 老集群host必须是[scheme]://[host]:[port],例如http://10.37.1.1:9200
oldClusterHost="老集群host"
timeField="updatetime"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
    "source": {
        "remote": {
            "host": "'${oldClusterHost}'",
            "username": "'${oldClusterUser}'",
            "password": "'${oldClusterPass}'"
        },
        "index": "'${indexName}'",
        "query": {
            "bool": {
                "must_not": {
                    "exists": {
                        "field": "'${timeField}'"
                    }
                }
            }
        }
    },
    "dest": {
       "index": "'${indexName}'"
    }
}'
  • 不停止服务。
    敬请期待

说明

您也可以使用Logstash进行数据迁移,详情请参见 Logstash迁移Elasticsearch数据方法解读。

常见问题

  • 问题:执行curl命令时提示{"error":"Content-Type header [application/x-www-form-urlencoded] is not supported","status":406}。
    解决方法:可以在curl命令中添加-H "Content-Type: application/json"参数重试。

// 获取老集群中所有索引信息,如果没有权限可去掉"-u user:pass"参数,oldClusterHost为老集群的host,注意替换。
  curl -u user:pass -XGET http://oldClusterHost/_cat/indices | awk '{print $3}'
  // 参考上面返回的索引列表,获取需要迁移的指定用户索引的setting和mapping,注意替换indexName为要查询的用户索引名。
  curl -u user:pass -XGET http://oldClusterHost/indexName/_settings,_mapping?pretty=true
  // 参考上面获取到的对应索引的_settings,_mapping信息,在新集群中创建对应索引,索引副本数可以先设置为0,用于加快数据同步速度,数据迁移完成后再重置副本数为1。
  //其中newClusterHost是新集群的host,testindex是已经创建的索引名,testtype是对应索引的type。
  curl -u user:pass -XPUT http://<newClusterHost>/<testindex> -d '{
    "testindex" : {
        "settings" : {
            "number_of_shards" : "5", //假设老集群中对应索引的shard数是5个
            "number_of_replicas" : "0" //设置索引副本为0
          }
        },
        "mappings" : { //假设老集群中对应索引的mappings配置如下
            "testtype" : {
                "properties" : {
                    "uid" : {
                        "type" : "long"
                    },
                    "name" : {
                        "type" : "text"
                    },
                    "create_time" : {
                      "type" : "long"
                    }
                }
           }
       }
   }
}'
  • 问题:数据同步速度太慢。
    解决方法: 如果单索引数据量比较大,可以在迁移前将目的索引的副本数设置为 0,刷新时间为 -1。待数据迁移完成后,再更改回来,这样可以加快数据同步速度。
// 迁移索引数据前可以先将索引副本数设为0,不刷新,用于加快数据迁移速度。
curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
        "number_of_replicas" : 0,
        "refresh_interval" : "-1"
}'
// 索引数据迁移完成后,可以重置索引副本数为1,刷新时间1s(1s是默认值)。
curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
        "number_of_replicas" : 1,
        "refresh_interval" : "1s"
}'

说明

本文部分内容参考了官网文档

加入社群

如有其它技术问题,可通过钉钉扫一扫下面的二维码

开发者社区二维码.png

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
2月前
|
存储 人工智能 自然语言处理
Elasticsearch Inference API增加对阿里云AI的支持
本文将介绍如何在 Elasticsearch 中设置和使用阿里云的文本生成、重排序、稀疏向量和稠密向量服务,提升搜索相关性。
68 14
Elasticsearch Inference API增加对阿里云AI的支持
|
10天前
|
人工智能 弹性计算 编解码
阿里云GPU云服务器性能、应用场景及收费标准和活动价格参考
GPU云服务器作为阿里云提供的一种高性能计算服务,通过结合GPU与CPU的计算能力,为用户在人工智能、高性能计算等领域提供了强大的支持。其具备覆盖范围广、超强计算能力、网络性能出色等优势,且计费方式灵活多样,能够满足不同用户的需求。目前用户购买阿里云gpu云服务器gn5 规格族(P100-16G)、gn6i 规格族(T4-16G)、gn6v 规格族(V100-16G)有优惠,本文为大家详细介绍阿里云gpu云服务器的相关性能及收费标准与最新活动价格情况,以供参考和选择。
|
16天前
|
机器学习/深度学习 人工智能 弹性计算
什么是阿里云GPU云服务器?GPU服务器优势、使用和租赁费用整理
阿里云GPU云服务器提供强大的GPU算力,适用于深度学习、科学计算、图形可视化和视频处理等多种场景。作为亚太领先的云服务提供商,阿里云的GPU云服务器具备灵活的资源配置、高安全性和易用性,支持多种计费模式,帮助企业高效应对计算密集型任务。
|
17天前
|
存储 分布式计算 固态存储
阿里云2核16G、4核32G、8核64G配置云服务器租用收费标准与活动价格参考
2核16G、8核64G、4核32G配置的云服务器处理器与内存比为1:8,这种配比的云服务器一般适用于数据分析与挖掘,Hadoop、Spark集群和数据库,缓存等内存密集型场景,因此,多为企业级用户选择。目前2核16G配置按量收费最低收费标准为0.54元/小时,按月租用标准收费标准为260.44元/1个月。4核32G配置的阿里云服务器按量收费标准最低为1.08元/小时,按月租用标准收费标准为520.88元/1个月。8核64G配置的阿里云服务器按量收费标准最低为2.17元/小时,按月租用标准收费标准为1041.77元/1个月。本文介绍这些配置的最新租用收费标准与活动价格情况,以供参考。
|
15天前
|
机器学习/深度学习 人工智能 弹性计算
阿里云GPU服务器全解析_GPU价格收费标准_GPU优势和使用说明
阿里云GPU云服务器提供强大的GPU算力,适用于深度学习、科学计算、图形可视化和视频处理等场景。作为亚太领先的云服务商,阿里云GPU云服务器具备高灵活性、易用性、容灾备份、安全性和成本效益,支持多种实例规格,满足不同业务需求。
|
23天前
|
弹性计算
阿里云2核16G服务器多少钱一年?亲测价格查询1个月和1小时收费标准
阿里云2核16G服务器提供多种ECS实例规格,内存型r8i实例1年6折优惠价为1901元,按月收费334.19元,按小时收费0.696221元。更多规格及详细报价请访问阿里云ECS页面。
62 9
|
20天前
|
监控 Ubuntu Linux
使用VSCode通过SSH远程登录阿里云Linux服务器异常崩溃
通过 VSCode 的 Remote - SSH 插件远程连接阿里云 Ubuntu 22 服务器时,会因高 CPU 使用率导致连接断开。经排查发现,VSCode 连接根目录 ".." 时会频繁调用"rg"(ripgrep)进行文件搜索,导致 CPU 负载过高。解决方法是将连接目录改为"root"(或其他具体的路径),避免不必要的文件检索,从而恢复正常连接。
|
23天前
|
弹性计算 异构计算
2024年阿里云GPU服务器多少钱1小时?亲测价格查询方法
2024年阿里云GPU服务器每小时收费因实例规格不同而异。可通过阿里云GPU服务器页面选择“按量付费”查看具体价格。例如,NVIDIA A100的gn7e实例为34.742元/小时,NVIDIA A10的gn7i实例为12.710156元/小时。更多详情请访问阿里云官网。
72 2
|
29天前
|
存储 弹性计算 NoSQL
"从入门到实践,全方位解析云服务器ECS的秘密——手把手教你轻松驾驭阿里云的强大计算力!"
【10月更文挑战第23天】云服务器ECS(Elastic Compute Service)是阿里云提供的基础云计算服务,允许用户在云端租用和管理虚拟服务器。ECS具有弹性伸缩、按需付费、简单易用等特点,适用于网站托管、数据库部署、大数据分析等多种场景。本文介绍ECS的基本概念、使用场景及快速上手指南。
71 3
|
2月前
|
存储 弹性计算 编解码
通过阿里云的活动租赁云服务器时如何选择实例规格?选择指南参考
新手用户通过阿里云的活动租赁云服务器的时候实例规格应该怎么选?目前在阿里云的活动中,可选的云服务器类型除了轻量应用服务器之外,云服务器的主要实例规格有经济型e、通用算力型u1和计算型c7与c8y、通用型g7与g8y、内存型r7与r8y等实例,但是对于新手来说,由于是初次购买,实例规格往往不知道怎么选择了。本文为大家展示阿里云目前活动中各云服务器实例规格性能、适用场景以及选择指南参考。

相关产品

  • 检索分析服务 Elasticsearch版