作者:刘晓国
Elasticsearch 版本支持
Elasticsearch 背后的关键原则之一是让你充分利用数据。 从历史上看,搜索引擎加载了来自单一来源的数,一般来说,它的数据是只读的。 随着使用量的增加以及 Elasticsearch 在你的应用程序中越来越重要,碰巧需要由多个组件来更新数据。 多个组件导致并发,并发导致冲突。 Elasticsearch 的版本控制系统可以帮助解决这些冲突。我们每修改一次文档,文档中的 version 数值就会自动增加1。
需要版本控制 — 一个例子
为了说明这种情况,我们假设有一个网站供人们对 T 恤设计进行评分。 该网站很简单。 它列出了所有设计,并允许用户通过竖起大拇指的图标给设计一个赞或将其否决。 该网站会针对每件T恤显示当前赞成票与反对票之间的余额。
每个搜索引擎的记录如下所示:
PUT designs PUT designs/_doc/1 { "name": "elasticsearch", "votes": 999 }
如你所见,每种T恤设计都有一个名称和一个投票计数器,以跟踪其当前余额。
为了使事情简单和可扩展,该网站是完全无状态的。 当某人查看页面并单击向上投票按钮时,它将向服务器发送 AJAX 请求,该请求应指示 Elasticsearch 更新计数器。 为此,一个朴素的实现将采用当前的投票值,将其增加一,然后将其发送给 Elasticsearch:
PUT /designs/_doc/1 { "name": "elasticsearch", "votes": 1000 }
这种方法存在严重缺陷-可能会失去选票。 假设 Adam 和 Eve 都在同一时间浏览同一页面。 目前,该页面显示 999 票。 由于都是粉丝,因此他们都单击了“投票”按钮。 现在,Elasticsearch 获得上述请求的两个相同副本以更新文档,这很高兴。 这意味着现在的总投票数不再是1001,而是1000。糟糕。这里面的原因是每次更新的操作不是原子操作,而是分为三个步骤: 读取 -> 修改 -> 更新。假如在两个读取操作之后,一个完成更新并写入操作,另外一个操作接着写入更新的内容,就会覆盖之前的写入数据。
当然,update api 可让你变得更聪明,并传达可以增加投票而不是将投票设置为特定值的事实:
POST designs/_update/1 { "script" : "ctx._source.votes += 1" }
这样,意味着 Elasticsearch 首先在内部检索文档,执行更新并再次为其编制索引。 尽管这使事情成功的可能性更大,但仍然存在与以前相同的潜在问题。 在再次检索和索引文档之间的小窗口中,可能会出错。
为了处理上述情况并帮助更复杂的情况,Elasticsearch 带有内置的版本控制系统。
Elasticsearch 的版本控制系统
你存储在 Elasticsearch 中的每个文档都有一个关联的版本号。 该版本号是介于1和2 的 63-1 次方之间的正数。 首次为文档建立索引时,它的版本为1,你可以在响应中看到 Elasticsearch 返回。 例如,这是此博客文章中第一个写入命令的结果:
{ "_index" : "designs", "_type" : "_doc", "_id" : "1", "_version" : 1, "result" : "created", "_shards" : { "total" : 2, "successful" : 2, "failed" : 0 }, "_seq_no" : 0, "_primary_term" : 1 }
对于此文档的每次写操作,无论是 index,update 还是 delete,Elasticsearch 都会将版本增加 1。该增加是原子的,并且保证在操作成功返回时会发生。
Elasticsearch 还将通过 get 操作的响应返回当前文档的版本(记住这些是实时的),并且还可以指示 Elasticsearch 将其与每个搜索结果一起返回。
GET designs/_search { "version": true, "query": { "match_all": {} } }
上面将返回:
{ "took" : 0, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "designs", "_type" : "_doc", "_id" : "1", "_version" : 1, "_score" : 1.0, "_source" : { "name" : "elasticsearch", "votes" : 1000 } } ] } }
从上面我们可以看到搜索的版本为 1。每当我们对文档进行操作一次后,我们会发现版本会增加 1,比如我们再次执行上面的如下命令:
PUT /designs/_doc/1 { "name": "elasticsearch", "votes": 1000 }
我们们会发现版本会增加 1:
{ "_index" : "designs", "_type" : "_doc", "_id" : "1", "_version" : 2, "result" : "updated", "_shards" : { "total" : 2, "successful" : 2, "failed" : 0 }, "_seq_no" : 1, "_primary_term" : 1 }
乐观并发控制 (Opmistic concurrency control)
因此,回到我们的上述示例中,我们需要一种解决方案,该方案中潜在的两个用户可能试图同时更新同一文档。传统上,这将通过锁定来解决:在更新文档之前,人们将获得对其的锁定,进行更新并释放该锁定。锁定文档后,可以确保没有人可以更改文档。
在许多应用程序中,这还意味着,如果某人正在修改文档,则在修改完成之前,其他任何人都无法读取该文档。这种类型的锁定有效,但需要一定的代价。在高吞吐量系统的情况下,它有两个主要缺点:
在许多情况下,根本不需要它。如果做对了,碰撞很少发生。当然,它们会发生,但这仅适用于系统执行的部分操作。
锁定假定你实际上在乎。如果你只想呈现一个网页,即使系统知道它会在短时间内发生变化,也可以获取一些过时但一致的值,这可能很好。读取并不总是需要等待正在进行的写入完成。
Elasticsearch 是分布式的。创建,更新或删除文档时,必须将文档的新版本复制到群集中的其他节点。 Elasticsearch 也是异步和并发的,这意味着这些复制请求是并行发送的,并且可能不按顺序到达其目的地。 Elasticsearch 需要一种方法来确保文档的旧版本永远不会覆盖新版本。
为确保文档的较旧版本不会覆盖较新的版本,对文档执行的每项操作均由主分片分配一个序号,以协调更改。每次操作都会增加序列号,因此可以确保较新的操作具有比较旧的操作更高的序列号。然后,Elasticsearch 可以使用操作的序列号来确保分配给它的序列号较小的更改不会覆盖较新的文档版本。
例如,以下索引命令将创建一个文档,并为其分配一个初始序列号和主要术语:
PUT products PUT products/_doc/1567 { "product" : "r2d2", "details" : "A resourceful astromech droid" }
上面的命令将创建一个叫做 products 的索引,并创建一个文档:
{ "_index" : "products", "_type" : "_doc", "_id" : "1567", "_version" : 1, "result" : "created", "_shards" : { "total" : 2, "successful" : 2, "failed" : 0 }, "_seq_no" : 0, "_primary_term" : 1 }
在上面,我们可以看到一个叫做 _seq_no 及 _primary_term 的值。在每次的操作中,_seq_no 的值会自动增加,比如我们再次执行上面的命令:
{ "_index" : "products", "_type" : "_doc", "_id" : "1567", "_version" : 2, "result" : "updated", "_shards" : { "total" : 2, "successful" : 2, "failed" : 0 }, "_seq_no" : 1, "_primary_term" : 1 }
我们看到 version 的值增加 1,同时 _seq_no 的值也同时增加 1。
Elasticsearch 会跟踪上次操作的序列号 (_seq_no) 和主要术语 (_primary_term),以更改其存储的每个文档。 序列号和主要术语在 GET API 的响应中的 _seq_no 和 _primary_term 字段中返回:
GET products/_doc/1567
上述命令返回:
{ "_index" : "products", "_type" : "_doc", "_id" : "1567", "_version" : 2, "_seq_no" : 1, "_primary_term" : 1, "found" : true, "_source" : { "product" : "r2d2", "details" : "A resourceful astromech droid" } }
注意:通过设置 seq_no_primary_term parameter,Search API 可以为每个搜索命中返回 _seq_no 和 _primary_term。
序列号和主要术语唯一地标识更改。 通过记下返回的序列号和主要术语,可以确保仅在检索以来未对其进行任何其他更改的情况下才更改该文档。 这可以通过设置 index API,update API 或 delete API 的 if_seq_no 和 if_primary_term 参数来完成。
例如,以下索引调用将确保在文档中添加标签,而不会丢失对描述的任何潜在更改或通过其他 API 添加其他标签:
PUT products/_doc/1567?if_seq_no=1&if_primary_term=1 { "product": "r2d2", "details": "A resourceful astromech droid", "tags": [ "droid" ] }
上面的命令返回的结果是:
{ "_index" : "products", "_type" : "_doc", "_id" : "1567", "_version" : 3, "result" : "updated", "_shards" : { "total" : 2, "successful" : 2, "failed" : 0 }, "_seq_no" : 2, "_primary_term" : 1 }
这时假如我们的另外一个终端或应用不知道最新的版本已经更新到3了,而是继续使用之前通过 GET API 获得的 _version 为 2 时的信息来更新这个文档,那么它将使用的还是之前的那个命令,比如:
PUT products/_doc/1567?if_seq_no=1&if_primary_term=1 { "product": "r2d2", "details": "A resourceful astromech droid", "tags": [ "magic" ] }
在上面 _seq_no 为1,_primary_term 为1,我们更新 tags 为 magic,那么我们会得到如下的结果:
{ "error" : { "root_cause" : [ { "type" : "version_conflict_engine_exception", "reason" : "[1567]: version conflict, required seqNo [1], primary term [1]. current document has seqNo [2] and primary term [1]", "index_uuid" : "2vXPyo6qRiK1gZGy7CXrfQ", "shard" : "0", "index" : "products" } ], "type" : "version_conflict_engine_exception", "reason" : "[1567]: version conflict, required seqNo [1], primary term [1]. current document has seqNo [2] and primary term [1]", "index_uuid" : "2vXPyo6qRiK1gZGy7CXrfQ", "shard" : "0", "index" : "products" }, "status" : 409 }
上面的错误表明,我们更新这个文档时,有一个更高级版本的文档存在,我们需要进行重新获得最新的版本,并在此基础之上在进行修改,直到成功为止。
我们可以使用 retry_on_conflict 参数来更新数据。具体操作请参考 “Elasticsearch:处理 Elasticsearch 中数据更新的并发”。
参考:
【1】 https://www.elastic.co/guide/en/elasticsearch/reference/current/optimistic-concurrency-control.html