ELK技术栈 - Elasticsearch 学习笔记(二)(下)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: ELK技术栈 - Elasticsearch 学习笔记(二)(下)

使用外部系统的版本



还有一种常见的情况就是我们还是使用其他的数据库来存储数据,而Elasticsearch只是帮我

们检索数据。这也就意味着主数据库只要发生的变更,就需要将其拷贝到Elasticsearch中。

如果多个进程同时发生,就会产生上文提到的那些并发问题。

如果你的数据库已经存在了版本号码,或者也可以代表版本的 时间戳  。这是你就可以在

Elasticsearch的查询字符串后面添加 version_type=external  来使用这些号码。版本号码必须

要是大于零小于 9.2e+18  (Java中long的最大正值)的整数

Elasticsearch在处理外部版本号时会与对内部版本号的处理有些不同。它不再是检

查 _version  是否与请求中指定的数值相同,而是检查当前的 _version  是否比指定的数值小

如果请求成功,那么外部的版本号就会被存储到文档中的 _version  中。


例如,创建一篇使用外部版本号为 5  的博文,我们可以这样操作:


PUT /website/blog/2?version=5&version_type=external
{
    "title": "My first external blog entry",
    "text": "Starting to get the hang of this..."
}


更新文档中的一部分



文档不能被修改,它们只能被替换掉。 更新  API也必须遵循这一法则。从表面看来,貌似是文档被替换了。对内而言,它必须按照找回-修改-索引的流程来进行操作与管理。不同之处在于这个流程是在一个片(shard) 中完成的,因此可以节省多个请求所带来的 网络开销。除了节省了步骤,同时我们也能减少多个进程造成冲突的可能性。

使用 更新  请求最简单的一种用途就是添加新数据。新的数据会被合并到现有数据中,而如果 存在相同的字段,就会被新的数据所替换。例如我们可以为我们的博客添加 tags  和 views  字 段:


POST /website/blog/1/_update
{
    "doc" : {
        "tags" : [ "testing" ],
        "views": 0
    }
}


如果请求成功,我们就会收到一个类似于 索引  时返回的内容:


{
    "_index" : "website",
    "_id" : "1",
    "_type" : "blog",
    "_version" : 3
}


再次取回数据,你可以在 _source  中看到更新的结果:


{
    "_index": "website",
    "_type": "blog",
    "_id": "1",
    "_version": 3,
    "found": true,
    "_source": {
        "title": "My first blog entry",
        "text": "Starting to get the hang of this...",
        "tags": [ "testing" ], 
        "views": 0 
    }
}


MVEL是一个简单高效的JAVA基础动态脚本语言,它的语法类似于Javascript。你可以 在Elasticsearch scripting docs 以及 MVEL website了解更多关于MVEL的信息。


脚本语言可以在 更新  API中被用来修改 _source  中的内容,而它在脚本中被称为 ctx._source  。例如,我们可以使用脚本来增加博文中 views  的数字:


POST /website/blog/1/_update
{
    "script" : "ctx._source.views+=1"
}


这样Elasticsearch就可以重新使用这个脚本进行tag的添加,而不用再次重新编写脚本了:


POST /website/blog/1/_update
{
    "script" : "ctx._source.tags+=new_tag",
    "params" : {
      "new_tag" : "search"
    }
}


结果


{
    "_index": "website",
    "_type": "blog",
    "_id": "1",
    "_version": 5,
    "found": true,
    "_source": {
        "title": "My first blog entry",
        "text": "Starting to get the hang of this...",
        "tags": ["testing", "search"], <1>
        "views": 1 <2>
     }
}


  1. tags  数组中出现了 search  。
  2. views  字段增加了。

使用 ctx.op  来根据内容选择是否删除一个文档:


POST /website/blog/1/_update
{
    "script" : "ctx.op = ctx._source.views == count ? 'delete' : 'none'",
    "params" : {
      "count": 1
    }
}


更新一篇可能不存在的文档


我们可以使用 upsert  参数来设定文档不存在时,它应该被创建:


POST /website/pageviews/1/_update
{
    "script" : "ctx._source.views+=1",
    "upsert": {
        "views": 1
    }
}


首次运行这个请求时, upsert  的内容会被索引成新的文档,它将 views  字段初始化为 1  。当之后再请求时,文档已经存在,所以 脚本  更新就会被执行, views  计数器就会增加。


更新和冲突



你可以通过设定 retry_on_conflict  参数来设置自动完成这项请求的次数,它的默认值是 0  。


POST /website/pageviews/1/_update?retry_on_conflict=5 <1>
{
    "script" : "ctx._source.views+=1",
    "upsert": {
      "views": 0
    }
}


失败前重新尝试5次


这个参数非常适用于类似于增加计数器这种无关顺序的请求,但是还有些情况的顺序就是很 重要的。例如上一节提到的情况,你可以参考乐观并发控制以及悲观并发控制来设定文档的 版本号。


获取多个文档



如果你需要从Elasticsearch中获取多个文档,你可以使用multi-get 或者 mget  API来取代一篇又一篇文档的获取。

mget  API需要一个 docs  数组,每一个元素包含你想要的文档的 _index  ,  _type  以及 _id  。 你也可以指定 _source  参数来设定你所需要的字段:


GET /_mget
{
    "docs" : [
        {
            "_index" : "website",
            "_type" : "blog",
            "_id" : 2
        },
        {
            "_index" : "website",
            "_type" : "pageviews",
            "_id" : 1,
            "_source": "views"
        }
    ]
}


返回值包含了一个 docs  数组,这个数组以请求中指定的顺序每个文档包含一个响应。每一个 响应都和独立的 get  请求返回的响应相同:


{
    "docs" : [
        {
            "_index" : "website",
            "_id" : "2",
            "_type" : "blog",
            "found" : true,
            "_source" : {
                "text" : "This is a piece of cake...",
                "title" : "My first external blog entry"
            },
            "_version" : 10
        },
        {
            "_index" : "website",
            "_id" : "1",
            "_type" : "pageviews",
            "found" : true,
            "_version" : 2,
            "_source" : {
                "views" : 2
            }
        }
    ]
}


如果你所需要的文档都在同一个 _index  或者同一个 _type  中,你就可以在URL中指定一个默 认的 /_index  或是 /_index/_type  。


GET /website/blog/_mget
{
    "docs" : [
        { "_id" : 2 },
        { "_type" : "pageviews", "_id" : 1 }
    ]
}


事实上,如果所有的文档拥有相同的 _index  以及  _type  ,直接在请求中添加 ids  的数组即 可:


GET /website/blog/_mget
{
  "ids" : [ "2", "1" ]
}


请注意,我们所请求的第二篇文档不存在,这是就会返回如下内容:


{
    "docs" : [
        {
            "_index" : "website",
            "_type" : "blog",
            "_id" : "2",
            "_version" : 10,
            "found" : true,
            "_source" : {
                "title": "My first external blog entry",
                "text": "This is a piece of cake..."
            }
        },
        {
            "_index" : "website",
            "_type" : "blog",
            "_id" : "1",
            "found" : false <1>
        }
    ]
}


要确定独立的文档是否被成功找到,你需要检查 found  标识。


批量更高效



与 mget  能同时允许帮助我们获取多个文档相同, bulk  API可以帮助我们同时完成执行多个请求,比如: create  , index  ,  update  以及 delete  。当你在处理类似于log等海量数据的时 候,你就可以一下处理成百上千的请求,这个操作将会极大提高效率。


bulk  的请求主体的格式稍微有些不同:


{ action: { metadata }}\n
{ request body }\n
{ action: { metadata }}\n
{ request body }\n
...


这种格式就类似于一个用 "\n"  字符来连接的单行json一样。下面是两点注意事项:


  1. 每一行都结尾处都必须有换行字符 "\n"  ,最后一行也要有。这些标记可以有效地分隔每
    行。
  2. 这些行里不能包含非转义字符,以免干扰数据的分析 — — 这也意味着JSON不能是 pretty-printed样式。


action/metadata 行指定了将要在哪个文档中执行什么操作。


其中action必须是 index  ,  create  ,  update  或者 delete  。metadata 需要指明需要被操作文 档的 _index  ,  _type  以及 _id  ,例如删除命令就可以这样填写:


示例


{ "delete": { "_index": "website", "_type": "blog", "_id": "123" }}


在你进行 index  以及 create  操作时,request body 行必须要包含文档的 _source  数据——也 就是文档的所有内容。 同样,在执行 update  API:  doc  ,  upsert  , script  的时候,也需要包含相关数据。而在删除 的时候就不需要request body行。


{ "create": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "title": "My first blog post" }


如果没有指定 _id  ,那么系统就会自动生成一个ID:


{ "index": { "_index": "website", "_type": "blog" }}
{ "title": "My second blog post" }


完成以上所有请求的 bulk  如下:



POST /_bulk
{ "delete": { "_index": "website", "_type": "blog", "_id": "123" }} <1>
{ "create": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "title": "My first blog post" }
{ "index": { "_index": "website", "_type": "blog" }}
{ "title": "My second blog post" }
{ "update": { "_index": "website", "_type": "blog", "_id": "123", "_retry_on_conflict"
: 3} }
{ "doc" : {"title" : "My updated blog post"} } <2>


  1. 注意 delete  操作是如何处理request body的,你可以在它之后直接执行新的操作。
  2. 请记住最后有换行符


Elasticsearch会返回含有 items  的列表、它的顺序和我们请求的顺序是相同的:


{
    "took": 4,
    "errors": false, <1>
    "items": [
        { "delete": {
            "_index": "website",
            "_type": "blog",
            "_id": "123",
            "_version": 2,
            "status": 200,
            "found": true
        }},
        { "create": {
            "_index": "website",
            "_type": "blog",
            "_id": "123",
            "_version": 3,
          "status": 201
        }},
        { "create": {
            "_index": "website",
            "_type": "blog",
            "_id": "EiwfApScQiiy7TIKFxRCTw",
            "_version": 1,
            "status": 201
        }},
        { "update": {
            "_index": "website",
            "_type": "blog",
            "_id": "123",
            "_version": 4,
            "status": 200
        }}
    ]
}}


所有的请求都被成功执行。

每一个子请求都会被单独执行,所以一旦有一个子请求失败了,并不会影响到其他请求的成功执行。如果一旦出现失败的请求error  就会变为 true  ,详细的错误信息也会出现在返回 内容的下方:


POST /_bulk
{ "create": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "title": "Cannot create - it already exists" }
{ "index": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "title": "But we can update it" }


失败结果:


{
    "took": 3,
    "errors": true, <1>
    "items": [
        { "create": {
            "_index": "website",
            "_type": "blog",
            "_id": "123",
            "status": 409, <2>
            "error": "DocumentAlreadyExistsException <3>
                    [[website][4] [blog][123]:
                    document already exists]"
        }},
        { "index": {
            "_index": "website",
            "_type": "blog",
            "_id": "123",
            "_version": 5,
            "status": 200 <4>
        }}
    ]
}


  1. 至少有一个请求错误发生。
  2. 这条请求的状态码为 409 CONFLICT  。
  3. 错误信息解释了导致错误的原因。
  4. 第二条请求的状态码为 200 OK  。


能省就省


或许你在批量导入大量的数据到相同的 index  以及 type  中。每次都去指定每个文档的 metadata是完全没有必要的。在 mget  API中, bulk  请求可以在URL中声明 /_index  或 者 /_index/_type  :


POST /website/_bulk
{ "index": { "_type": "log" }}
{ "event": "User logged in" }


你依旧可以在metadata行中使用 _index  以及 _type  来重写数据,未声明的将会使用URL中的 配置作为默认值:


POST /website/log/_bulk
{ "index": {}}
{ "event": "User logged in" }
{ "index": { "_type": "blog" }}
{ "title": "Overriding the default type" }


最大有多大?


试着去批量索引越来越多的文档。当性能开始下降的时候,就说明你的数据量太大了。一般 比较好初始数量级是1000到5000个文档,或者你的文档很大,你就可以试着减小队列。 有的 时候看看批量请求的物理大小是很有帮助的。1000个1KB的文档和1000个1MB的文档的差距 将会是天差地别的。比较好的初始批量容量是5-15MB。

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
4月前
|
消息中间件 存储 运维
一文吃透企业级elk技术栈:1. 基础优化
一文吃透企业级elk技术栈:1. 基础优化
|
4月前
|
监控
一文吃透企业级elk技术栈:9. zabbix结合logstash告警
一文吃透企业级elk技术栈:9. zabbix结合logstash告警
|
4月前
|
NoSQL 关系型数据库 MySQL
一文吃透企业级elk技术栈:7. 验证结果
一文吃透企业级elk技术栈:7. 验证结果
|
24天前
|
存储 监控 安全
|
2月前
|
存储 JSON 监控
大数据-167 ELK Elasticsearch 详细介绍 特点 分片 查询
大数据-167 ELK Elasticsearch 详细介绍 特点 分片 查询
52 4
|
4月前
|
消息中间件 Kafka 网络安全
一文吃透企业级elk技术栈:elk 各组件调试
调试需先理解逻辑与程序调用顺序。本文介绍filebeat、kafka、logstash和es的数据推送流程及调试方法:filebeat传输数据检查包括服务状态、配置与日志;kafka调试涵盖服务状态、端口与日志;logstash调试需检查配置文件、日志与流量;es直接通过kibana查看。还介绍了使用rsyslog接收防火墙/waf/交换机日志的方法。
|
4月前
|
监控 关系型数据库 MySQL
一文吃透企业级elk技术栈:11. zabbix报警实现
一文吃透企业级elk技术栈:11. zabbix报警实现
一文吃透企业级elk技术栈:10. es数据生命周期管理
一文吃透企业级elk技术栈:10. es数据生命周期管理
|
4月前
|
NoSQL 关系型数据库 MySQL
一文吃透企业级elk技术栈:6. filebeat安装配置
一文吃透企业级elk技术栈:6. filebeat安装配置
|
4月前
|
监控 NoSQL 关系型数据库
一文吃透企业级elk技术栈:5. logstatsh 安装配置
一文吃透企业级elk技术栈:5. logstatsh 安装配置