python操作mysql

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: python操作mysql

一、python操作mysql简介

关于mysql安装及基本知识可以查看mysql专栏文章

https://blog.csdn.net/qq_34491508/category_11590629.html

python中可以使用 mysql-connector 来连接使用 MySQL, mysql-connectorMySQL 官方提供的驱动器,也可以使用PyMySQL 操作

关于mysql-connector的使用可以参考

Python MySQL – mysql-connector 驱动 | 菜鸟教程 (runoob.com)

无论通过何种方式去连接MySQL,本质上发送的 指令 都是相同的,只是连接的方式和操作形式不同而已。

这篇主要介绍开发中常用的PyMySQL 使用

PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库,Python2 中则使用 mysqldb。

PyMySQL 遵循 Python 数据库 API v2.0 规范,并包含了 pure-Python MySQL 客户端库。

在使用 PyMySQL 之前,我们需要确保 PyMySQL 已安装。如果还未安装,我们可以使用以下命令安装最新版的 PyMySQL:

pip3 install PyMySQL

二、pymysql管理数据库

使用pymysql完成对数据库的增删改查

import pymysql
 
# 连接MySQL(socket)
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8")
cursor = conn.cursor()
 
# 1. 查看数据库
# 发送指令
cursor.execute("show databases")
# 获取指令的结果
result = cursor.fetchall()
print(result) # (('information_schema',), ('mysql',), ('performance_schema',), ('sys',))
 
# 2. 创建数据库(新增、删除、修改)
# 发送指令
cursor.execute("create database db3 default charset utf8 collate utf8_general_ci")
conn.commit()
 
# 3. 查看数据库
# 发送指令
cursor.execute("show databases")
# 获取指令的结果
result = cursor.fetchall()
print(result) # (('information_schema',), ('db3',), ('mysql',), ('performance_schema',), ('sys',))
 
# 4. 删除数据库
# 发送指令
cursor.execute("drop database db3")
conn.commit()
 
# 3. 查看数据库
# 发送指令
cursor.execute("show databases")
# 获取指令的结果
result = cursor.fetchall()
print(result) # (('information_schema',), ('mysql',), ('performance_schema',), ('sys',))
 
# 5. 进入数据库,查看表
# 发送指令
cursor.execute("use mysql")
cursor.execute("show tables")
result = cursor.fetchall()
print(result) # (('columns_priv',), ('db',), ('engine_cost',), ('event',), ('func',), ('general_log',),....
 
# 关闭连接
cursor.close()
conn.close()

三、pymysql管理数据表

import pymysql
 
# 连接MySQL
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8")
cursor = conn.cursor()
 
# 1. 创建数据库
"""
cursor.execute("create database db4 default charset utf8 collate utf8_general_ci")
conn.commit()
"""
 
# 2. 进入数据库、查看数据表
"""
cursor.execute("use db4")
cursor.execute("show tables")
result = cursor.fetchall()
print(result)
"""
 
# 3. 进入数据库创建表
cursor.execute("use db4")
sql = """
create table L4(
    id int not null primary key auto_increment,
    title varchar(128),
    content text,
    ctime datetime
)default charset=utf8;
"""
cursor.execute(sql)
conn.commit()
 
# 4. 查看数据库中的表
"""
cursor.execute("show tables")
result = cursor.fetchall()
print(result)
"""
 
# 5. 其他 drop table... 略过
 
 
# 关闭连接
cursor.close()
conn.close()

四、pymysql管理数据行

1、数据的增删改查

import pymysql
 
# 连接MySQL,自动执行 use userdb; -- 进入数据库
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db='userdb')
cursor = conn.cursor()
 
 
# 1.新增(需commit)
"""
cursor.execute("insert into tb1(name,password) values('李四','123123')")
conn.commit()
"""
 
# 2.删除(需commit)
"""
cursor.execute("delete from tb1 where id=1")
conn.commit()
"""
 
# 3.修改(需commit)
"""
cursor.execute("update tb1 set name='xx' where id=1")
conn.commit()
"""
 
# 4.查询
"""
cursor.execute("select * from tb where id>10")
data = cursor.fetchone() # cursor.fetchall()
print(data)
"""
 
# 关闭连接
cursor.close()
conn.close()

2、案例:实现 注册、登录功能

import pymysql
 
 
def register():
    print("用户注册")
 
    user = input("请输入用户名:") # alex
    password = input("请输入密码:") # sb
 
    # 连接指定数据
    conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db="usersdb")
    cursor = conn.cursor()
 
    # 执行SQL语句(有SQL注入风险,稍后讲解)
    # sql = 'insert into users(name,password)values("alex","sb")'
    sql = 'insert into users(name,password) values("{}","{}")'.format(user, password)
    
    cursor.execute(sql)
    conn.commit()
 
    # 关闭数据库连接
    cursor.close()
    conn.close()
 
    print("注册成功,用户名:{},密码:{}".format(user, password))
 
 
def login():
    print("用户登录")
 
    user = input("请输入用户名:")
    password = input("请输入密码:")
 
    # 连接指定数据
    conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db="usersdb")
    cursor = conn.cursor()
 
    # 执行SQL语句(有SQL注入风险,稍后讲解)
    # sql = select * from users where name='xxx' and password='123'
    sql = "select * from users where name='{}' and password='{}'".format(user, password)
    cursor.execute(sql)
    
    result = cursor.fetchone() # 去向mysql获取结果
    # None
    # (1,wupeiqi,123)
    
    
    # 关闭数据库连接
    cursor.close()
    conn.close()
 
    if result:
        print("登录成功", result)
    else:
        print("登录失败")
 
 
def run():
    choice = input("1.注册;2.登录")
    if choice == '1':
        register()
    elif choice == '2':
        login()
    else:
        print("输入错误")
 
 
if __name__ == '__main__':
    run()

3、关于SQL注入

import pymysql
 
# 输入用户名和密码
user = input("请输入用户名:") # ' or 1=1 -- 
pwd = input("请输入密码:") # 123
 
 
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8",db='usersdb')
cursor = conn.cursor()
 
# 基于字符串格式化来 拼接SQL语句
# sql = "select * from users where name='alex' and password='123'"
# sql = "select * from users where name='' or 1=1 -- ' and password='123'"
sql = "select * from users where name='{}' and password='{}'".format(user, pwd)
cursor.execute(sql)
 
result = cursor.fetchone()
print(result) # None,不是None
 
cursor.close()
conn.close()

如果用户在输入user时,输入了: ' or 1=1 -- ,这样即使用户输入的密码不存在,也会可以通过验证。

为什么呢?

因为在SQL拼接时,拼接后的结果是:

select * from users where name='' or 1=1 -- ' and password='123'

注意:在MySQL中 -- 表示注释。

那么,在Python开发中 如何来避免SQL注入呢?

切记,SQL语句不要在使用python的字符串格式化,而是使用pymysql的execute方法。

import pymysql
 
# 输入用户名和密码
user = input("请输入用户名:")
pwd = input("请输入密码:")
 
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db='userdb')
 
cursor = conn.cursor()
 
cursor.execute("select * from users where name=%s and password=%s", [user, pwd])
# 或
# cursor.execute("select * from users where name=%(n1)s and password=%(n2)s", {"n1": user, 'n2': pwd})
 
result = cursor.fetchone()
print(result)
 
cursor.close()
conn.close()

五、工具类

上面pymysql对mysql的关键代码就是sql语句,其他都是公共的,所以可以封装成简单的数据库操作工具类

import pymysql
 
# 连接地址相关可以从配置文件读取
dataSource = {
    'host': 'localhost',
    'port': 3306,
    'user': 'root',
    'password': 'root',
    'db': 'db_sys',
    'charset': 'utf8'
}
 
 
def get_conn_cursor():
    conn = pymysql.connect(host=dataSource['host'], port=dataSource['port'], user=dataSource['user'],
                           password=dataSource['password'], charset=dataSource['charset'],
                           db=dataSource['db'])
    cursor = conn.cursor()
    return conn, cursor
 
 
def close_conn_cursor(*args):
    for item in args:
        item.close()
 
 
def exec(sql, **kwargs):
    conn, cursor = get_conn_cursor()
    cursor.execute(sql, kwargs)
    conn.commit()
    close_conn_cursor(conn, cursor)
 
 
def fetch_one(sql, **kwargs):
    conn, cursor = get_conn_cursor()
    cursor.execute(sql, kwargs)
    result = cursor.fetchone()
    close_conn_cursor(conn, cursor)
    return result
 
 
def fetch_all(sql, **kwargs):
    conn, cursor = get_conn_cursor()
    cursor.execute(sql, kwargs)
    result = cursor.fetchall()
    close_conn_cursor(conn, cursor)
    return result
 
 
if __name__ == '__main__':
    users = fetch_all("select * from user")
    print(users)  # ((1, 'admin', 'admin', None, None), (2, 'test', 'test', None, None))

六、数据库连接池

在操作数据库时需要使用数据库连接池。

pip3 install pymysql
pip3 install dbutils
import threading
import pymysql
from dbutils.pooled_db import PooledDB
 
MYSQL_DB_POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=5,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=3,  # 链接池中最多闲置的链接,0和None不限制
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。
    # 如:0 = None = never, 1 = default = whenever it is requested, 
    # 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='root123',
    database='userdb',
    charset='utf8'
)
 
 
def task():
    # 去连接池获取一个连接
    conn = MYSQL_DB_POOL.connection()
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    
    cursor.execute('select sleep(2)')
    result = cursor.fetchall()
    print(result)
 
    cursor.close()
    # 将连接交换给连接池
    conn.close()
 
def run():
    for i in range(10):
        t = threading.Thread(target=task)
        t.start()
 
 
if __name__ == '__main__':
    run()

七、基于数据库连接池工具类

1、第一种:单例和方法工具类

# db.py
import pymysql
from dbutils.pooled_db import PooledDB
 
 
class DBHelper(object):
 
    def __init__(self):
        # TODO 此处配置,可以去配置文件中读取。
        self.pool = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=5,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
            maxcached=3,  # 链接池中最多闲置的链接,0和None不限制
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
            ping=0,
            # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
            host='127.0.0.1',
            port=3306,
            user='root',
            password='root123',
            database='userdb',
            charset='utf8'
        )
 
    def get_conn_cursor(self):
        conn = self.pool.connection()
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        return conn, cursor
 
    def close_conn_cursor(self, *args):
        for item in args:
            item.close()
 
    def exec(self, sql, **kwargs):
        conn, cursor = self.get_conn_cursor()
 
        cursor.execute(sql, kwargs)
        conn.commit()
 
        self.close_conn_cursor(conn, cursor)
 
    def fetch_one(self, sql, **kwargs):
        conn, cursor = self.get_conn_cursor()
 
        cursor.execute(sql, kwargs)
        result = cursor.fetchone()
 
        self.close_conn_cursor(conn, cursor)
        return result
 
    def fetch_all(self, sql, **kwargs):
        conn, cursor = self.get_conn_cursor()
 
        cursor.execute(sql, kwargs)
        result = cursor.fetchall()
 
        self.close_conn_cursor(conn, cursor)
 
        return result
 
 
db = DBHelper()

测试

from db import db
 
db.exec("insert into d1(name) values(%(name)s)", name="666")
 
ret = db.fetch_one("select * from d1")
print(ret)
 
ret = db.fetch_one("select * from d1 where id=%(nid)s", nid=3)
print(ret)
 
ret = db.fetch_all("select * from d1")
print(ret)
 
ret = db.fetch_all("select * from d1 where id>%(nid)s", nid=2)
print(ret)

 

2、上下文管理方式工具类(推荐)

如果想要让他也支持 with 上下文管理。

with 获取连接:

执行SQL(执行完毕后,自动将连接交还给连接池)

# db_context.py
import threading
import pymysql
from dbutils.pooled_db import PooledDB
 
POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=5,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=3,  # 链接池中最多闲置的链接,0和None不限制
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    host='127.0.0.1',
    port=3306,
    user='root',
    password='root123',
    database='userdb',
    charset='utf8'
)
 
 
class Connect(object):
    def __init__(self):
        self.conn = conn = POOL.connection()
        self.cursor = conn.cursor(pymysql.cursors.DictCursor)
 
    def __enter__(self):
        return self
 
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cursor.close()
        self.conn.close()
 
    def exec(self, sql, **kwargs):
        self.cursor.execute(sql, kwargs)
        self.conn.commit()
 
    def fetch_one(self, sql, **kwargs):
        self.cursor.execute(sql, kwargs)
        result = self.cursor.fetchone()
        return result
 
    def fetch_all(self, sql, **kwargs):
        self.cursor.execute(sql, kwargs)
        result = self.cursor.fetchall()
        return result

测试

from db_context import Connect
 
with Connect() as obj:
    # print(obj.conn)
    # print(obj.cursor)
    ret = obj.fetch_one("select * from d1")
    print(ret)
 
    ret = obj.fetch_one("select * from d1 where id=%(id)s", id=3)
    print(ret)


相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
6天前
|
SQL 关系型数据库 MySQL
go 通过sql操作mysql
go 通过sql操作mysql
11 1
|
22小时前
|
存储 Go 索引
牢记python对象的操作方式
【6月更文挑战第20天】在Python中,`hash()`和`is`帮助确定对象的相等性。`dir()`和`vars()`揭示对象的属性和内部表示,`__slots__`优化内存使用。列表和字典结构有不同的内存和性能特性,字典使用哈希表进行快速访问。
13 5
牢记python对象的操作方式
|
1天前
|
Python
Python列表推导式是一种简洁的创建新列表的方式,它允许你在一行代码中完成对数据的操作和转换
【6月更文挑战第19天】Python列表推导式是创建新列表的简洁语法,它在一行内处理数据。表达式如`[expr for item in iterable if cond]`,其中`expr`是对元素的操作,`item`来自`iterable`,`if cond`是可选过滤条件。例如,将数字列表平方:`[x**2 for x in numbers]`。嵌套列表推导处理复杂结构,如合并二维数组:`[[a+b for a,b in zip(row1, row2)] for row1, row2 in zip(matrix1, matrix2)]`。简洁但勿过度复杂化。
11 5
|
21小时前
|
Python
高阶函数如`map`, `filter`, `reduce`和`functools.partial`在Python中用于函数操作
【6月更文挑战第20天】高阶函数如`map`, `filter`, `reduce`和`functools.partial`在Python中用于函数操作。装饰器如`@timer`接收或返回函数,用于扩展功能,如记录执行时间。`timer`装饰器通过包裹函数并计算执行间隙展示时间消耗,如`my_function(2)`执行耗时2秒。
10 3
|
1天前
|
Python
Python教程:Python中的输入与输出操作
在编程语言中,输入(Input)和输出(Output),简称I/O,是基础且重要的概念。Python作为一门易于学习且功能强大的编程语言,在处理输入和输出方面提供了多种方式。本文将深入探讨Python中的输入输出操作,包括标准输入输出、文件操作、以及网络I/O等领域
9 4
|
2天前
|
SQL 关系型数据库 MySQL
Python进阶第二篇(Python与MySQL数据库)
Python进阶第二篇(Python与MySQL数据库)
|
2天前
|
Python
【干货】python xlwt写入excel操作
【干货】python xlwt写入excel操作
9 2
|
3天前
|
关系型数据库 MySQL Go
Go语言介绍以及如何在Go语言中操作MySQL数据库
Go语言介绍以及如何在Go语言中操作MySQL数据库
19 3
|
3天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之同步MySQL数据并EP(复杂事件处理)时,编译报错,如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3天前
|
存储 自然语言处理 关系型数据库
✅生产问题之Emoji表情如何操作存储,MySQL是否支持
MySQL支持存储Emoji表情,需使用UTF8MB4编码。UTF8MB3,MySQL早期的UTF-8实现,不支持部分Unicode字符包括Emoji,已被弃用。推荐使用UTF8MB4,它支持全部Unicode字符。转换时,现有UTF8MB3表需转换为UTF8MB4,列和表都需设置相应字符集。