自建Elasticsearch通过Logstash全量、增量迁移阿里云

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 利用Logstash将自建Elasticsearch数据全量、增量迁移到阿里云Elasticsearch。部署Logstash在阿里云ECS上,Logstash ECS选择和阿里云Elasticsearch集群相同的VPC,同时Logstash需要支持同时访问源ES集群、目标ES集群。 数据迁移可以全量迁移和增量迁移,首次迁移都是全量迁移,后续写入数据选择增量迁移,增量迁移需要索引有时间戳字段。环

利用Logstash将自建Elasticsearch数据全量、增量迁移到阿里云Elasticsearch。部署Logstash在阿里云ECS上,Logstash ECS选择和阿里云Elasticsearch集群相同的VPC,同时Logstash需要支持同时访问源ES集群、目标ES集群。 数据迁移可以全量迁移和增量迁移,首次迁移都是全量迁移,后续写入数据选择增量迁移,增量迁移需要索引有时间戳字段。

环境

生态

公有云

地域

新加坡

产品名称

Elasticsearch

产品版本

阿里云 Elasticsearch版本:7.10.0 Elasticsearch集群规格:3个数据节点,单节点4核CPU/16G内存/100GB  ESSD云盘。

自建Elasticsearch

自建Elasticsearch版本:7.6.2,部署在ECS虚拟机上,保持跟阿里云 Elasticsearch在相同VPC网络环境,操作系统 CentOS 7.9 64位, 4核CPU/16G内存/100GB ESSD云盘。

Logstash

Logstash版本:7.10.0,部署在ECS虚拟机上,保持跟阿里云 Elasticsearch在相同VPC网络环境

JDK

JDK 11版本(Elasticsearch、Logstash自带JDK)

场景描述

自建Elasticsearch数据迁移到阿里云Elasticsearch。

数据架构

在ECS服务器部署自建Elasticsearch,在阿里云开通Elasticsearch服务。在ECS服务器运行Python脚本迁移索引元数据,部署Logstash执行全量、增量同步任务。

数据

在Kibana控制台添加电商订单、航班、日志示例数据。

执行步骤

一、准备环境实例

1、创建阿里云Elasticsearch实例

Elasticsearch版本选择7.10,3个数据节点,节点规格为4核CPU/16G内存/100GB ESSD云盘。

参考:Elasticsearch 快速入门 https://help.aliyun.com/document_detail/134807.html

登录阿里云控制台,在产品与服务中找到 Elasticsearch 产品。

创建 Elasticsearch 实例

选择通用商业版,Elasticsearch 7.10版本。

场景初始化配置,选择通用场景。

选择亚太新加坡区域,可用区数量选择三个可用区。

配置数据节点、Kibana 节点、专有主节点、协调节点。其中3个数据节点,节点规格为4核CPU/16G内存/100GB ESSD云盘。

配置专有网络、虚拟交换机,配置实例名称、elastic 用户登录密码。

确认订单,购买 Elasticsearch 产品。

2、创建云服务器 ECS 实例

在新加坡地域创建 ECS 实例,用于部署自建Elasticsearch、Kibana、Logstash。

登录阿里云控制台,在产品与服务中找到云服务器 ECS 产品。

创建 ECS 实例。

付费模式选择按量付费,地域及可用区选择新加坡可用区C。

实例规格选择 4 vCPU 16GiB。

操作系统镜像选择公共镜像 CentOS 7.9 64位,存储配置系统盘 ESSD云盘 100GiB。

网络配置使用跟阿里云 Elasticsearch 相同的专有网络 VPC。勾选分配公网 IPv4 地址,按使用流量计费,带宽峰值配置 100 Mbps。

新建安全组,入方向手动添加 5601 端口,授权对象配置开发人员办公IP段(配置为0.0.0.0/0 则允许所有用户访问,生产环境尽量避免) ,描述说明Kibana端口。

选择安全组。

配置root用户登录密码。

确认订单,创建 ECS 实例。

3、部署自建 Elasticsearch 

在 ECS 服务器部署自建 Elasticsearch,版本使用7.6.2,1个数据节点。

在 ECS 控制台,访问“远程连接”链接。

通过“Workbench远程连接”登录 ECS 服务器。

创建elastic用户

useradd elastic
passwd elastic

root用户切换为elastic用户

su elastic

下载Elasticsearch软件安装包

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.2-linux-x86_64.tar.gz
tar -zvxf elasticsearch-7.6.2-linux-x86_64.tar.gz

使用非root用户启用Elasticsearch

./bin/elasticsearch -d

验证Elasticsearch服务是否正常运行。正常情况会显示Elasticsearch版本号和“You Know, for Search”。

curl localhost:9200

4、部署自建 Kibana 

Kibana版本为7.6.2,1个数据节点,跟自建Elasticsearch保持在相同 ECS 服务器。

wget https://artifacts.elastic.co/downloads/kibana/kibana-7.6.2-linux-x86_64.tar.gz
tar -zvxf kibana-7.6.2-linux-x86_64.tar.gz

修改Kibana配置文件config/kibana.yml,增加 server.host: "0.0.0.0",允许通过公网IP访问Kibana。

vi kibana.yml

使用非root用户启用Kibana

nohup ./bin/kibana &

通过公网IP登录Kibana控制台,添加示例数据。

添加示例数据入口

添加电商订单、航班、日志示例数据。

添加示例数据

5、在 ECS 服务器部署 Logstash 

Logstash版本为7.10.0,1个节点,所在 ECS 服务器跟阿里云 Elasticsearch 实例保持在相同网络环境(VPC网络)。

wget https://artifacts.elastic.co/downloads/logstash/logstash-7.10.0-linux-x86_64.tar.gz
tar -zvxf logstash-7.10.0-linux-x86_64.tar.gz

修改堆内存使用,Logstash默认的堆内存是1G,根据 ECS 规格配置合适的内存大小,可以加快集群数据的迁移效率。

vi config/jvm.options
-Xms8g
-Xmx8g

修改Logstash批量写入记录条数,每批量写入5-15MB数据,可以加快集群数据的迁移效率。例如,每批量写入记录条数 pipeline.batch.size 从 125 改为 5000 。

vi config/pipelines.yml

验证Logstash功能

# 通过控制台输入输出收集数据
bin/logstash -e 'input { stdin { } } output { stdout {} }'
# 在控制台中输入 "Hello world!",然后会看到控制台输出"Hello world!"

二、迁移索引元数据(设置和映射)

Logstash会帮助用户自动创建索引,但是自动创建的索引和用户本身的索引会有些许差异,导致最终数据的搜索格式不一致,一般索引需要手动创建,保证索引的数据完全一致。

以下提供创建索引的python脚本,用户可以使用该脚本创建需要的索引。

拷贝以下代码保存为 indiceCreate.py (注意修改集群连接地址、用户名、密码)

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 文件名:indiceCreate.py
import sys
import base64
import time
import httplib
import json
## 旧集群host
oldClusterHost = "localhost:9200"
## 旧集群用户名,可为空
oldClusterUserName = "xxxxxx"
## 旧集群密码,可为空
oldClusterPassword = "xxxxxx"
## 新集群host
newClusterHost = "new-cluster-xxxxxx:9200"
## 新集群用户名
newClusterUser = "elastic"
## 新集群密码
newClusterPassword = "xxxxxx"
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
    conn = httplib.HTTPConnection(host)
    headers = {}
    if (username != "") :
        'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
        base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
        headers["Authorization"] = "Basic %s" % base64string;
    if "GET" == method:
        headers["Content-Type"] = "application/x-www-form-urlencoded"
        conn.request(method=method, url=endpoint, headers=headers)
    else :
        headers["Content-Type"] = "application/json"
        conn.request(method=method, url=endpoint, body=params, headers=headers)
    response = conn.getresponse()
    res = response.read()
    return res
def httpGet(host, endpoint, username="", password=""):
    return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
    return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
    return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
    endpoint = "/_cat/indices"
    indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
    indicesList = indicesResult.split("\n")
    indexList = []
    for indices in indicesList:
        if (indices.find("open") > 0):
            indexList.append(indices.split()[2])
    return indexList
def getSettings(index, host, username="", password=""):
    endpoint = "/" + index + "/_settings"
    indexSettings = httpGet(host, endpoint, username, password)
    print (index + "  原始settings如下:\n" + indexSettings)
    settingsDict = json.loads(indexSettings)
    ## 分片数默认和旧集群索引保持一致
    number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
    ## 副本数默认为0
    number_of_replicas = DEFAULT_REPLICAS
    newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
    return newSetting
def getMapping(index, host, username="", password=""):
    endpoint = "/" + index + "/_mapping"
    indexMapping = httpGet(host, endpoint, username, password)
    print (index + " 原始mapping如下:\n" + indexMapping)
    mappingDict = json.loads(indexMapping)
    mappings = json.dumps(mappingDict[index]["mappings"])
    newMapping = "\"mappings\" : " + mappings
    return newMapping
def createIndexStatement(oldIndexName):
    settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
    return createstatement
def createIndex(oldIndexName, newIndexName=""):
    if (newIndexName == "") :
        newIndexName = oldIndexName
    createstatement = createIndexStatement(oldIndexName)
    print ("新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement)
    endpoint = "/" + newIndexName
    createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
    print ("新索引 " + newIndexName + " 创建结果:" + createResult)
## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:
    if (index.startswith(".")):
        systemIndex.append(index)
    else :
        createIndex(index, index)
if (len(systemIndex) > 0) :
    for index in systemIndex:
        print (index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~")

直接执行脚本即可完成索引同步。

python indiceCreate.py

索引同步完成可以取目标集群的kibana上查看索引迁移情况:

GET /_cat/indices?v

三、全量数据迁移

Logstash配置位于config目录下。

用户可以参考配置修改Logstash配置文件,为了保证迁移数据的准确性,一般建议建立多组Logstash,分批次迁移数据,每个Logstash迁移部分数据。

集群间迁移配置参考 es2es_all.conf 

input{
    elasticsearch{
        # 源端ES地址
        hosts =>  ["http://localhost:9200"]
        # 安全集群配置登录用户名密码
        user => "xxxxxx"
        password => "xxxxxx"
        # 需要迁移的索引列表,以逗号分隔
        index => "kibana_sample_data_*, -.kibana*"
        # 以下三项保持默认即可,包含线程数和迁移数据大小和logstash jvm配置相关
        docinfo=>true
        slices => 5
        size => 5000
    }
}

filter {
  # 去掉一些logstash自己加的字段
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}

output{
    elasticsearch{
        # 目的端ES地址
        hosts => ["http://new-cluster-xxxxxx:9200"]
        # 安全集群配置登录用户名密码
        user => "elastic"
        password => "xxxxxx"
        # 目的端索引名称,以下配置为和源端保持一致
        index => "%{[@metadata][_index]}"
        # 目的端索引type,以下配置为和源端保持一致
        document_type => "%{[@metadata][_type]}"
        # 目标端数据的_id,如果不需要保留原_id,可以删除以下这行,删除后性能会更好
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false
    }
    
    # 调试信息,正式迁移去掉
    #stdout { codec => rubydebug { metadata => true }}
}

后台启动Logstash全量迁移任务

nohup bin/logstash -f config/es2es_all.conf >/dev/null 2>&1 &

四、增量数据迁移

Logstash增量迁移需要数据增量标志,写出对应ES 的DLS query,可以查出查出增量数据。

开启Logstash定时任务即可触发增量迁移。

增量迁移配置参考 es2es_kibana_sample_data_logs.conf 

input{
    elasticsearch{
        # 源端ES地址
        hosts =>  ["http://localhost:9200"]
        # 安全集群配置登录用户名密码
        user => "xxxxxx"
        password => "xxxxxx"
        # 需要迁移的索引列表,以逗号分隔
        index => "kibana_sample_data_logs"
        # 按时间范围查询增量数据,示例查询最近5分钟数据
        query => '{"query":{"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}}}'
        # 定时任务,每分钟执行一次
        schedule => "* * * * *"
        scroll => "5m"
        docinfo=>true
        size => 5000
    }
}

filter {
  # 去掉一些logstash自己加的字段
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}


output{
    elasticsearch{
        # 目的端ES地址
        hosts => ["http://new-cluster-xxxxxx:9200"]
        # 安全集群配置登录用户名密码
        user => "elastic"
        password => "xxxxxx"
        # 目的端索引名称,以下配置为和源端保持一致
        index => "%{[@metadata][_index]}"
        # 目的端索引type,以下配置为和源端保持一致
        document_type => "%{[@metadata][_type]}"
        # 目标端数据的_id,如果不需要保留原_id,可以删除以下这行,删除后性能会更好
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false
    }
    
    # 调试信息,正式迁移去掉
    #stdout { codec => rubydebug { metadata => true }}
}

后台启动Logstash增量迁移任务

nohup bin/logstash -f config/es2es_kibana_sample_data_logs.conf >/dev/null 2>&1 &

在Kibana中查询最近更新的记录,验证增量数据是否同步。

示例查询条件中索引名称 kibana_sample_data_logs、时间戳字段@timestamp、最近时间范围5分钟可以根据实际情况修改。

GET kibana_sample_data_logs/_search
{
  "query": {
    "range": {
      "@timestamp": {
        "gte": "now-5m",
        "lte": "now/m"
      }
    }
  },
  "sort": [
    {
      "@timestamp": {
        "order": "desc"
      }
    }
  ]
}

效果

支持通过Logstash全量、增量同步自建Elasticsearch数据到阿里云Elasticsearch。

自建Elasticsearch集群索引、数据量信息

全量迁移前,阿里云Elasticsearch集群索引、数据量信息(记录条数为0)

全量迁移后,阿里云Elasticsearch集群索引、数据量信息(记录条数跟自建集群一致)

自建Elasticsearch集群最近更新记录

增量迁移后,阿里云Elasticsearch集群最近更新记录(保持跟自建集群一致)

备注

参考文档

Elasticsearch迁移方案选取指南

https://help.aliyun.com/document_detail/170095.html

通过阿里云Logstash将自建Elasticsearch数据迁移至阿里云

https://help.aliyun.com/document_detail/145925.html

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
6月前
|
关系型数据库 MySQL
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
63 0
|
6月前
|
监控 安全 Linux
【Elasticsearch专栏 14】深入探索:Elasticsearch使用Logstash的日期过滤器删除旧数据
使用Logstash的日期过滤器可以有效删除Elasticsearch中的旧数据,释放存储空间并提高集群性能。通过配置Logstash,可以指定索引模式、筛选时间戳早于特定阈值的文档,并在输出阶段删除这些旧数据。执行配置时,需确保Logstash与Elasticsearch连接正常,并监控日志以确保操作安全。定期执行此操作可确保旧数据不会过多积累。总之,Logstash的日期过滤器提供了一种简单而高效的方法,帮助管理和优化Elasticsearch中的数据。
106 0
|
23天前
|
存储 人工智能 自然语言处理
Elasticsearch Inference API增加对阿里云AI的支持
本文将介绍如何在 Elasticsearch 中设置和使用阿里云的文本生成、重排序、稀疏向量和稠密向量服务,提升搜索相关性。
65 14
Elasticsearch Inference API增加对阿里云AI的支持
|
15天前
|
存储 监控 安全
|
2月前
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
|
3月前
|
消息中间件 监控 Kafka
Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
【8月更文挑战第13天】Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
202 3
|
4月前
|
存储 缓存 数据处理
ELK中 Elasticsearch和Logstash内存大小设置的考虑
ELK中 Elasticsearch和Logstash内存大小设置的考虑
267 0
|
6月前
|
监控 应用服务中间件 nginx
使用 Docker Compose V2 快速搭建日志分析平台 ELK (Elasticsearch、Logstash 和 Kibana)
ELK的架构有多种,本篇分享使用的架构如图所示: Beats(Filebeat) -> -> Elasticsearch -> Kibana,目前生产环境一天几千万的日志,内存占用大概 10G
382 4
|
6月前
|
自然语言处理 测试技术 网络安全
ElasticSearch7最新实战文档-附带logstash同步方案
ElasticSearch7最新实战文档-附带logstash同步方案
83 0
|
6月前
|
监控 API 索引
实战问题:Elasticsearch 2.X 数据如何迁移到 7.X?
实战问题:Elasticsearch 2.X 数据如何迁移到 7.X?
47 0