3.5.19.Elasticsearch语言开发(Python/Nodejs/Java)
3.5.19.1.Elasticsearch语言开发(Python)
创作人:张刘毅
审稿人:周海清
简介
本节介绍如何使用 Python 操作 Elasticsearch(简称 ES ),将从以下几个方面进行阐述:
l 客户端选择
l 配置及初始化
l 常见 API 使用
l 优化建议
一、客户端选择
Elasticsearch 官方提供了 low-level Python 客户端 elasticsearch-py,目的是为了给用
Python 代码操作 ES 提供统一、可扩展的编程接口。low-level Python 客户端的功能包括:
l 将 Python 的基本数据类型转换为 JSON
l 集群节点自动发现
l 负载平衡及线程安全
l 使用 helpers 的 bulk API 进行批量导入和 reindex 的功能
同时,官方也提供了建立在 elasticsearch-py 之上的高级库 elasticsearch-dsl,使用它可以更方便进行 DSL 查询。另外它还提供了一种类似 ORM 的方式将文档作为 Python 对象处理。
二 、 配置和初始化
1. 环境准备
Python 的客户端可以通过 pip 进行安装,我们选择国内的清华源加快下载速度:
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple elasticsearch
如果有异步编程需求的可以下载:
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple elasticsearch[async]
2. 命令行测试
Python 由于其丰富的库函数和解释型语言特性使它成为多数平台上脚本测试的利器,我们在编写 Python 项目工程前可以在命令行里测试下和 ES 的交互是否正常。
>>> from datetime import datetime >>> from elasticsearch import Elasticsearch >>> es = Elasticsearch("localhost:9200") >>> es.index(index="my-index-000001", doc_type="test-type", id=42, body={"any": "data", "timestamp": datetime.now()}) {'_index': 'my-index-000001', '_type': 'test-type', '_id': '42', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}
3. 初始化
客户端一般在全局作用域初始化,这样不仅提高性能也方便启用后台嗅探节点等功能。模版如下:
#导入es的python客户端 from elasticsearch import Elasticsearch # 设置集群地址 client = Elasticsearch( ... # 客户端的配置信息 ) def main(request): ... # 使用客户端
l 指定连接
client = Elasticsearch( ['esnode1:port', 'esnode2:port'] # 认证信息 # http_auth=('elastic', 'changeme') )
l 使用嗅探功能进行动态连接
client = Elasticsearch( ['esnode1:port', 'esnode2:port'], # 在做任何操作之前,先进行嗅探 sniff_on_start=True, # 节点没有响应时,进行刷新,重新连接 sniff_on_connection_fail=True, # 每 60 秒刷新一次 sniffer_timeout=60 ) # 可以获取当前连接的节点来测试 client.cluster.client.info()
4. 使用连接池
数据库是一项宝贵的资源,使用连接池技术可以减少连接开销从而提高程序性能。连接池是保存数据库连接实例的容器,ES 通过 ConnectionSelector 管理连接选择和无效连接。每次请求通过 get_connection 方法获取连接。如果连接失败将其标记 mark_dead 并设置超时,超时结束后将连接重新放回到连接池。
下面的代码示例中,使用 ES 类简单封装了 Elasticsearch 连接池的并发连接并示范了一个简单查询:查询含有 "FANCD2" 的 gene symbol。
# -*- coding=utf8 -*- import os import json from datetime import datetime from elasticsearch import Elasticsearch, RequestsHttpConnection from elasticsearch import Transport from elasticsearch.exceptions import NotFoundError class ES(object): # 索引的相关设置 _index = "hgvs4variation" _type = "_doc" # ES类初始化设置,使用基于 requests 实例化 ES 连接池 def __init__(self, hosts): self.conn_pool = Transport(hosts=hosts, connection_class=RequestsHttpConnection).connection_pool # 获取ES连接 def get_conn(self): """ 从连接池获取一个连接 """ conn = self.conn_pool.get_connection() return conn # 向es集群发送一个请求 def request(self, method, url, headers=None, params=None, body=None): conn = self.get_conn() try: status, headers, body = conn.perform_request(method, url, headers=headers, params=params, body=body) except NotFoundError as e: return None if method == "HEAD": return status return json.loads(body) def post(self, url, body=None, method="POST"): """使用post请求访问服务器""" data = self.request(method, url, body=body) return data if __name__ == '__main__': ELASTICSEARCH = [{'host':'192.168.2.2','port':27000}] es=ES(ELASTICSEARCH) #获得连接 res_conn = es.get_conn() print(res) #查询示例 res_query = es.request(method='POST',body='{"query": { "match": { "#Symbol": "FANCD2"}}}',url='/_search') print(res_query)
《Elastic Stack 实战手册》——三、产品能力——3.5 进阶篇——3.5.19.Elasticsearch语言开发(Python/Nodejs/Java)—— 3.5.19.1.Elasticsearch语言开发(Python)(下) https://developer.aliyun.com/article/1226662