SQLAchemy操作mysql

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介:

ORM框架有两种形式:
第一种是DB first,就是需要手动创建数据库和表,然后ORM框架,自动生成代码类的方法;
第二种是code first,就是手动创建数据库,然后通过写代码类的方法,ORM框架来自动生成表和数据的方法

SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
作用:

  1. 提供简单的规则,2. 自动转换成SQL语句

安装方式:

pip3 install SQLAlchemy
or
easy_install SQLAlchemy

SQLAlchemy的结构图,如下:
SQLAchemy操作mysql
SQLAlchemy本身是不会连接数据库的,他是通过"DBAPI"这个模块api接口来实现对数据库连接的,而它本身会更加Dialect里面的设置,来确定你是什么数据库,如何转化sql语句的功能。

使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。

常见的DBAPI在官方的sqlachemy里面有介绍和使用方法,下面列出几个案例:

MySQL-Python
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]

MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>

cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]

更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html

使用方法

#导入模块
from sqlalchemy import create_engine
#创建连接,是通过create_engine来创建连接的
engine = create_engine("mysql+pymysql://kk:123@localhost:3306/pysqltest",max_overflow=5)
#执行SQL
cur = engine.execute(
    "insert into t1(name) VALUES ('rr')"
)

结果截图
SQLAchemy操作mysql

其他操作

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)

# 执行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES ('1.1.1.22', 3)"
# )

# 新插入行自增ID
# cur.lastrowid

# 执行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES(%s, %s)",[('1.1.1.22', 3),('1.1.1.221', 3),]
# )

# 执行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES (%(host)s, %(color_id)s)",
#     host='1.1.1.99', color_id=3
# )

# 执行SQL
# cur = engine.execute('select * from hosts')
# 获取第一行数据
# cur.fetchone()
# 获取第n行数据
# cur.fetchmany(3)
# 获取所有数据
# cur.fetchall()

ORM功能的使用

创建一个单表的操作

from  sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,Integer,String,ForeignKey,UniqueConstraint,Index,CHAR,VARCHAR
from  sqlalchemy.orm import sessionmaker,relationship
from  sqlalchemy import create_engine

engine=create_engine("mysql+pymysql://kk:123@localhost:3306/pysqltest?charset=utf8",max_overflow=5)
Base=declarative_base()
#创建单表
class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer,primary_key=True)
    name = Column(String(32))
    gender = Column(CHAR(32))

    __table_args__ = (
        UniqueConstraint('id','name',name='uix_id_name'),
        Index('ix_id_name','name','gender')
        # Index 要把名字写在最前面
    )
def init_db():
    Base.metadata.create_all(engine)

def drop_db():
    Base.metadata.drop_all(engine)

init_db()

执行结果:
SQLAchemy操作mysql

SQLAchemy操作mysql

增加操作

要想对表中的数据进行操作,需要能拿到一个类似pymysql的游标的功能,在这里它是一个session。

Session = sessionmaker(bind=engine)
session = Session()

类 -> 表

对象 -> 行

from  sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,Integer,String,ForeignKey,UniqueConstraint,Index,CHAR,VARCHAR
from  sqlalchemy.orm import sessionmaker,relationship
from  sqlalchemy import create_engine

engine=create_engine("mysql+pymysql://kk:123@localhost:3306/pysqltest?charset=utf8",max_overflow=5)
session=sessionmaker(bind=engine)
session=session()
Base=declarative_base()
#创建单表
class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer,primary_key=True)
    name = Column(String(32))
    gender = Column(CHAR(32))

    __table_args__ = (
        UniqueConstraint('id','name',name='uix_id_name'),
        Index('ix_id_name','name','gender')
        # Index 要把名字写在最前面
    )

def init_db():
    Base.metadata.create_all(engine)

def drop_db():
    Base.metadata.drop_all(engine)

###增加功能
obj1 =Users(name='xx',gender='女')
session.add(obj1)
#添加过个values的方法
#obj2=[
 #   Users(name='yy',gender='男'),
  #  Users(name='zz',gender='女'),
   # Users(name='rr',gender='男'),
#]
#session.add_all(obj2)
session.commit()
session.close()

查找功能

####查找
user_list=session.query(Users).all()
for row in user_list:
    print(row.id,row.name,row.gender)
# user_type_list = session.query(UserType.id,UserType.title).filter(UserType.id > 2)
# for row in user_type_list:
#     print(row.id,row.title)

删除

session.query(Users).filter(Users.id ==4).delete()

修改

session.query(Users).filter(Users.id ==1).update({'gender':'中性'})
#给添加变长字符串长度的用下面这种
session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False)
#把所有数字自动加1用下面这种
session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate")
session.commit()

其他

+参考文章

# 条件
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.extra != ""
    )).all()

# 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()

# 限制
ret = session.query(Users)[1:2]

# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 分组
from sqlalchemy.sql import func

ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).all()

ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 连表

ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()

ret = session.query(Person).join(Favor).all()

ret = session.query(Person).join(Favor, isouter=True).all()

# 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()

子查询


# 1.
# select * from b where id in (select id from tb2)

# 2 select * from (select * from tb) as B
# q1 = session.query(UserType).filter(UserType.id > 0).subquery()
# result = session.query(q1).all()
# print(result)

# 3
# select
#   id ,
#   (select * from users where users.user_type_id=usertype.id)
# from usertype;

# session.query(UserType,session.query(Users).filter(Users.id == 1).subquery())
# session.query(UserType,Users)
# result = session.query(UserType.id,session.query(Users).as_scalar())
# print(result)
# result = session.query(UserType.id,session.query(Users).filter(Users.user_type_id==UserType.id).as_scalar())

# print(result)




本文转自 kesungang 51CTO博客,原文链接:http://blog.51cto.com/sgk2011/2052485,如需转载请自行联系原作者

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
6月前
|
SQL 运维 关系型数据库
MySQL 中 GRANT 操作会引起复制中断吗?
GRANT 操作并不是一个原子性操作,不管执行成功与否,都会触发一个隐式重载授权表的行为。 在生产环境中需要规范用户创建及授权的操作,不推荐使用 DML 语句去直接变更 mysql.user 表,可能会引发其他的问题,若使用了 DML 语句进行变更, 需要手工执行 flush privileges。
88 4
|
6月前
|
JavaScript 关系型数据库 MySQL
创建nodejs项目并接入mysql,完成用户相关的增删改查的详细操作
创建nodejs项目并接入mysql,完成用户相关的增删改查的详细操作
83 0
|
7月前
|
关系型数据库 MySQL
【mysql技巧】如何在这个mysql语句执行前加个前提,也就是只有表里没有相同数据才进行添加插入操作
【mysql技巧】如何在这个mysql语句执行前加个前提,也就是只有表里没有相同数据才进行添加插入操作
47 1
|
6月前
|
存储 关系型数据库 文件存储
面试题MySQL问题之简单的SELECT操作在MVCC下加锁如何解决
面试题MySQL问题之简单的SELECT操作在MVCC下加锁如何解决
58 2
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之运行mysql to doris pipeline时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之整库同步mysql到starRock提交任务异常,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
SQL 关系型数据库 MySQL
「Python入门」python操作MySQL和SqlServer
**摘要:** 了解如何使用Python的pymysql模块与MySQL数据库交互。首先,通过`pip install pymysql`安装模块。pymysql提供与MySQL的连接功能,例如创建数据库连接、执行SQL查询。在设置好MySQL环境后,使用`pymysql.connect()`建立连接,并通过游标执行SQL(如用户登录验证)。注意防止SQL注入,使用参数化查询。增删改操作需调用`conn.commit()`来保存更改。pymssql模块类似,但导入和连接对象创建略有不同。
77 0
「Python入门」python操作MySQL和SqlServer
|
6月前
|
SQL 存储 关系型数据库
|
7月前
|
关系型数据库 MySQL 数据库
『Django』模型入门教程-操作MySQL
一个后台如果没有数据库可以说废了一半。日常开发中大多数时候都在与数据库打交道。Django 为我们提供了一种更简单的操作数据库的方式。 在 Django 中,模型(Model)是用来定义数据库结构的类。每个模型类通常对应数据库中的一个表,类的属性对应表中的列。通过定义模型,Django 的 ORM(Object-Relational Mapping)可以将 Python 对象映射到数据库表,并提供一套 API 来进行数据库操作。 本文介绍模型的用法。