SQLAlchemy + 协程,实现异步的 ORM

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: SQLAlchemy + 协程,实现异步的 ORM


楔子



最近猫哥转载了我在博客园上的一篇文章,是和异步的数据库驱动相关的。对于一个 web 服务而言,性能瓶颈基本都在数据库上面,如果在等待数据库返回数据的时候,能够自动切换并处理其它请求的话,那么并发量会得到显著的提升。但想做到这一点,我们就必须将同步驱动换成异步驱动。

那么异步驱动都有哪些呢?

  • aiosqlite:用于连接 SQLite;
  • asyncmy、aiomysql:用于连接 MySQL;
  • asyncpg、aiopg:用于连接 PostgreSQL;
  • cx_Oracle_async:用于连接 Oracle;
  • aioredis:用于连接 Redis;


现如今 Python 已经进化到 3.11 了,适配不同数据库的异步驱动也已经非常成熟了。但这里我要介绍的不是这些驱动,而是 ORM。不同的驱动使用起来会有一些差异,而 ORM 提供了一个统一的上层接口,屏蔽了不同驱动之间的差异。

Python 里面最有名的ORM莫过于SQLAlchemy,在早期它是一个同步的 ORM,只能适配一些同步驱动。不过从 1.4 版本的时候引入了协程,支持了异步功能,并且在使用上和之前没有太大区别。下面我们来看一下它的用法,并介绍一些最佳实践。


创建一个异步引擎



SQLAlchemy 不具备连接数据库的能力,它连接数据库还是使用了驱动,所以在使用之前我们必须先下载一个驱动才行。这里我以 MySQL 为例,使用的异步驱动为 asyncmy,直接 pip install asyncmy 安装即可。

"""
使用 create_engine 创建同步引擎
使用 create_async_engine 创建异步引擎
同步引擎搭配同步驱动
异步引擎搭配异步驱动
"""
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.engine import URL
# 也可以直接传递一个字符串
# 参数和 create_engine 是一样的
# create_async_engine("mysql+asyncmy://...")
engine = create_async_engine(
    URL.create("mysql+asyncmy",
               username="root",
               password="123456",
               host="82.157.146.194",
               port=3306,
               database="mysql")
)

以上我们就创建了一个异步引擎,创建方式和同步引擎没什么区别,它们的参数也都是一样的。

既然引擎有了,那么如何用该引擎操作数据库呢?

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import text
engine = create_async_engine(
    "mysql+asyncmy://root:123456@82.157.146.194/mysql")
# 需要定义一个协程函数
async def get_data():
    # 引擎内部维护了一个连接池
    # engine.connect()会从池子里取出一个连接
    async with engine.connect() as conn:
        # 调用 conn.execute() 执行 SQL 语句
        # SQL 语句需要传到 text 方法中
        query = text("SELECT * FROM girl")
        result = await conn.execute(query)
    # 返回的 result 是一个 CursorResult 对象
    # 调用 result.fetchone() 拿到单条数据
    data = result.fetchone()
    print(data)
    """
    (1, '古明地觉', 156)
    """
    # 虽然显示的是一个元组,但它其实是一个 Row 对象
    # 我们还可以将它转成字典
    print(dict(data))
    """
    {'id': 1, 'name': '古明地觉', 'height': 156}
    """
    # result 内部有一个游标
    # 再调用 result.fetchone() 会返回下一条数据
    print(result.fetchone())
    print(result.fetchone())
    """
    (2, '古明地恋', 154)
    (3, '魔理沙', 154)
    """
    # 库里面总共就 3 条数据
    # 所以当没有数据时,就会返回 None
    print(result.fetchone())
    """
    None
    """
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(get_data())

用法很简单,通过 engine.connect() 可以从池子里面取出一个连接,再调用连接的 execute 方法执行 SQL 语句即可。但需要注意:字符串格式的 SQL 语句不能直接传递,需要先调用 SQLAlchemy 提供的 text 方法。

执行完毕之后,会返回一个 CursorResult 对象,调用它的 fetchone 方法会逐条结果集的数据。当然除了 fetchone,还有 fetchmany 和 fetchall,我们来看一下。

async def get_data():
    async with engine.connect() as conn:
        query = text("SELECT * FROM girl")
        result = await conn.execute(query)
    # 从结果集取两条数据
    data = result.fetchmany(2)
    print(data)
    """
    [(1, '古明地觉', 156), 
     (2, '古明地恋', 154)]
    """
    # 再取两条数据,但显然此时只剩下一条了
    data = result.fetchmany(2)
    print(data)
    """
    [(3, '魔理沙', 154)]
    """
    # 如果没有数据了,fetchmany 会返回空列表
    data = result.fetchmany(1)
    print(data)
    """
    []
    """

所以 fetchmany 接收一个整数,就是获取指定数量的数据。而 fetchall 就简单了,显然它是获取结果集的全部数据。

async def get_data():
    async with engine.connect() as conn:
        query = text("SELECT * FROM girl")
        result = await conn.execute(query)
    data = result.fetchall()
    print(data)
    """
    [(1, '古明地觉', 156), 
     (2, '古明地恋', 154), 
     (3, '魔理沙', 154)]
    """
    # 列表里面的 Row 对象都转成字典
    print(list(map(dict, data)))
    """
    [{'id': 1, 'name': '古明地觉', 'height': 156}, 
     {'id': 2, 'name': '古明地恋', 'height': 154}, 
     {'id': 3, 'name': '魔理沙', 'height': 154}]
    """

还是比较简单的,通过 CursorResult 的这三个方法,便可以获取想要的数据。然后再补充一点,我们说 SQL 语句需要放在 text 方法中,然后才能传给连接的 execute 方法。虽然这个过程稍微有点麻烦,但好处就是我们可以使用 SQLAlchemy 提供的占位符功能。

async def get_data():
    async with engine.connect() as conn:
        # :id 就是一个占位符,那么它等于多少呢?
        # 再调用 bindparams 指定即可
        # 并且占位符的数量没有限制
        query = text(
            "SELECT * FROM girl WHERE id > :id"
        ).bindparams(id=1)
        result = await conn.execute(query)
    data = result.fetchall()
    # 此时只返回了两条数据
    print(list(map(dict, data)))
    """
    [{'id': 2, 'name': '古明地恋', 'height': 154}, 
     {'id': 3, 'name': '魔理沙', 'height': 154}]
    """

以后执行 SQL 语句的时候,就通过这种方式去执行即可。当然我们这里只介绍了查询,增删改还没有说,下面来看看它在面对增删改时的表现。


执行增删改语句



先来看看添加数据:

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import Table, MetaData, Column
from sqlalchemy.dialects.mysql import INTEGER, VARCHAR
engine = create_async_engine(
    "mysql+asyncmy://root:123456@82.157.146.194/mysql")
async def get_data():
    # 构建数据库表
    table = Table(
        "girl",  # 表名
        MetaData(),  # MetaData() 实例
        # 表里面的列
        Column("id", INTEGER, primary_key=True,
               autoincrement=True),
        Column("name", VARCHAR),
        Column("height", INTEGER)
    )
    async with engine.connect() as conn:
        query = table.insert().values(
            {"name": "芙兰朵露", "height": 150})
        result = await conn.execute(query)
        # 返回受影响的行数
        print(result.rowcount)  # 1
        # 返回数据在插入之后的主键
        print(result.inserted_primary_key)  # (4,)
        # 对于增删改而言,还必须调用一次 commit
        # 否则数据不会写入到库中
        await conn.commit()
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(get_data())

以上是插入单条数据,我们也可以同时插入多条数据。而方法也很简单,插入单条数据是往 values 里面传一个字典,而插入多条数据只需要传一个列表即可。

async def get_data():
    # 构建数据库表
    table = Table(
        "girl", MetaData(),
        Column("id", INTEGER, primary_key=True,
               autoincrement=True),
        Column("name", VARCHAR),
        Column("height", INTEGER)
    )
    async with engine.connect() as conn:
        query = table.insert().values(
            [{"name": "琪露诺", "height": 151},
             {"name": "十六夜咲夜", "height": 165}])
        await conn.execute(query)
        await conn.commit()

我们看一下数据库,看看数据有没有变化。

数据成功地写入到库中了,然后再来看看修改数据:

async def get_data():
    table = Table(
        "girl", MetaData(),
        Column("id", INTEGER, primary_key=True,
               autoincrement=True),
        Column("name", VARCHAR),
        Column("height", INTEGER)
    )
    async with engine.connect() as conn:
        # 修改 id = 1 的 name 字段
        query = table.update().where(
            table.c.id == 1).values({"name": "satori"})
        result = await conn.execute(query)
        print(result.rowcount)  # 1
        # 少女们都长高了 10 厘米
        # 不调用 where,则修改所有行
        query = table.update().values(
            {"height": Column("height") + 10}
        )
        result = await conn.execute(query)
        # 受影响的行数为 6
        print(result.rowcount)  # 6
        # 别忘了提交
        await conn.commit()

看一下表数据有没有变:

数据成功被修改。另外这里的 where 只有单个条件,如果是多个条件,那么彼此之间使用 & 或 | 进行连接,代表 and 和 or。

最后是删除数据:

async def get_data():
    table = Table(
        "girl", MetaData(),
        Column("id", INTEGER, primary_key=True,
               autoincrement=True),
        Column("name", VARCHAR),
        Column("height", INTEGER)
    )
    async with engine.connect() as conn:
        # 删除 id = 1 的数据
        query = table.delete().where(
            table.c.id == 1)
        result = await conn.execute(query)
        print(result.rowcount)  # 1
        # 删除 id 为 2、3 的数据
        query = table.delete().where(
            table.c.id.in_([2, 3]))
        result = await conn.execute(query)
        print(result.rowcount)  # 2
        await conn.commit()

那么数据有没有被成功删除呢?

成功将数据删掉了。


异步引擎的性能提升



必须要说明的是,如果只是单次的数据库请求,那么同步引擎和异步引擎之间没什么差异,耗时是差不多的。但如果是多个请求,那么异步引擎可以实现并发访问,我们举个例子。这里为了更好地观察到现象,我往表里写了 100w 条数据。

async def get_data():
    async with engine.connect() as conn:
        query = text("SELECT * FROM girl")
        await conn.execute(query)
async def main():
    start = time.perf_counter()
    await get_data()
    end = time.perf_counter()
    print(f"单次请求耗时: {end - start}s")
    """
    单次请求耗时: 26.8164807s
    """
    start = time.perf_counter()
    await asyncio.gather(*[get_data()] * 20)
    end = time.perf_counter()
    print(f"二十次请求耗时: {end - start}s")
    """
    二十次请求耗时: 27.2821891s
    """
    start = time.perf_counter()
    await asyncio.gather(*[get_data()] * 50)
    end = time.perf_counter()
    print(f"五十次请求耗时: {end - start}s")
    """
    五十次请求耗时: 27.480469s
    """
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

可以看到耗时是差不多的,如果你写了一个服务,请求过来的时候需要从数据库读数据(假设耗时 2s),然后返回。那么无论是来一个请求,还是同时来十个请求,耗时都是差不多的,大概 2s。可能同时处理十个请求的耗时会多一些,但不会多太多,因为请求数据库这个过程是并发进行的。

当然啦,并发处理的请求数肯定也是有上限的,不可能无限大,因为数据库连接池内部的连接数量是有限的。所以任何一个由多个组件构成的系统,随着并发数的提高,总会出现瓶颈。可能一开始的瓶颈服务访问数据库的连接数量不够,但随着连接的增多瓶颈又会转移到数据库上。这个时候可以搭建一个 MySQL 集群,以及引入 Redis 缓存,进一步提升并发量。

所以服务到底选择什么样的架构,取决于你的业务量,随着业务量的增大,一开始行之有效的架构设计就会变得力不从心,总会在某个地方出现瓶颈。我们只能根据实际情况进行调整,使得服务的处理能力尽可能地延展下去。


引擎的反射



在使用同步引擎的时候,我们应该都用过它的反射功能,举个例子。

from pprint import pprint
from sqlalchemy import create_engine
from sqlalchemy import inspect
# 此处为同步引擎
engine = create_engine(
    "mysql+pymysql://root:123456@82.157.146.194/mysql")
inspector = inspect(engine)
# 返回当前数据库下都有哪些表
pprint(inspector.get_table_names())
"""
['columns_priv',
 'component',
 'db',
 'default_roles',
 ......
"""
# 返回默认的数据库
pprint(inspector.default_schema_name)
"""
'mysql'
"""
# 返回所有的数据库
# 如果是 PostgreSQL,则返回 schema
pprint(inspector.get_schema_names())
"""
['information_schema', 'mysql', 
 'performance_schema', 'sys']
"""
# 返回当前数据库下都有哪些视图
pprint(inspector.get_view_names())
"""
[]
"""
# 查看一张表都有哪些列
# 里面包含了列名、类型、默认值、注释等信息
pprint(inspector.get_columns("girl"))
"""
[{'autoincrement': True,
  'comment': None,
  'default': None,
  'name': 'id',
  'nullable': False,
  'type': INTEGER()},
 {'comment': None,
  'default': None,
  'name': 'name',
  'nullable': True,
  'type': VARCHAR(length=255)},
 {'autoincrement': False,
  'comment': None,
  'default': None,
  'name': 'height',
  'nullable': True,
  'type': INTEGER()}]
"""
# 返回一张表的主键约束
pprint(inspector.get_pk_constraint("girl"))
# 返回一张表的所有外键
pprint(inspector.get_foreign_keys("girl"))
# 返回一张表的索引
pprint(inspector.get_indexes("girl"))
# 返回一张表的唯一性约束
pprint(inspector.get_unique_constraints("girl"))
# 返回一张表的注释
pprint(inspector.get_table_comment("girl"))

通过反射引擎,我们可以拿到很多的元信息。当然,也能将一张表反射出来。但这是同步引擎才具有的功能,异步引擎目前还不支持反射。

当然这些信息本质上也是执行了相关查询才获取到的,我们也可以使用异步引擎手动执行,比如查看表字段信息:

async def main():
    async with engine.connect() as conn:
        query = text("SELECT COLUMN_NAME, DATA_TYPE "
                     "FROM INFORMATION_SCHEMA.COLUMNS "
                     "WHERE TABLE_NAME='girl'")
        data = (await conn.execute(query)).fetchall()
        print(list(map(dict, data)))
        """
        [{'COLUMN_NAME': 'height', 'DATA_TYPE': 'int'}, 
         {'COLUMN_NAME': 'id', 'DATA_TYPE': 'int'}, 
         {'COLUMN_NAME': 'name', 'DATA_TYPE': 'varchar'}]
        """

其它的一些元信息也可以通过查询的方式获取。


小结



以上就是 SQLAlchemy + 协程相关的内容,这篇文章算是对猫哥那篇文章的一个补充。如果你使用的是 FastAPI、Sanic 之类的框架,那么也应该要搭配一个异步的 ORM 才能发挥出威力。

最后特别感谢《Python 猫》的号主猫哥,在我创建公众号之后给了我莫大的帮助,正是他第一次转载文章帮我引流,才让我有了继续写下去的动力。这里反向推荐一波。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
前端开发 Java API
vertx学习总结5之回调函数及其限制,如网关/边缘服务示例所示未来和承诺——链接异步操作的简单模型响应式扩展——一个更强大的模型,特别适合组合异步事件流Kotlin协程
本文是Vert.x学习系列的第五部分,讨论了回调函数的限制、Future和Promise在异步操作中的应用、响应式扩展以及Kotlin协程,并通过示例代码展示了如何在Vert.x中使用这些异步编程模式。
46 5
vertx学习总结5之回调函数及其限制,如网关/边缘服务示例所示未来和承诺——链接异步操作的简单模型响应式扩展——一个更强大的模型,特别适合组合异步事件流Kotlin协程
|
13天前
|
NoSQL 关系型数据库 MySQL
python协程+异步总结!
本文介绍了Python中的协程、asyncio模块以及异步编程的相关知识。首先解释了协程的概念和实现方法,包括greenlet、yield关键字、asyncio装饰器和async/await关键字。接着详细讲解了协程的意义和应用场景,如提高IO密集型任务的性能。文章还介绍了事件循环、Task对象、Future对象等核心概念,并提供了多个实战案例,包括异步Redis、MySQL操作、FastAPI框架和异步爬虫。最后提到了uvloop作为asyncio的高性能替代方案。通过这些内容,读者可以全面了解和掌握Python中的异步编程技术。
33 0
|
1月前
|
数据处理 Python
深入探索:Python中的并发编程新纪元——协程与异步函数解析
深入探索:Python中的并发编程新纪元——协程与异步函数解析
26 3
|
2月前
|
数据库 开发者 Python
实战指南:用Python协程与异步函数优化高性能Web应用
在快速发展的Web开发领域,高性能与高效响应是衡量应用质量的重要标准。随着Python在Web开发中的广泛应用,如何利用Python的协程(Coroutine)与异步函数(Async Functions)特性来优化Web应用的性能,成为了许多开发者关注的焦点。本文将从实战角度出发,通过具体案例展示如何运用这些技术来提升Web应用的响应速度和吞吐量。
28 1
|
2月前
|
调度 Python
揭秘Python并发编程核心:深入理解协程与异步函数的工作原理
在Python异步编程领域,协程与异步函数成为处理并发任务的关键工具。协程(微线程)比操作系统线程更轻量级,通过`async def`定义并在遇到`await`表达式时暂停执行。异步函数利用`await`实现任务间的切换。事件循环作为异步编程的核心,负责调度任务;`asyncio`库提供了事件循环的管理。Future对象则优雅地处理异步结果。掌握这些概念,可使代码更高效、简洁且易于维护。
24 1
|
1月前
|
数据采集 调度 Python
Python编程异步爬虫——协程的基本原理(一)
Python编程异步爬虫——协程的基本原理(一)
|
1月前
|
数据采集 Python
Python编程异步爬虫——协程的基本原理(二)
Python编程异步爬虫——协程的基本原理(二)
|
1月前
|
Python
从零到一:构建Python异步编程思维,掌握协程与异步函数
从零到一:构建Python异步编程思维,掌握协程与异步函数
22 0
|
2月前
|
开发者 Kotlin
揭秘Kotlin协程:如何在异步风暴中稳握错误处理之舵?
【9月更文挑战第12天】本文深入探讨了Kotlin协程框架下的错误处理机制,通过实例分析展示了如何利用`CoroutineExceptionHandler`进行结构化异常处理。文章详细介绍了全局与局部异常处理器的使用方法,并展示了如何在挂起函数中使用`try`表达式优雅地处理异常,以提高程序的健壮性和可维护性。
36 4
|
2月前
|
数据采集
爬虫之协程异步 asyncio和aiohttp
爬虫之协程异步 asyncio和aiohttp