使用外部系统的版本
还有一种常见的情况就是我们还是使用其他的数据库来存储数据,而Elasticsearch只是帮我
们检索数据。这也就意味着主数据库只要发生的变更,就需要将其拷贝到Elasticsearch中。
如果多个进程同时发生,就会产生上文提到的那些并发问题。
如果你的数据库已经存在了版本号码,或者也可以代表版本的 时间戳 。这是你就可以在
Elasticsearch的查询字符串后面添加 version_type=external 来使用这些号码。版本号码必须
要是大于零小于 9.2e+18 (Java中long的最大正值)的整数。
Elasticsearch在处理外部版本号时会与对内部版本号的处理有些不同。它不再是检
查 _version 是否与请求中指定的数值相同,而是检查当前的 _version 是否比指定的数值小。
如果请求成功,那么外部的版本号就会被存储到文档中的 _version 中。
例如,创建一篇使用外部版本号为 5 的博文,我们可以这样操作:
PUT /website/blog/2?version=5&version_type=external { "title": "My first external blog entry", "text": "Starting to get the hang of this..." }
更新文档中的一部分
文档不能被修改,它们只能被替换掉。 更新 API也必须遵循这一法则。从表面看来,貌似是文档被替换了。对内而言,它必须按照找回-修改-索引的流程来进行操作与管理。不同之处在于这个流程是在一个片(shard) 中完成的,因此可以节省多个请求所带来的 网络开销。除了节省了步骤,同时我们也能减少多个进程造成冲突的可能性。
使用 更新 请求最简单的一种用途就是添加新数据。新的数据会被合并到现有数据中,而如果 存在相同的字段,就会被新的数据所替换。例如我们可以为我们的博客添加 tags 和 views 字 段:
POST /website/blog/1/_update { "doc" : { "tags" : [ "testing" ], "views": 0 } }
如果请求成功,我们就会收到一个类似于 索引 时返回的内容:
{ "_index" : "website", "_id" : "1", "_type" : "blog", "_version" : 3 }
再次取回数据,你可以在 _source 中看到更新的结果:
{ "_index": "website", "_type": "blog", "_id": "1", "_version": 3, "found": true, "_source": { "title": "My first blog entry", "text": "Starting to get the hang of this...", "tags": [ "testing" ], "views": 0 } }
MVEL是一个简单高效的JAVA基础动态脚本语言,它的语法类似于Javascript。你可以 在Elasticsearch scripting docs 以及 MVEL website了解更多关于MVEL的信息。
脚本语言可以在 更新 API中被用来修改 _source 中的内容,而它在脚本中被称为 ctx._source 。例如,我们可以使用脚本来增加博文中 views 的数字:
POST /website/blog/1/_update { "script" : "ctx._source.views+=1" }
这样Elasticsearch就可以重新使用这个脚本进行tag的添加,而不用再次重新编写脚本了:
POST /website/blog/1/_update { "script" : "ctx._source.tags+=new_tag", "params" : { "new_tag" : "search" } }
结果
{ "_index": "website", "_type": "blog", "_id": "1", "_version": 5, "found": true, "_source": { "title": "My first blog entry", "text": "Starting to get the hang of this...", "tags": ["testing", "search"], <1> "views": 1 <2> } }
- tags 数组中出现了 search 。
- views 字段增加了。
使用 ctx.op 来根据内容选择是否删除一个文档:
POST /website/blog/1/_update { "script" : "ctx.op = ctx._source.views == count ? 'delete' : 'none'", "params" : { "count": 1 } }
更新一篇可能不存在的文档
我们可以使用 upsert 参数来设定文档不存在时,它应该被创建:
POST /website/pageviews/1/_update { "script" : "ctx._source.views+=1", "upsert": { "views": 1 } }
首次运行这个请求时, upsert 的内容会被索引成新的文档,它将 views 字段初始化为 1 。当之后再请求时,文档已经存在,所以 脚本 更新就会被执行, views 计数器就会增加。
更新和冲突
你可以通过设定 retry_on_conflict 参数来设置自动完成这项请求的次数,它的默认值是 0 。
POST /website/pageviews/1/_update?retry_on_conflict=5 <1> { "script" : "ctx._source.views+=1", "upsert": { "views": 0 } }
失败前重新尝试5次
这个参数非常适用于类似于增加计数器这种无关顺序的请求,但是还有些情况的顺序就是很 重要的。例如上一节提到的情况,你可以参考乐观并发控制以及悲观并发控制来设定文档的 版本号。
获取多个文档
如果你需要从Elasticsearch中获取多个文档,你可以使用multi-get 或者 mget API来取代一篇又一篇文档的获取。
mget API需要一个 docs 数组,每一个元素包含你想要的文档的 _index , _type 以及 _id 。 你也可以指定 _source 参数来设定你所需要的字段:
GET /_mget { "docs" : [ { "_index" : "website", "_type" : "blog", "_id" : 2 }, { "_index" : "website", "_type" : "pageviews", "_id" : 1, "_source": "views" } ] }
返回值包含了一个 docs 数组,这个数组以请求中指定的顺序每个文档包含一个响应。每一个 响应都和独立的 get 请求返回的响应相同:
{ "docs" : [ { "_index" : "website", "_id" : "2", "_type" : "blog", "found" : true, "_source" : { "text" : "This is a piece of cake...", "title" : "My first external blog entry" }, "_version" : 10 }, { "_index" : "website", "_id" : "1", "_type" : "pageviews", "found" : true, "_version" : 2, "_source" : { "views" : 2 } } ] }
如果你所需要的文档都在同一个 _index 或者同一个 _type 中,你就可以在URL中指定一个默 认的 /_index 或是 /_index/_type 。
GET /website/blog/_mget { "docs" : [ { "_id" : 2 }, { "_type" : "pageviews", "_id" : 1 } ] }
事实上,如果所有的文档拥有相同的 _index 以及 _type ,直接在请求中添加 ids 的数组即 可:
GET /website/blog/_mget { "ids" : [ "2", "1" ] }
请注意,我们所请求的第二篇文档不存在,这是就会返回如下内容:
{ "docs" : [ { "_index" : "website", "_type" : "blog", "_id" : "2", "_version" : 10, "found" : true, "_source" : { "title": "My first external blog entry", "text": "This is a piece of cake..." } }, { "_index" : "website", "_type" : "blog", "_id" : "1", "found" : false <1> } ] }
要确定独立的文档是否被成功找到,你需要检查 found 标识。
批量更高效
与 mget 能同时允许帮助我们获取多个文档相同, bulk API可以帮助我们同时完成执行多个请求,比如: create , index , update 以及 delete 。当你在处理类似于log等海量数据的时 候,你就可以一下处理成百上千的请求,这个操作将会极大提高效率。
bulk 的请求主体的格式稍微有些不同:
{ action: { metadata }}\n { request body }\n { action: { metadata }}\n { request body }\n ...
这种格式就类似于一个用 "\n" 字符来连接的单行json一样。下面是两点注意事项:
- 每一行都结尾处都必须有换行字符 "\n" ,最后一行也要有。这些标记可以有效地分隔每
行。 - 这些行里不能包含非转义字符,以免干扰数据的分析 — — 这也意味着JSON不能是 pretty-printed样式。
action/metadata 行指定了将要在哪个文档中执行什么操作。
其中action必须是 index , create , update 或者 delete 。metadata 需要指明需要被操作文 档的 _index , _type 以及 _id ,例如删除命令就可以这样填写:
示例
{ "delete": { "_index": "website", "_type": "blog", "_id": "123" }}
在你进行 index 以及 create 操作时,request body 行必须要包含文档的 _source 数据——也 就是文档的所有内容。 同样,在执行 update API: doc , upsert , script 的时候,也需要包含相关数据。而在删除 的时候就不需要request body行。
{ "create": { "_index": "website", "_type": "blog", "_id": "123" }} { "title": "My first blog post" }
如果没有指定 _id ,那么系统就会自动生成一个ID:
{ "index": { "_index": "website", "_type": "blog" }} { "title": "My second blog post" }
完成以上所有请求的 bulk 如下:
POST /_bulk { "delete": { "_index": "website", "_type": "blog", "_id": "123" }} <1> { "create": { "_index": "website", "_type": "blog", "_id": "123" }} { "title": "My first blog post" } { "index": { "_index": "website", "_type": "blog" }} { "title": "My second blog post" } { "update": { "_index": "website", "_type": "blog", "_id": "123", "_retry_on_conflict" : 3} } { "doc" : {"title" : "My updated blog post"} } <2>
- 注意 delete 操作是如何处理request body的,你可以在它之后直接执行新的操作。
- 请记住最后有换行符
Elasticsearch会返回含有 items 的列表、它的顺序和我们请求的顺序是相同的:
{ "took": 4, "errors": false, <1> "items": [ { "delete": { "_index": "website", "_type": "blog", "_id": "123", "_version": 2, "status": 200, "found": true }}, { "create": { "_index": "website", "_type": "blog", "_id": "123", "_version": 3, "status": 201 }}, { "create": { "_index": "website", "_type": "blog", "_id": "EiwfApScQiiy7TIKFxRCTw", "_version": 1, "status": 201 }}, { "update": { "_index": "website", "_type": "blog", "_id": "123", "_version": 4, "status": 200 }} ] }}
所有的请求都被成功执行。
每一个子请求都会被单独执行,所以一旦有一个子请求失败了,并不会影响到其他请求的成功执行。如果一旦出现失败的请求, error 就会变为 true ,详细的错误信息也会出现在返回 内容的下方:
POST /_bulk { "create": { "_index": "website", "_type": "blog", "_id": "123" }} { "title": "Cannot create - it already exists" } { "index": { "_index": "website", "_type": "blog", "_id": "123" }} { "title": "But we can update it" }
失败结果:
{ "took": 3, "errors": true, <1> "items": [ { "create": { "_index": "website", "_type": "blog", "_id": "123", "status": 409, <2> "error": "DocumentAlreadyExistsException <3> [[website][4] [blog][123]: document already exists]" }}, { "index": { "_index": "website", "_type": "blog", "_id": "123", "_version": 5, "status": 200 <4> }} ] }
- 至少有一个请求错误发生。
- 这条请求的状态码为 409 CONFLICT 。
- 错误信息解释了导致错误的原因。
- 第二条请求的状态码为 200 OK 。
能省就省
或许你在批量导入大量的数据到相同的 index 以及 type 中。每次都去指定每个文档的 metadata是完全没有必要的。在 mget API中, bulk 请求可以在URL中声明 /_index 或 者 /_index/_type :
POST /website/_bulk { "index": { "_type": "log" }} { "event": "User logged in" }
你依旧可以在metadata行中使用 _index 以及 _type 来重写数据,未声明的将会使用URL中的 配置作为默认值:
POST /website/log/_bulk { "index": {}} { "event": "User logged in" } { "index": { "_type": "blog" }} { "title": "Overriding the default type" }
最大有多大?
试着去批量索引越来越多的文档。当性能开始下降的时候,就说明你的数据量太大了。一般 比较好初始数量级是1000到5000个文档,或者你的文档很大,你就可以试着减小队列。 有的 时候看看批量请求的物理大小是很有帮助的。1000个1KB的文档和1000个1MB的文档的差距 将会是天差地别的。比较好的初始批量容量是5-15MB。