1. 基本信息
源端集群:ElasticSearch版本:6.4.3
目标集群:ElasticSearch版本:6.7.0
2. 模拟数据生成
2.1. 索引1:java写入
2.1.1. 创建索引
PUT /prod_info { "settings": { "number_of_shards": 5, "number_of_replicas": 1 }, "mappings": { "prod": { "properties": { "cid": { "type": "text", "analyzer": "ik_smart" }, "prod_id": { "type": "text", "analyzer": "ik_smart" } } } } }
2.1.2. java客户端写入es模拟
2.1.2.1. 参考文档
high level rest client 6.7.x:
https://help.aliyun.com/document_detail/131255.html?spm=a2c4g.120716.0.0.639b40bf4z4bol
2.1.2.2. 代码实现
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.7.0</version> </dependency>
import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.*; import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Random; public class WriteESTest { private static final RequestOptions COMMON_OPTIONS; static { RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); // 默认缓存限制为100MB,此处修改为30MB。 builder.setHttpAsyncResponseConsumerFactory( new HttpAsyncResponseConsumerFactory .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024)); COMMON_OPTIONS = builder.build(); } public static @NotNull String generateRandomString(int length) { String characters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; StringBuilder sb = new StringBuilder(length); Random random = new Random(); for (int i = 0; i < length; i++) { sb.append(characters.charAt(random.nextInt(characters.length()))); } return sb.toString(); } public static void main(String[] args) throws IOException { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long l1 = System.currentTimeMillis(); String startTime = simpleDateFormat.format(new Date(l1)); System.out.println("开始时间: " + l1 + "\n" + startTime); RestClientSourceES67 restClientSourceES67 = new RestClientSourceES67(); RestHighLevelClient client = restClientSourceES67.getClient(); Random random = new Random(); try { int cnt = 1; while (true) { if (cnt >= 1000) { break; } if (cnt % 100 ==0){ System.out.println(cnt); } Map<String, Object> jsonMap = new HashMap<>(); // cid和prod_id为字段名,value_01、value_02为对应的值。 jsonMap.put("cid", generateRandomString(15)); int i1 = random.nextInt(); int prod_id = i1 >= 0 ? i1 : -i1; jsonMap.put("prod_id", prod_id); cnt += 1; //index_name为索引名称;type_name为类型名称;doc_id为文档的id。 IndexRequest indexRequest = new IndexRequest("prod_info", "prod").source(jsonMap); // 同步执行,并使用自定义RequestOptions(COMMON_OPTIONS)。 IndexResponse indexResponse = client.index(indexRequest, COMMON_OPTIONS); long version = indexResponse.getVersion(); // System.out.println("Index document successfully! version:" + version + " data:" + jsonMap); } } catch (Exception e) { System.out.println(e); } finally { client.close(); } long l2 = System.currentTimeMillis(); String endTime = simpleDateFormat.format(new Date(l2)); System.out.println("结束时间: " + l2 + "\n" + endTime); } } class RestClientSourceES67 { private String endpoint = "es-cn-uqm3cclew000edxpm.public.elasticsearch.aliyuncs.com"; private Integer port = 9200; private String username = "elastic"; private String password = "密码"; public RestHighLevelClient getClient() { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); //访问用户名和密码为您创建阿里云Elasticsearch实例时设置的用户名和密码,也是Kibana控制台的登录用户名和密码。 credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(this.username, this.password)); // 通过builder创建rest client,配置http client的HttpClientConfigCallback。 // 单击所创建的Elasticsearch实例ID,在基本信息页面获取公网地址,即为ES集群地址 RestClientBuilder builder = RestClient.builder(new HttpHost(this.endpoint, this.port)) .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }); // RestHighLevelClient实例通过REST low-level client builder进行构造。 RestHighLevelClient highClient = new RestHighLevelClient(builder); return highClient; } }
2.1.2.3. 写入效果
场景:针对某些JAVA应用导入ES集群文档的场景进行了模拟
效果:未评估。可按需增加索引文档写入的时间间隔(降低)、增加线程并发等(提高)。
2.2. 索引2:flink写入
2.2.1. 创建索引
PUT /random_info { "settings": { "number_of_shards": 5, # 不填默认索引是1个分片 "number_of_replicas": 1 # 默认1个副本,最大为(节点-1) }, "mappings": { "random": { "properties": { "productName": { "type": "text", "analyzer": "ik_smart" }, "annual_rate":{ "type":"keyword" # text和keyword都是文本类型,keyword默认不分词 }, "describe": { "type": "text", "analyzer": "ik_smart" } } } } }
2.2.2. Flink实时写入es模拟
2.2.2.1. 参考文档
datagen source: https://help.aliyun.com/zh/flink/developer-reference/datagen-connector?spm=a2c4g.11186623.0.i1
ElasticSearch sink: https://help.aliyun.com/zh/flink/developer-reference/elasticsearch-connector?spm=a2c4g.11174283.0.i1
2.2.2.2. 代码实现
CREATE TEMPORARY TABLE datagen_source ( productName STRING, annual_rate STRING, `describe` string ) WITH ( 'connector' = 'datagen' -- ,'rows-per-second' = '2' ,'number-of-rows' = '5000000' ,'fields.productName.length' = '20' ,'fields.annual_rate.length' = '20' ,'fields.describe.length' = '20' ); CREATE TEMPORARY TABLE es_sink ( productName STRING, annual_rate STRING, `describe` string -- ,PRIMARY KEY (user_id) NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。 ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = 'es-cn-xx.elasticsearch.aliyuncs.com:9200', 'index' = 'random_info', 'document-type' = 'random', 'username' ='elastic', 'password' ='密码' ); insert into es_sink select * from datagen_source
2.2.2.3. 写入效果
- Flink
- 资源配置:JobManager(1c1g)TaskManager(1 * 1c2g)
- 数据量:2500W,数据内容大小长度见datagen算子配置
- 耗时:30MIN
- ElasticSearch
- 集群配置:数据节点2c4g * 2
- QPS:20K
- CPU:75%-80%
3. 全量数据快照迁移
3.1. 索引情况汇总
索引大小:约9G
索引个数:2个(见上文场景化写入)
3.2. 源端进行全量快照
3.2.1. 创建仓库
# 创建快照仓库 put /_snapshot/{仓库名} { "type": "oss", "settings": { "endpoint": "endpoint", "access_key_id": "ak", "secret_access_key": "sk", "bucket": "bucket名称", "compress": true } }
3.2.2. 验证仓库
# 查看集群快照细节 GET /_snapshot # 结果 { "es_snapshot_2_es0802" : { "type" : "oss", "settings" : { "bucket" : "bucket名称", "compress" : "true", "endpoint" : "endpoint" } } } # 查看快照仓库列表 GET /_cat/repositories?v # 结果 # id type # es_snapshot_2_es0802 oss
3.2.3. 创建快照
# 在仓库下创建快照,快照的名称可以根据日期或业务逻辑命名 # indices 指定了所有索引(不包括.开头的系统索引) PUT /_snapshot/{仓库名}/{快照明} { "indices":"*,-.*" } # 结果 { "accepted" : true }
3.2.4. 验证快照
# 包括整体的和不同的索引,时间,shard情况,文件数 GET /_snapshot/{仓库名}/{快照名}/_status # 可以看到仓库下所有快照的整体信息 # 可以看到正则匹配到了两个索引,indices列表中 # 开始和结束时间是标准时间戳 # state 为成功即为完成快照的创建 GET /_snapshot/{仓库名}/* # { # "snapshots" : [ # { # "snapshot" : "snapshot_1", # "uuid" : "-k0IOgiaSvGIYu5ouWcj2Q", # "version_id" : 6070099, # "version" : "6.7.0", # "indices" : [ # "product_info", # "rds_source" # ], # "include_global_state" : true, # "state" : "SUCCESS", # "start_time" : "2023-08-02T08:07:16.715Z", # "start_time_in_millis" : 1690963636715, # "end_time" : "2023-08-02T08:07:20.843Z", # "end_time_in_millis" : 1690963640843, # "duration_in_millis" : 4128, # "failures" : [ ], # "shards" : { # "total" : 10, # "failed" : 0, # "successful" : 10 # } # } # ] # }
3.3. 快照文件迁移
参考阿里云OSS数据迁移功能:https://help.aliyun.com/zh/oss/use-cases/overview-58?spm=a2c4g.11186623.0.0.694371bfE6nnRR
3.4. 目标端从快照恢复
3.4.1. 全量快照恢复及文档数验证
# 创建同名仓库 put /_snapshot/{仓库名} { "type": "oss", "settings": { "endpoint": "endpoint", "access_key_id": "ak", "secret_access_key": "sk", "bucket": "bucket", "compress": true } } # 进行恢复 POST _snapshot/{仓库名}/{快照名}/_restore { "indices": "*,-.*", "ignore_unavailable": "true" } # 查看两个索引数量 # docs.count可以对的上 GET /_cat/indices?v
3.4.2. 恢复时间统计
参考:https://developer.aliyun.com/article/1301212?spm=a2c6h.13148508.setting.15.fbf04f0eBFV3k7
4. 增量数据写入
4.1. 增量数据手动变更写入
DELETE /prod_info/prod/Y2w63okBKCBmuMF9Zqyo PUT /prod_info/prod/ZGw63okBKCBmuMF9Zqy7 { "prod_id":373586, "cid":"666" } GET /prod_info/prod/ZGw63okBKCBmuMF9Zqy7
4.2. FLINK增量写入
逻辑同2.2
写入500W到索引 random_info
耗时共9分钟
5. 增量数据快照迁移
5.1. 源端增量快照
# 创建增量快照 # 增量 PUT /_snapshot/{仓库名}/{快照名2} { "indices":"*,-.*" }
5.2. OSS快照仓库全、增量文件对比
5.2.1. 全量
5.2.2. 增量
5.2.3. 对比
oss文件数增长 300+
oss文件大小增长 1G+
源文件记录多个快照,可以看到index-0以及index-1
5.3. 目标端增量快照恢复
# 允许集群通配符 PUT _cluster/settings { "transient": { "action.destructive_requires_name": "false" } } # 关闭所有非系统索引,保证恢复期间没有数据变更,保证数据一致性 POST *,-.*/_close POST _snapshot/{仓库名}/{增量索引名}/_restore { "indices": "*,-.*", "ignore_unavailable": "true" }
5.4. 数据验证
5.4.1. 整体数据量验证
5.4.2. 删除数据验证
5.4.3. 更新数据验证
5.5. 增量快照恢复时间统计