Python:peewee常用操作CRUD

简介: Python:peewee常用操作CRUD

Defining models is similar to Django orSQLAlchemy

译文:定义模型类似于Django或SQLAlchemy

目录

文档

示例代码仓库

https://github.com/mouday/peewee-demo

安装

pip install peewee

测试环境

$ python --version
Python 3.7.0
$ pip show peewee
Name: peewee
Version: 3.15.3

1、数据库 Database

1.1、设置参数

# -*- coding: utf-8 -*-
"""
@File    : database.py
"""
from peewee import SqliteDatabase
import logging
# 设置数据库
db = SqliteDatabase("demo.db")
# 打印日志
logger = logging.getLogger('peewee')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
logger.propagate = False  # 不向上传播

1.2、连接数据库

from app.database import db
# 链接数据库
db.connect()
# 断开数据库
if not db.is_closed():
    db.close()

1.3、执行原生sql

获取多条记录

cursor = db.execute_sql("select * from tb_user where id = ?", (1,))
rows = cursor.fetchall()
print(rows)
[
    (1, 'Jack', 23, '2022-10-19 18:09:07.038935', '2022-10-19 18:09:07.038940')
]

获取单条记录

cursor = db.execute_sql("select * from tb_user where id = ?", (1,))
# 将返回结果转换为dict
# https://docs.python.org/zh-cn/3.6/library/sqlite3.html#sqlite3.Connection.row_factory 
def dict_factory(cursor, row):
    """将返回结果转换为dict"""
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d
cursor.row_factory = dict_factory
row = cursor.fetchone()
print(row)
{
    'id': 1, 
    'name': 'Jack', 
    'age': 23, 
    'created_time': '2022-10-19 18:09:07.038935', 
    'update_time': '2022-10-19 18:09:07.038940'
}

2、模型 Model

2.1、定义模型

定义基类模型

# -*- coding: utf-8 -*-
"""
@File    : base_model.py
"""
from peewee import Model
from app.database import db
class BaseModel(Model):
    """
    # 基类,设置数据库链接
    """
    class Meta:
        database = db

定义模型

# -*- coding: utf-8 -*-
"""
@File    : user_model.py
"""
from datetime import datetime
from peewee import CharField, DateTimeField, IntegerField, AutoField
from app.model.base_model import BaseModel
class UserModel(BaseModel):
    """
    用户表
    """
    id = AutoField()
    name = CharField(null=False)
    age = IntegerField(null=False)
    created_time = DateTimeField(default=datetime.now)
    update_time = DateTimeField(default=datetime.now)
    class Meta:
        # 指定表名
        table_name = 'tb_user'

2.2、表操作

建表

UserModel.create_table()
(
    'CREATE TABLE IF NOT EXISTS "tb_user" (
    "id" INTEGER NOT NULL PRIMARY KEY, 
    "name" VARCHAR(255) NOT NULL, 
    "age" INTEGER NOT NULL, 
    "created_time" DATETIME NOT NULL, 
    "update_time" DATETIME NOT NULL)', 
    []
)

查看表是否存在

UserModel.table_exists()
(
    'SELECT name FROM "main".sqlite_master WHERE type=? ORDER BY name',
    ('table',)
 )

删除表

UserModel.drop_table()
(
    'DROP TABLE IF EXISTS "tb_user"', 
    []
)

3、模型的CURD操作

3.1、写入操作

插入数据

ret = UserModel.insert({
    UserModel.age: 20,
    UserModel.name: 'Tom'
}).execute()
'INSERT INTO "tb_user"
("name", "age", "created_time", "update_time")
VALUES (?, ?, ?, ?)',
[
    'Tom', 
    20, 
    datetime.datetime(2022, 10, 19, 17, 28, 30, 198981), 
    datetime.datetime(2022, 10, 19, 17, 28, 30, 198988)
]

插入字典数据

ret = UserModel.insert({
    'age': 20,
    'name': 'Tom'
}).execute()
'INSERT INTO "tb_user"
("name", "age", "created_time", "update_time")
VALUES (?, ?, ?, ?)',
[
    'Tom', 
    20, 
    datetime.datetime(2022, 10, 19, 17, 28, 30, 198981), 
    datetime.datetime(2022, 10, 19, 17, 28, 30, 198988)
]

保存实例

user = UserModel(
    age=21,
    name='Tom'
)
user.save()
('INSERT INTO "tb_user" 
    ("name", "age", "created_time", "update_time") 
    VALUES (?, ?, ?, ?)', 
['Charlie', 12, 
datetime.datetime(2022, 10, 19, 17, 34, 43, 376650), 
datetime.datetime(2022, 10, 19, 17, 34, 43, 376652)])

插入并创建实例

user = UserModel.create(
    age=22,
    name='Tom'
)
('INSERT INTO "tb_user" 
    ("name", "age", "created_time", "update_time") 
    VALUES (?, ?, ?, ?)', 
    ['Charlie', 12, 
    datetime.datetime(2022, 10, 19, 17, 36, 16, 408224), 
    datetime.datetime(2022, 10, 19, 17, 36, 16, 408226)])

插入多条数据

UserModel.insert_many([
    {
        'age': 23,
        'name': 'Tom'
    },
    {
        'age': 24,
        'name': 'Tom'
    }
]).execute()
('INSERT INTO "tb_user" 
    ("name", "age", "created_time", "update_time") 
    VALUES (?, ?, ?, ?), (?, ?, ?, ?)', 
    [
    'Tom', 23, 
    datetime.datetime(2022, 10, 19, 17, 38, 48, 106336), 
    datetime.datetime(2022, 10, 19, 17, 38, 48, 106344), 
    'Tom', 24, 
    datetime.datetime(2022, 10, 19, 17, 38, 48, 106355), 
    datetime.datetime(2022, 10, 19, 17, 38, 48, 106360)])

分块插入,忽略重复数据

from peewee import chunked
# Insert rows 100 at a time.
with db.atomic():
    for batch in chunked(data_source, 100):
        MyModel.insert_many(batch).on_conflict_ignore().execute()

3.2、更新数据

更新多条数据

UserModel.update(
    name='Jack'
).where(
    UserModel.id == 1
).execute()
('UPDATE "tb_user" SET "name" = ? WHERE ("tb_user"."id" = ?)', ['Jack', 1])

更新单条数据

UserModel.set_by_id(1, {'name': 'Jack'})
('UPDATE "tb_user" SET "name" = ? WHERE ("tb_user"."id" = ?)', ['Jack', 1])

3.3、删除数据

按照主键删除

UserModel.delete_by_id(1)
('DELETE FROM "tb_user" WHERE ("tb_user"."id" = ?)', [1])

按条件删除

UserModel.delete().where(
    UserModel.id == 1
).execute()
('DELETE FROM "tb_user" WHERE ("tb_user"."id" = ?)', [1])
• 1

删除实例

user = UserModel.get_by_id(1)
user.delete_instance()
('DELETE FROM "tb_user" WHERE ("tb_user"."id" = ?)', [1])

清空表数据

UserModel.truncate_table()
('DELETE FROM "tb_user"', [])

3.4、取单条数据

条件查询一条

row = UserModel.select().where(
    UserModel.name == 'Tom'
).get()
print(row)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    WHERE ("t1"."name" = ?) 
    LIMIT ? OFFSET ?', 
    ['Tom', 1, 0])

获取第一条

row = UserModel.select().where(
    UserModel.name == 'Tom'
).first()
print(row)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    WHERE ("t1"."name" = ?) 
    LIMIT ?', 
    ['Tom', 1])

通过获取,不存在报错

row = UserModel.get(UserModel.name == 'Tom')
print(row)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    WHERE ("t1"."name" = ?) 
    LIMIT ? OFFSET ?', 
    ['Tom', 1, 0])

通过获取或者返回None

user = UserModel.get_or_none(UserModel.name == 'Jack')
print(user)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    WHERE ("t1"."name" = ?) 
    LIMIT ? OFFSET ?', 
    ['Jack', 1, 0])

通过主键获取,不存在报错

user = UserModel.get_by_id(1)
print(user)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    WHERE ("t1"."id" = ?) 
    LIMIT ? 
    OFFSET ?', 
    [1, 1, 0])

获取或创建

UserModel.get_or_create(name='Tom', age=23)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    WHERE (("t1"."name" = ?) AND ("t1"."age" = ?)) 
    LIMIT ? OFFSET ?', 
    ['Tom', 23, 1, 0])
('BEGIN', None)
('INSERT INTO "tb_user" 
    ("name", "age", "created_time", "update_time") 
    VALUES (?, ?, ?, ?)', 
    ['Tom', 23, 
    datetime.datetime(2022, 10, 19, 18, 9, 7, 38935), 
    datetime.datetime(2022, 10, 19, 18, 9, 7, 38940)])

3.5、取多条数据

查询多条记录

# 注意,获取的是 iterator
# 可以转为 namedtuples(), tuples(), dicts()
query = UserModel.select().where(
    UserModel.name == 'Tom'
)
print(list(query))
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    WHERE ("t1"."name" = ?)', 
    ['Tom'])

4、排序分组统计

1、排序

query = UserModel.select().where(
    UserModel.name == 'Tom'
).order_by(UserModel.age.desc())
print(list(query))
# [<UserModel: 1>]
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    WHERE ("t1"."name" = ?) 
    ORDER BY "t1"."age" DESC', 
    ['Tom'])

4.2、分页

query = UserModel.select().paginate(2, 10)
print(list(query))
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    LIMIT ? OFFSET ?', 
    [10, 10])

4.3、统计

query = UserModel.select().count()
print(list(query))
('SELECT COUNT(1) FROM (SELECT 1 FROM "tb_user" AS "t1") AS "_wrapped"', [])

4.4、分组

query = UserModel.select().group_by(UserModel.name)
print(list(query))
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    GROUP BY "t1"."name"', 
    [])

4.5、分组统计

query = UserModel.select(
          UserModel.group_id,
          fn.COUNT().alias('count')
      ).group_by(UserModel.group_id)
print(list(query))
('SELECT "t1"."group_id", COUNT() AS "count" 
  FROM "tb_user" AS "t1" 
  GROUP BY "t1"."group_id"', 
[])


相关文章
|
2月前
|
NoSQL MongoDB 数据库
python3操作MongoDB的crud以及聚合案例,代码可直接运行(python经典编程案例)
这篇文章提供了使用Python操作MongoDB数据库进行CRUD(创建、读取、更新、删除)操作的详细代码示例,以及如何执行聚合查询的案例。
32 6
|
2月前
|
Python
在Python中,文本查找和替换的常用操作
在Python中,文本查找和替换的常用操作,使用字符串方法进行查找和替换,使用正则表达式进行查找和替换,对所查找到的内容进行计数。
30 1
|
2月前
|
Python
Python之peewee|4-22
Python之peewee|4-22
|
3月前
|
存储 Python 容器
python字典的常用操作方法
python字典的常用操作方法
|
3月前
|
索引 Python
python列表的常用操作方法
python列表的常用操作方法
|
3月前
|
Python
python字符串常用操作方法
python字符串常用操作方法
|
3月前
|
Java 缓存 数据库连接
揭秘!Struts 2性能翻倍的秘诀:不可思议的优化技巧大公开
【8月更文挑战第31天】《Struts 2性能优化技巧》介绍了提升Struts 2 Web应用响应速度的关键策略,包括减少配置开销、优化Action处理、合理使用拦截器、精简标签库使用、改进数据访问方式、利用缓存机制以及浏览器与网络层面的优化。通过实施这些技巧,如懒加载配置、异步请求处理、高效数据库连接管理和启用GZIP压缩等,可显著提高应用性能,为用户提供更快的体验。性能优化需根据实际场景持续调整。
72 0
|
3月前
|
JSON API 数据库
探索FastAPI:不仅仅是一个Python Web框架,更是助力开发者高效构建现代化RESTful API服务的神器——从环境搭建到CRUD应用实战全面解析
【8月更文挑战第31天】FastAPI 是一个基于 Python 3.6+ 类型提示标准的现代 Web 框架,以其高性能、易用性和现代化设计而备受青睐。本文通过示例介绍了 FastAPI 的优势及其在构建高效 Web 应用中的强大功能。首先,通过安装 FastAPI 和 Uvicorn 并创建简单的“Hello, World!”应用入门;接着展示了如何处理路径参数和查询参数,并利用类型提示进行数据验证和转换。
94 0
|
6月前
|
存储 数据处理 Python
Python字典的常用操作详解
Python字典的常用操作详解
47 1
|
6月前
|
数据采集 Python
10个Python set 常用操作函数!,bilibili面试题
10个Python set 常用操作函数!,bilibili面试题
10个Python set 常用操作函数!,bilibili面试题