ES Python客户端介绍
官方提供了两个客户端elasticsearch、elasticsearch-dsl
pip install elasticsearch
pip install elasticsearch-dsl
第二个是对第一个的封装,类似ORM操作数据库,可以.filter、.groupby,个人感觉很鸡肋,star数也不多。平时使用的时候一般会在kibana上测试,然后直接把query拷贝过来获取更多数据,所以这里做下第一个的封装。
封装代码
- 封装后依然暴露了es,方便有特殊情况下使用
- index一般很少改动,就直接放到对象中了,可以使用set_index修改
- 常用的应该是get_doc和get_doc_scroll来获取少量和全量数据
代码测试时使用的是7.17.12版本,大于此版本可能由于官方改动出异常
pip install elasticsearch==7.17.12
es.py
import random import string from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk from typing import List,Dict class ESClient: def __init__(self, host="127.0.0.1",index="", http_auth = None): self.index = index if http_auth is None: self.es = Elasticsearch(hosts=host) else: self.es = Elasticsearch(hosts=host, http_auth=http_auth) print("success to connect " + host) def close(self): self.es.close() # 设置索引 def set_index(self,index:str): self.index = index # 创建索引 def create_index(self, index_name: str, mappings=None): res = self.es.indices.create(index=index_name, mappings=mappings) return res # 删除索引 def delete_index(self, index_name: str): res = self.es.indices.delete(index=index_name) return res # 获取索引 def get_index(self, index_name: str): res = self.es.indices.get(index=index_name) return res # 创建文档(单个) def create_doc(self,body, _id=''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))): res = self.es.create(index=self.index, body=body, id=_id) return res # 创建文档(批量) def create_doc_bulk(self, docs: List[Dict]): actions = [] for doc in docs: action = { "_index": self.index, "_op_type": "create", "_id": ''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20)) } for k,v in doc.items(): action[k] = v actions.append(action) res = bulk(client=self.es, actions=actions) return res # 删除文档 def delete_doc(self, doc_id): res = self.es.delete(index=self.index, id=doc_id) return res # 更新文档 def update_doc(self, doc_id, doc:Dict): body = { "doc" : doc } res = self.es.update(index=self.index, id=doc_id, body=body) return res # 分页获取超过100000的文档 def get_doc_scroll(self,query:Dict): res = self.es.search(index=self.index,size=10000,body=query,search_type="query_then_fetch",scroll="5m") data_list = [] hits = res.get("hits") scroll_id = res.get('_scroll_id') total_value = 0 # total 可能为Dict或int if isinstance(hits.get('total'),Dict): total_value= hits.get('total').get('value') else: total_value = hits.get('total') if total_value>0: for data in hits.get('hits'): data_list.append(data.get('_source')) return scroll_id,data_list # 通过scroll_id分页获取后序文档 def get_doc_by_scroll_id(self,scroll_id): page = self.es.scroll(scroll_id=scroll_id,scroll="5m") data_list = [] scroll_id = page.get('_scroll_id') for data in page.get('hits').get('hits'): data_list.append(data) return scroll_id,data_list # 清空scroll_id,防止服务端不够用 def clear_scroll(self,scroll_id): self.es.clear_scroll(scroll_id) # 获取索引的hits内容(一般用于获取文档id、总数) def get_doc_all(self): res = self.es.search(index=self.index) return res['hits'] # 获取一个文档 def get_doc_by_id(self, id_): res = self.es.get(index=self.index, id=id_) return res["_source"] # 获取所有文档的_source内容(小于100000) def get_doc(self,query:Dict,size:int=100000): query['size'] = size res = self.es.search(index=self.index,body=query) data_list = [] hits = res.get("hits") total_value = 0 # total 可能为Dict或int if isinstance(hits.get('total'), Dict): total_value = hits.get('total').get('value') else: total_value = hits.get('total') if total_value > 0: for data in hits.get('hits'): data_list.append(data.get('_source')) return data_list # 聚合查询(分组条件名为group_by,返回buckets) def get_doc_agg(self, query): res = self.es.search(index=self.index, body=query) return res['aggregations']['group_by'].get('buckets') # 统计查询(统计条件为stats_by,返回最值、平均值等) def get_doc_stats(self,query): res = self.es.search(index=self.index,body=query) return res['aggregations']["stats_by"]
测试代码
import unittest from es import ESClient cli = ESClient(host="http://10.28.144.3:9200",http_auth=["elastic","changeme"]) def test_create_index(): res = cli.create_index(index_name="test") print(res) def test_delete_index(): res = cli.delete_index(index_name="test") print(res) def test_get_index(): res = cli.get_index(index_name="test") print(res) def test_set_index(): cli.set_index(index="test") def test_create_doc(): body = { "name": "lady_killer9", "age": 19 } res = cli.create_doc(body=body) print(res) def test_create_doc_bulk(): from copy import deepcopy body = { "name": "lady_killer9" } users = [] for i in range(100001): tmp = deepcopy(body) tmp["age"] = i users.append(tmp) res = cli.create_doc_bulk(docs=users) print(res) def test_get_doc_all(): res = cli.get_doc_all() print(res) def test_get_doc_by_id(): res = cli.get_doc_by_id("jHALXDQaENQZPM4C9EUt") print(res) def test_get_doc(): query = { "query": { "match_all": { } } } res = cli.get_doc(query=query,size=20) print(res) def test_update_doc(): body={ "name": "lady_killer_after_update" } res = cli.update_doc(doc_id="jHALXDQaENQZPM4C9EUt",doc=body) print(res) def test_delete_doc(): res = cli.delete_doc(doc_id="jHALXDQaENQZPM4C9EUt") print(res) def test_get_doc_agg(): query = { "aggs": { "group_by": { "terms": { "field": "age" } } } } res = cli.get_doc_agg(query=query) print(res) def test_get_doc_stats(): query = { "aggs": { "stats_by": { "stats": { "field": "age" } } } } res = cli.get_doc_stats(query=query) print(res) def test_get_doc_scroll(): query = { "query": { "match_all": {} } } scroll_id,data_list = cli.get_doc_scroll(query=query) res = [] while data_list: res.extend(data_list) scroll_id,data_list = cli.get_doc_by_scroll_id(scroll_id=scroll_id) print(len(res)) if __name__ == '__main__': # test_delete_index() test_create_index() test_get_index() # test_set_index() # test_create_doc() # test_create_doc_bulk() # test_get_doc_all() # test_update_doc() # test_get_doc_by_id() # test_get_doc() # test_delete_doc() # test_get_doc_agg() # test_get_doc_stats() # test_get_doc_scroll() cli.close()
测试截图
更多python相关内容:【python总结】python学习框架梳理
本人b站账号:一路狂飚的蜗牛
有问题请下方评论,转载请注明出处,并附有原文链接,谢谢!如有侵权,请及时联系。