【最佳实践】ECS自建的Elasticsearch迁移至阿里云Elasticsearch

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 本文档为您介绍将ECS自建的Elasticsearch迁移至阿里云Elasticsearch的方法,包括创建索引和数据迁移。

教程概述

本案例的整体步骤如下。

  1. 创建索引
  2. 数据迁移

同时本文档也为您介绍了一些操作过程中可能遇到的问题和解决方法,详情请参见常见问题

前提条件

参考本文档做迁移前必须先满足以下条件,如果不满足需要通过其他方案进行迁移,详情请参见Logstash部署

  • 自建Elasticsearch所在的ECS必须是VPC网络(不支持Classiclink方式打通的ECS),并且自建Elasticsearch必须与阿里云Elasticsearch在同一个VPC下。
  • 您可以通过中控机器(或者任意一台机器)执行文档中的脚本,前提是该中控机器可以同时访问新旧Elasticsearch集群的9200端口。
  • 自建Elasticsearch所在的ECS的VPC安全组不能限制IP白名单,并且需要开启9200端口。
  • 自建Elasticsearch所在的ECS的VPC安全组不能限制阿里云Elasticsearch实例的各节点IP(Kibana控制台可查看阿里云Elasticsearch实例各节点的IP)。
  • 自建Elasticsearch与阿里云Elasticsearch实例已经连通。可以在执行脚本的机器上使用curl -XGET http://<host>:9200进行验证。

创建索引

参考旧集群中需要迁移的索引配置,提前在新集群中创建索引。或者为新集群开启自动创建索引和动态映射(不建议)功能。
以Python为例,使用如下脚本在新集群中批量创建旧集群索引,默认新创建的索引副本数为0。

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 文件名:indiceCreate.py
import sys
import base64
import time
import httplib
import json
## 老集群host(ip+port)
oldClusterHost = "old-cluster.com"
## 老集群用户名,可为空
oldClusterUserName = "old-username"
## 老集群密码,可为空
oldClusterPassword = "old-password"
## 新集群host(ip+port)
newClusterHost = "new-cluster.com"
## 新集群用户名,可为空
newClusterUser = "new-username"
## 新集群密码,可为空
newClusterPassword = "new-password"
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
    conn = httplib.HTTPConnection(host)
    headers = {}
    if (username != "") :
        'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
        base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
        headers["Authorization"] = "Basic %s" % base64string;
    if "GET" == method:
        headers["Content-Type"] = "application/x-www-form-urlencoded"
        conn.request(method=method, url=endpoint, headers=headers)
    else :
        headers["Content-Type"] = "application/json"
        conn.request(method=method, url=endpoint, body=params, headers=headers)
    response = conn.getresponse()
    res = response.read()
    return res
def httpGet(host, endpoint, username="", password=""):
    return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
    return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
    return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
    endpoint = "/_cat/indices"
    indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
    indicesList = indicesResult.split("\n")
    indexList = []
    for indices in indicesList:
        if (indices.find("open") > 0):
            indexList.append(indices.split()[2])
    return indexList
def getSettings(index, host, username="", password=""):
    endpoint = "/" + index + "/_settings"
    indexSettings = httpGet(host, endpoint, username, password)
    print index + "  原始settings如下:\n" + indexSettings
    settingsDict = json.loads(indexSettings)
    ## 分片数默认和老集群索引保持一致
    number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
    ## 副本数默认为0
    number_of_replicas = DEFAULT_REPLICAS
    newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
    return newSetting
def getMapping(index, host, username="", password=""):
    endpoint = "/" + index + "/_mapping"
    indexMapping = httpGet(host, endpoint, username, password)
    print index + " 原始mapping如下:\n" + indexMapping
    mappingDict = json.loads(indexMapping)
    mappings = json.dumps(mappingDict[index]["mappings"])
    newMapping = "\"mappings\" : " + mappings
    return newMapping
def createIndexStatement(oldIndexName):
    settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
    return createstatement
def createIndex(oldIndexName, newIndexName=""):
    if (newIndexName == "") :
        newIndexName = oldIndexName
    createstatement = createIndexStatement(oldIndexName)
    print "新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement
    endpoint = "/" + newIndexName
    createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
    print "新索引 " + newIndexName + " 创建结果:" + createResult
## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:
    if (index.startswith(".")):
        systemIndex.append(index)
    else :
        createIndex(index, index)
if (len(systemIndex) > 0) :
    for index in systemIndex:
        print index + "  或许是系统索引,不会重新创建,如有需要,请单独处理~"

数据迁移

以下提供了三种数据迁移的方式供您参考。您可以根据迁移的数据量大小以及具体的操作情况,选择合适的方式进行数据迁移。

注意

  • 为保证数据迁移前后的一致性,需要上游业务停止旧集群的写操作,读服务才可以正常进行。迁移完毕后,直接切换到新集群进行读写。如果不停止写操作可能会存在迁移前后数据不一致的情况。
  • 使用下述方案迁移时,如果是通过IP + Port的方式访问旧集群,则必须在新集群的yml文件中配置reindex白名单(加入旧集群的IP地址),例如reindex.remote.whitelist: 1.1.1.1:9200,1.2.*.*:*
  • 如果使用域名访问,则不允许通过http://host:port/path这种带path的形式访问。
  • 数据量小。
    使用reindex.sh脚本。
#!/bin/bash
# file:reindex.sh
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
newClusterHost="新集群host"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
# 老集群host必须是[scheme]://[host]:[port],例如http://10.37.1.1:9200
oldClusterHost="老集群host"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d'{
    "source": {
        "remote": {
            "host": "'${oldClusterHost}'",
            "username": "'${oldClusterUser}'",
            "password": "'${oldClusterPass}'"
        },
        "index": "'${indexName}'",
        "query": {
            "match_all": {}
        }
    },
    "dest": {
       "index": "'${indexName}'"
    }
}'
  • 数据量大、无删除操作、有更新时间。
    数据量较大且无删除操作时,可以使用滚动迁移的方式,减少停止写服务的时间。滚动迁移需要有一个类似于更新时间的字段代表新数据的写时序。可以在数据迁移完成后,再停止写服务,快速更新一次。即可切换到新集群,恢复读写。
#!/bin/bash
# file: circleReindex.sh
# CONTROLLING STARTUP:
# 这是通过reindex操作远程重建索引的脚本,要求:
# 1. 新集群已经创建完索引,或者支持自动创建和动态映射。
# 2. 新集群必须在yml里配置IP白名单 reindex.remote.whitelist: 172.16.123.*:9200
# 3. host必须是[scheme]://[host]:[port]
USAGE="Usage: sh circleReindex.sh <count>
       count: 执行次数,多次(负数为循环)增量执行或者单次执行
Example:
        sh circleReindex.sh 1
        sh circleReindex.sh 5
        sh circleReindex.sh -1"
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
## http://myescluster.com
newClusterHost="新集群host"
# 老集群host必须是[scheme]://[host]:[port],例如http://10.37.1.1:9200
oldClusterHost="老集群host"
timeField="更新时间字段"
reindexTimes=0
lastTimestamp=0
curTimestamp=`date +%s`
hasError=false
function reIndexOP() {
    reindexTimes=$[${reindexTimes} + 1]
    curTimestamp=`date +%s`
    ret=`curl -u ${newClusterUser}:${newClusterPass} -XPOST "${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
        "source": {
            "remote": {
                "host": "'${oldClusterHost}'",
                "username": "'${oldClusterUser}'",
                "password": "'${oldClusterPass}'"
            },
            "index": "'${indexName}'",
            "query": {
                "range" : {
                    "'${timeField}'" : {
                        "gte" : '${lastTimestamp}',
                        "lt" : '${curTimestamp}'
                    }
                }
            }
        },
        "dest": {
            "index": "'${indexName}'"
        }
    }'`
    lastTimestamp=${curTimestamp}
    echo "第${reindexTimes}次reIndex,本次更新截止时间 ${lastTimestamp} 结果:${ret}"
    if [[ ${ret} == *error* ]]; then
        hasError=true
        echo "本次执行异常,中断后续执行操作~~,请检查"
    fi
}
function start() {
    ## 负数就不停循环执行
    if [[ $1 -lt 0 ]]; then
        while :
        do
            reIndexOP
        done
    elif [[ $1 -gt 0 ]]; then
        k=0
        while [[ k -lt $1 ]] && [[ ${hasError} == false ]]; do
            reIndexOP
            let ++k
        done
    fi
}
## main 
if [ $# -lt 1 ]; then
    echo "$USAGE"
    exit 1
fi
echo "开始执行索引 ${indexName} 的 ReIndex操作"
start $1
echo "总共执行 ${reindexTimes} 次 reIndex 操作"
  • 数据量大、无删除操作、无更新时间。
    当数据量较大,且索引的mapping中没有定义更新时间字段时,需要由上游业务修改代码添加更新时间字段。添加完成后可以先将历史数据迁移完,然后再使用上述的第二种方案。
#!/bin/bash
# file:miss.sh
indexName="你的索引名"
newClusterUser="新集群用户名"
newClusterPass="新集群密码"
newClusterHost="新集群host"
oldClusterUser="老集群用户名"
oldClusterPass="老集群密码"
# 老集群host必须是[scheme]://[host]:[port],例如http://10.37.1.1:9200
oldClusterHost="老集群host"
timeField="updatetime"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
    "source": {
        "remote": {
            "host": "'${oldClusterHost}'",
            "username": "'${oldClusterUser}'",
            "password": "'${oldClusterPass}'"
        },
        "index": "'${indexName}'",
        "query": {
            "bool": {
                "must_not": {
                    "exists": {
                        "field": "'${timeField}'"
                    }
                }
            }
        }
    },
    "dest": {
       "index": "'${indexName}'"
    }
}'
  • 不停止服务。
    敬请期待

说明

您也可以使用Logstash进行数据迁移,详情请参见 Logstash迁移Elasticsearch数据方法解读。

常见问题

  • 问题:执行curl命令时提示{"error":"Content-Type header [application/x-www-form-urlencoded] is not supported","status":406}。
    解决方法:可以在curl命令中添加-H "Content-Type: application/json"参数重试。

// 获取老集群中所有索引信息,如果没有权限可去掉"-u user:pass"参数,oldClusterHost为老集群的host,注意替换。
  curl -u user:pass -XGET http://oldClusterHost/_cat/indices | awk '{print $3}'
  // 参考上面返回的索引列表,获取需要迁移的指定用户索引的setting和mapping,注意替换indexName为要查询的用户索引名。
  curl -u user:pass -XGET http://oldClusterHost/indexName/_settings,_mapping?pretty=true
  // 参考上面获取到的对应索引的_settings,_mapping信息,在新集群中创建对应索引,索引副本数可以先设置为0,用于加快数据同步速度,数据迁移完成后再重置副本数为1。
  //其中newClusterHost是新集群的host,testindex是已经创建的索引名,testtype是对应索引的type。
  curl -u user:pass -XPUT http://<newClusterHost>/<testindex> -d '{
    "testindex" : {
        "settings" : {
            "number_of_shards" : "5", //假设老集群中对应索引的shard数是5个
            "number_of_replicas" : "0" //设置索引副本为0
          }
        },
        "mappings" : { //假设老集群中对应索引的mappings配置如下
            "testtype" : {
                "properties" : {
                    "uid" : {
                        "type" : "long"
                    },
                    "name" : {
                        "type" : "text"
                    },
                    "create_time" : {
                      "type" : "long"
                    }
                }
           }
       }
   }
}'
  • 问题:数据同步速度太慢。
    解决方法: 如果单索引数据量比较大,可以在迁移前将目的索引的副本数设置为 0,刷新时间为 -1。待数据迁移完成后,再更改回来,这样可以加快数据同步速度。
// 迁移索引数据前可以先将索引副本数设为0,不刷新,用于加快数据迁移速度。
curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
        "number_of_replicas" : 0,
        "refresh_interval" : "-1"
}'
// 索引数据迁移完成后,可以重置索引副本数为1,刷新时间1s(1s是默认值)。
curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' {
        "number_of_replicas" : 1,
        "refresh_interval" : "1s"
}'

说明

本文部分内容参考了官网文档

加入社群

如有其它技术问题,可通过钉钉扫一扫下面的二维码

开发者社区二维码.png

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
9天前
|
机器学习/深度学习 人工智能 PyTorch
阿里云GPU云服务器怎么样?产品优势、应用场景介绍与最新活动价格参考
阿里云GPU云服务器怎么样?阿里云GPU结合了GPU计算力与CPU计算力,主要应用于于深度学习、科学计算、图形可视化、视频处理多种应用场景,本文为您详细介绍阿里云GPU云服务器产品优势、应用场景以及最新活动价格。
阿里云GPU云服务器怎么样?产品优势、应用场景介绍与最新活动价格参考
|
8天前
|
存储 运维 安全
阿里云弹性裸金属服务器是什么?产品规格及适用场景介绍
阿里云服务器ECS包括众多产品,其中弹性裸金属服务器(ECS Bare Metal Server)是一种可弹性伸缩的高性能计算服务,计算性能与传统物理机无差别,具有安全物理隔离的特点。分钟级的交付周期将提供给您实时的业务响应能力,助力您的核心业务飞速成长。本文为大家详细介绍弹性裸金属服务器的特点、优势以及与云服务器的对比等内容。
|
16天前
|
人工智能 JSON Linux
利用阿里云GPU加速服务器实现pdf转换为markdown格式
随着AI模型的发展,GPU需求日益增长,尤其是个人学习和研究。直接购置硬件成本高且更新快,建议选择阿里云等提供的GPU加速型服务器。
利用阿里云GPU加速服务器实现pdf转换为markdown格式
|
3天前
|
机器学习/深度学习 弹性计算 缓存
简单聊聊,阿里云2核2G3M带宽云服务器与轻量应用服务器区别及选择参考
2核2G3M带宽云服务器与轻量应用服务器是目前阿里云的活动中,入门级走量型云服务器,轻量云服务器2核2G3M带宽68元一年,经济型e实例云服务器2核2G3M带宽99元1年。同样的配置,对于有的新手用户来说,有必要了解一下他们之间的区别,以及各自的购买和续费相关政策,从而选择更适合自己需求的云服务器。本文为大家简单分析一下我们应该选择哪一款。
|
3天前
|
监控 安全 数据库
阿里云国际站:如何使用阿里云国际站服务器
阿里云国际站服务器是一种强大的云计算服务,可以帮助用户轻松搭建和管理自己的网站、应用程序和数据库。本文将介绍如何使用阿里云国际站服务器,包括注册账户、选择服务器配置、安装操作系统、配置网络和安全设置等方面。
|
6天前
|
弹性计算 安全 搜索推荐
阿里云国际站注册教程:阿里云服务器安全设置
阿里云国际站注册教程:阿里云服务器安全设置 在云计算领域,阿里云是一个备受推崇的品牌,因其强大的技术支持和优质的服务而受到众多用户的青睐。本文将为您介绍阿里云国际站的注册过程,并重点讲解如何进行阿里云服务器的安全设置。
|
6天前
|
人工智能 监控 测试技术
阿里云磐久服务器稳定性实践之路
阿里云服务器质量智能管理体系聚焦自研服务器硬件层面的极致优化,应对高并发交付、短稳定性周期、早问题发现和快修复四大挑战。通过“三个重构”(质量标准、开发流程、交付模式)、“六个归一”(架构、硬件、软件、测试、部件、制造)策略,实现芯片、整机和云同步发布,确保快速稳定上量。此外,全场景测试体系与智能预警、分析、修复系统协同工作,保障服务器在萌芽阶段发现问题并及时解决,提升整体质量水平。未来,阿里云将继续深化大数据驱动的质量管理,推动服务器行业硬件质量的持续进步。
|
15天前
|
开发框架 缓存 .NET
阿里云轻量应用服务器、经济型e、通用算力型u1实例怎么选?区别及选择参考
在阿里云目前的活动中,价格比较优惠的云服务器有轻量应用服务器2核2G3M带宽68元1年,经济型e实例2核2G3M带宽99元1年,通用算力型u1实例2核4G5M带宽199元1年,这几个云服务器是用户关注度最高的。有的新手用户由于是初次使用阿里云服务器,对于轻量应用服务器、经济型e、通用算力型u1实例的相关性能并不是很清楚,本文为大家做个简单的介绍和对比,以供参考。
|
21天前
|
机器学习/深度学习 人工智能 编解码
阿里云GPU云服务器优惠收费标准,GPU服务器优缺点与适用场景详解
随着人工智能、大数据分析和高性能计算的发展,对计算资源的需求不断增加。GPU凭借强大的并行计算能力和高效的浮点运算性能,逐渐成为处理复杂计算任务的首选工具。阿里云提供了从入门级到旗舰级的多种GPU服务器,涵盖GN5、GN6、GN7、GN8和GN9系列,分别适用于图形渲染、视频编码、深度学习推理、训练和高性能计算等场景。本文详细介绍各系列的规格、价格和适用场景,帮助用户根据实际需求选择最合适的GPU实例。
|
23天前
|
弹性计算 Linux 数据安全/隐私保护
阿里云上快速搭建幻兽帕鲁游戏联机服务器指南
对于热爱幻兽帕鲁游戏的玩家来说,搭建一台专属的联机服务器无疑能够大大提升游戏体验。阿里云作为领先的云计算服务商,为玩家提供了便捷、高效的服务器搭建方案。本文将为您详细介绍如何在阿里云上快速搭建幻兽帕鲁游戏联机服务器,让您轻松享受多人游戏的乐趣。

热门文章

最新文章

相关产品

  • 检索分析服务 Elasticsearch版
  • 下一篇
    开通oss服务