FastAPI 结合 SQLAlchemy 操作 MySQL 数据库

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: FastAPI 结合 SQLAlchemy 操作 MySQL 数据库

文章目录


1. 安装 SQLAlchemy

2. 创建数据库

3. SQLAlchemy 连接 MySQL

4. 创建数据模型

5. 创建 Pydantic 模型

6. crud 工具

7. main函数

learning from 《python web开发从入门到精通》


1. 安装 SQLAlchemy


pip install sqlalchemy


2. 创建数据库


mysql -u root -p 命令行登录 MySQL

创建数据库 fastapiimage.png_db

mysql> create database fastapi_db default charset utf8mb4 collate utf8mb4_unicode_ci;
Query OK, 1 row affected (0.04 sec)

3. SQLAlchemy 连接 MySQL


  • database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# 数据库连接配置
SQLALCHEMY_DATABASE_URI = (
    "mysql+pymysql://root:123456@localhost/fastapi_db?charset=utf8mb4"
    #                用户:密码@服务器/数据库?参数
)
# 创建数据库引擎
engine = create_engine(SQLALCHEMY_DATABASE_URI)
# 创建数据库会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 声明基类
Base = declarative_base()


4. 创建数据模型


  • models.py
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
# 定义 User 类
class User(Base):
    __tablename__ = 'users'  # 定义表名
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(255), unique=True, index=True)
    hashed_password = Column(String(255))
    is_active = Column(Boolean, default=True)
    items = relationship("Item", back_populates="owner")
    # 关联 Item 表
# 定义 Item 类
class Item(Base):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(255), index=True)
    description = Column(String(255), index=True)
    owner_id = Column(Integer, ForeignKey('users.id'))
    owner = relationship("User", back_populates="items")
    # 关联 User 表

relationship 还不懂,有待学习 SQLAlchemy


5. 创建 Pydantic 模型


  • schemas.py
from typing import List
from pydantic import BaseModel
class ItemBase(BaseModel):
    title: str
    description: str = None
class ItemCreate(ItemBase):
    pass
class Item(ItemBase):
    id: int
    owner_id: int
    class Config:
        orm_mode = True
class UserBase(BaseModel):
    email: str
class UserCreate(UserBase):
    password: str
class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []
    class Config:
        orm_mode = True


6. crud 工具


  • crud.py
from sqlalchemy.orm import Session
from . import models, schemas
def get_user(db: Session, user_id: int):
    """
    根据id获取用户信息
    :param db: 数据库会话
    :param user_id: 用户id
    :return: 用户信息
    """
    return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
    """
    根据email获取用户信息
    :param db: 数据库会话
    :param email: 用户email
    :return: 用户信息
    """
    return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
    """
    获取特定数量的用户
    :param db: 数据库会话
    :param skip: 开始位置
    :param limit: 限制数量
    :return: 用户信息列表
    """
    return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
    """
    创建用户
    :param db: 数据库会话
    :param user: 用户模型
    :return: 根据email和password登录的用户信息
    """
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)      # 添加到会话
    db.commit()          # 提交到数据库
    db.refresh(db_user)  # 刷新数据库
    return db_user
def get_items(db: Session, skip: int = 0, limit: int = 100):
    """
    获取指定数量的item
    :param db: 数据库会话
    :param skip: 开始位置
    :param limit: 限制数量
    :return: item列表
    """
    return db.query(models.Item).offset(skip).limit(limit).all()
def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    """
    创建用户item
    :param db: 数据库会话
    :param item: Item对象
    :param user_id: 用户id
    :return: Item模型对象
    """
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item


7. main函数

from typing import List
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# 依赖
def get_db():
    try:
        db = SessionLocal()
        yield db
    finally:
        db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    # 根据email查找用户
    db_user = crud.get_user_by_email(db, email=user.email)
    # 如果用户存在,提示该邮箱已经被注册
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    # 返回创建的user对象
    return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    # 读取指定数量用户
    users = crud.get_users(db, skip=skip, limit=limit)
    return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    # 获取当前id的用户信息
    db_user = crud.get_user(db, user_id=user_id)
    # 如果没有信息,提示用户不存在
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    # 创建该用户的items
    return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    # 获取所有items
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

image.png

(pt19) D:\web_python_dev>uvicorn fastapi_mysql.main:app --reload
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [6988] using watchgod
INFO:     Started server process [2112]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
mysql> use fastapi_db
Database changed
mysql> show tables;
+----------------------+
| Tables_in_fastapi_db |
+----------------------+
| items                |
| users                |
+----------------------+
2 rows in set (0.00 sec)

image.png

mysql> select * from users;
+----+----------------+---------------------+-----------+
| id | email          | hashed_password     | is_active |
+----+----------------+---------------------+-----------+
|  1 | michael@xx.com | abcdnotreallyhashed |         1 |
+----+----------------+---------------------+-----------+
1 row in set (0.00 sec)
相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
2天前
|
关系型数据库 MySQL Java
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
|
8天前
|
关系型数据库 MySQL 数据库连接
docker拉取MySQL后数据库连接失败解决方案
通过以上方法,可以解决Docker中拉取MySQL镜像后数据库连接失败的常见问题。关键步骤包括确保容器正确启动、配置正确的环境变量、合理设置网络和权限,以及检查主机防火墙设置等。通过逐步排查,可以快速定位并解决连接问题,确保MySQL服务的正常使用。
117 82
|
2月前
|
关系型数据库 MySQL 数据库连接
数据库连接工具连接mysql提示:“Host ‘172.23.0.1‘ is not allowed to connect to this MySQL server“
docker-compose部署mysql8服务后,连接时提示不允许连接问题解决
|
10天前
|
消息中间件 缓存 NoSQL
缓存与数据库的一致性方案,Redis与Mysql一致性方案,大厂P8的终极方案(图解+秒懂+史上最全)
缓存与数据库的一致性方案,Redis与Mysql一致性方案,大厂P8的终极方案(图解+秒懂+史上最全)
|
1月前
|
关系型数据库 MySQL 数据库
Docker Compose V2 安装常用数据库MySQL+Mongo
以上内容涵盖了使用 Docker Compose 安装和管理 MySQL 和 MongoDB 的详细步骤,希望对您有所帮助。
207 42
|
16天前
|
SQL 关系型数据库 MySQL
MySQL生产环境迁移至YashanDB数据库深度体验
这篇文章是作者将 MySQL 生产环境迁移至 YashanDB 数据库的深度体验。介绍了 YashanDB 迁移平台 YMP 的产品相关信息、安装步骤、迁移中遇到的各种兼容问题及解决方案,最后总结了迁移体验,包括工具部署和操作特点,也指出功能有优化空间及暂不支持的部分,期待其不断优化。
|
1月前
|
关系型数据库 MySQL 网络安全
如何排查和解决PHP连接数据库MYSQL失败写锁的问题
通过本文的介绍,您可以系统地了解如何排查和解决PHP连接MySQL数据库失败及写锁问题。通过检查配置、确保服务启动、调整防火墙设置和用户权限,以及识别和解决长时间运行的事务和死锁问题,可以有效地保障应用的稳定运行。
158 25
|
27天前
|
监控 关系型数据库 MySQL
云数据库:从零到一,构建高可用MySQL集群
在互联网时代,数据成为企业核心资产,传统单机数据库难以满足高并发、高可用需求。云数据库通过弹性扩展、分布式架构等优势解决了这些问题,但也面临数据安全和性能优化挑战。本文介绍了如何从零开始构建高可用MySQL集群,涵盖选择云服务提供商、创建实例、配置高可用架构、数据备份恢复及性能优化等内容,并通过电商平台案例展示了具体应用。
|
5天前
|
存储 SQL 关系型数据库
从 MySQL 到时序数据库 TDengine:Zendure 如何实现高效储能数据管理?
TDengine 助力广州疆海科技有限公司高效完成储能业务的数据分析任务,轻松应对海量功率、电能及输入输出数据的实时统计与分析,并以接近 1 : 20 的数据文件压缩率大幅降低存储成本。此外,taosX 强大的 transform 功能帮助用户完成原始数据的清洗和结构优化,而其零代码迁移能力更实现了历史数据从 TDengine OSS 与 MySQL 到 TDengine 企业版的平滑迁移,全面提升了企业的数据管理效率。本文将详细解读这一实践案例。
22 0
|
1月前
|
SQL 关系型数据库 MySQL
数据库数据恢复——MySQL简介和数据恢复案例
MySQL数据库数据恢复环境&故障: 本地服务器,安装的windows server操作系统。 操作系统上部署MySQL单实例,引擎类型为innodb,表空间类型为独立表空间。该MySQL数据库没有备份,未开启binlog。 人为误操作,在用Delete命令删除数据时未添加where子句进行筛选导致全表数据被删除,删除后未对该表进行任何操作。

热门文章

最新文章