Python-ElasticSearch客户端的封装(聚合查询、统计查询、全量数据)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: Python-ElasticSearch客户端的封装(聚合查询、统计查询、全量数据)

ES Python客户端介绍

官方提供了两个客户端elasticsearch、elasticsearch-dsl

pip install elasticsearch
pip install elasticsearch-dsl

第二个是对第一个的封装,类似ORM操作数据库,可以.filter、.groupby,个人感觉很鸡肋,star数也不多。平时使用的时候一般会在kibana上测试,然后直接把query拷贝过来获取更多数据,所以这里做下第一个的封装。

封装代码

  1. 封装后依然暴露了es,方便有特殊情况下使用
  2. index一般很少改动,就直接放到对象中了,可以使用set_index修改
  3. 常用的应该是get_doc和get_doc_scroll来获取少量和全量数据

代码测试时使用的是7.17.12版本,大于此版本可能由于官方改动出异常

pip install elasticsearch==7.17.12

es.py

import random
import string
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from typing import List,Dict
class ESClient:
    def __init__(self, host="127.0.0.1",index="", http_auth = None):
        self.index = index
        if http_auth is None:
            self.es = Elasticsearch(hosts=host)
        else:
            self.es = Elasticsearch(hosts=host, http_auth=http_auth)
        print("success to connect " + host)
    def close(self):
        self.es.close()
    # 设置索引
    def set_index(self,index:str):
        self.index = index
    # 创建索引
    def create_index(self, index_name: str, mappings=None):
        res = self.es.indices.create(index=index_name, mappings=mappings)
        return res
    # 删除索引
    def delete_index(self, index_name: str):
        res = self.es.indices.delete(index=index_name)
        return res
    # 获取索引
    def get_index(self, index_name: str):
        res = self.es.indices.get(index=index_name)
        return res
    # 创建文档(单个)
    def create_doc(self,body, _id=''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))):
        res = self.es.create(index=self.index, body=body, id=_id)
        return res
    # 创建文档(批量)
    def create_doc_bulk(self, docs: List[Dict]):
        actions = []
        for doc in docs:
            action = {
                "_index": self.index,
                "_op_type": "create",
                "_id": ''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))
            }
            for k,v in doc.items():
                action[k] = v
            actions.append(action)
        res = bulk(client=self.es, actions=actions)
        return res
    # 删除文档
    def delete_doc(self, doc_id):
        res = self.es.delete(index=self.index, id=doc_id)
        return res
    # 更新文档
    def update_doc(self, doc_id, doc:Dict):
        body = {
            "doc" : doc
        }
        res = self.es.update(index=self.index, id=doc_id, body=body)
        return res
    # 分页获取超过100000的文档
    def get_doc_scroll(self,query:Dict):
        res = self.es.search(index=self.index,size=10000,body=query,search_type="query_then_fetch",scroll="5m")
        data_list = []
        hits = res.get("hits")
        scroll_id = res.get('_scroll_id')
        total_value = 0
        # total 可能为Dict或int
        if isinstance(hits.get('total'),Dict):
            total_value= hits.get('total').get('value')
        else:
            total_value = hits.get('total')
        if total_value>0:
            for data in hits.get('hits'):
                data_list.append(data.get('_source'))
        return scroll_id,data_list
    # 通过scroll_id分页获取后序文档
    def get_doc_by_scroll_id(self,scroll_id):
        page = self.es.scroll(scroll_id=scroll_id,scroll="5m")
        data_list = []
        scroll_id = page.get('_scroll_id')
        for data in page.get('hits').get('hits'):
            data_list.append(data)
        return scroll_id,data_list
    # 清空scroll_id,防止服务端不够用
    def clear_scroll(self,scroll_id):
        self.es.clear_scroll(scroll_id)
    # 获取索引的hits内容(一般用于获取文档id、总数)
    def get_doc_all(self):
        res = self.es.search(index=self.index)
        return res['hits']
    # 获取一个文档
    def get_doc_by_id(self, id_):
        res = self.es.get(index=self.index, id=id_)
        return res["_source"]
    # 获取所有文档的_source内容(小于100000)
    def get_doc(self,query:Dict,size:int=100000):
        query['size'] = size
        res = self.es.search(index=self.index,body=query)
        data_list = []
        hits = res.get("hits")
        total_value = 0
        # total 可能为Dict或int
        if isinstance(hits.get('total'), Dict):
            total_value = hits.get('total').get('value')
        else:
            total_value = hits.get('total')
        if total_value > 0:
            for data in hits.get('hits'):
                data_list.append(data.get('_source'))
        return data_list
    # 聚合查询(分组条件名为group_by,返回buckets)
    def get_doc_agg(self, query):
        res = self.es.search(index=self.index, body=query)
        return res['aggregations']['group_by'].get('buckets')
    # 统计查询(统计条件为stats_by,返回最值、平均值等)
    def get_doc_stats(self,query):
        res = self.es.search(index=self.index,body=query)
        return res['aggregations']["stats_by"]

测试代码

import unittest
from es import ESClient
cli = ESClient(host="http://10.28.144.3:9200",http_auth=["elastic","changeme"])
def test_create_index():
    res = cli.create_index(index_name="test")
    print(res)
def test_delete_index():
    res = cli.delete_index(index_name="test")
    print(res)
def test_get_index():
    res = cli.get_index(index_name="test")
    print(res)
def test_set_index():
    cli.set_index(index="test")
def test_create_doc():
    body = {
        "name": "lady_killer9",
        "age": 19
    }
    res = cli.create_doc(body=body)
    print(res)
def test_create_doc_bulk():
    from copy import deepcopy
    body = {
        "name": "lady_killer9"
    }
    users = []
    for i in range(100001):
        tmp = deepcopy(body)
        tmp["age"] = i
        users.append(tmp)
    res = cli.create_doc_bulk(docs=users)
    print(res)
def test_get_doc_all():
    res = cli.get_doc_all()
    print(res)
def test_get_doc_by_id():
    res = cli.get_doc_by_id("jHALXDQaENQZPM4C9EUt")
    print(res)
def test_get_doc():
    query = {
        "query": {
            "match_all": {
            }
        }
    }
    res = cli.get_doc(query=query,size=20)
    print(res)
def test_update_doc():
    body={
        "name": "lady_killer_after_update"
    }
    res = cli.update_doc(doc_id="jHALXDQaENQZPM4C9EUt",doc=body)
    print(res)
def test_delete_doc():
    res = cli.delete_doc(doc_id="jHALXDQaENQZPM4C9EUt")
    print(res)
def test_get_doc_agg():
    query = {
            "aggs": {
                "group_by": {
                    "terms": {
                        "field": "age"
                    }
                }
            }
    }
    res = cli.get_doc_agg(query=query)
    print(res)
def test_get_doc_stats():
    query = {
            "aggs": {
                "stats_by": {
                    "stats": {
                        "field": "age"
                    }
                }
            }
    }
    res = cli.get_doc_stats(query=query)
    print(res)
def test_get_doc_scroll():
    query = {
        "query": {
            "match_all": {}
        }
    }
    scroll_id,data_list = cli.get_doc_scroll(query=query)
    res = []
    while data_list:
        res.extend(data_list)
        scroll_id,data_list = cli.get_doc_by_scroll_id(scroll_id=scroll_id)
    print(len(res))
if __name__ == '__main__':
    # test_delete_index()
    test_create_index()
    test_get_index()
    # test_set_index()
    # test_create_doc()
    # test_create_doc_bulk()
    # test_get_doc_all()
    # test_update_doc()
    # test_get_doc_by_id()
    # test_get_doc()
    # test_delete_doc()
    # test_get_doc_agg()
    # test_get_doc_stats()
    # test_get_doc_scroll()
    cli.close()

测试截图

更多python相关内容:【python总结】python学习框架梳理

本人b站账号:一路狂飚的蜗牛

有问题请下方评论,转载请注明出处,并附有原文链接,谢谢!如有侵权,请及时联系。

参考

github-elasticsearch

github-elasticsearch-dsl

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
6天前
|
数据采集 数据安全/隐私保护 Python
从零开始:用Python爬取网站的汽车品牌和价格数据
在现代化办公室中,工程师小李和产品经理小张讨论如何获取懂车帝网站的汽车品牌和价格数据。小李提出使用Python编写爬虫,并通过亿牛云爬虫代理避免被封禁。代码实现包括设置代理、请求头、解析网页内容、多线程爬取等步骤,确保高效且稳定地抓取数据。小张表示理解并准备按照指导操作。
从零开始:用Python爬取网站的汽车品牌和价格数据
|
1天前
|
算法 Serverless 数据处理
从集思录可转债数据探秘:Python与C++实现的移动平均算法应用
本文探讨了如何利用移动平均算法分析集思录提供的可转债数据,帮助投资者把握价格趋势。通过Python和C++两种编程语言实现简单移动平均(SMA),展示了数据处理的具体方法。Python代码借助`pandas`库轻松计算5日SMA,而C++代码则通过高效的数据处理展示了SMA的计算过程。集思录平台提供了详尽且及时的可转债数据,助力投资者结合算法与社区讨论,做出更明智的投资决策。掌握这些工具和技术,有助于在复杂多变的金融市场中挖掘更多价值。
22 12
|
1月前
|
数据采集 Web App开发 数据可视化
Python用代理IP获取抖音电商达人主播数据
在当今数字化时代,电商直播成为重要的销售模式,抖音电商汇聚了众多达人主播。了解这些主播的数据对于品牌和商家至关重要。然而,直接从平台获取数据并非易事。本文介绍如何使用Python和代理IP高效抓取抖音电商达人主播的关键数据,包括主播昵称、ID、直播间链接、观看人数、点赞数和商品列表等。通过环境准备、代码实战及数据处理与可视化,最终实现定时任务自动化抓取,为企业决策提供有力支持。
|
2月前
|
数据采集 Web App开发 监控
Python爬虫:爱奇艺榜单数据的实时监控
Python爬虫:爱奇艺榜单数据的实时监控
|
2月前
|
数据采集 存储 XML
python实战——使用代理IP批量获取手机类电商数据
本文介绍了如何使用代理IP批量获取华为荣耀Magic7 Pro手机在电商网站的商品数据,包括名称、价格、销量和用户评价等。通过Python实现自动化采集,并存储到本地文件中。使用青果网络的代理IP服务,可以提高数据采集的安全性和效率,确保数据的多样性和准确性。文中详细描述了准备工作、API鉴权、代理授权及获取接口的过程,并提供了代码示例,帮助读者快速上手。手机数据来源为京东(item.jd.com),代理IP资源来自青果网络(qg.net)。
|
3月前
|
数据采集 存储 数据挖掘
Python数据分析:Pandas库的高效数据处理技巧
【10月更文挑战第27天】在数据分析领域,Python的Pandas库因其强大的数据处理能力而备受青睐。本文介绍了Pandas在数据导入、清洗、转换、聚合、时间序列分析和数据合并等方面的高效技巧,帮助数据分析师快速处理复杂数据集,提高工作效率。
114 0
|
3月前
|
机器学习/深度学习 数据采集 数据挖掘
解锁 Python 数据分析新境界:Pandas 与 NumPy 高级技巧深度剖析
Pandas 和 NumPy 是 Python 中不可或缺的数据处理和分析工具。本文通过实际案例深入剖析了 Pandas 的数据清洗、NumPy 的数组运算、结合两者进行数据分析和特征工程,以及 Pandas 的时间序列处理功能。这些高级技巧能够帮助我们更高效、准确地处理和分析数据,为决策提供支持。
72 2
|
3月前
|
存储 数据挖掘 数据处理
Python数据分析:Pandas库的高效数据处理技巧
【10月更文挑战第26天】Python 是数据分析领域的热门语言,Pandas 库以其高效的数据处理功能成为数据科学家的利器。本文介绍 Pandas 在数据读取、筛选、分组、转换和合并等方面的高效技巧,并通过示例代码展示其实际应用。
83 2
|
3月前
|
数据采集 数据可视化 数据挖掘
Python数据分析:Pandas库实战指南
Python数据分析:Pandas库实战指南
|
3月前
|
并行计算 数据挖掘 大数据
Python数据分析实战:利用Pandas处理大数据集
Python数据分析实战:利用Pandas处理大数据集

热门文章

最新文章

推荐镜像

更多