阿里云ElasticSearch迁移-基于OSS全增量快照

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 阿里云ElasticSearch迁移-基于OSS全增量快照

1. 基本信息

源端集群:ElasticSearch版本:6.4.3

目标集群:ElasticSearch版本:6.7.0

2. 模拟数据生成

2.1. 索引1:java写入

2.1.1. 创建索引

PUT /prod_info
{
  "settings": {
    "number_of_shards": 5,
    "number_of_replicas": 1
  },
  "mappings": {
    "prod": {
      "properties": {
        "cid": {
          "type": "text",
          "analyzer": "ik_smart"
        },
        "prod_id": {
         "type": "text",
         "analyzer": "ik_smart"
        }
      }
    }
  }
}

2.1.2. java客户端写入es模拟

2.1.2.1. 参考文档

high level rest client 6.7.x:

https://help.aliyun.com/document_detail/131255.html?spm=a2c4g.120716.0.0.639b40bf4z4bol

2.1.2.2. 代码实现

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.7.0</version>
</dependency>
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.*;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
public class WriteESTest {
    private static final RequestOptions COMMON_OPTIONS;
    static {
        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
        // 默认缓存限制为100MB,此处修改为30MB。
        builder.setHttpAsyncResponseConsumerFactory(
                new HttpAsyncResponseConsumerFactory
                        .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
        COMMON_OPTIONS = builder.build();
    }
    public static @NotNull String generateRandomString(int length) {
        String characters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
        StringBuilder sb = new StringBuilder(length);
        Random random = new Random();
        for (int i = 0; i < length; i++) {
            sb.append(characters.charAt(random.nextInt(characters.length())));
        }
        return sb.toString();
    }
    public static void main(String[] args) throws IOException {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        long l1 = System.currentTimeMillis();
        String startTime = simpleDateFormat.format(new Date(l1));
        System.out.println("开始时间: " + l1 + "\n" + startTime);
        RestClientSourceES67 restClientSourceES67 = new RestClientSourceES67();
        RestHighLevelClient client = restClientSourceES67.getClient();
        Random random = new Random();
        try {
            int cnt = 1;
            while (true) {
                if (cnt >= 1000) {
                    break;
                }
                if (cnt % 100 ==0){
                    System.out.println(cnt);
                }
                Map<String, Object> jsonMap = new HashMap<>();
                // cid和prod_id为字段名,value_01、value_02为对应的值。
                jsonMap.put("cid", generateRandomString(15));
                int i1 = random.nextInt();
                int prod_id = i1 >= 0 ? i1 : -i1;
                jsonMap.put("prod_id", prod_id);
                cnt += 1;
                //index_name为索引名称;type_name为类型名称;doc_id为文档的id。
                IndexRequest indexRequest = new IndexRequest("prod_info", "prod").source(jsonMap);
                // 同步执行,并使用自定义RequestOptions(COMMON_OPTIONS)。
                IndexResponse indexResponse = client.index(indexRequest, COMMON_OPTIONS);
                long version = indexResponse.getVersion();
//                System.out.println("Index document successfully!  version:" + version + " data:" + jsonMap);
            }
        } catch (Exception e) {
            System.out.println(e);
        } finally {
            client.close();
        }
        long l2 = System.currentTimeMillis();
        String endTime = simpleDateFormat.format(new Date(l2));
        System.out.println("结束时间: " + l2 + "\n" + endTime);
    }
}
class RestClientSourceES67 {
    private String endpoint = "es-cn-uqm3cclew000edxpm.public.elasticsearch.aliyuncs.com";
    private Integer port = 9200;
    private String username = "elastic";
    private String password = "密码";
    public RestHighLevelClient getClient() {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        //访问用户名和密码为您创建阿里云Elasticsearch实例时设置的用户名和密码,也是Kibana控制台的登录用户名和密码。
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(this.username, this.password));
        // 通过builder创建rest client,配置http client的HttpClientConfigCallback。
        // 单击所创建的Elasticsearch实例ID,在基本信息页面获取公网地址,即为ES集群地址
        RestClientBuilder builder = RestClient.builder(new HttpHost(this.endpoint, this.port))
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                });
        // RestHighLevelClient实例通过REST low-level client builder进行构造。
        RestHighLevelClient highClient = new RestHighLevelClient(builder);
        return highClient;
    }
}

2.1.2.3. 写入效果

场景:针对某些JAVA应用导入ES集群文档的场景进行了模拟

效果:未评估。可按需增加索引文档写入的时间间隔(降低)、增加线程并发等(提高)。

image.png

2.2. 索引2:flink写入

2.2.1. 创建索引

PUT /random_info
{
  "settings": {
    "number_of_shards": 5, # 不填默认索引是1个分片
    "number_of_replicas": 1 # 默认1个副本,最大为(节点-1)
  },
  "mappings": {
    "random": {
      "properties": {
        "productName": {
          "type": "text",
          "analyzer": "ik_smart"
        },
        "annual_rate":{
          "type":"keyword" # text和keyword都是文本类型,keyword默认不分词
        },
        "describe": {
         "type": "text",
         "analyzer": "ik_smart"
        }
      }
    }
  }
}

2.2.2. Flink实时写入es模拟

2.2.2.1. 参考文档

datagen source: https://help.aliyun.com/zh/flink/developer-reference/datagen-connector?spm=a2c4g.11186623.0.i1

ElasticSearch sink: https://help.aliyun.com/zh/flink/developer-reference/elasticsearch-connector?spm=a2c4g.11174283.0.i1

2.2.2.2. 代码实现

CREATE TEMPORARY TABLE datagen_source (
  productName STRING, 
  annual_rate STRING,
  `describe` string
) WITH (
  'connector' = 'datagen'
  -- ,'rows-per-second' = '2'
  ,'number-of-rows' = '5000000'
  ,'fields.productName.length' = '20'
  ,'fields.annual_rate.length' = '20'
  ,'fields.describe.length' = '20'
);
CREATE TEMPORARY TABLE es_sink (
  productName STRING,
  annual_rate STRING,
  `describe` string
  --   ,PRIMARY KEY (user_id) NOT ENFORCED 
  -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。
) WITH (
  'connector' = 'elasticsearch-6',
  'hosts' = 'es-cn-xx.elasticsearch.aliyuncs.com:9200',
  'index' = 'random_info',
  'document-type' = 'random',
  'username' ='elastic',
  'password' ='密码'
);
insert into es_sink
select * from datagen_source

2.2.2.3. 写入效果

  1. Flink
  1. 资源配置:JobManager(1c1g)TaskManager(1 * 1c2g)
  2. 数据量:2500W,数据内容大小长度见datagen算子配置
  3. 耗时:30MIN
  1. ElasticSearch
  1. 集群配置:数据节点2c4g * 2
  2. QPS:20K
  3. CPU:75%-80%

3. 全量数据快照迁移

3.1. 索引情况汇总

索引大小:约9G

索引个数:2个(见上文场景化写入)

image.png

3.2. 源端进行全量快照

3.2.1. 创建仓库

# 创建快照仓库
put /_snapshot/{仓库名}
{
    "type": "oss",
    "settings": {
        "endpoint": "endpoint",
        "access_key_id": "ak",
        "secret_access_key": "sk",
        "bucket": "bucket名称",
        "compress": true
    }
}

3.2.2. 验证仓库

# 查看集群快照细节
GET /_snapshot
# 结果
{
  "es_snapshot_2_es0802" : {
    "type" : "oss",
    "settings" : {
      "bucket" : "bucket名称",
      "compress" : "true",
      "endpoint" : "endpoint"
    }
  }
}
# 查看快照仓库列表
GET /_cat/repositories?v
# 结果
# id                   type
# es_snapshot_2_es0802  oss

3.2.3. 创建快照

# 在仓库下创建快照,快照的名称可以根据日期或业务逻辑命名
# indices 指定了所有索引(不包括.开头的系统索引)
PUT /_snapshot/{仓库名}/{快照明}
{
  "indices":"*,-.*"
}
# 结果
{
  "accepted" : true
}

3.2.4. 验证快照

# 包括整体的和不同的索引,时间,shard情况,文件数
GET /_snapshot/{仓库名}/{快照名}/_status
# 可以看到仓库下所有快照的整体信息
# 可以看到正则匹配到了两个索引,indices列表中
# 开始和结束时间是标准时间戳
# state 为成功即为完成快照的创建
GET /_snapshot/{仓库名}/*
# {
#   "snapshots" : [
#     {
#       "snapshot" : "snapshot_1",
#       "uuid" : "-k0IOgiaSvGIYu5ouWcj2Q",
#       "version_id" : 6070099,
#       "version" : "6.7.0",
#       "indices" : [
#         "product_info",
#         "rds_source"
#       ],
#       "include_global_state" : true,
#       "state" : "SUCCESS",
#       "start_time" : "2023-08-02T08:07:16.715Z",
#       "start_time_in_millis" : 1690963636715,
#       "end_time" : "2023-08-02T08:07:20.843Z",
#       "end_time_in_millis" : 1690963640843,
#       "duration_in_millis" : 4128,
#       "failures" : [ ],
#       "shards" : {
#         "total" : 10,
#         "failed" : 0,
#         "successful" : 10
#       }
#     }
#   ]
# }

3.3. 快照文件迁移

参考阿里云OSS数据迁移功能:https://help.aliyun.com/zh/oss/use-cases/overview-58?spm=a2c4g.11186623.0.0.694371bfE6nnRR

3.4. 目标端从快照恢复

3.4.1. 全量快照恢复及文档数验证

# 创建同名仓库
put /_snapshot/{仓库名}
{
    "type": "oss",
    "settings": {
        "endpoint": "endpoint",
        "access_key_id": "ak",
        "secret_access_key": "sk",
        "bucket": "bucket",
        "compress": true
    }
}
# 进行恢复
POST _snapshot/{仓库名}/{快照名}/_restore
{
    "indices": "*,-.*",
    "ignore_unavailable": "true"
}
# 查看两个索引数量
# docs.count可以对的上
GET /_cat/indices?v

image.png

3.4.2. 恢复时间统计

参考:https://developer.aliyun.com/article/1301212?spm=a2c6h.13148508.setting.15.fbf04f0eBFV3k7

image.png

4. 增量数据写入

4.1. 增量数据手动变更写入

DELETE /prod_info/prod/Y2w63okBKCBmuMF9Zqyo
PUT /prod_info/prod/ZGw63okBKCBmuMF9Zqy7
{
  "prod_id":373586,
  "cid":"666"
}
GET /prod_info/prod/ZGw63okBKCBmuMF9Zqy7

image.pngimage.png

4.2. FLINK增量写入

逻辑同2.2

写入500W到索引 random_info

耗时共9分钟

image.pngimage.pngimage.png

5. 增量数据快照迁移

5.1. 源端增量快照

# 创建增量快照
# 增量
PUT /_snapshot/{仓库名}/{快照名2}
{
  "indices":"*,-.*"
}

image.png

5.2. OSS快照仓库全、增量文件对比

5.2.1. 全量

image.pngimage.png

5.2.2. 增量

image.png

image.png

5.2.3. 对比

oss文件数增长 300+

oss文件大小增长 1G+

源文件记录多个快照,可以看到index-0以及index-1

5.3. 目标端增量快照恢复

# 允许集群通配符
PUT _cluster/settings
{
  "transient": {
    "action.destructive_requires_name": "false"
  }
}
# 关闭所有非系统索引,保证恢复期间没有数据变更,保证数据一致性
POST *,-.*/_close
POST _snapshot/{仓库名}/{增量索引名}/_restore
{
    "indices": "*,-.*",
    "ignore_unavailable": "true"
}

5.4. 数据验证

5.4.1. 整体数据量验证

image.png

5.4.2. 删除数据验证

image.png

5.4.3. 更新数据验证

image.png

5.5. 增量快照恢复时间统计

image.png

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
4月前
|
机器学习/深度学习 人工智能 专有云
人工智能平台PAI使用问题之怎么将DLC的数据写入到另一个阿里云主账号的OSS中
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
16天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
3月前
|
存储 机器学习/深度学习 弹性计算
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
|
4月前
|
消息中间件 分布式计算 DataWorks
DataWorks产品使用合集之如何使用Python和阿里云SDK读取OSS中的文件
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
存储 运维 安全
阿里云OSS的优势
【7月更文挑战第19天】阿里云OSS的优势
189 2
|
4月前
|
存储 API 开发工具
阿里云OSS
【7月更文挑战第19天】阿里云OSS
187 1
|
4月前
|
存储 弹性计算 对象存储
预留空间是什么?阿里云OSS对象存储预留空间说明
阿里云OSS预留空间是预付费存储产品,提供折扣价以锁定特定容量,适用于抵扣有地域属性的Bucket标准存储费用及ECS快照费。通过购买预留空间,如500GB通用预留+100GB标准-本地冗余存储包,用户可优化成本。
203 4
|
4月前
|
人工智能 对象存储
【阿里云AI助理】自家产品提供错误答案。阿里云OSS 资源包类型: 下行流量 地域: 中国内地通用 下行流量包规格: 300 GB 套餐: 下行流量包(中国内地) ,包1年。那么这个是每月300GB,1年是3600GB的流量;还是1年只有300GB的流量?
自家产品提供错误答案。阿里云OSS 资源包类型: 下行流量 地域: 中国内地通用 下行流量包规格: 300 GB 套餐: 下行流量包(中国内地) ,包1年。那么这个是每月300GB,1年是3600GB的流量;还是1年只有300GB的流量?
128 1
|
5月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何将CSV文件从阿里云OSS同步到ODPS表,并且使用列作为表分区
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
DataWorks产品使用合集之如何将CSV文件从阿里云OSS同步到ODPS表,并且使用列作为表分区
|
6月前
|
SQL 分布式计算 监控
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
本文演示了使用 EMR Serverless Spark 产品搭建一个日志分析应用的全流程,包括数据开发和生产调度以及交互式查询等场景。
56595 7
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用

相关产品

  • 检索分析服务 Elasticsearch版