阿里云ElasticSearch迁移-基于OSS全增量快照

本文涉及的产品
Elasticsearch Serverless通用抵扣包,测试体验金 200元
简介: 阿里云ElasticSearch迁移-基于OSS全增量快照

1. 基本信息

源端集群:ElasticSearch版本:6.4.3

目标集群:ElasticSearch版本:6.7.0

2. 模拟数据生成

2.1. 索引1:java写入

2.1.1. 创建索引

PUT /prod_info
{
  "settings": {
    "number_of_shards": 5,
    "number_of_replicas": 1
  },
  "mappings": {
    "prod": {
      "properties": {
        "cid": {
          "type": "text",
          "analyzer": "ik_smart"
        },
        "prod_id": {
         "type": "text",
         "analyzer": "ik_smart"
        }
      }
    }
  }
}

2.1.2. java客户端写入es模拟

2.1.2.1. 参考文档

high level rest client 6.7.x:

https://help.aliyun.com/document_detail/131255.html?spm=a2c4g.120716.0.0.639b40bf4z4bol

2.1.2.2. 代码实现

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.7.0</version>
</dependency>
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.*;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
public class WriteESTest {
    private static final RequestOptions COMMON_OPTIONS;
    static {
        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
        // 默认缓存限制为100MB,此处修改为30MB。
        builder.setHttpAsyncResponseConsumerFactory(
                new HttpAsyncResponseConsumerFactory
                        .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
        COMMON_OPTIONS = builder.build();
    }
    public static @NotNull String generateRandomString(int length) {
        String characters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
        StringBuilder sb = new StringBuilder(length);
        Random random = new Random();
        for (int i = 0; i < length; i++) {
            sb.append(characters.charAt(random.nextInt(characters.length())));
        }
        return sb.toString();
    }
    public static void main(String[] args) throws IOException {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        long l1 = System.currentTimeMillis();
        String startTime = simpleDateFormat.format(new Date(l1));
        System.out.println("开始时间: " + l1 + "\n" + startTime);
        RestClientSourceES67 restClientSourceES67 = new RestClientSourceES67();
        RestHighLevelClient client = restClientSourceES67.getClient();
        Random random = new Random();
        try {
            int cnt = 1;
            while (true) {
                if (cnt >= 1000) {
                    break;
                }
                if (cnt % 100 ==0){
                    System.out.println(cnt);
                }
                Map<String, Object> jsonMap = new HashMap<>();
                // cid和prod_id为字段名,value_01、value_02为对应的值。
                jsonMap.put("cid", generateRandomString(15));
                int i1 = random.nextInt();
                int prod_id = i1 >= 0 ? i1 : -i1;
                jsonMap.put("prod_id", prod_id);
                cnt += 1;
                //index_name为索引名称;type_name为类型名称;doc_id为文档的id。
                IndexRequest indexRequest = new IndexRequest("prod_info", "prod").source(jsonMap);
                // 同步执行,并使用自定义RequestOptions(COMMON_OPTIONS)。
                IndexResponse indexResponse = client.index(indexRequest, COMMON_OPTIONS);
                long version = indexResponse.getVersion();
//                System.out.println("Index document successfully!  version:" + version + " data:" + jsonMap);
            }
        } catch (Exception e) {
            System.out.println(e);
        } finally {
            client.close();
        }
        long l2 = System.currentTimeMillis();
        String endTime = simpleDateFormat.format(new Date(l2));
        System.out.println("结束时间: " + l2 + "\n" + endTime);
    }
}
class RestClientSourceES67 {
    private String endpoint = "es-cn-uqm3cclew000edxpm.public.elasticsearch.aliyuncs.com";
    private Integer port = 9200;
    private String username = "elastic";
    private String password = "密码";
    public RestHighLevelClient getClient() {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        //访问用户名和密码为您创建阿里云Elasticsearch实例时设置的用户名和密码,也是Kibana控制台的登录用户名和密码。
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(this.username, this.password));
        // 通过builder创建rest client,配置http client的HttpClientConfigCallback。
        // 单击所创建的Elasticsearch实例ID,在基本信息页面获取公网地址,即为ES集群地址
        RestClientBuilder builder = RestClient.builder(new HttpHost(this.endpoint, this.port))
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                });
        // RestHighLevelClient实例通过REST low-level client builder进行构造。
        RestHighLevelClient highClient = new RestHighLevelClient(builder);
        return highClient;
    }
}

2.1.2.3. 写入效果

场景:针对某些JAVA应用导入ES集群文档的场景进行了模拟

效果:未评估。可按需增加索引文档写入的时间间隔(降低)、增加线程并发等(提高)。

image.png

2.2. 索引2:flink写入

2.2.1. 创建索引

PUT /random_info
{
  "settings": {
    "number_of_shards": 5, # 不填默认索引是1个分片
    "number_of_replicas": 1 # 默认1个副本,最大为(节点-1)
  },
  "mappings": {
    "random": {
      "properties": {
        "productName": {
          "type": "text",
          "analyzer": "ik_smart"
        },
        "annual_rate":{
          "type":"keyword" # text和keyword都是文本类型,keyword默认不分词
        },
        "describe": {
         "type": "text",
         "analyzer": "ik_smart"
        }
      }
    }
  }
}

2.2.2. Flink实时写入es模拟

2.2.2.1. 参考文档

datagen source: https://help.aliyun.com/zh/flink/developer-reference/datagen-connector?spm=a2c4g.11186623.0.i1

ElasticSearch sink: https://help.aliyun.com/zh/flink/developer-reference/elasticsearch-connector?spm=a2c4g.11174283.0.i1

2.2.2.2. 代码实现

CREATE TEMPORARY TABLE datagen_source (
  productName STRING, 
  annual_rate STRING,
  `describe` string
) WITH (
  'connector' = 'datagen'
  -- ,'rows-per-second' = '2'
  ,'number-of-rows' = '5000000'
  ,'fields.productName.length' = '20'
  ,'fields.annual_rate.length' = '20'
  ,'fields.describe.length' = '20'
);
CREATE TEMPORARY TABLE es_sink (
  productName STRING,
  annual_rate STRING,
  `describe` string
  --   ,PRIMARY KEY (user_id) NOT ENFORCED 
  -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。
) WITH (
  'connector' = 'elasticsearch-6',
  'hosts' = 'es-cn-xx.elasticsearch.aliyuncs.com:9200',
  'index' = 'random_info',
  'document-type' = 'random',
  'username' ='elastic',
  'password' ='密码'
);
insert into es_sink
select * from datagen_source

2.2.2.3. 写入效果

  1. Flink
  1. 资源配置:JobManager(1c1g)TaskManager(1 * 1c2g)
  2. 数据量:2500W,数据内容大小长度见datagen算子配置
  3. 耗时:30MIN
  1. ElasticSearch
  1. 集群配置:数据节点2c4g * 2
  2. QPS:20K
  3. CPU:75%-80%

3. 全量数据快照迁移

3.1. 索引情况汇总

索引大小:约9G

索引个数:2个(见上文场景化写入)

image.png

3.2. 源端进行全量快照

3.2.1. 创建仓库

# 创建快照仓库
put /_snapshot/{仓库名}
{
    "type": "oss",
    "settings": {
        "endpoint": "endpoint",
        "access_key_id": "ak",
        "secret_access_key": "sk",
        "bucket": "bucket名称",
        "compress": true
    }
}

3.2.2. 验证仓库

# 查看集群快照细节
GET /_snapshot
# 结果
{
  "es_snapshot_2_es0802" : {
    "type" : "oss",
    "settings" : {
      "bucket" : "bucket名称",
      "compress" : "true",
      "endpoint" : "endpoint"
    }
  }
}
# 查看快照仓库列表
GET /_cat/repositories?v
# 结果
# id                   type
# es_snapshot_2_es0802  oss

3.2.3. 创建快照

# 在仓库下创建快照,快照的名称可以根据日期或业务逻辑命名
# indices 指定了所有索引(不包括.开头的系统索引)
PUT /_snapshot/{仓库名}/{快照明}
{
  "indices":"*,-.*"
}
# 结果
{
  "accepted" : true
}

3.2.4. 验证快照

# 包括整体的和不同的索引,时间,shard情况,文件数
GET /_snapshot/{仓库名}/{快照名}/_status
# 可以看到仓库下所有快照的整体信息
# 可以看到正则匹配到了两个索引,indices列表中
# 开始和结束时间是标准时间戳
# state 为成功即为完成快照的创建
GET /_snapshot/{仓库名}/*
# {
#   "snapshots" : [
#     {
#       "snapshot" : "snapshot_1",
#       "uuid" : "-k0IOgiaSvGIYu5ouWcj2Q",
#       "version_id" : 6070099,
#       "version" : "6.7.0",
#       "indices" : [
#         "product_info",
#         "rds_source"
#       ],
#       "include_global_state" : true,
#       "state" : "SUCCESS",
#       "start_time" : "2023-08-02T08:07:16.715Z",
#       "start_time_in_millis" : 1690963636715,
#       "end_time" : "2023-08-02T08:07:20.843Z",
#       "end_time_in_millis" : 1690963640843,
#       "duration_in_millis" : 4128,
#       "failures" : [ ],
#       "shards" : {
#         "total" : 10,
#         "failed" : 0,
#         "successful" : 10
#       }
#     }
#   ]
# }

3.3. 快照文件迁移

参考阿里云OSS数据迁移功能:https://help.aliyun.com/zh/oss/use-cases/overview-58?spm=a2c4g.11186623.0.0.694371bfE6nnRR

3.4. 目标端从快照恢复

3.4.1. 全量快照恢复及文档数验证

# 创建同名仓库
put /_snapshot/{仓库名}
{
    "type": "oss",
    "settings": {
        "endpoint": "endpoint",
        "access_key_id": "ak",
        "secret_access_key": "sk",
        "bucket": "bucket",
        "compress": true
    }
}
# 进行恢复
POST _snapshot/{仓库名}/{快照名}/_restore
{
    "indices": "*,-.*",
    "ignore_unavailable": "true"
}
# 查看两个索引数量
# docs.count可以对的上
GET /_cat/indices?v

image.png

3.4.2. 恢复时间统计

参考:https://developer.aliyun.com/article/1301212?spm=a2c6h.13148508.setting.15.fbf04f0eBFV3k7

image.png

4. 增量数据写入

4.1. 增量数据手动变更写入

DELETE /prod_info/prod/Y2w63okBKCBmuMF9Zqyo
PUT /prod_info/prod/ZGw63okBKCBmuMF9Zqy7
{
  "prod_id":373586,
  "cid":"666"
}
GET /prod_info/prod/ZGw63okBKCBmuMF9Zqy7

image.pngimage.png

4.2. FLINK增量写入

逻辑同2.2

写入500W到索引 random_info

耗时共9分钟

image.pngimage.pngimage.png

5. 增量数据快照迁移

5.1. 源端增量快照

# 创建增量快照
# 增量
PUT /_snapshot/{仓库名}/{快照名2}
{
  "indices":"*,-.*"
}

image.png

5.2. OSS快照仓库全、增量文件对比

5.2.1. 全量

image.pngimage.png

5.2.2. 增量

image.png

image.png

5.2.3. 对比

oss文件数增长 300+

oss文件大小增长 1G+

源文件记录多个快照,可以看到index-0以及index-1

5.3. 目标端增量快照恢复

# 允许集群通配符
PUT _cluster/settings
{
  "transient": {
    "action.destructive_requires_name": "false"
  }
}
# 关闭所有非系统索引,保证恢复期间没有数据变更,保证数据一致性
POST *,-.*/_close
POST _snapshot/{仓库名}/{增量索引名}/_restore
{
    "indices": "*,-.*",
    "ignore_unavailable": "true"
}

5.4. 数据验证

5.4.1. 整体数据量验证

image.png

5.4.2. 删除数据验证

image.png

5.4.3. 更新数据验证

image.png

5.5. 增量快照恢复时间统计

image.png

相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。 &nbsp;
相关文章
|
关系型数据库 MySQL
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
131 0
|
存储 Linux
ElasticSearch集群快照
ElasticSearch集群快照
533 2
|
JavaScript
ElasticSearch快照脚本
ElasticSearch快照脚本
153 0
|
存储 API
Elasticsearch快照备份与恢复 - 蓝易云
以上步骤可以帮助你在Elasticsearch中实现快照备份和恢复。注意,这些操作可能需要特定的权限和配置,所以在进行操作前,确保你具备足够的权限并已正确配置Elasticsearch。
284 0
|
监控 API 索引
实战问题:Elasticsearch 2.X 数据如何迁移到 7.X?
实战问题:Elasticsearch 2.X 数据如何迁移到 7.X?
|
存储 弹性计算 API
ECS快照问题之绑定oss失败如何解决
阿里云ECS用户可以创建的一个虚拟机实例或硬盘的数据备份,用于数据恢复和克隆新实例;本合集将指导用户如何有效地创建和管理ECS快照,以及解决快照过程中可能遇到的问题,确保数据的安全性和可靠性。
|
分布式计算 大数据 API
完美避坑!记一次Elasticsearch集群迁移架构实战
Elastic自身设计了集群分片的负载平衡机制,当有新数据节点加入集群或者离开集群,集群会自动平衡分片的负载分布。
|
索引 Python
阿里云ElasticSearch索引元数据迁移-基于Python3原生类库
阿里云ElasticSearch索引元数据迁移-基于Python3原生类库
|
测试技术 对象存储 索引
阿里云ElasticSearch从OSS快照恢复总消耗时长统计方法
阿里云ElasticSearch从OSS快照恢复包含其他云厂商ES实例或本地IDC的ES实例中所有的索引,GET /_recovery包含独立索引从快照恢复的开始及结束时长,本方法统计ES实例级别的所有索引快照恢复时间,包含手动输入GET /_recovery及http请求两种信息获取方式。
|
6月前
|
JSON 安全 数据可视化
Elasticsearch(es)在Windows系统上的安装与部署(含Kibana)
Kibana 是 Elastic Stack(原 ELK Stack)中的核心数据可视化工具,主要与 Elasticsearch 配合使用,提供强大的数据探索、分析和展示功能。elasticsearch安装在windows上一般是zip文件,解压到对应目录。文件,elasticsearch8.x以上版本是自动开启安全认证的。kibana安装在windows上一般是zip文件,解压到对应目录。elasticsearch的默认端口是9200,访问。默认用户是elastic,密码需要重置。
2938 0

热门文章

最新文章

相关产品

  • 检索分析服务 Elasticsearch版