带你读《Elastic Stack 实战手册》之47:——3.5.6.Datastream (4)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 带你读《Elastic Stack 实战手册》之47:——3.5.6.Datastream (4)

《Elastic Stack 实战手册》——三、产品能力——3.5 进阶篇——3.5.6.Datastream (3) https://developer.aliyun.com/article/1228581


使用 data stream

 

此处对数据流的操作主要以命令为主,Kibana 界面支持较少。

 

新增数据

 

data stream 在新增数据时是只追加的模式,因此在固定 id 和 bulk 的模式下,op_type 是指定 create 的。

 

如下面命令:


POST my-data-stream/_create/1
{"@timestamp":"2020-12-07T11:06:07.000Z","test":1}

或者

PUT /my-data-stream/_bulk?refresh
{"create":{ }}
{ "@timestamp": "2020-12-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
{"create":{ }}
{ "@timestamp": "2020-12-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
{"create":{ }}
{ "@timestamp": "2020-12-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }

如果并不指定,文档的 id,则可以使用默认的 _doc ,如下:

 

POST my-data-stream/_doc/
{"@timestamp":"2020-12-07T11:06:07.000Z","test":1}

获取 data stream 状态

 

使用 data stream stats API 查看 data stream 的状态。

GET /_data_stream/my-data-stream/_stats?human=true

Response:

{
  "_shards" : {
"total" : 4,
  "successful" : 2,
    "failed" : 0
  },
  "data_stream_count" : 1,
  "backing_indices" : 1,
  "total_store_size" : "5kb",
  "total_store_size_bytes" : 5151,
  "data_streams" : [
    {
      "data_stream" : "my-data-stream",
      "backing_indices" : 1,
      "store_size" : "5kb",
      "store_size_bytes" : 5151,
      "maximum_timestamp" : 1607252645000
    }
  ]
}


可见 my-data-stream 的大下和后备索引数量。

 

同时需要用 _ilm/explain 获取 data stream 后备索引所在的 ILM 策略状态。

 

GET my-data-stream/_ilm/explain

 

结果:

 

{
 "indices" : {
    ".ds-my-data-stream-000001" : {
      "index" : ".ds-my-data-stream-000001",
      "managed" : true,
      "policy" : "my-data-stream-policy",
       "lifecycle_date_millis" : 1620907943375,
      "age" : "7.95s",
      "phase" : "hot",
      "phase_time_millis" : 1620907943567,
      "action" : "rollover",
      "action_time_millis" : 1620907943661,
      "step" : "check-rollover-ready",
      "step_time_millis" : 1620907943661,
      "phase_execution" : {
        "policy" : "my-data-stream-policy",
        "phase_definition" : {
          "min_age" : "0ms",
          "actions" : {
            "rollover" : {
              "max_size" : "25gb"
            }
          }
        },
        "version" : 1,
        "modified_date_in_millis" : 1620907939978
      }
    }
  }
}

上图可见,这个数据的 000001 索引主要处于 hot 阶段,策略名称是 logs 等信息。具体参数可见于 ILM 的相关定义。

 

手动 rollover data stream

 

使用 rollover API,手动 rollover data stream。


POST my-data-stream/_rollover

结果:

{
  "acknowledged" : true,
  "shards_acknowledged" : true,
  "old_index" : ".ds-my-data-stream-000001",
  "new_index" : ".ds-my-data-stream-000002",
  "rolled_over" : true,
  "dry_run" : false,
  "conditions" : { }
}

再 GET 相关 data stream 状态,后备索引增加。

GET /_data_stream/my-data-stream/

结果:

{
  "data_streams" : [
    {
      "name" : "my-data-stream",
      "timestamp_field" : {
        "name" : "@timestamp"
      },
      "indices" : [
        {
          "index_name" : ".ds-my-data-stream-000001",
          "index_uuid" : "AJBi0g3fRyG8-1tiH2UD2Q"
        },
        {
          "index_name" : ".ds-my-data-stream-000002",
          "index_uuid" : "AgOLGMSBSYWb4X-ID8uwtg"
        }
      ],
      "generation" : 2,
      "status" : "GREEN",
      "template" : "my-data-stream-template",
      "ilm_policy" : "my-data-stream-policy",
      "hidden" : false
    }
  ]
}


Reindex data stream

 

使用 reindex API 去复制数据到一个 data stream。由于 data stream 的只追加特性,在

op_type 中要选择为 create 。

POST /_reindex
{
  "source": {
    "index": "test"
  },
  "dest": {
    "index": "my-data-stream",
    "op_type": "create"
  }
}

结果:

 

{
  "took" : 80,
  "timed_out" : false,
  "total" : 1,
  "updated" : 0,
  "created" : 1,
  "deleted" : 0,
  "batches" : 1,
  "version_conflicts" : 0,
  "noops" : 0,
  "retries" : {
    "bulk" : 0,
    "search" : 0
  },
  "throttled_millis" : 0,
  "requests_per_second" : -1.0,
  "throttled_until_millis" : 0,
  "failures" : [ ]
}

Delete/Update by query

 

针对 data stream 只能 delete/update by query。

 

相关命令:


POST /my-data-stream/_update_by_query
{
  "query": {
    "match": {
      "user.id": "l7gk7f82"
}
  },
  "script": {
 "source": "ctx._source.user.id = params.new_id",
    "params": {
      "new_id": "XgdX0NoX"
    }
  }
}
POST /my-data-stream/_delete_by_query
{
  "query": {
    "match": {
      "user.id": "vlb44hny"
    }
  }
}

Delete update 后备索引数据

 

在后备索引删除或者修改,需要注意下面三个要素:

 

l 文档 id。

l 文档所在的后备索引。

l 如果是修改文档,则需要其 _seq_no 和 _primary_term 两个参数。

 

主要操作如下:

 

先获取文档所需的要素信息,设置 seq_no_primary_term 为 true。

GET /my-data-stream/_search
{
  "seq_no_primary_term": true,
  "query": {
    "match": {
      "message": "Login attempt failed"
        }
  }
}

 获得结果:

{
  "took" : 621,
  "timed_out" : false,
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 0.8630463,
    "hits" : [
      {
        "_index" : ".ds-my-data-stream-000001",
        "_type" : "_doc",
        "_id" : "9ZakZXkBkhA9X9yUZo2P",
        "_seq_no" : 0,
        "_primary_term" : 1,
        "_score" : 0.8630463,
        "_source" : {
          "@timestamp" : "2020-12-06T11:04:05.000Z",
          "user" : {
            "id" : "vlb44hny"
          },
          "message" : "Login attempt failed"
             }
      }
    ]
  }
}

然后修改命令:

PUT /.ds-my-data-stream-000001/_doc/9ZakZXkBkhA9X9yUZo2P?if_seq_no=0&if_primary_term=1
{
  "@timestamp": "2020-12-07T11:06:07.000Z",
  "test": 4
}

结果:

{
  "_index" : ".ds-my-data-stream-000001",
  "_type" : "_doc",
  "_id" : "9ZakZXkBkhA9X9yUZo2P",
  "_version" : 2,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 1,
  "_primary_term" : 1
}

或者删除命令:

DELETE  /.ds-logs-1-1-000002/_doc/3

《Elastic Stack 实战手册》——三、产品能力——3.5 进阶篇——3.5.6.Datastream (5) https://developer.aliyun.com/article/1228578

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
存储 索引
带你读《Elastic Stack 实战手册》之47:——3.5.6.Datastream (5)
带你读《Elastic Stack 实战手册》之47:——3.5.6.Datastream (5)
|
监控 索引
带你读《Elastic Stack 实战手册》之47:——3.5.6.Datastream (2)
带你读《Elastic Stack 实战手册》之47:——3.5.6.Datastream (2)
105 0
|
索引
带你读《Elastic Stack 实战手册》之47:——3.5.6.Datastream (3)
带你读《Elastic Stack 实战手册》之47:——3.5.6.Datastream (3)
|
存储 数据处理 索引
带你读《Elastic Stack 实战手册》之47:——3.5.6.Datastream (1)
带你读《Elastic Stack 实战手册》之47:——3.5.6.Datastream (1)
|
JSON Java 数据格式
带你读《Elastic Stack 实战手册》之33:——3.4.2.17.2.Schemaless(下)
带你读《Elastic Stack 实战手册》之33:——3.4.2.17.2.Schemaless(下)
100 0
|
SQL 自然语言处理 监控
带你读《Elastic Stack 实战手册》之2:——二、导读(上)
带你读《Elastic Stack 实战手册》之2:——二、导读(上)
333 0
|
存储 运维 监控
带你读《Elastic Stack 实战手册》之2:——二、导读(下)
带你读《Elastic Stack 实战手册》之2:——二、导读(下)
259 0
|
自然语言处理 索引
带你读《Elastic Stack 实战手册》之33:——3.4.2.17.2.Schemaless(上)
带你读《Elastic Stack 实战手册》之33:——3.4.2.17.2.Schemaless(上)
106 0
|
存储 资源调度 NoSQL
带你读《Elastic Stack 实战手册》之45:——3.5.4.Graph (上)
带你读《Elastic Stack 实战手册》之45:——3.5.4.Graph (上)
124 0
|
数据可视化 NoSQL API
带你读《Elastic Stack 实战手册》之45:——3.5.4.Graph (下)
带你读《Elastic Stack 实战手册》之45:——3.5.4.Graph (下)
138 0

热门文章

最新文章