《Elastic Stack 实战手册》——三、产品能力——3.5 进阶篇——3.5.19.Elasticsearch语言开发(Python/Nodejs/Java)—— 3.5.19.1.Elasticsearch语言开发(Python)(上) https://developer.aliyun.com/article/1226664
三、 常见的 API 使用
1. Index API
对 ES 单条写入只需要指定三部分信息:索引名,id, body。示例如下:
from datetime import datetime from elasticsearch import Elasticsearch #连接ES集群 es = Elasticsearch("192.168.2.2:27000") #以JSON字符串的形式构建文档内容 doc = { 'author': 'author_name', 'text': 'Interesting content...', 'timestamp': datetime.now(), } res = es.index(index="test-index", id=1, body=doc) print(res['result'])
2. Get API
要获取一条文档需要指定其索引和 id
res = es.get(index="test-index", id=1) #获取_source内的文档内容 print(res['_source'])
3. Refresh API
ES接收数据请求时会先存入内存中,默认每隔一段时间会从内存 buffer 中将数据写入filesystem cache。如需手动 refresh ,可使用如下 API。
es.indices.refresh(index="test-index")
4. Search API
# 查询所有的数据 body = { "query":{ "match_all":{} } } res = es.search(index="test-index",body=body) print("Got %d Hits:" % res['hits']['total']['value']) for hit in res['hits']['hits']: print("%(timestamp)s %(author)s: %(text)s" % hit["_source"])
5. Update API
doc = { 'author': 'author_name', 'text': 'Interesting modified content...', 'timestamp': datetime.now(), } res = es.update(index="test-index", id=1, body=doc) print(res['result'])
6. Delete API
es.delete(index="test-index", id=1)
7. Bulk API
elasticsearch 的 helpers 模块是对 bulk 的封装。Bulk API 接受索引、创建、删除和更新操作。使用 _op_type 字段指定操作(_op_type 默认为 index)
批量写入示例如下:
import random from elasticsearch import Elasticsearch from elasticsearch import helpers es=Elasticsearch(hosts='192.168.2.2',port=27000) meaningless_random_repeat=['python','elasticsearch','alibaba','bulk_helpers'] actions=[] for i in range(10000): meaningless_random_repeat=meaningless_random_repeat[random.randrange(0,len(meaningless_random_repeat))] action={ #操作 index update create delete '_op_type':'index', #索引名 '_index':'meaning_less_random_repeat', #索引type '_type':'_doc', '_source':{'meaningless_random_repeat':meaningless_random_repeat}} actions.append(action) helpers.bulk(client=es,actions=actions) #这里使用streaming_bulk,如果设置线程数的需求可以使用parallel_bulk for ok,response in helpers.streaming_bulk(es,actions): if not ok: print(response)
8. Scan API
当我们需要使用大查询时可以使用 Scan API。Scan API 是对 scroll API 的简单封装。
from elasticsearch import Elasticsearch from elasticsearch import helpers es=Elasticsearch(hosts='192.168.2.2',port=27000) query='{ "match": { "#Symbol": "FANCD2"}}}' index = 'hgvs4variation' source={'hgvs4variation':'hgvs4variation'} datas = helpers.scan(es, index=index, query=body, size=1000, request_timeout=100, _source=source) for data in datas: print(data)
四、优化建议
1. 写入优化
针对日志类场景,对于写入速度要求很高而对数据的情况下:
l 不要单条导入,尽量使用 Bulk API。这样是为了提高写入效率。
l 可以先设置索引副本为0,写完恢复副本数。这样是为了降低索引副本分配产生的性能开销。
l 增加 Index Refresh 间隔,这样是为了减少 Segment Merge 的次数。
2. 查询优化
针对大查询的场景:
l 运维端可以根据当前的集群情况适当调整 max_result_window 参数。
l 开发端可以使用 ES 的 scroll API 或者 helpers 的 scan API,尽量控制每次拉取的条数以及避免深翻。
l 选择合适的路由,查询时可以根据 Routing 信息,直接定位到目标分片,避免查询所有的分片。
五、完整的代码示例
https://gitee.com/ld-sys/elasticsearch-python
参考
l https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/index.html
创作人简介:
张刘毅,Elastic 认证工程师,存储研发工程师,曾经做过 AI 大数据平台研发,负责过80+ES集群。目前专注于生物医药和大数据。