《Elastic Stack 实战手册》——三、产品能力——3.5 进阶篇——3.5.6.Datastream (3) https://developer.aliyun.com/article/1228581
使用 data stream
此处对数据流的操作主要以命令为主,Kibana 界面支持较少。
新增数据
data stream 在新增数据时是只追加的模式,因此在固定 id 和 bulk 的模式下,op_type 是指定 create 的。
如下面命令:
POST my-data-stream/_create/1 {"@timestamp":"2020-12-07T11:06:07.000Z","test":1}
或者
PUT /my-data-stream/_bulk?refresh {"create":{ }} { "@timestamp": "2020-12-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" } {"create":{ }} { "@timestamp": "2020-12-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" } {"create":{ }} { "@timestamp": "2020-12-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }
如果并不指定,文档的 id,则可以使用默认的 _doc ,如下:
POST my-data-stream/_doc/ {"@timestamp":"2020-12-07T11:06:07.000Z","test":1}
获取 data stream 状态
使用 data stream stats API 查看 data stream 的状态。
GET /_data_stream/my-data-stream/_stats?human=true
Response:
{ "_shards" : { "total" : 4, "successful" : 2, "failed" : 0 }, "data_stream_count" : 1, "backing_indices" : 1, "total_store_size" : "5kb", "total_store_size_bytes" : 5151, "data_streams" : [ { "data_stream" : "my-data-stream", "backing_indices" : 1, "store_size" : "5kb", "store_size_bytes" : 5151, "maximum_timestamp" : 1607252645000 } ] }
可见 my-data-stream 的大下和后备索引数量。
同时需要用 _ilm/explain 获取 data stream 后备索引所在的 ILM 策略状态。
GET my-data-stream/_ilm/explain
结果:
{ "indices" : { ".ds-my-data-stream-000001" : { "index" : ".ds-my-data-stream-000001", "managed" : true, "policy" : "my-data-stream-policy", "lifecycle_date_millis" : 1620907943375, "age" : "7.95s", "phase" : "hot", "phase_time_millis" : 1620907943567, "action" : "rollover", "action_time_millis" : 1620907943661, "step" : "check-rollover-ready", "step_time_millis" : 1620907943661, "phase_execution" : { "policy" : "my-data-stream-policy", "phase_definition" : { "min_age" : "0ms", "actions" : { "rollover" : { "max_size" : "25gb" } } }, "version" : 1, "modified_date_in_millis" : 1620907939978 } } } }
上图可见,这个数据的 000001 索引主要处于 hot 阶段,策略名称是 logs 等信息。具体参数可见于 ILM 的相关定义。
手动 rollover data stream
使用 rollover API,手动 rollover data stream。
POST my-data-stream/_rollover
结果:
{ "acknowledged" : true, "shards_acknowledged" : true, "old_index" : ".ds-my-data-stream-000001", "new_index" : ".ds-my-data-stream-000002", "rolled_over" : true, "dry_run" : false, "conditions" : { } }
再 GET 相关 data stream 状态,后备索引增加。
GET /_data_stream/my-data-stream/
结果:
{ "data_streams" : [ { "name" : "my-data-stream", "timestamp_field" : { "name" : "@timestamp" }, "indices" : [ { "index_name" : ".ds-my-data-stream-000001", "index_uuid" : "AJBi0g3fRyG8-1tiH2UD2Q" }, { "index_name" : ".ds-my-data-stream-000002", "index_uuid" : "AgOLGMSBSYWb4X-ID8uwtg" } ], "generation" : 2, "status" : "GREEN", "template" : "my-data-stream-template", "ilm_policy" : "my-data-stream-policy", "hidden" : false } ] }
Reindex data stream
使用 reindex API 去复制数据到一个 data stream。由于 data stream 的只追加特性,在
op_type 中要选择为 create 。
POST /_reindex { "source": { "index": "test" }, "dest": { "index": "my-data-stream", "op_type": "create" } }
结果:
{ "took" : 80, "timed_out" : false, "total" : 1, "updated" : 0, "created" : 1, "deleted" : 0, "batches" : 1, "version_conflicts" : 0, "noops" : 0, "retries" : { "bulk" : 0, "search" : 0 }, "throttled_millis" : 0, "requests_per_second" : -1.0, "throttled_until_millis" : 0, "failures" : [ ] }
Delete/Update by query
针对 data stream 只能 delete/update by query。
相关命令:
POST /my-data-stream/_update_by_query { "query": { "match": { "user.id": "l7gk7f82" } }, "script": { "source": "ctx._source.user.id = params.new_id", "params": { "new_id": "XgdX0NoX" } } } POST /my-data-stream/_delete_by_query { "query": { "match": { "user.id": "vlb44hny" } } }
Delete update 后备索引数据
在后备索引删除或者修改,需要注意下面三个要素:
l 文档 id。
l 文档所在的后备索引。
l 如果是修改文档,则需要其 _seq_no 和 _primary_term 两个参数。
主要操作如下:
先获取文档所需的要素信息,设置 seq_no_primary_term 为 true。
GET /my-data-stream/_search { "seq_no_primary_term": true, "query": { "match": { "message": "Login attempt failed" } } }
获得结果:
{ "took" : 621, "timed_out" : false, "_shards" : { "total" : 2, "successful" : 2, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 0.8630463, "hits" : [ { "_index" : ".ds-my-data-stream-000001", "_type" : "_doc", "_id" : "9ZakZXkBkhA9X9yUZo2P", "_seq_no" : 0, "_primary_term" : 1, "_score" : 0.8630463, "_source" : { "@timestamp" : "2020-12-06T11:04:05.000Z", "user" : { "id" : "vlb44hny" }, "message" : "Login attempt failed" } } ] } }
然后修改命令:
PUT /.ds-my-data-stream-000001/_doc/9ZakZXkBkhA9X9yUZo2P?if_seq_no=0&if_primary_term=1 { "@timestamp": "2020-12-07T11:06:07.000Z", "test": 4 }
结果:
{ "_index" : ".ds-my-data-stream-000001", "_type" : "_doc", "_id" : "9ZakZXkBkhA9X9yUZo2P", "_version" : 2, "result" : "updated", "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "_seq_no" : 1, "_primary_term" : 1 }
或者删除命令:
DELETE /.ds-logs-1-1-000002/_doc/3
《Elastic Stack 实战手册》——三、产品能力——3.5 进阶篇——3.5.6.Datastream (5) https://developer.aliyun.com/article/1228578