python3操作MongoDB的crud以及聚合案例,代码可直接运行(python经典编程案例)

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
简介: 这篇文章提供了使用Python操作MongoDB数据库进行CRUD(创建、读取、更新、删除)操作的详细代码示例,以及如何执行聚合查询的案例。

参考:
官方文档:https://pymongo.readthedocs.io/en/stable/
github:https://github.com/mongodb/mongo-python-driver

一. 插入数据案例

# -*- encoding: utf-8 -*-
import time
import pymongo
import datetime

# 创建对象
client = pymongo.MongoClient('mongodb://账号:密码@主机:端口号/?authSource=admin')
# 连接DB数据库
db = client['数据库名']


def insert_one():
    # 连接集合user,集合类似于关系数据库的数据表; 如果集合不存在,就会新建集合user
    user_collection = db.user_demo
    # 设置文档格式(文档即我们常说的数据)
    user_info = {
   
        "_id": 105,
        "author": "小绿",
        "text": "Python开发",
        "tags": ["mongodb", "pymongo"],
        "date": datetime.datetime.now()}

    # 使用insert_one单条添加文档,inserted_id获取写入后的id
    # 添加文档时,如果文档尚未包含"_id"键,就会自动添加"_id"。"_id"的值在集合中必须是唯一的
    # inserted_id用于获取添加后的id,若不需要,则可以去掉
    user_id = user_collection.insert_one(user_info).inserted_id
    print("user id is ", user_id)


def insert_many():
    #批量添加
    user_infos = [{
   
        "_id": 101,
        "author": "小黄",
             "text": "Python开发",
             "tags": ["mongodb", "python", "pymongo"],
             "date": datetime.datetime.utcnow()},
     {
   
        "_id": 102,
        "author": "小黄_A",
             "text": "Python开发_A",
             "tags": {
   "db":"Mongodb","lan":"Python","modle":"Pymongo"},
             "date": datetime.datetime.utcnow()},
     ]

    user_collection = db.user_insert_many
    # inserted_ids用于获取添加后的id,若不需要,则可以直接去掉
    user_id = user_collection.insert_many(user_infos).inserted_ids
    print("user id is ", user_id)


def bulk_insert_data():
    from pymongo import UpdateOne
    data_list = [{
   'user_id': 5, 'name': '张三1', 'age': 27, 'email': 'zhangsan1@email.com'},
                 {
   'user_id': 6, 'name': '李四1', 'age': 26, 'email': 'lisi1@email.com'},
                 {
   'user_id': 7, 'name': '王五1', 'age': 29, 'email': 'wangwu1@email.com'},
                 {
   'user_id': 8, 'name': '赵六1', 'age': 26, 'email': 'zhaoliu1@email.com'}]
    bulk_data_list = []
    for data in data_list:
        one = UpdateOne({
   "_id": data['user_id']}, {
   
            "$set": {
   "name": data['name'],
                     "age": data['age'],
                     "email": data['email'],
                     "date": datetime.datetime.now()}}, upsert=True)
        bulk_data_list.append(one)

    try:
        collection_item = db.bulk_insert_demo
        collection_item.bulk_write(bulk_data_list)
    except Exception as e:
        print(f'e: {e}')
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')}, 已存mongo: {len(bulk_data_list)}条")


if __name__ == '__main__':
    # 插入单条数据
    insert_one()

    # 插入多条数据
    # insert_many()

    # 批量插入
    # bulk_insert_data()

二. 查询数据案例

# -*- encoding: utf-8 -*-
import re
import pymongo
# 创建对象
# client = pymongo.MongoClient()
client = pymongo.MongoClient('mongodb://账号:密码@主机:端口号/?authSource=admin')
# 连接DB数据库
db = client['数据库名']


def find_by_condition():
    # 连接集合user,集合类似于关系数据库的数据表, 如果集合不存在,就会新建集合user
    user_collection = db.user
    # 1. 查询文档: find({"_id":101}),其中{"_id":101}为查询条件, 若查询条件为空,则默认查询全部
    # find_value = user_collection.find({"_id": 103})
    # print(list(find_value))

    # 2. 如果要实现多条件查询,$and和$or,使用方法如下:
    # AND条件查询
    # find_value = user_collection.find({"$and": [{"_id": 104}, {"author": "小蓝"}]})
    # print(list(find_value))
    # OR条件查询
    # find_value = user_collection.find({"$or": [{"author": "小黄_A"}, {"author": "小黄"}]})
    # print(list(find_value))

    # 3. 根据范围查找: $gt: 大于, $gte: 大于等于, $lt: 小于, $lte: 小于等于, $ne: 不等于,
    # 如查找id>102且id<104(_id=101)的文档
    # find_value = user_collection.find({"_id": {"$gt": 102, "$lt": 104}})
    # print(list(find_value))
    # 查找id在[100,101]的文档
    # find_value = user_collection.find({"_id": {"$in": [100, 101]}})
    # print(list(find_value))
    # find_value = user_collection.find({"and": [{"_id": {"$gt": 102, "$lt": 105}},
    #                                           {"_id": {"$in": [100, 105]}}]})
    # print(list(find_value))

    # 4. 模糊查询实际上是加入正则表达式实现的
    # # 方法一
    # find_value = user_collection.find({"author": {"$regex": ".*小.*"}})
    # print(list(find_value))
    # #方法二
    regex = re.compile(".*小.*")
    find_value = user_collection.find({
   "author": regex})
    print(list(find_value))

    # 5. 查询嵌入/嵌套文档
    # 查询字段"tags":{"db":"Mongodb","lan":"Python","modle":"Pymongo"}
    # 查询嵌套字段,只需要查询嵌套里的某个值即可
    find_value = user_collection.find({
   "tags.db": "Mongodb"})
    print(list(find_value))

    # 6. 查询字段"tags":{"db":
    # {"Mongodb":"NoSql","MySql":"Sql"},"lan":"Python","modle":"Pymongo"}
    # find_value = user_collection.find({"tags.db.Mongodb": "NoSql"})
    # print(list(find_value))


def find_many():
    user_collection = db.user

    # 1. 查询文档数量
    # result_data = user_collection.count_documents({})
    # print(result_data)

    # 2. 限定返回结果
    # result_data_limit = user_collection.find({}).limit(2)
    # for result in result_data_limit:
    #     print(result)

    # 3. 对查询结果进行排序: 字段值1表示正序, -1表示倒序
    # user_collection = db.bulk_insert_demo
    # result_data_sort = user_collection.find({'age': {'$gt': 22}}).sort([('age', -1)])
    # print(list(result_data_sort))

    # 4. 对数据进行去重
    user_collection = db.bulk_insert_demo
    # 对age字段去重
    result_data_distinct = user_collection.distinct('age')
    print(list(result_data_distinct))
    # 对满足特定条件的age字段去重
    # result_data_distinct = user_collection.distinct('age', {'age': {'$gte': 22}})
    # print(list(result_data_distinct))

    # 5.偏移
    # results = collection.find().sort('id', pymongo.ASCENDING).skip(1)
    # for result in results:
    #     print(result)


if __name__ == '__main__':
    # 根据条件查询文档
    # find_by_condition()

    # 查询数据
    find_many()

三. 更新数据案例

# -*- encoding: utf-8 -*-
import pymongo
# 创建对象
client = pymongo.MongoClient('mongodb://账号:密码@主机:端口号/?authSource=admin')
# 连接DB数据库
db = client['数据库名']


def update_one():
    # update_one(筛选条件,更新内容),筛选条件为空,默认更新第一条文档
    # 如果查询有多条数据,就按照排序先后更新第一条数据
    # {"author": "小蓝"}, {"$set": {"author": "小黄", "text": "数据挖掘"}}
    user_collection = db.user
    user_collection.update_one({
   "author": "小蓝"}, {
   "$set": {
   "author": "小黄", "text": "数据挖掘"}})


def replace_one():
    # replace_one(筛选条件,更新内容)用于将整条数据替换
    # 如果文档的部分数据没有更新,就去除这部分数据
    # topic_data.update_one({"_id": ObjectId(mongo_id)}, {"$set": {'tag_field': 0}})
    user_collection = db.user
    user_collection.replace_one({
   "author": "小绿"},
                                {
   "author": "小绿", "text": "Python_django"})


def update_many():
    # update_many(筛选条件,更新内容)用于批量更新文档, 如果查询有多条数据,就会对全部数据进行更新处理
    # topic_data.update_many({"tag_field": {"$exists": False}}, {"$set": {'tag_field': 0}})
    user_collection = db.user
    user_collection.update_many({
   "author": "小黄"},
                                {
   "$set": {
   "text": "Python_web开发"}})


if __name__ == '__main__':
    # 更新单条文档
    # update_one()

    # 替换一条数据
    replace_one()

    # 更新多条数据
    # update_many()

四. 删除数据案例

# -*- encoding: utf-8 -*-
import pymongo
# 创建对象
# client = pymongo.MongoClient()
client = pymongo.MongoClient('mongodb://账号:密码@主机:端口号/?authSource=admin')
# 连接DB数据库
db = client['数据库名']
user_collection = db.user


def delete_one():
    # 删除单条文档
    # delete_one(筛选条件),筛选条件为空,默认删除第一条文档
    user_collection.delete_one({
   "_id": 100})


def delete_many():
    # delete_many(筛选条件)用于删除多条数据
    user_collection.delete_many({
   "author": "小黄"})


if __name__ == '__main__':
    # 删除单条文档
    delete_one()

    # 删除多条数据
    # delete_many()

五. 聚合查询案例

import pymongo

handler = pymongo.MongoClient().monog_db.example_user

rows = handler.aggregate([
    {
   '$lookup': {
   
        'from': 'example_post',
        'localField': 'id',
        'foreignField': 'user_id',
        'as': 'weibo_info'
        }
    },
    {
   '$unwind': '$weibo_info'},
    {
   '$project': {
   
        'name': 1,
        'work': 1,
        'content': '$weibo_info.content',
        'post_time': '$weibo_info.post_time'}}
])
for row in rows:
    print(row)
相关文章
|
3月前
|
数据采集 机器学习/深度学习 人工智能
Python:现代编程的首选语言
Python:现代编程的首选语言
287 102
|
2月前
|
Python
Python编程:运算符详解
本文全面详解Python各类运算符,涵盖算术、比较、逻辑、赋值、位、身份、成员运算符及优先级规则,结合实例代码与运行结果,助你深入掌握Python运算符的使用方法与应用场景。
179 3
|
2月前
|
数据处理 Python
Python编程:类型转换与输入输出
本教程介绍Python中输入输出与类型转换的基础知识,涵盖input()和print()的使用,int()、float()等类型转换方法,并通过综合示例演示数据处理、错误处理及格式化输出,助你掌握核心编程技能。
420 3
|
2月前
|
并行计算 安全 计算机视觉
Python多进程编程:用multiprocessing突破GIL限制
Python中GIL限制多线程性能,尤其在CPU密集型任务中。`multiprocessing`模块通过创建独立进程,绕过GIL,实现真正的并行计算。它支持进程池、队列、管道、共享内存和同步机制,适用于科学计算、图像处理等场景。相比多线程,多进程更适合利用多核优势,虽有较高内存开销,但能显著提升性能。合理使用进程池与通信机制,可最大化效率。
264 3
|
2月前
|
Java 调度 数据库
Python threading模块:多线程编程的实战指南
本文深入讲解Python多线程编程,涵盖threading模块的核心用法:线程创建、生命周期、同步机制(锁、信号量、条件变量)、线程通信(队列)、守护线程与线程池应用。结合实战案例,如多线程下载器,帮助开发者提升程序并发性能,适用于I/O密集型任务处理。
260 0
|
5月前
|
NoSQL MongoDB 数据库
数据库数据恢复—MongoDB数据库数据恢复案例
MongoDB数据库数据恢复环境: 一台操作系统为Windows Server的虚拟机上部署MongoDB数据库。 MongoDB数据库故障: 工作人员在MongoDB服务仍然开启的情况下将MongoDB数据库文件拷贝到其他分区,数据复制完成后将MongoDB数据库原先所在的分区进行了格式化操作。 结果发现拷贝过去的数据无法使用。管理员又将数据拷贝回原始分区,MongoDB服务仍然无法使用,报错“Windows无法启动MongoDB服务(位于 本地计算机 上)错误1067:进程意外终止。”
|
5月前
|
缓存 NoSQL Linux
在CentOS 7系统中彻底移除MongoDB数据库的步骤
以上步骤完成后,MongoDB应该会从您的CentOS 7系统中被彻底移除。在执行上述操作前,请确保已经备份好所有重要数据以防丢失。这些步骤操作需要一些基本的Linux系统管理知识,若您对某一步骤不是非常清楚,请先进行必要的学习或咨询专业人士。在执行系统级操作时,推荐在实施前创建系统快照或备份,以便在出现问题时能够恢复到原先的状态。
426 79
|
5月前
|
存储 NoSQL MongoDB
MongoDB数据库详解-针对大型分布式项目采用的原因以及基础原理和发展-卓伊凡|贝贝|莉莉
MongoDB数据库详解-针对大型分布式项目采用的原因以及基础原理和发展-卓伊凡|贝贝|莉莉
273 8
MongoDB数据库详解-针对大型分布式项目采用的原因以及基础原理和发展-卓伊凡|贝贝|莉莉
|
4月前
|
运维 NoSQL 容灾
告别运维噩梦:手把手教你将自建 MongoDB 平滑迁移至云数据库
程序员为何逃离自建MongoDB?扩容困难、运维复杂、高可用性差成痛点。阿里云MongoDB提供分钟级扩容、自动诊断与高可用保障,助力企业高效运维、降本增效,实现数据库“无感运维”。
|
8月前
|
NoSQL MongoDB 数据库
数据库数据恢复——MongoDB数据库服务无法启动的数据恢复案例
MongoDB数据库数据恢复环境: 一台Windows Server操作系统虚拟机上部署MongoDB数据库。 MongoDB数据库故障: 管理员在未关闭MongoDB服务的情况下拷贝数据库文件。将MongoDB数据库文件拷贝到其他分区后,对MongoDB数据库所在原分区进行了格式化操作。格式化完成后将数据库文件拷回原分区,并重新启动MongoDB服务。发现服务无法启动并报错。