【最佳实践】ECS自建的Elasticsearch迁移至阿里云Elasticsearch

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 本文档为您介绍将ECS自建的Elasticsearch迁移至阿里云Elasticsearch的方法,包括创建索引和数据迁移。

教程概述

本案例的整体步骤如下。

  1. 创建索引
  2. 数据迁移

同时本文档也为您介绍了一些操作过程中可能遇到的问题和解决方法,详情请参见常见问题

前提条件

参考本文档做迁移前必须先满足以下条件,如果不满足需要通过其他方案进行迁移,详情请参见Logstash部署

  • 自建Elasticsearch所在的ECS必须是VPC网络(不支持Classiclink方式打通的ECS),并且自建Elasticsearch必须与阿里云Elasticsearch在同一个VPC下。
  • 您可以通过中控机器(或者任意一台机器)执行文档中的脚本,前提是该中控机器可以同时访问新旧Elasticsearch集群的9200端口。
  • 自建Elasticsearch所在的ECS的VPC安全组不能限制IP白名单,并且需要开启9200端口。
  • 自建Elasticsearch所在的ECS的VPC安全组不能限制阿里云Elasticsearch实例的各节点IP(Kibana控制台可查看阿里云Elasticsearch实例各节点的IP)。
  • 自建Elasticsearch与阿里云Elasticsearch实例已经连通。可以在执行脚本的机器上使用curl -XGET http://<host>:9200进行验证。

创建索引

参考旧集群中需要迁移的索引配置,提前在新集群中创建索引。或者为新集群开启自动创建索引和动态映射(不建议)功能。
以Python为例,使用如下脚本在新集群中批量创建旧集群索引,默认新创建的索引副本数为0。

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 文件名:indiceCreate.py
import sys
import base64
import time
import httplib
import json
## 老集群host(ip+port)
oldClusterHost = "old-cluster.com"
## 老集群用户名,可为空
oldClusterUserName = "old-username"
## 老集群密码,可为空
oldClusterPassword = "old-password"
## 新集群host(ip+port)
newClusterHost = "new-cluster.com"
## 新集群用户名,可为空
newClusterUser = "new-username"
## 新集群密码,可为空
newClusterPassword = "new-password"
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
    conn = httplib.HTTPConnection(host)
    headers = {}
    if (username != "") :
        'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
        base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
        headers["Authorization"] = "Basic %s" % base64string;
    if "GET" == method:
        headers["Content-Type"] = "application/x-www-form-urlencoded"
        conn.request(method=method, url=endpoint, headers=headers)
    else :
        headers["Content-Type"] = "application/json"
        conn.request(method=method, url=endpoint, body=params, headers=headers)
    response = conn.getresponse()
    res = response.read()
    return res
def httpGet(host, endpoint, username="", password=""):
    return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
    return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
    return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
    endpoint = "/_cat/indices"
    indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
    indicesList = indicesResult.split("\n")
    indexList = []
    for indices in indicesList:
        if (indices.find("open") > 0):
            indexList.append(indices.split()[2])
    return indexList
def getSettings(index, host, username="", password=""):
    endpoint = "/" + index + "/_settings"
    indexSettings = httpGet(host, endpoint, username, password)
    print index + "  原始settings如下:\n" + indexSettings
    settingsDict = json.loads(indexSettings)
    ## 分片数默认和老集群索引保持一致
    number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
    ## 副本数默认为0
    number_of_replicas = DEFAULT_REPLICAS
    newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
    return newSetting
def getMapping(index, host, username="", password=""):
    endpoint = "/" + index + "/_mapping"
    indexMapping = httpGet(host, endpoint, username, password)
    print index + " 原始mapping如下:\n" + indexMapping
    mappingDict = json.loads(indexMapping)
    mappings = json.dumps(mappingDict[index]["mappings"])
    newMapping = "\"mappings\" : " + mappings
    return newMapping
def createIndexStatement(oldIndexName):
    settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
    return createstatement
def createIndex(oldIndexName, newIndexName=""):
    if (newIndexName == "") :
        newIndexName = oldIndexName
    createstatement = createIndexStatement(oldIndexName)
    print "新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement
    endpoint = "/" + newIndexName
    createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
    print "新索引 " + newIndexName + " 创建结果:" + createResult
## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:
    if (index.startswith(".")):
        systemIndex.append(index)
    else :
        createIndex(index, index)
if (len(systemIndex) > 0) :
    for index in systemIndex:
        print index + "  或许是系统索引,不会重新创建,如有需要,请单独处理~"

数据迁移

以下提供了三种数据迁移的方式供您参考。您可以根据迁移的数据量大小以及具体的操作情况,选择合适的方式进行数据迁移。

注意

  • 为保证数据迁移前后的一致性,需要上游业务停止旧集群的写操作,读服务才可以正常进行。迁移完毕后,直接切换到新集群进行读写。如果不停止写操作可能会存在迁移前后数据不一致的情况。
  • 使用下述方案迁移时,如果是通过IP + Port的方式访问旧集群,则必须在新集群的yml文件中配置reindex白名单(加入旧集群的IP地址),例如reindex.remote.whitelist: 1.1.1.1:9200,1.2.*.*:*
  • 如果使用域名访问,则不允许通过http://host:port/path这种带path的形式访问。
  • 数据量小。
    使用reindex.sh脚本。
#!/bin/bash
# file:reindex.sh
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
newClusterHost="新集群host"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
# 老集群host必须是[scheme]://[host]:[port],例如http://10.37.1.1:9200
oldClusterHost="老集群host"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d'{
    "source": {
        "remote": {
            "host": "'${oldClusterHost}'",
            "username": "'${oldClusterUser}'",
            "password": "'${oldClusterPass}'"
        },
        "index": "'${indexName}'",
        "query": {
            "match_all": {}
        }
    },
    "dest": {
       "index": "'${indexName}'"
    }
}'
  • 数据量大、无删除操作、有更新时间。
    数据量较大且无删除操作时,可以使用滚动迁移的方式,减少停止写服务的时间。滚动迁移需要有一个类似于更新时间的字段代表新数据的写时序。可以在数据迁移完成后,再停止写服务,快速更新一次。即可切换到新集群,恢复读写。
#!/bin/bash
# file: circleReindex.sh
# CONTROLLING STARTUP:
# 这是通过reindex操作远程重建索引的脚本,要求:
# 1. 新集群已经创建完索引,或者支持自动创建和动态映射。
# 2. 新集群必须在yml里配置IP白名单 reindex.remote.whitelist: 172.16.123.*:9200
# 3. host必须是[scheme]://[host]:[port]
USAGE="Usage: sh circleReindex.sh <count>
       count: 执行次数,多次(负数为循环)增量执行或者单次执行
Example:
        sh circleReindex.sh 1
        sh circleReindex.sh 5
        sh circleReindex.sh -1"
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
## http://myescluster.com
newClusterHost="新集群host"
# 老集群host必须是[scheme]://[host]:[port],例如http://10.37.1.1:9200
oldClusterHost="老集群host"
timeField="更新时间字段"
reindexTimes=0
lastTimestamp=0
curTimestamp=`date +%s`
hasError=false
function reIndexOP() {
    reindexTimes=$[${reindexTimes} + 1]
    curTimestamp=`date +%s`
    ret=`curl -u ${newClusterUser}:${newClusterPass} -XPOST "${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
        "source": {
            "remote": {
                "host": "'${oldClusterHost}'",
                "username": "'${oldClusterUser}'",
                "password": "'${oldClusterPass}'"
            },
            "index": "'${indexName}'",
            "query": {
                "range" : {
                    "'${timeField}'" : {
                        "gte" : '${lastTimestamp}',
                        "lt" : '${curTimestamp}'
                    }
                }
            }
        },
        "dest": {
            "index": "'${indexName}'"
        }
    }'`
    lastTimestamp=${curTimestamp}
    echo "第${reindexTimes}次reIndex,本次更新截止时间 ${lastTimestamp} 结果:${ret}"
    if [[ ${ret} == *error* ]]; then
        hasError=true
        echo "本次执行异常,中断后续执行操作~~,请检查"
    fi
}
function start() {
    ## 负数就不停循环执行
    if [[ $1 -lt 0 ]]; then
        while :
        do
            reIndexOP
        done
    elif [[ $1 -gt 0 ]]; then
        k=0
        while [[ k -lt $1 ]] && [[ ${hasError} == false ]]; do
            reIndexOP
            let ++k
        done
    fi
}
## main 
if [ $# -lt 1 ]; then
    echo "$USAGE"
    exit 1
fi
echo "开始执行索引 ${indexName} 的 ReIndex操作"
start $1
echo "总共执行 ${reindexTimes} 次 reIndex 操作"
  • 数据量大、无删除操作、无更新时间。
    当数据量较大,且索引的mapping中没有定义更新时间字段时,需要由上游业务修改代码添加更新时间字段。添加完成后可以先将历史数据迁移完,然后再使用上述的第二种方案。
#!/bin/bash
# file:miss.sh
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
newClusterHost="新集群host"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
# 老集群host必须是[scheme]://[host]:[port],例如http://10.37.1.1:9200
oldClusterHost="老集群host"
timeField="updatetime"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
    "source": {
        "remote": {
            "host": "'${oldClusterHost}'",
            "username": "'${oldClusterUser}'",
            "password": "'${oldClusterPass}'"
        },
        "index": "'${indexName}'",
        "query": {
            "bool": {
                "must_not": {
                    "exists": {
                        "field": "'${timeField}'"
                    }
                }
            }
        }
    },
    "dest": {
       "index": "'${indexName}'"
    }
}'
  • 不停止服务。
    敬请期待

说明

您也可以使用Logstash进行数据迁移,详情请参见 Logstash迁移Elasticsearch数据方法解读。

常见问题

  • 问题:执行curl命令时提示{"error":"Content-Type header [application/x-www-form-urlencoded] is not supported","status":406}。
    解决方法:可以在curl命令中添加-H "Content-Type: application/json"参数重试。

// 获取老集群中所有索引信息,如果没有权限可去掉"-u user:pass"参数,oldClusterHost为老集群的host,注意替换。
  curl -u user:pass -XGET http://oldClusterHost/_cat/indices | awk '{print $3}'
  // 参考上面返回的索引列表,获取需要迁移的指定用户索引的setting和mapping,注意替换indexName为要查询的用户索引名。
  curl -u user:pass -XGET http://oldClusterHost/indexName/_settings,_mapping?pretty=true
  // 参考上面获取到的对应索引的_settings,_mapping信息,在新集群中创建对应索引,索引副本数可以先设置为0,用于加快数据同步速度,数据迁移完成后再重置副本数为1。
  //其中newClusterHost是新集群的host,testindex是已经创建的索引名,testtype是对应索引的type。
  curl -u user:pass -XPUT http://<newClusterHost>/<testindex> -d '{
    "testindex" : {
        "settings" : {
            "number_of_shards" : "5", //假设老集群中对应索引的shard数是5个
            "number_of_replicas" : "0" //设置索引副本为0
          }
        },
        "mappings" : { //假设老集群中对应索引的mappings配置如下
            "testtype" : {
                "properties" : {
                    "uid" : {
                        "type" : "long"
                    },
                    "name" : {
                        "type" : "text"
                    },
                    "create_time" : {
                      "type" : "long"
                    }
                }
           }
       }
   }
}'
  • 问题:数据同步速度太慢。
    解决方法: 如果单索引数据量比较大,可以在迁移前将目的索引的副本数设置为 0,刷新时间为 -1。待数据迁移完成后,再更改回来,这样可以加快数据同步速度。
// 迁移索引数据前可以先将索引副本数设为0,不刷新,用于加快数据迁移速度。
curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
        "number_of_replicas" : 0,
        "refresh_interval" : "-1"
}'
// 索引数据迁移完成后,可以重置索引副本数为1,刷新时间1s(1s是默认值)。
curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
        "number_of_replicas" : 1,
        "refresh_interval" : "1s"
}'

说明

本文部分内容参考了官网文档

加入社群

如有其它技术问题,可通过钉钉扫一扫下面的二维码

开发者社区二维码.png

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
7月前
|
存储 监控 搜索推荐
在生产环境中部署Elasticsearch:最佳实践和故障排除技巧——安装篇(一)
在生产环境中部署Elasticsearch:最佳实践和故障排除技巧——安装篇(一)
|
7天前
|
机器学习/深度学习 人工智能 运维
阿里云技术公开课直播预告:基于阿里云 Elasticsearch 构建 AI 搜索和可观测 Chatbot
阿里云技术公开课预告:Elastic和阿里云搜索技术专家将深入解读阿里云Elasticsearch Enterprise版的AI功能及其在实际应用。
阿里云技术公开课直播预告:基于阿里云 Elasticsearch 构建 AI 搜索和可观测 Chatbot
|
10天前
|
存储 人工智能 API
(Elasticsearch)使用阿里云 infererence API 及 semantic text 进行向量搜索
本文展示了如何使用阿里云 infererence API 及 semantic text 进行向量搜索。
|
2月前
|
存储 人工智能 自然语言处理
Elasticsearch Inference API增加对阿里云AI的支持
本文将介绍如何在 Elasticsearch 中设置和使用阿里云的文本生成、重排序、稀疏向量和稠密向量服务,提升搜索相关性。
94 14
Elasticsearch Inference API增加对阿里云AI的支持
|
2月前
|
存储 缓存 监控
深入解析:Elasticsearch集群性能调优策略与最佳实践
【10月更文挑战第8天】Elasticsearch 是一个分布式的、基于 RESTful 风格的搜索和数据分析引擎,它能够快速地存储、搜索和分析大量数据。随着企业对实时数据处理需求的增长,Elasticsearch 被广泛应用于日志分析、全文搜索、安全信息和事件管理(SIEM)等领域。然而,为了确保 Elasticsearch 集群能够高效运行并满足业务需求,需要进行一系列的性能调优工作。
171 3
|
5月前
|
存储 人工智能 自然语言处理
阿里云Elasticsearch AI场景语义搜索最佳实践
本文介绍了如何使用阿里云Elasticsearch结合搜索开发工作台搭建AI语义搜索。
17321 68
|
7月前
|
存储 监控 Java
视频 | Elasticsearch 8.X 企业内训之最佳实践10 讲
视频 | Elasticsearch 8.X 企业内训之最佳实践10 讲
77 0
|
4月前
|
机器学习/深度学习 数据采集 缓存
Elasticsearch与机器学习集成的最佳实践
【8月更文第28天】Elasticsearch 提供了强大的搜索和分析能力,而机器学习则能够通过识别模式和预测趋势来增强这些能力。将两者结合可以实现更智能的搜索体验、异常检测等功能。
129 0
|
5月前
|
存储 监控 Java
使用Elasticsearch实现全文搜索的最佳实践
使用Elasticsearch实现全文搜索的最佳实践
|
5月前
|
存储 监控 Java
使用Elasticsearch实现全文搜索的最佳实践
使用Elasticsearch实现全文搜索的最佳实践

相关产品

  • 检索分析服务 Elasticsearch版