利用Logstash将自建Elasticsearch数据全量、增量迁移到阿里云Elasticsearch。部署Logstash在阿里云ECS上,Logstash ECS选择和阿里云Elasticsearch集群相同的VPC,同时Logstash需要支持同时访问源ES集群、目标ES集群。 数据迁移可以全量迁移和增量迁移,首次迁移都是全量迁移,后续写入数据选择增量迁移,增量迁移需要索引有时间戳字段。
环境
场景描述
自建Elasticsearch数据迁移到阿里云Elasticsearch。
数据架构
在ECS服务器部署自建Elasticsearch,在阿里云开通Elasticsearch服务。在ECS服务器运行Python脚本迁移索引元数据,部署Logstash执行全量、增量同步任务。
数据
在Kibana控制台添加电商订单、航班、日志示例数据。
执行步骤
一、准备环境实例
1、创建阿里云Elasticsearch实例
Elasticsearch版本选择7.10,3个数据节点,节点规格为4核CPU/16G内存/100GB ESSD云盘。
参考:Elasticsearch 快速入门 https://help.aliyun.com/document_detail/134807.html
登录阿里云控制台,在产品与服务中找到 Elasticsearch 产品。
创建 Elasticsearch 实例
选择通用商业版,Elasticsearch 7.10版本。
场景初始化配置,选择通用场景。
选择亚太新加坡区域,可用区数量选择三个可用区。
配置数据节点、Kibana 节点、专有主节点、协调节点。其中3个数据节点,节点规格为4核CPU/16G内存/100GB ESSD云盘。
配置专有网络、虚拟交换机,配置实例名称、elastic 用户登录密码。
确认订单,购买 Elasticsearch 产品。
2、创建云服务器 ECS 实例
在新加坡地域创建 ECS 实例,用于部署自建Elasticsearch、Kibana、Logstash。
登录阿里云控制台,在产品与服务中找到云服务器 ECS 产品。
创建 ECS 实例。
付费模式选择按量付费,地域及可用区选择新加坡可用区C。
实例规格选择 4 vCPU 16GiB。
操作系统镜像选择公共镜像 CentOS 7.9 64位,存储配置系统盘 ESSD云盘 100GiB。
网络配置使用跟阿里云 Elasticsearch 相同的专有网络 VPC。勾选分配公网 IPv4 地址,按使用流量计费,带宽峰值配置 100 Mbps。
新建安全组,入方向手动添加 5601 端口,授权对象配置开发人员办公IP段(配置为0.0.0.0/0 则允许所有用户访问,生产环境尽量避免) ,描述说明Kibana端口。
选择安全组。
配置root用户登录密码。
确认订单,创建 ECS 实例。
3、部署自建 Elasticsearch
在 ECS 服务器部署自建 Elasticsearch,版本使用7.6.2,1个数据节点。
在 ECS 控制台,访问“远程连接”链接。
通过“Workbench远程连接”登录 ECS 服务器。
创建elastic用户
useradd elastic
passwd elastic
root用户切换为elastic用户
su elastic
下载Elasticsearch软件安装包
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.2-linux-x86_64.tar.gz
tar -zvxf elasticsearch-7.6.2-linux-x86_64.tar.gz
使用非root用户启用Elasticsearch
./bin/elasticsearch -d
验证Elasticsearch服务是否正常运行。正常情况会显示Elasticsearch版本号和“You Know, for Search”。
curl localhost:9200
4、部署自建 Kibana
Kibana版本为7.6.2,1个数据节点,跟自建Elasticsearch保持在相同 ECS 服务器。
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.6.2-linux-x86_64.tar.gz
tar -zvxf kibana-7.6.2-linux-x86_64.tar.gz
修改Kibana配置文件config/kibana.yml,增加 server.host: "0.0.0.0",允许通过公网IP访问Kibana。
vi kibana.yml
使用非root用户启用Kibana
nohup ./bin/kibana &
通过公网IP登录Kibana控制台,添加示例数据。
添加示例数据入口
添加电商订单、航班、日志示例数据。
添加示例数据
5、在 ECS 服务器部署 Logstash
Logstash版本为7.10.0,1个节点,所在 ECS 服务器跟阿里云 Elasticsearch 实例保持在相同网络环境(VPC网络)。
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.10.0-linux-x86_64.tar.gz
tar -zvxf logstash-7.10.0-linux-x86_64.tar.gz
修改堆内存使用,Logstash默认的堆内存是1G,根据 ECS 规格配置合适的内存大小,可以加快集群数据的迁移效率。
vi config/jvm.options
-Xms8g
-Xmx8g
修改Logstash批量写入记录条数,每批量写入5-15MB数据,可以加快集群数据的迁移效率。例如,每批量写入记录条数 pipeline.batch.size 从 125 改为 5000 。
vi config/pipelines.yml
验证Logstash功能
# 通过控制台输入输出收集数据
bin/logstash -e 'input { stdin { } } output { stdout {} }'
# 在控制台中输入 "Hello world!",然后会看到控制台输出"Hello world!"
二、迁移索引元数据(设置和映射)
Logstash会帮助用户自动创建索引,但是自动创建的索引和用户本身的索引会有些许差异,导致最终数据的搜索格式不一致,一般索引需要手动创建,保证索引的数据完全一致。
以下提供创建索引的python脚本,用户可以使用该脚本创建需要的索引。
拷贝以下代码保存为 indiceCreate.py (注意修改集群连接地址、用户名、密码):
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 文件名:indiceCreate.py
import sys
import base64
import time
import httplib
import json
## 旧集群host
oldClusterHost = "localhost:9200"
## 旧集群用户名,可为空
oldClusterUserName = "xxxxxx"
## 旧集群密码,可为空
oldClusterPassword = "xxxxxx"
## 新集群host
newClusterHost = "new-cluster-xxxxxx:9200"
## 新集群用户名
newClusterUser = "elastic"
## 新集群密码
newClusterPassword = "xxxxxx"
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
conn = httplib.HTTPConnection(host)
headers = {}
if (username != "") :
'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
headers["Authorization"] = "Basic %s" % base64string;
if "GET" == method:
headers["Content-Type"] = "application/x-www-form-urlencoded"
conn.request(method=method, url=endpoint, headers=headers)
else :
headers["Content-Type"] = "application/json"
conn.request(method=method, url=endpoint, body=params, headers=headers)
response = conn.getresponse()
res = response.read()
return res
def httpGet(host, endpoint, username="", password=""):
return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
endpoint = "/_cat/indices"
indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
indicesList = indicesResult.split("\n")
indexList = []
for indices in indicesList:
if (indices.find("open") > 0):
indexList.append(indices.split()[2])
return indexList
def getSettings(index, host, username="", password=""):
endpoint = "/" + index + "/_settings"
indexSettings = httpGet(host, endpoint, username, password)
print (index + " 原始settings如下:\n" + indexSettings)
settingsDict = json.loads(indexSettings)
## 分片数默认和旧集群索引保持一致
number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
## 副本数默认为0
number_of_replicas = DEFAULT_REPLICAS
newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
return newSetting
def getMapping(index, host, username="", password=""):
endpoint = "/" + index + "/_mapping"
indexMapping = httpGet(host, endpoint, username, password)
print (index + " 原始mapping如下:\n" + indexMapping)
mappingDict = json.loads(indexMapping)
mappings = json.dumps(mappingDict[index]["mappings"])
newMapping = "\"mappings\" : " + mappings
return newMapping
def createIndexStatement(oldIndexName):
settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
return createstatement
def createIndex(oldIndexName, newIndexName=""):
if (newIndexName == "") :
newIndexName = oldIndexName
createstatement = createIndexStatement(oldIndexName)
print ("新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement)
endpoint = "/" + newIndexName
createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
print ("新索引 " + newIndexName + " 创建结果:" + createResult)
## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:
if (index.startswith(".")):
systemIndex.append(index)
else :
createIndex(index, index)
if (len(systemIndex) > 0) :
for index in systemIndex:
print (index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~")
直接执行脚本即可完成索引同步。
python indiceCreate.py
索引同步完成可以取目标集群的kibana上查看索引迁移情况:
GET /_cat/indices?v
三、全量数据迁移
Logstash配置位于config目录下。
用户可以参考配置修改Logstash配置文件,为了保证迁移数据的准确性,一般建议建立多组Logstash,分批次迁移数据,每个Logstash迁移部分数据。
集群间迁移配置参考 es2es_all.conf :
input{
elasticsearch{
# 源端ES地址
hosts => ["http://localhost:9200"]
# 安全集群配置登录用户名密码
user => "xxxxxx"
password => "xxxxxx"
# 需要迁移的索引列表,以逗号分隔
index => "kibana_sample_data_*, -.kibana*"
# 以下三项保持默认即可,包含线程数和迁移数据大小和logstash jvm配置相关
docinfo=>true
slices => 5
size => 5000
}
}
filter {
# 去掉一些logstash自己加的字段
mutate {
remove_field => ["@timestamp", "@version"]
}
}
output{
elasticsearch{
# 目的端ES地址
hosts => ["http://new-cluster-xxxxxx:9200"]
# 安全集群配置登录用户名密码
user => "elastic"
password => "xxxxxx"
# 目的端索引名称,以下配置为和源端保持一致
index => "%{[@metadata][_index]}"
# 目的端索引type,以下配置为和源端保持一致
document_type => "%{[@metadata][_type]}"
# 目标端数据的_id,如果不需要保留原_id,可以删除以下这行,删除后性能会更好
document_id => "%{[@metadata][_id]}"
ilm_enabled => false
manage_template => false
}
# 调试信息,正式迁移去掉
#stdout { codec => rubydebug { metadata => true }}
}
后台启动Logstash全量迁移任务
nohup bin/logstash -f config/es2es_all.conf >/dev/null 2>&1 &
四、增量数据迁移
Logstash增量迁移需要数据增量标志,写出对应ES 的DLS query,可以查出查出增量数据。
开启Logstash定时任务即可触发增量迁移。
增量迁移配置参考 es2es_kibana_sample_data_logs.conf :
input{
elasticsearch{
# 源端ES地址
hosts => ["http://localhost:9200"]
# 安全集群配置登录用户名密码
user => "xxxxxx"
password => "xxxxxx"
# 需要迁移的索引列表,以逗号分隔
index => "kibana_sample_data_logs"
# 按时间范围查询增量数据,示例查询最近5分钟数据
query => '{"query":{"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}}}'
# 定时任务,每分钟执行一次
schedule => "* * * * *"
scroll => "5m"
docinfo=>true
size => 5000
}
}
filter {
# 去掉一些logstash自己加的字段
mutate {
remove_field => ["@timestamp", "@version"]
}
}
output{
elasticsearch{
# 目的端ES地址
hosts => ["http://new-cluster-xxxxxx:9200"]
# 安全集群配置登录用户名密码
user => "elastic"
password => "xxxxxx"
# 目的端索引名称,以下配置为和源端保持一致
index => "%{[@metadata][_index]}"
# 目的端索引type,以下配置为和源端保持一致
document_type => "%{[@metadata][_type]}"
# 目标端数据的_id,如果不需要保留原_id,可以删除以下这行,删除后性能会更好
document_id => "%{[@metadata][_id]}"
ilm_enabled => false
manage_template => false
}
# 调试信息,正式迁移去掉
#stdout { codec => rubydebug { metadata => true }}
}
后台启动Logstash增量迁移任务
nohup bin/logstash -f config/es2es_kibana_sample_data_logs.conf >/dev/null 2>&1 &
在Kibana中查询最近更新的记录,验证增量数据是否同步。
示例查询条件中索引名称 kibana_sample_data_logs、时间戳字段@timestamp、最近时间范围5分钟可以根据实际情况修改。
GET kibana_sample_data_logs/_search
{
"query": {
"range": {
"@timestamp": {
"gte": "now-5m",
"lte": "now/m"
}
}
},
"sort": [
{
"@timestamp": {
"order": "desc"
}
}
]
}
效果
支持通过Logstash全量、增量同步自建Elasticsearch数据到阿里云Elasticsearch。
自建Elasticsearch集群索引、数据量信息
全量迁移前,阿里云Elasticsearch集群索引、数据量信息(记录条数为0)
全量迁移后,阿里云Elasticsearch集群索引、数据量信息(记录条数跟自建集群一致)
自建Elasticsearch集群最近更新记录
增量迁移后,阿里云Elasticsearch集群最近更新记录(保持跟自建集群一致)
备注
参考文档
Elasticsearch迁移方案选取指南
https://help.aliyun.com/document_detail/170095.html
通过阿里云Logstash将自建Elasticsearch数据迁移至阿里云
https://help.aliyun.com/document_detail/145925.html