楔子
最近猫哥转载了我在博客园上的一篇文章,是和异步的数据库驱动相关的。对于一个 web 服务而言,性能瓶颈基本都在数据库上面,如果在等待数据库返回数据的时候,能够自动切换并处理其它请求的话,那么并发量会得到显著的提升。但想做到这一点,我们就必须将同步驱动换成异步驱动。
那么异步驱动都有哪些呢?
- aiosqlite:用于连接 SQLite;
- asyncmy、aiomysql:用于连接 MySQL;
- asyncpg、aiopg:用于连接 PostgreSQL;
- cx_Oracle_async:用于连接 Oracle;
- aioredis:用于连接 Redis;
现如今 Python 已经进化到 3.11 了,适配不同数据库的异步驱动也已经非常成熟了。但这里我要介绍的不是这些驱动,而是 ORM。不同的驱动使用起来会有一些差异,而 ORM 提供了一个统一的上层接口,屏蔽了不同驱动之间的差异。
Python 里面最有名的ORM莫过于SQLAlchemy,在早期它是一个同步的 ORM,只能适配一些同步驱动。不过从 1.4 版本的时候引入了协程,支持了异步功能,并且在使用上和之前没有太大区别。下面我们来看一下它的用法,并介绍一些最佳实践。
创建一个异步引擎
SQLAlchemy 不具备连接数据库的能力,它连接数据库还是使用了驱动,所以在使用之前我们必须先下载一个驱动才行。这里我以 MySQL 为例,使用的异步驱动为 asyncmy,直接 pip install asyncmy 安装即可。
""" 使用 create_engine 创建同步引擎 使用 create_async_engine 创建异步引擎 同步引擎搭配同步驱动 异步引擎搭配异步驱动 """ from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.engine import URL # 也可以直接传递一个字符串 # 参数和 create_engine 是一样的 # create_async_engine("mysql+asyncmy://...") engine = create_async_engine( URL.create("mysql+asyncmy", username="root", password="123456", host="82.157.146.194", port=3306, database="mysql") )
以上我们就创建了一个异步引擎,创建方式和同步引擎没什么区别,它们的参数也都是一样的。
既然引擎有了,那么如何用该引擎操作数据库呢?
import asyncio from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy import text engine = create_async_engine( "mysql+asyncmy://root:123456@82.157.146.194/mysql") # 需要定义一个协程函数 async def get_data(): # 引擎内部维护了一个连接池 # engine.connect()会从池子里取出一个连接 async with engine.connect() as conn: # 调用 conn.execute() 执行 SQL 语句 # SQL 语句需要传到 text 方法中 query = text("SELECT * FROM girl") result = await conn.execute(query) # 返回的 result 是一个 CursorResult 对象 # 调用 result.fetchone() 拿到单条数据 data = result.fetchone() print(data) """ (1, '古明地觉', 156) """ # 虽然显示的是一个元组,但它其实是一个 Row 对象 # 我们还可以将它转成字典 print(dict(data)) """ {'id': 1, 'name': '古明地觉', 'height': 156} """ # result 内部有一个游标 # 再调用 result.fetchone() 会返回下一条数据 print(result.fetchone()) print(result.fetchone()) """ (2, '古明地恋', 154) (3, '魔理沙', 154) """ # 库里面总共就 3 条数据 # 所以当没有数据时,就会返回 None print(result.fetchone()) """ None """ if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(get_data())
用法很简单,通过 engine.connect() 可以从池子里面取出一个连接,再调用连接的 execute 方法执行 SQL 语句即可。但需要注意:字符串格式的 SQL 语句不能直接传递,需要先调用 SQLAlchemy 提供的 text 方法。
执行完毕之后,会返回一个 CursorResult 对象,调用它的 fetchone 方法会逐条返回结果集的数据。当然除了 fetchone,还有 fetchmany 和 fetchall,我们来看一下。
async def get_data(): async with engine.connect() as conn: query = text("SELECT * FROM girl") result = await conn.execute(query) # 从结果集取两条数据 data = result.fetchmany(2) print(data) """ [(1, '古明地觉', 156), (2, '古明地恋', 154)] """ # 再取两条数据,但显然此时只剩下一条了 data = result.fetchmany(2) print(data) """ [(3, '魔理沙', 154)] """ # 如果没有数据了,fetchmany 会返回空列表 data = result.fetchmany(1) print(data) """ [] """
所以 fetchmany 接收一个整数,就是获取指定数量的数据。而 fetchall 就简单了,显然它是获取结果集的全部数据。
async def get_data(): async with engine.connect() as conn: query = text("SELECT * FROM girl") result = await conn.execute(query) data = result.fetchall() print(data) """ [(1, '古明地觉', 156), (2, '古明地恋', 154), (3, '魔理沙', 154)] """ # 列表里面的 Row 对象都转成字典 print(list(map(dict, data))) """ [{'id': 1, 'name': '古明地觉', 'height': 156}, {'id': 2, 'name': '古明地恋', 'height': 154}, {'id': 3, 'name': '魔理沙', 'height': 154}] """
还是比较简单的,通过 CursorResult 的这三个方法,便可以获取想要的数据。然后再补充一点,我们说 SQL 语句需要放在 text 方法中,然后才能传给连接的 execute 方法。虽然这个过程稍微有点麻烦,但好处就是我们可以使用 SQLAlchemy 提供的占位符功能。
async def get_data(): async with engine.connect() as conn: # :id 就是一个占位符,那么它等于多少呢? # 再调用 bindparams 指定即可 # 并且占位符的数量没有限制 query = text( "SELECT * FROM girl WHERE id > :id" ).bindparams(id=1) result = await conn.execute(query) data = result.fetchall() # 此时只返回了两条数据 print(list(map(dict, data))) """ [{'id': 2, 'name': '古明地恋', 'height': 154}, {'id': 3, 'name': '魔理沙', 'height': 154}] """
以后执行 SQL 语句的时候,就通过这种方式去执行即可。当然我们这里只介绍了查询,增删改还没有说,下面来看看它在面对增删改时的表现。
执行增删改语句
先来看看添加数据:
import asyncio from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy import Table, MetaData, Column from sqlalchemy.dialects.mysql import INTEGER, VARCHAR engine = create_async_engine( "mysql+asyncmy://root:123456@82.157.146.194/mysql") async def get_data(): # 构建数据库表 table = Table( "girl", # 表名 MetaData(), # MetaData() 实例 # 表里面的列 Column("id", INTEGER, primary_key=True, autoincrement=True), Column("name", VARCHAR), Column("height", INTEGER) ) async with engine.connect() as conn: query = table.insert().values( {"name": "芙兰朵露", "height": 150}) result = await conn.execute(query) # 返回受影响的行数 print(result.rowcount) # 1 # 返回数据在插入之后的主键 print(result.inserted_primary_key) # (4,) # 对于增删改而言,还必须调用一次 commit # 否则数据不会写入到库中 await conn.commit() if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(get_data())
以上是插入单条数据,我们也可以同时插入多条数据。而方法也很简单,插入单条数据是往 values 里面传一个字典,而插入多条数据只需要传一个列表即可。
async def get_data(): # 构建数据库表 table = Table( "girl", MetaData(), Column("id", INTEGER, primary_key=True, autoincrement=True), Column("name", VARCHAR), Column("height", INTEGER) ) async with engine.connect() as conn: query = table.insert().values( [{"name": "琪露诺", "height": 151}, {"name": "十六夜咲夜", "height": 165}]) await conn.execute(query) await conn.commit()
我们看一下数据库,看看数据有没有变化。
数据成功地写入到库中了,然后再来看看修改数据:
async def get_data(): table = Table( "girl", MetaData(), Column("id", INTEGER, primary_key=True, autoincrement=True), Column("name", VARCHAR), Column("height", INTEGER) ) async with engine.connect() as conn: # 修改 id = 1 的 name 字段 query = table.update().where( table.c.id == 1).values({"name": "satori"}) result = await conn.execute(query) print(result.rowcount) # 1 # 少女们都长高了 10 厘米 # 不调用 where,则修改所有行 query = table.update().values( {"height": Column("height") + 10} ) result = await conn.execute(query) # 受影响的行数为 6 print(result.rowcount) # 6 # 别忘了提交 await conn.commit()
看一下表数据有没有变:
数据成功被修改。另外这里的 where 只有单个条件,如果是多个条件,那么彼此之间使用 & 或 | 进行连接,代表 and 和 or。
最后是删除数据:
async def get_data(): table = Table( "girl", MetaData(), Column("id", INTEGER, primary_key=True, autoincrement=True), Column("name", VARCHAR), Column("height", INTEGER) ) async with engine.connect() as conn: # 删除 id = 1 的数据 query = table.delete().where( table.c.id == 1) result = await conn.execute(query) print(result.rowcount) # 1 # 删除 id 为 2、3 的数据 query = table.delete().where( table.c.id.in_([2, 3])) result = await conn.execute(query) print(result.rowcount) # 2 await conn.commit()
那么数据有没有被成功删除呢?
成功将数据删掉了。
异步引擎的性能提升
必须要说明的是,如果只是单次的数据库请求,那么同步引擎和异步引擎之间没什么差异,耗时是差不多的。但如果是多个请求,那么异步引擎可以实现并发访问,我们举个例子。这里为了更好地观察到现象,我往表里写了 100w 条数据。
async def get_data(): async with engine.connect() as conn: query = text("SELECT * FROM girl") await conn.execute(query) async def main(): start = time.perf_counter() await get_data() end = time.perf_counter() print(f"单次请求耗时: {end - start}s") """ 单次请求耗时: 26.8164807s """ start = time.perf_counter() await asyncio.gather(*[get_data()] * 20) end = time.perf_counter() print(f"二十次请求耗时: {end - start}s") """ 二十次请求耗时: 27.2821891s """ start = time.perf_counter() await asyncio.gather(*[get_data()] * 50) end = time.perf_counter() print(f"五十次请求耗时: {end - start}s") """ 五十次请求耗时: 27.480469s """ if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main())
可以看到耗时是差不多的,如果你写了一个服务,请求过来的时候需要从数据库读数据(假设耗时 2s),然后返回。那么无论是来一个请求,还是同时来十个请求,耗时都是差不多的,大概 2s。可能同时处理十个请求的耗时会多一些,但不会多太多,因为请求数据库这个过程是并发进行的。
当然啦,并发处理的请求数肯定也是有上限的,不可能无限大,因为数据库连接池内部的连接数量是有限的。所以任何一个由多个组件构成的系统,随着并发数的提高,总会出现瓶颈。可能一开始的瓶颈是服务访问数据库的连接数量不够,但随着连接的增多,瓶颈又会转移到数据库上。这个时候可以搭建一个 MySQL 集群,以及引入 Redis 缓存,进一步提升并发量。
所以服务到底选择什么样的架构,取决于你的业务量,随着业务量的增大,一开始行之有效的架构设计就会变得力不从心,总会在某个地方出现瓶颈。我们只能根据实际情况进行调整,使得服务的处理能力尽可能地延展下去。
引擎的反射
在使用同步引擎的时候,我们应该都用过它的反射功能,举个例子。
from pprint import pprint from sqlalchemy import create_engine from sqlalchemy import inspect # 此处为同步引擎 engine = create_engine( "mysql+pymysql://root:123456@82.157.146.194/mysql") inspector = inspect(engine) # 返回当前数据库下都有哪些表 pprint(inspector.get_table_names()) """ ['columns_priv', 'component', 'db', 'default_roles', ...... """ # 返回默认的数据库 pprint(inspector.default_schema_name) """ 'mysql' """ # 返回所有的数据库 # 如果是 PostgreSQL,则返回 schema pprint(inspector.get_schema_names()) """ ['information_schema', 'mysql', 'performance_schema', 'sys'] """ # 返回当前数据库下都有哪些视图 pprint(inspector.get_view_names()) """ [] """ # 查看一张表都有哪些列 # 里面包含了列名、类型、默认值、注释等信息 pprint(inspector.get_columns("girl")) """ [{'autoincrement': True, 'comment': None, 'default': None, 'name': 'id', 'nullable': False, 'type': INTEGER()}, {'comment': None, 'default': None, 'name': 'name', 'nullable': True, 'type': VARCHAR(length=255)}, {'autoincrement': False, 'comment': None, 'default': None, 'name': 'height', 'nullable': True, 'type': INTEGER()}] """ # 返回一张表的主键约束 pprint(inspector.get_pk_constraint("girl")) # 返回一张表的所有外键 pprint(inspector.get_foreign_keys("girl")) # 返回一张表的索引 pprint(inspector.get_indexes("girl")) # 返回一张表的唯一性约束 pprint(inspector.get_unique_constraints("girl")) # 返回一张表的注释 pprint(inspector.get_table_comment("girl"))
通过反射引擎,我们可以拿到很多的元信息。当然,也能将一张表反射出来。但这是同步引擎才具有的功能,异步引擎目前还不支持反射。
当然这些信息本质上也是执行了相关查询才获取到的,我们也可以使用异步引擎手动执行,比如查看表字段信息:
async def main(): async with engine.connect() as conn: query = text("SELECT COLUMN_NAME, DATA_TYPE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_NAME='girl'") data = (await conn.execute(query)).fetchall() print(list(map(dict, data))) """ [{'COLUMN_NAME': 'height', 'DATA_TYPE': 'int'}, {'COLUMN_NAME': 'id', 'DATA_TYPE': 'int'}, {'COLUMN_NAME': 'name', 'DATA_TYPE': 'varchar'}] """
其它的一些元信息也可以通过查询的方式获取。
小结
以上就是 SQLAlchemy + 协程相关的内容,这篇文章算是对猫哥那篇文章的一个补充。如果你使用的是 FastAPI、Sanic 之类的框架,那么也应该要搭配一个异步的 ORM 才能发挥出威力。
最后特别感谢《Python 猫》的号主猫哥,在我创建公众号之后给了我莫大的帮助,正是他第一次转载文章帮我引流,才让我有了继续写下去的动力。这里反向推荐一波。