带你读《Elastic Stack 实战手册》之65:——3.5.19.1.Elasticsearch语言开发(Python)(下)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 带你读《Elastic Stack 实战手册》之65:——3.5.19.1.Elasticsearch语言开发(Python)(下)

《Elastic Stack 实战手册》——三、产品能力——3.5 进阶篇——3.5.19.Elasticsearch语言开发(Python/Nodejs/Java)—— 3.5.19.1.Elasticsearch语言开发(Python)(上) https://developer.aliyun.com/article/1226664




、 常见的 API 使用

 

1. Index API

 

对 ES 单条写入只需要指定三部分信息:索引名,id, body。示例如下:

from datetime import datetime
from elasticsearch import Elasticsearch
#连接ES集群
es = Elasticsearch("192.168.2.2:27000")
#以JSON字符串的形式构建文档内容
doc = {
    'author': 'author_name',
    'text': 'Interesting content...',
    'timestamp': datetime.now(),
}
res = es.index(index="test-index", id=1, body=doc)
print(res['result'])

2. Get API

 

要获取一条文档需要指定其索引和 id

res = es.get(index="test-index", id=1)
#获取_source内的文档内容
print(res['_source'])

 3. Refresh API

 

ES接收数据请求时会先存入内存中,默认每隔一段时间会从内存 buffer 中将数据写入filesystem cache。如需手动 refresh ,可使用如下 API。


es.indices.refresh(index="test-index")

4. Search API


# 查询所有的数据
body = {
    "query":{
        "match_all":{}
    }
}
res = es.search(index="test-index",body=body)
print("Got %d Hits:" % res['hits']['total']['value'])
for hit in res['hits']['hits']:
    print("%(timestamp)s %(author)s: %(text)s" % hit["_source"])

5. Update API


doc = {
    'author': 'author_name',
    'text': 'Interesting modified content...',
    'timestamp': datetime.now(),
}
res = es.update(index="test-index", id=1, body=doc)
print(res['result'])

6. Delete API

es.delete(index="test-index", id=1)

7. Bulk API


elasticsearch 的 helpers 模块是对 bulk 的封装。Bulk API 接受索引、创建、删除和更新操作。使用 _op_type 字段指定操作(_op_type 默认为 index)

 

批量写入示例如下:


import random
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es=Elasticsearch(hosts='192.168.2.2',port=27000)
meaningless_random_repeat=['python','elasticsearch','alibaba','bulk_helpers']
actions=[]
for i in range(10000):
    meaningless_random_repeat=meaningless_random_repeat[random.randrange(0,len(meaningless_random_repeat))]
    action={
            #操作 index update create delete  
            '_op_type':'index',
            #索引名
            '_index':'meaning_less_random_repeat',
            #索引type
            '_type':'_doc',  
            '_source':{'meaningless_random_repeat':meaningless_random_repeat}}
    actions.append(action)
helpers.bulk(client=es,actions=actions)
#这里使用streaming_bulk,如果设置线程数的需求可以使用parallel_bulk  
for ok,response in helpers.streaming_bulk(es,actions):
        if not ok:
            print(response)

8. Scan API

当我们需要使用大查询时可以使用 Scan API。Scan API 是对 scroll API 的简单封装。

from elasticsearch import Elasticsearch
from elasticsearch import helpers
es=Elasticsearch(hosts='192.168.2.2',port=27000)
query='{ "match": { "#Symbol": "FANCD2"}}}'
index = 'hgvs4variation'
source={'hgvs4variation':'hgvs4variation'}
datas = helpers.scan(es, index=index, query=body, size=1000, request_timeout=100, _source=source)
for data in datas:
    print(data)

、优化建议

 

1. 写入优化

 

针对日志类场景,对于写入速度要求很高而对数据的情况下:

 

l 不要单条导入,尽量使用 Bulk API。这样是为了提高写入效率。

l 可以先设置索引副本为0,写完恢复副本数。这样是为了降低索引副本分配产生的性能开销。

l 增加 Index Refresh 间隔,这样是为了减少 Segment Merge 的次数。

 

2. 查询优化

 

针对大查询的场景:

 

l 运维端可以根据当前的集群情况适当调整 max_result_window 参数。

l 开发端可以使用 ES 的 scroll API 或者 helpers 的 scan API,尽量控制每次拉取的条数以及避免深翻。

l 选择合适的路由,查询时可以根据 Routing 信息,直接定位到目标分片,避免查询所有的分片。


五、完整的代码示例

 

https://gitee.com/ld-sys/elasticsearch-python

 

参考

 

l https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/index.html

 

创作人简介:

张刘毅,Elastic 认证工程师,存储研发工程师,曾经做过 AI 大数据平台研发,负责过80+ES集群。目前专注于生物医药和大数据。

博客:https://blog.csdn.net/dtzly

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
2月前
|
数据可视化 API Python
画图实战-Python实现某产品全年销量数据多种样式可视化
画图实战-Python实现某产品全年销量数据多种样式可视化
39 0
|
8月前
|
存储 Python
【python基础知识】20.“午饭吃什么”的python实现(产品思维-实操篇)
【python基础知识】20.“午饭吃什么”的python实现(产品思维-实操篇)
75 0
【python基础知识】20.“午饭吃什么”的python实现(产品思维-实操篇)
|
3天前
|
SQL DataWorks 安全
DataWorks产品使用合集之DataWorks资源里python运行时候,查看中途打印日志如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
19 0
|
4天前
|
分布式计算 DataWorks 关系型数据库
MaxCompute产品使用合集之我需要在MaxCompute客户端添加Python第三方包,我该怎么操作
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5天前
|
运维 监控 Serverless
Serverless 应用引擎产品使用之阿里函数计算中在自定义环境下用debian10运行django,用官方层的python3.9,配置好环境变量后发现自定义层的django找不到了如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
14 3
|
17天前
|
存储 监控 开发工具
对象存储OSS产品常见问题之python sdk中的append_object方法支持追加上传xls文件如何解决
对象存储OSS是基于互联网的数据存储服务模式,让用户可以安全、可靠地存储大量非结构化数据,如图片、音频、视频、文档等任意类型文件,并通过简单的基于HTTP/HTTPS协议的RESTful API接口进行访问和管理。本帖梳理了用户在实际使用中可能遇到的各种常见问题,涵盖了基础操作、性能优化、安全设置、费用管理、数据备份与恢复、跨区域同步、API接口调用等多个方面。
46 9
|
5月前
|
传感器 算法 定位技术
Python中gdal实现MODIS卫星遥感影像产品栅格数据读取处理与质量控制QC波段筛选掩膜
Python中gdal实现MODIS卫星遥感影像产品栅格数据读取处理与质量控制QC波段筛选掩膜
|
8月前
|
Java API 数据安全/隐私保护
Elasticsearch Java API Client 开发
本场景主要介绍如何使用 Elasticsearch Java API Client 进行开发,实现常用的 CRUD 操作。
146 0
|
8月前
|
存储 程序员 Python
【python基础知识】19.产品思维以及流程图的使用 - 思维篇
【python基础知识】19.产品思维以及流程图的使用 - 思维篇
61 0
【python基础知识】19.产品思维以及流程图的使用 - 思维篇
|
11月前
|
资源调度 算法 计算机视觉
python opencv 图像处理进阶篇(二)
python opencv 图像处理进阶篇(二)

相关产品

  • 检索分析服务 Elasticsearch版