一、python操作mysql简介
关于mysql安装及基本知识可以查看mysql专栏文章
https://blog.csdn.net/qq_34491508/category_11590629.html
python中可以使用 mysql-connector 来连接使用 MySQL, mysql-connector 是 MySQL 官方提供的驱动器,也可以使用PyMySQL 操作
关于mysql-connector的使用可以参考
Python MySQL – mysql-connector 驱动 | 菜鸟教程 (runoob.com)
无论通过何种方式去连接MySQL,本质上发送的 指令 都是相同的,只是连接的方式和操作形式不同而已。
这篇主要介绍开发中常用的PyMySQL 使用
PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库,Python2 中则使用 mysqldb。
PyMySQL 遵循 Python 数据库 API v2.0 规范,并包含了 pure-Python MySQL 客户端库。
在使用 PyMySQL 之前,我们需要确保 PyMySQL 已安装。如果还未安装,我们可以使用以下命令安装最新版的 PyMySQL:
pip3 install PyMySQL
二、pymysql管理数据库
使用pymysql完成对数据库的增删改查
import pymysql # 连接MySQL(socket) conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8") cursor = conn.cursor() # 1. 查看数据库 # 发送指令 cursor.execute("show databases") # 获取指令的结果 result = cursor.fetchall() print(result) # (('information_schema',), ('mysql',), ('performance_schema',), ('sys',)) # 2. 创建数据库(新增、删除、修改) # 发送指令 cursor.execute("create database db3 default charset utf8 collate utf8_general_ci") conn.commit() # 3. 查看数据库 # 发送指令 cursor.execute("show databases") # 获取指令的结果 result = cursor.fetchall() print(result) # (('information_schema',), ('db3',), ('mysql',), ('performance_schema',), ('sys',)) # 4. 删除数据库 # 发送指令 cursor.execute("drop database db3") conn.commit() # 3. 查看数据库 # 发送指令 cursor.execute("show databases") # 获取指令的结果 result = cursor.fetchall() print(result) # (('information_schema',), ('mysql',), ('performance_schema',), ('sys',)) # 5. 进入数据库,查看表 # 发送指令 cursor.execute("use mysql") cursor.execute("show tables") result = cursor.fetchall() print(result) # (('columns_priv',), ('db',), ('engine_cost',), ('event',), ('func',), ('general_log',),.... # 关闭连接 cursor.close() conn.close()
三、pymysql管理数据表
import pymysql # 连接MySQL conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8") cursor = conn.cursor() # 1. 创建数据库 """ cursor.execute("create database db4 default charset utf8 collate utf8_general_ci") conn.commit() """ # 2. 进入数据库、查看数据表 """ cursor.execute("use db4") cursor.execute("show tables") result = cursor.fetchall() print(result) """ # 3. 进入数据库创建表 cursor.execute("use db4") sql = """ create table L4( id int not null primary key auto_increment, title varchar(128), content text, ctime datetime )default charset=utf8; """ cursor.execute(sql) conn.commit() # 4. 查看数据库中的表 """ cursor.execute("show tables") result = cursor.fetchall() print(result) """ # 5. 其他 drop table... 略过 # 关闭连接 cursor.close() conn.close()
四、pymysql管理数据行
1、数据的增删改查
import pymysql # 连接MySQL,自动执行 use userdb; -- 进入数据库 conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db='userdb') cursor = conn.cursor() # 1.新增(需commit) """ cursor.execute("insert into tb1(name,password) values('李四','123123')") conn.commit() """ # 2.删除(需commit) """ cursor.execute("delete from tb1 where id=1") conn.commit() """ # 3.修改(需commit) """ cursor.execute("update tb1 set name='xx' where id=1") conn.commit() """ # 4.查询 """ cursor.execute("select * from tb where id>10") data = cursor.fetchone() # cursor.fetchall() print(data) """ # 关闭连接 cursor.close() conn.close()
2、案例:实现 注册、登录功能
import pymysql def register(): print("用户注册") user = input("请输入用户名:") # alex password = input("请输入密码:") # sb # 连接指定数据 conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db="usersdb") cursor = conn.cursor() # 执行SQL语句(有SQL注入风险,稍后讲解) # sql = 'insert into users(name,password)values("alex","sb")' sql = 'insert into users(name,password) values("{}","{}")'.format(user, password) cursor.execute(sql) conn.commit() # 关闭数据库连接 cursor.close() conn.close() print("注册成功,用户名:{},密码:{}".format(user, password)) def login(): print("用户登录") user = input("请输入用户名:") password = input("请输入密码:") # 连接指定数据 conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db="usersdb") cursor = conn.cursor() # 执行SQL语句(有SQL注入风险,稍后讲解) # sql = select * from users where name='xxx' and password='123' sql = "select * from users where name='{}' and password='{}'".format(user, password) cursor.execute(sql) result = cursor.fetchone() # 去向mysql获取结果 # None # (1,wupeiqi,123) # 关闭数据库连接 cursor.close() conn.close() if result: print("登录成功", result) else: print("登录失败") def run(): choice = input("1.注册;2.登录") if choice == '1': register() elif choice == '2': login() else: print("输入错误") if __name__ == '__main__': run()
3、关于SQL注入
import pymysql # 输入用户名和密码 user = input("请输入用户名:") # ' or 1=1 -- pwd = input("请输入密码:") # 123 conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8",db='usersdb') cursor = conn.cursor() # 基于字符串格式化来 拼接SQL语句 # sql = "select * from users where name='alex' and password='123'" # sql = "select * from users where name='' or 1=1 -- ' and password='123'" sql = "select * from users where name='{}' and password='{}'".format(user, pwd) cursor.execute(sql) result = cursor.fetchone() print(result) # None,不是None cursor.close() conn.close()
如果用户在输入user时,输入了: ' or 1=1 --
,这样即使用户输入的密码不存在,也会可以通过验证。
为什么呢?
因为在SQL拼接时,拼接后的结果是:
select * from users where name='' or 1=1 -- ' and password='123'
注意:在MySQL中 --
表示注释。
那么,在Python开发中 如何来避免SQL注入呢?
切记,SQL语句不要在使用python的字符串格式化,而是使用pymysql的execute方法。
import pymysql # 输入用户名和密码 user = input("请输入用户名:") pwd = input("请输入密码:") conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db='userdb') cursor = conn.cursor() cursor.execute("select * from users where name=%s and password=%s", [user, pwd]) # 或 # cursor.execute("select * from users where name=%(n1)s and password=%(n2)s", {"n1": user, 'n2': pwd}) result = cursor.fetchone() print(result) cursor.close() conn.close()
五、工具类
上面pymysql对mysql的关键代码就是sql语句,其他都是公共的,所以可以封装成简单的数据库操作工具类
import pymysql # 连接地址相关可以从配置文件读取 dataSource = { 'host': 'localhost', 'port': 3306, 'user': 'root', 'password': 'root', 'db': 'db_sys', 'charset': 'utf8' } def get_conn_cursor(): conn = pymysql.connect(host=dataSource['host'], port=dataSource['port'], user=dataSource['user'], password=dataSource['password'], charset=dataSource['charset'], db=dataSource['db']) cursor = conn.cursor() return conn, cursor def close_conn_cursor(*args): for item in args: item.close() def exec(sql, **kwargs): conn, cursor = get_conn_cursor() cursor.execute(sql, kwargs) conn.commit() close_conn_cursor(conn, cursor) def fetch_one(sql, **kwargs): conn, cursor = get_conn_cursor() cursor.execute(sql, kwargs) result = cursor.fetchone() close_conn_cursor(conn, cursor) return result def fetch_all(sql, **kwargs): conn, cursor = get_conn_cursor() cursor.execute(sql, kwargs) result = cursor.fetchall() close_conn_cursor(conn, cursor) return result if __name__ == '__main__': users = fetch_all("select * from user") print(users) # ((1, 'admin', 'admin', None, None), (2, 'test', 'test', None, None))
六、数据库连接池
在操作数据库时需要使用数据库连接池。
pip3 install pymysql pip3 install dbutils
import threading import pymysql from dbutils.pooled_db import PooledDB MYSQL_DB_POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=5, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=3, # 链接池中最多闲置的链接,0和None不限制 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。 # 如:0 = None = never, 1 = default = whenever it is requested, # 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='root123', database='userdb', charset='utf8' ) def task(): # 去连接池获取一个连接 conn = MYSQL_DB_POOL.connection() cursor = conn.cursor(pymysql.cursors.DictCursor) cursor.execute('select sleep(2)') result = cursor.fetchall() print(result) cursor.close() # 将连接交换给连接池 conn.close() def run(): for i in range(10): t = threading.Thread(target=task) t.start() if __name__ == '__main__': run()
七、基于数据库连接池工具类
1、第一种:单例和方法工具类
# db.py import pymysql from dbutils.pooled_db import PooledDB class DBHelper(object): def __init__(self): # TODO 此处配置,可以去配置文件中读取。 self.pool = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=5, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=3, # 链接池中最多闲置的链接,0和None不限制 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='root123', database='userdb', charset='utf8' ) def get_conn_cursor(self): conn = self.pool.connection() cursor = conn.cursor(pymysql.cursors.DictCursor) return conn, cursor def close_conn_cursor(self, *args): for item in args: item.close() def exec(self, sql, **kwargs): conn, cursor = self.get_conn_cursor() cursor.execute(sql, kwargs) conn.commit() self.close_conn_cursor(conn, cursor) def fetch_one(self, sql, **kwargs): conn, cursor = self.get_conn_cursor() cursor.execute(sql, kwargs) result = cursor.fetchone() self.close_conn_cursor(conn, cursor) return result def fetch_all(self, sql, **kwargs): conn, cursor = self.get_conn_cursor() cursor.execute(sql, kwargs) result = cursor.fetchall() self.close_conn_cursor(conn, cursor) return result db = DBHelper()
测试
from db import db db.exec("insert into d1(name) values(%(name)s)", name="666") ret = db.fetch_one("select * from d1") print(ret) ret = db.fetch_one("select * from d1 where id=%(nid)s", nid=3) print(ret) ret = db.fetch_all("select * from d1") print(ret) ret = db.fetch_all("select * from d1 where id>%(nid)s", nid=2) print(ret)
2、上下文管理方式工具类(推荐)
如果想要让他也支持 with 上下文管理。
with 获取连接:
执行SQL(执行完毕后,自动将连接交还给连接池)
# db_context.py import threading import pymysql from dbutils.pooled_db import PooledDB POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=5, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=3, # 链接池中最多闲置的链接,0和None不限制 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, host='127.0.0.1', port=3306, user='root', password='root123', database='userdb', charset='utf8' ) class Connect(object): def __init__(self): self.conn = conn = POOL.connection() self.cursor = conn.cursor(pymysql.cursors.DictCursor) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.cursor.close() self.conn.close() def exec(self, sql, **kwargs): self.cursor.execute(sql, kwargs) self.conn.commit() def fetch_one(self, sql, **kwargs): self.cursor.execute(sql, kwargs) result = self.cursor.fetchone() return result def fetch_all(self, sql, **kwargs): self.cursor.execute(sql, kwargs) result = self.cursor.fetchall() return result
测试
from db_context import Connect with Connect() as obj: # print(obj.conn) # print(obj.cursor) ret = obj.fetch_one("select * from d1") print(ret) ret = obj.fetch_one("select * from d1 where id=%(id)s", id=3) print(ret)