python 操作ES

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: python 操作ES

python 操作es 的基操


配置部分

    """
    @author xiaofei
    @email 
    @auth
    @desc 
    """
    import pymongo, json
    from elasticsearch import Elasticsearch
    from elasticsearch import helpers
    import math
    # mapping 定义你的es字段
    doc_mapping = {
        'properties': {
            "other_id": {"type": "long"},
            "city": {"type": "keyword"},
            "id": {"type": "keyword"},
        }
     }
    # 创建ES连接
    es_conn_test = Elasticsearch(["127.0.0.1:6666"], maxsize=25)
    # es的基础配置,  index_name, alias(别名可以创建多个), doc_type(表名)
    index_name = f'complex_test'
    online_alias = f'online_complex_test'
    type_name = f'complex_city'

es的基操

    # es 创建库
    es_conn_test.indices.create(index=index_name, ignore=400)
    es_conn_test.indices.put_mapping(index=index_name, doc_type=type_name, body=doc_mapping)
    es_conn_test.indices.put_alias(index_name, online_alias)
    # 获取mapping
    res = es_conn_test.indices.get_mapping(index=index_name)
    print(json.dumps(res))
    # es查询
    res = es_conn_test.search(index=index_name, doc_type=type_name, body={})
    print(json.dumps(res))
    # 删除索引
    res = es_conn_test.indices.delete(index=index_name)
    print(res)

mongo 数据批量插入es(注意, 如果需要插入的数据量太多的话要分批插入, 比如1000条一次, 根据性能自己调整)

    # 连接mongo
    mongo_spider = pymongo.MongoClient(host='')
    cli = mongo_spider.test.test_test
    # 统计mongo里面的数量, 计算分页
    nums = cli.count()
    print(nums)
    pages = math.ceil(nums/1000)
    print(pages)
    for page in range(pages):
        ls = page *1000
        # mongo分页查询
        datas = list(cli.find({}).limit(1000).skip(ls))
        data = []
        for x in datas:
            # 判断数据完整
            res = {}
            for k, v in doc_mapping['properties'].items():
                if k == 'id':
                    res['id'] = x['key']
                elif not x.get(k, ''):
                    if v['type'] in ['keyword', 'text']:
                        res[k] = ''
                    elif v['type'] == 'long':
                        res[k] = 0
                    elif v['type'] == 'float':
                        res[k] = 0.0
                else:
                    res[k] = x[k]
            # 下面注释这行是单条插入
            # es_conn_test.create(index=index_name, doc_type=type_name, id=1, body=x)
            action = {"_index": index_name, "_type": type_name, "_source": res, "_id": res['id']}
            print(action)
            data.append(action)
        helpers.bulk(es_conn_test, data)
        print(f'插入{ls}条')
    mongo_spider.close()

es数据批量更新 (注意, 如果需要更新的数据量太多的话要分批更新, 比如1000条一次, 根据性能自己调整)

    for x in data:
        id = x['id']
        time_score = x['time_score']
        price_score = x['price_score']
        doc = {"time_score": time_score, "price_score": price_score}
        datas = {"_index": f"online_complex_test", "_op_type": "update", "_type": f"complex_city", "_id": id, 'doc': doc}
        print(datas)
        result.append(datas)
    helpers.bulk(es_conn, result)
    print('更新成功')

如果想存在即更新, 不存在则更新

# 加一个 doc_as_upsert 参数就好了
datas = {"_index": f"online_complex_test", "_op_type": "update", "_type": f"complex_city", "_id": id, 'doc': doc, "doc_as_upsert": True}


相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
5月前
|
存储 监控 数据处理
💻Python高手必备!文件系统操作秘籍,让你的数据存取如臂使指
【7月更文挑战第29天】在数据驱动时代, Python以简洁语法、丰富库生态和强大跨平台能力, 成为数据科学等领域首选。本文探讨Python文件系统操作秘籍, 助力高效数据处理。
56 11
|
5月前
|
索引 Python
Python的列表操作有哪些?
Python的列表操作非常丰富,包括列表的创建、元素的访问、修改、添加、删除、切片、排序等多个方面。
54 12
|
5月前
|
监控 网络协议 网络安全
SMTP操作使用详解并通过python进行smtp邮件发送示例
SMTP操作使用详解并通过python进行smtp邮件发送示例
166 3
|
5月前
|
数据挖掘 数据处理 Python
🔍深入Python系统编程腹地:文件系统操作与I/O管理,打造高效数据处理流水线
【7月更文挑战第29天】深入Python系统编程腹地:文件系统操作与I/O管理,打造高效数据处理流水线
43 3
|
5月前
|
安全 数据安全/隐私保护 Python
|
5月前
|
Serverless 语音技术 开发工具
函数计算操作报错合集之怎么何集成nls tts python sdk
在使用函数计算服务(如阿里云函数计算)时,用户可能会遇到多种错误场景。以下是一些常见的操作报错及其可能的原因和解决方法,包括但不限于:1. 函数部署失败、2. 函数执行超时、3. 资源不足错误、4. 权限与访问错误、5. 依赖问题、6. 网络配置错误、7. 触发器配置错误、8. 日志与监控问题。
|
5月前
|
API Python
Python高手修炼手册:精通文件系统操作,掌控I/O管理,提升编程效率
【7月更文挑战第30天】在 Python 编程中, 文件系统操作与 I/O 管理是连接程序与数据的关键。初学者常因路径错误和权限问题受挫, 而高手能自如管理文件。传统 `os` 和 `os.path` 模块易出错, `pathlib` 提供了更直观的对象导向 API。I/O 方面, 同步操作会阻塞程序, 异步 (如使用 `aiofiles`) 则能大幅提升并发能力。真正的高手不仅掌握 API, 更能预见性能瓶颈并优化代码, 实现高效与优雅。
48 1
|
5月前
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之重新上传后只有SQL无法运行,而Python可以正常运行,是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
Python