python3操作aiomysql的几个案例

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 这篇文章介绍了如何使用aiomysql库在Python中进行异步MySQL数据库操作,包括查询、连接池使用、多条插入、事务操作和SQLAlchemy的异步支持。

官方文档:https://aiomysql.readthedocs.io/

github:https://github.com/aio-libs/aiomysql

查询案例

import asyncio
import aiomysql


async def test_example(loop):
    conn = await aiomysql.connect(host='127.0.0.1', port=3306,
                                  user='root', password='', db='mysql',
                                  loop=loop)

    async with conn.cursor() as cur:
        await cur.execute("SELECT Host,User FROM user")
        print(cur.description)
        r = await cur.fetchall()
        print(r)
    conn.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(test_example(loop))

连接池

import asyncio
import aiomysql


async def test_example(loop):
    pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
                                      user='root', password='',
                                      db='mysql', loop=loop)
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 42;")
            print(cur.description)
            (r,) = await cur.fetchone()
            assert r == 42
    pool.close()
    await pool.wait_closed()


loop = asyncio.get_event_loop()
loop.run_until_complete(test_example(loop))

插入多条

import asyncio
import aiomysql


async def test_example_executemany(loop):
    conn = await aiomysql.connect(host='127.0.0.1', port=3306,
                                       user='root', password='',
                                       db='test_pymysql', loop=loop)

    cur = await conn.cursor()
    async with conn.cursor() as cur:
        await cur.execute("DROP TABLE IF EXISTS music_style;")
        await cur.execute("""CREATE TABLE music_style
                                  (id INT,
                                  name VARCHAR(255),
                                  PRIMARY KEY (id));""")
        await conn.commit()

        # insert 3 rows one by one
        await cur.execute("INSERT INTO music_style VALUES(1,'heavy metal')")
        await cur.execute("INSERT INTO music_style VALUES(2,'death metal');")
        await cur.execute("INSERT INTO music_style VALUES(3,'power metal');")
        await conn.commit()

        # insert 3 row by one long query using *executemany* method
        data = [(4, 'gothic metal'), (5, 'doom metal'), (6, 'post metal')]
        await cur.executemany(
            "INSERT INTO music_style (id, name)"
            "values (%s,%s)", data)
        await conn.commit()

        # fetch all insert row from table music_style
        await cur.execute("SELECT * FROM music_style;")
        result = await cur.fetchall()
        print(result)

    conn.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(test_example_executemany(loop))

事务操作

import asyncio
import aiomysql


async def test_example_transaction(loop):
    conn = await aiomysql.connect(host='127.0.0.1', port=3306,
                                  user='root', password='',
                                  db='test_pymysql', autocommit=False,
                                  loop=loop)

    async with conn.cursor() as cursor:
        stmt_drop = "DROP TABLE IF EXISTS names"
        await cursor.execute(stmt_drop)
        await cursor.execute("""
            CREATE TABLE names (
            id TINYINT UNSIGNED NOT NULL AUTO_INCREMENT,
            name VARCHAR(30) DEFAULT '' NOT NULL,
            cnt TINYINT UNSIGNED DEFAULT 0,
            PRIMARY KEY (id))""")
        await conn.commit()

        # Insert 3 records
        names = (('Geert',), ('Jan',), ('Michel',))
        stmt_insert = "INSERT INTO names (name) VALUES (%s)"
        await cursor.executemany(stmt_insert, names)

        # Roll back!!!!
        await conn.rollback()

        # There should be no data!
        stmt_select = "SELECT id, name FROM names ORDER BY id"
        await cursor.execute(stmt_select)
        resp = await cursor.fetchall()
        # Check there is no data
        assert not resp

        # Do the insert again.
        await cursor.executemany(stmt_insert, names)

        # Data should be already there
        await cursor.execute(stmt_select)
        resp = await cursor.fetchall()
        print(resp)
        # Do a commit
        await conn.commit()

        await cursor.execute(stmt_select)
        print(resp)

        # Cleaning up, dropping the table again
        await cursor.execute(stmt_drop)
        await cursor.close()
        conn.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(test_example_transaction(loop))

sa操作

import asyncio
import sqlalchemy as sa

from aiomysql.sa import create_engine


metadata = sa.MetaData()

tbl = sa.Table('tbl', metadata,
               sa.Column('id', sa.Integer, primary_key=True),
               sa.Column('val', sa.String(255)))


async def create_table(engine):
    async with engine.acquire() as conn:
        await conn.execute('DROP TABLE IF EXISTS tbl')
        await conn.execute('''CREATE TABLE tbl (
                              id serial PRIMARY KEY,
                              val varchar(255))''')


async def go(loop):
    engine = await create_engine(user='root', db='test_pymysql',
                                 host='127.0.0.1', password='', loop=loop)
    await create_table(engine)
    async with engine.acquire() as conn:
        await conn.execute(tbl.insert().values(val='abc'))
        await conn.execute(tbl.insert().values(val='xyz'))

        async for row in conn.execute(tbl.select()):
            print(row.id, row.val)

        await conn.execute("commit")

    engine.close()
    await engine.wait_closed()


loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
3月前
|
iOS开发 MacOS Python
Python 编程案例:谁没交论文?输出并生成电子表格
Python 编程案例:谁没交论文?输出并生成电子表格
29 9
|
3月前
|
数据采集 前端开发 NoSQL
Python编程异步爬虫实战案例
Python编程异步爬虫实战案例
84 2
|
3月前
|
数据采集 自然语言处理 API
Python反爬案例——验证码的识别
Python反爬案例——验证码的识别
53 2
|
3月前
|
iOS开发 MacOS Python
Python编程小案例—利用flask查询本机IP归属并输出网页图片
Python编程小案例—利用flask查询本机IP归属并输出网页图片
32 1
|
3月前
|
存储 大数据 Python
案例学Python:filter()函数的用法,高级!
`filter()`函数是Python中处理序列数据的强大工具,它允许我们高效地根据条件过滤元素。通过结合匿名函数、常规函数或直接利用Python的内置逻辑,`filter()`提供了灵活且高效的过滤机制,尤其在大数据处理和内存敏感的应用中展现出其价值。掌握 `filter()`的使用,不仅能提升代码的可读性和效率,还能更好地适应Python的函数式编程风格。
54 2
|
3月前
|
IDE 开发工具 iOS开发
Python编程案例:查找指定文件大小的文件并输出路径
Python编程案例:查找指定文件大小的文件并输出路径
31 3
|
3月前
|
文件存储 iOS开发 MacOS
Python编程案例:文件查找并归类
Python编程案例:文件查找并归类
26 2
|
3月前
|
Python
Python编程案例:同一工作簿不同表单特定数据添加到工作簿的另一表单里
Python编程案例:同一工作簿不同表单特定数据添加到工作簿的另一表单里
28 1
|
3月前
|
iOS开发 MacOS Python
Python编程案例:根据姓名归档论文、报告
Python编程案例:根据姓名归档论文、报告
18 1
|
4月前
|
机器学习/深度学习 数据采集 数据可视化
跟着penguins案例学Seaborn之Pairplot
跟着penguins案例学Seaborn之Pairplot
140 1