代价较小的批量操作
使用bulk
API 允许在单个步骤中进行多次 create
、 index
、 update
或 delete
请求。
bulk
与其他的请求体格式稍有不同,如下所示:
{ action: { metadata }}\n { request body }\n { action: { metadata }}\n { request body }\n ...
这种格式类似一个有效的单行 JSON 文档 流 ,它通过换行符(\n
)连接到一起。注意两个要点:
- 每行一定要以换行符(
\n
)结尾, 包括最后一行 。这些换行符被用作一个标记,可以有效分隔行。 - 这些行不能包含未转义的换行符,因为他们将会对解析造成干扰。这意味着这个 JSON 不 能使用 pretty 参数打印。
其中参数的含义
action
必须是以下选项之一:
- **
create
**如果文档不存在,那么就创建它。 - **
index
**创建一个新文档或者替换一个现有的文档。 - **
update
**部分更新一个文档。 - **
delete
**删除一个文档。
metadata
应该指定被索引、创建、更新或者删除的文档的 _index
、 _type
和 _id
。
request body
行由文档的 _source
本身组成—文档包含的字段和值。它是 index
和 create
操作所必需的,这是有道理的:必须为请求提供结构体。只有删除操作不需要 request body
行。
现在我们就进行实践,在learn索引的user类型下
- 删除id为5的文档
- 创建一个新的文档,id为1
- 创建一个新的文档,不指定id
- 更改id为2的的兴趣爱好为“唱跳”、“RAP”、“你干嘛~”
POST /_bulk {"delete": {"_index": "learn","_type": "user","_id": "5"}} {"create":{"_index": "learn","_type": "user","_id": "1"}} { "first_name" : "四", "last_name" : "李","age" : 25,"about" : "法外狂徒的弟弟","interests": [ "偷盗", "抢劫","嘿嘿" ]} {"create":{"_index": "learn","_type": "user"}} { "first_name" : "四", "last_name" : "李","age" : 25,"about" : "法外狂徒的弟弟","interests": [ "偷盗", "抢劫","嘿嘿" ]} {"update": { "_index": "learn", "_type": "user", "_id": "2"}} {"doc": {"interests": ["唱跳","RAP","你干嘛~"]}}
结果
{ "took" : 7, "errors" : true, "items" : [ { "delete" : { "_index" : "learn", "_type" : "user", "_id" : "5", "_version" : 1, "result" : "not_found", "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "_seq_no" : 53, "_primary_term" : 2, "status" : 404 } }, { "create" : { "_index" : "learn", "_type" : "user", "_id" : "1", "status" : 409, "error" : { "type" : "version_conflict_engine_exception", "reason" : "[1]: version conflict, document already exists (current version [2])", "index_uuid" : "AyqqcNFHSgyE2tgDurtkLA", "shard" : "0", "index" : "learn" } } }, { "create" : { "_index" : "learn", "_type" : "user", "_id" : "TBfyY4IB_561K7UQqFc7", "_version" : 1, "result" : "created", "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "_seq_no" : 54, "_primary_term" : 2, "status" : 201 } }, { "update" : { "_index" : "learn", "_type" : "user", "_id" : "2", "_version" : 2, "result" : "updated", "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "_seq_no" : 55, "_primary_term" : 2, "status" : 200 } } ] }
注意:1.delete动作后不要有请求体 2.最后一行也需要换行符,不要忘记
参数介绍:
- errors:改errors为true并不表示所有操作失败,而是一个或者多个请求失败。
- error:解释为什么请求失败的错误信息。
这意味着bulk
请求不是原子的: 不能用它来实现事务控制。每个请求是单独处理的,因此一个请求的成功或失败不会影响其他的请求。
注意:在使用bulk API进行批量的插入操作时,当指定的文档已经存在,那么则会返回一个错误,这个错误不是致命的
我们使用批量操作原因一般是有着大量文档要进行操作,那么操作的文档量多大是太大了呢?这是官方给出的大小。
一个好的批量大小在开始处理后所占用的物理大小约为 5-15 MB。
排序与相关性问题
一般来说,默认的排序是按照相关性打分来排的。但是也会出现随机排序的情况,比如我们进行空搜索时就会出现相关性评分都是零的情况,这时候就会出现随机排序的情况。
在elasticsearch中我们可以使用sort
指定字段来进行排序。
比如根据_id
进行的排序如下
GET /learn/user/_search { "sort": [{"_id": {"order": "asc"}}] } GET /learn/user/_search { "sort": [{"_id": {"order": "desc"}}] }
Query-string 搜索 也支持自定义排序,可以写为
GET /learn/user/_search?sort=_id:desc
多级排序
满足第一个排序条件后再进行第二个排序条件的排序,一次类推
我们先根据相关性得分排序然后根据id排序,就可以写为
GET /learn/user/_search { "sort": [ {"_score": {"order": "desc"}}, {"_id": {"order": "desc"}} ] }
当然所给出的例子是无效的。因为没有条件时,相关性得分为null
多值字段排序
什么时多值字段?就类似于时间2022-08-04
之类的数与数的组合。
可以将多值字段减为单值,这可以通过使用
min
、max
、avg
或是sum
排序模式 。例如你可以按照每个date
字段中的最早日期进行排序,通过以下方法:
"sort": { "dates": { "order": "asc", "mode": "min" } }
管理索引
创建索引
再创建文档时,我们就已经通过索引文档创建过一个文档learn,这个索引采用的是默认的配置,新的字段通过动态映射的方式被添加到类型映射。
如果你想禁止自动创建索引,你 可以通过在 config/elasticsearch.yml
的每个节点下添加下面的配置:
action.auto_create_index: false
现在我们想要确保这个索引有数量适中的主分片,并且在我们索引任何数据 之前 ,分析器和映射已经被建立好。为了达到这个目的,我们需要手动创建索引。
创建索引的格式如下
PUT /my_index { "settings": { ... any settings ... }, "mappings": { "type_one": { ... any mappings ... }, "type_two": { ... any mappings ... }, ... } }
再v7.0以后的版本不在支持指定类型,所有的类型都为doc。所以格式变为
PUT /my_index { "settings": { ... any settings ... }, "mappings": { ... any mappings ... } }
我们创建一个名为test索引。
PUT /test { "settings": { "number_of_shards": 1, "number_of_replicas": 2 }, "mappings": { "properties":{ "id":{"type": "long"}, "name":{"type": "text"}, "age":{"type": "integer"}, "sex":{"type": "text"} } } }
参数介绍:
- number_of_shards:主分片数
- number_of_replicas:每个主分片的副本数
- properties:属性设置
属性可有
- 字符串类型:string;
- 数值类型:字节(byte)、2字节(short)、4字节(integer)、8字节(long)、float、double;
- 布尔类型:boolean,值是true或false;
- 时间/日期类型:date,用于存储日期和时间;
- 二进制类型:binary;
- IP地址类型:ip,以字符串形式存储IPv4地址;
- 特殊数据类型:token_count,用于存储索引的字数信息
注意再v5.0版本后string
属性由text
和keyword
代替。
删除索引
用以下的请求来 删除索引:
DELETE /my_index
你也可以这样删除多个索引:
DELETE /index_one,index_two DELETE /index_*
你甚至可以这样删除 全部 索引:
DELETE /_all DELETE /*
处理冲突
我们一般会将数据存放在关系型数据库中同时复制到es上使其可以被搜索到,但是当两个人同时进行操作时,可能会造成业务变更的丢失,即使这个几率是很小的,我们仍然要予以重视。
一个官网上的例子
有一天,管理层决定做一次促销。突然地,我们一秒要卖好几个商品。 假设有两个 web 程序并行运行,每一个都同时处理所有商品的销售,如图 Figure 7, “Consequence of no concurrency control” 所示。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ebWC55dZ-1659605536079)(https://www.elastic.co/guide/cn/elasticsearch/guide/current/images/elas_0301.png)]
Figure 7. Consequence of no concurrency control
web_1
对stock_count
所做的更改已经丢失,因为web_2
不知道它的stock_count
的拷贝已经过期。 结果我们会认为有超过商品的实际数量的库存,因为卖给顾客的库存商品并不存在,我们将让他们非常失望。
常用到的处理方式有两种
悲观并发控制
这种方法被关系型数据库广泛使用,它假定有变更冲突可能发生,因此阻塞访问资源以防止冲突。 一个典型的例子是读取一行数据之前先将其锁住,确保只有放置锁的线程能够对这行数据进行修改。
乐观并发控制
Elasticsearch 中使用的这种方法假定冲突是不可能发生的,并且不会阻塞正在尝试的操作。 然而,如果源数据在读写当中被修改,更新将会失败。应用程序接下来将决定该如何解决冲突。 例如,可以重试更新、使用新的数据、或者将相关情况报告给用户。
我们可以利用 _version
号来确保 应用中相互冲突的变更不会导致数据丢失。我们通过指定想要修改文档的 version
号来达到这个目的。 如果该版本不是当前版本号,我们的请求将会失败。
一般来说应该是这样的,新版本已近不在支持version,但是我们可以使用_seq_no和 _primary_term来对文档进行版本控制。
首先我们要先知道两点:
- 删除或更新文档会使_seq_no自增+1。
- _primary_term主分片的编号不会轻易改变。
在进行乐观并发控制操作前,我们先来介绍下_seq_no 和 version的区别。
- _seq_no 是针对索引的,而version是针对文档的。
- _seq_no 当索引中有文档发生变更(更新、删除、增加)都会引起 _seq_no 加一
- version只有当前文档发生变更才会加一。
我们首先创建一个文档
PUT /learn/user/5/_create { "first_name" : "先森", "last_name" : "双口", "age" : 10, "about" : "散弹批发商", "interests": [ "嘿嘿","游戏" ] }
查询结果
GET /learn/user/5
{ "_index" : "learn", "_type" : "user", "_id" : "5", "_version" : 7, "_seq_no" : 19, "_primary_term" : 2, "found" : true, "_source" : { "first_name" : "先森", "last_name" : "双口", "age" : 10, "about" : "散弹批发商", "interests" : [ "嘿嘿", "游戏" ] } }
然后我们在根据上面所查到的序列号和主分片号对文档进行操作。
PUT /learn/user/5?if_seq_no=19&if_primary_term=2 { "first_name" : "先森", "last_name" : "双口", "age" : 10, "about" : "散弹批发商", "interests": [ "嘿嘿","游戏" ] }
当序列号会主分片号不对时会返回409状态码。
交互所有代码
# 插入信息 PUT /learn/user/1 { "first_name" : "三", "last_name" : "张", "age" : 25, "about" : "法外狂徒", "interests": [ "偷盗", "抢劫","嘿嘿" ] } PUT /learn/user/2 { "first_name" : "xk", "last_name" : "蔡", "age" : 99, "about" : "基尼太美", "interests": [ "唱", "跳","rap","篮球" ] } PUT /learn/user/3 { "first_name" : "倍", "last_name" : "安", "age" : 10, "about" : "散弹枪狂热者", "interests": [ "盗窃","篮球" ] } # 对文档信息进行更改 PUT /learn/user/3 { "first_name" : "小日本鬼子", "last_name" : "安", "age" : 10, "about" : "散弹枪狂热者", "interests": [ "盗窃","篮球" ] } # 不指定ID,随机生成 POST /learn/user/ { "first_name" : "三", "last_name" : "张", "age" : 25, "about" : "法外狂徒", "interests": [ "偷盗", "抢劫","嘿嘿" ] } # 新增文档但不更改,1 PUT /learn/user/5/_create { "first_name" : "先森", "last_name" : "双口", "age" : 10, "about" : "散弹批发商", "interests": [ "嘿嘿","游戏" ] } # 新增文档但不更改,2 PUT /learn/user/6?op_type=create { "first_name" : "先森", "last_name" : "双口", "age" : 10, "about" : "散弹批发商", "interests": [ "嘿嘿","游戏" ] } # 删除 DELETE /learn/user/5 DELETE /learn/user/6 PUT /learn/user/4 { "first_name" : "先森", "last_name" : "双口", "age" : 10, "about" : "散弹批发商", "interests": [ "嘿嘿","游戏" ] } # 获取指定id的信息 GET /learn/user/3 # 查询不存在的文档 GET /learn/user/5 # 获取指定id的信息 GET /learn/user/1?pretty # 获取全部搜索信息 GET /learn/user/_search { "query": { "match_all": {} } } # 搜索兴趣为篮球的文档 GET /learn/user/_search { "query": { "match": { "interests.keyword": "篮球" } } } GET /learn/user/_search?q=interests:篮球 # 使用filter进行请求体搜索 GET /learn/user/_search { "query": { "bool": { "filter": { "range": { "age": { "gt": 50 } } }, "must": { "match": { "interests.keyword": "篮球" } } } } } GET /learn/user/_search { "query" : { "match" : { "about" : "散弹枪" } } } # 短语搜索 GET /learn/user/_search { "query" : { "match_phrase" : { "about" : "散弹枪" } } } # 高亮搜索 GET /learn/user/_search { "query" : { "match" : { "about" : "散弹枪" } }, "highlight": { "fields" : { "about" : {} } } } # 分析,不能用,可能是已被版本淘汰 GET /learn/user/_search { "aggs": { "all_interests": { "terms": { "field": "interests" } } } } # 只得到source字段 GET /learn/user/1/_source # 得到指定字段 GET /learn/user/1/_source/?_source=age,about # 指定_seq_no和 _primary_term PUT /learn/user/5?if_seq_no=18&if_primary_term=2 { "first_name" : "先森", "last_name" : "双口", "age" : 10, "about" : "散弹批发商", "interests": [ "嘿嘿","游戏" ] } # 更改部分文档 POST /learn/user/3/_update { "doc" : { "first_name" : "鬼子" } } # 使用脚本对部分文档进行修改 POST /learn/user/3/_update { "script" : "ctx._source.age=0" } # 给数组新增元素,也不可用,看报错因该是数组越界,暂不清楚时那个版本进行的更新。 POST /learn/user/3/_update { "script" : "ctx._source.interests+=new_interest", "params" : { "new_interest" : "new_interest" } } # 检索多条文档信息 GET /_mget { "docs" : [ { "_index" : "learn", "_type" : "user", "_id" : 1 }, { "_index" : "learn", "_type" : "user", "_id" : 2, "_source": "about" } ] } # 简写 GET /learn/user/_mget { "docs": [ { "_id": 1 }, { "_id": 2, "_source": "about" } ] } GET /learn/user/2 # 使用代价较小的批量操作 POST /_bulk {"delete": {"_index": "learn","_type": "user","_id": "5"}} {"create":{"_index": "learn","_type": "user","_id": "1"}} { "first_name" : "四", "last_name" : "李","age" : 25,"about" : "法外狂徒的弟弟","interests": [ "偷盗", "抢劫","嘿嘿" ]} {"create":{"_index": "learn","_type": "user"}} { "first_name" : "四", "last_name" : "李","age" : 25,"about" : "法外狂徒的弟弟","interests": [ "偷盗", "抢劫","嘿嘿" ]} {"update": { "_index": "learn", "_type": "user", "_id": "2"}} {"doc": {"interests": ["唱跳","RAP","你干嘛~"]}} # 分页查询 GET /learn/user/_search POST /learn/user/_bulk {"delete": {"_id": "TBfyY4IB_561K7UQqFc7"}} {"delete": {"_id": "ShfxY4IB_561K7UQBVey"}} {"delete": {"_id": "SRftY4IB_561K7UQHVeY"}} {"delete": {"_id": "SBftY4IB_561K7UQHFes"}} {"delete": {"_id": "RxftY4IB_561K7UQG1c7"}} {"delete": {"_id": "RhftY4IB_561K7UQGVex"}} GET /learn/user/_search?size=2 GET /learn/user/_search?size=2&from=2 GET /learn/user/_search?size=2&from=4 # 请求体格式 GET /learn/user/_search { "size": 2, "from": 2 } # 排序 GET /learn/user/_search { "sort": [{"_id": {"order": "asc"}}] } GET /learn/user/_search { "sort": [{"_id": {"order": "desc"}}] } GET /learn/user/_search?sort=_id:desc # 多级排序 GET /learn/user/_search { "sort": [ {"_score": {"order": "desc"}}, {"_id": {"order": "desc"}} ] } # 创建一个索引 PUT /test { "settings": { "number_of_shards": 1, "number_of_replicas": 2 }, "mappings": { "properties":{ "id":{"type": "long"}, "name":{"type": "text"}, "age":{"type": "integer"}, "sex":{"type": "text"} } } } # 删除索引 DELETE /test
如果觉的这篇文章有用点个赞吧,赞是免费的但能让我开心一整天,拜托了。
参考资料: