简介
第三方包
pip install flask-sqlalchemy
pip install Pymysql
优势
不用写SQL语句,底层代码强大(方便更改、维护和迁移)
ORM
目前,许多主流的语言,都实现了对象关系映射(
Object Relational Mapper
,简称ORM
)的库包。ORM
的主要功能是将数据库表中的每条记录映射成一个对象。所有的数据库操作,都转化为对象的操作。这样可以增加代码的可读性和安全性。
ORM
优点:
- 简洁易读:将数据表抽象为对象(数据模型),更直观易读。
- 可移植:封装了多种数据库引擎,面对多个数据库,操作基本一致,代码易维护。
- 更安全:有效避免
SQL
注入。当然性能上会低于直接执行
SQL
语句,本文介绍SQLAlchemy
的一些基础操作。
数据库的连接配置
<协议名称>://<⽤户名>:<密码>@<ip地址>:<端⼝>/<数据库名>
如果使⽤的是mysqldb驱动,协议名: mysql
如果使⽤的是pymysql驱动,协议名: mysql+pymysql
1. from flask import Flask 2. from flask_sqlalchemy import SQLAlchemy 3. 4. app = Flask(__name__) 5. 6. # 数据库配置 7. HOSTNAME = '127.0.0.1' 8. PORT = 3306 9. DATABASE = 'flask' 10. USERNAME = 'root' 11. PASSWORD = '123456' 12. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4" 13. db=SQLAlchemy(app) 14. 15. with app.app_context(): 16. with db.engine.connect() as conn: 17. rs=conn.execute(SQLAlchemy().text("select 1")) 18. print(rs.fetchone()) 19. 20. @app.route('/') 21. def hello_world(): 22. return 'Hello World!' 23. 24. if __name__ == '__main__': 25. app.run()
此时输出为(1,)
如图
ROM模型
数据类型
Interger:整型,映射到数据库中是int类型
Float:浮点类型,float
Double; String; Boolean;
Decimal: 定点类型,专门为解决浮点类型精度丢失的问题而设定。Decimal需要传入两个参数,第一个参数标记该字段能存储多少位数,第二个参数表示小数点后有又多少个小数位。
Enum:枚举类型;
Date:日期类型,年月日;
DateTime: 时间类型,年月日时分毫秒;
Time:时间类型,时分秒;
Text:长字符串,可存储6万多个字符,text;
LongText:长文本类型,longtext.
新建表
1. from flask import Flask 2. from flask_sqlalchemy import SQLAlchemy 3. 4. app = Flask(__name__) 5. 6. # 数据库配置 7. HOSTNAME = '127.0.0.1' 8. PORT = 3306 9. DATABASE = 'flask' 10. USERNAME = 'root' 11. PASSWORD = '123456' 12. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4" 13. db=SQLAlchemy(app) 14. 15. # with app.app_context(): 16. # with db.engine.connect() as conn: 17. # rs=conn.execute(SQLAlchemy().text("select 1")) 18. # print(rs.fetchone()) 19. 20. class User(db.Model): 21. __tablename__ = 'user' 22. id = db.Column(db.Integer, primary_key=True, autoincrement=True) 23. # varchar(50),唯一约束,不能为空 24. username = db.Column(db.String(50), nullable=False) 25. email = db.Column(db.String(120), unique=True, nullable=False) 26. user1=User(username="coleak",email="666@qq.com") 27. user2=User() 28. user2.username="ayue" 29. user2.email="999@qq.com" 30. with app.app_context(): 31. db.create_all() 32. 33. 34. @app.route('/') 35. def hello_world(): 36. return 'Hello World!' 37. 38. if __name__ == '__main__': 39. app.run()
效果如图
增加数据
1. from flask import Flask 2. from flask_sqlalchemy import SQLAlchemy 3. 4. app = Flask(__name__) 5. 6. # 数据库配置 7. HOSTNAME = '127.0.0.1' 8. PORT = 3306 9. DATABASE = 'flask' 10. USERNAME = 'root' 11. PASSWORD = '123456' 12. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4" 13. db=SQLAlchemy(app) 14. 15. class User(db.Model): 16. __tablename__ = 'user' 17. id = db.Column(db.Integer, primary_key=True, autoincrement=True) 18. # varchar(50),唯一约束,不能为空 19. username = db.Column(db.String(50), nullable=False) 20. email = db.Column(db.String(120), unique=True, nullable=False) 21. 22. @app.route('/uesr/add') 23. def add_user(): 24. user1 = User(username="coleak", email="666@qq.com") 25. user2 = User() 26. user2.username = "ayue" 27. user2.email = "999@qq.com" 28. db.session.add(user1) 29. db.session.add(user2) 30. db.session.commit() 31. return "用户创建成功" 32. 33. @app.route('/') 34. def hello_world(): 35. return 'Hello World!' 36. 37. if __name__ == '__main__': 38. app.run()
效果如图
查找数据
1. from flask import Flask,request 2. from flask_sqlalchemy import SQLAlchemy 3. 4. app = Flask(__name__) 5. 6. # 数据库配置 7. HOSTNAME = '127.0.0.1' 8. PORT = 3306 9. DATABASE = 'flask' 10. USERNAME = 'root' 11. PASSWORD = '123456' 12. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4" 13. db=SQLAlchemy(app) 14. 15. # 继承了db.Model 16. class User(db.Model): 17. __tablename__ = 'user' 18. id = db.Column(db.Integer, primary_key=True, autoincrement=True) 19. # varchar(50),唯一约束,不能为空 20. username = db.Column(db.String(50), nullable=False) 21. email = db.Column(db.String(120), unique=True, nullable=False) 22. 23. @app.route('/uesr/add') 24. def add_user(): 25. user1 = User(username="coleak", email="66666@qq.com") 26. user2 = User() 27. user2.username = "ayue" 28. user2.email = "99999@qq.com" 29. db.session.add(user1) 30. db.session.add(user2) 31. db.session.commit() 32. return "用户创建成功" 33. 34. @app.route("/user/query") 35. # get查找和filter_by查找 36. def query_user(): 37. # id=request.args.get('id') 38. # user=User.query.get(id) 39. # # http: // 127.0.0.1: 5000 / user / query?id = 1 40. username = request.args.get('username') 41. users=User.query.filter_by(username=username) 42. for user in users: 43. print(f"{user.id}:{user.username}-{user.email}") 44. return "数据查找成功!" 45. 46. @app.route('/') 47. def hello_world(): 48. return 'Hello World!' 49. 50. if __name__ == '__main__': 51. app.run()
http://127.0.0.1:5000/user/query?username=coleak
修改数据
1. from flask import Flask,request 2. from flask_sqlalchemy import SQLAlchemy 3. 4. app = Flask(__name__) 5. 6. # 数据库配置 7. HOSTNAME = '127.0.0.1' 8. PORT = 3306 9. DATABASE = 'flask' 10. USERNAME = 'root' 11. PASSWORD = '123456' 12. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4" 13. db=SQLAlchemy(app) 14. 15. # 继承了db.Model 16. class User(db.Model): 17. __tablename__ = 'user' 18. id = db.Column(db.Integer, primary_key=True, autoincrement=True) 19. # varchar(50),唯一约束,不能为空 20. username = db.Column(db.String(50), nullable=False) 21. email = db.Column(db.String(120), unique=True, nullable=False) 22. 23. # 新增数据 24. @app.route('/uesr/add') 25. def add_user(): 26. user1 = User(username="coleak", email="66666@qq.com") 27. user2 = User() 28. user2.username = "ayue" 29. user2.email = "99999@qq.com" 30. db.session.add(user1) 31. db.session.add(user2) 32. db.session.commit() 33. return "用户创建成功" 34. 35. # 查询数据 36. @app.route("/user/query") 37. # get查找和filter_by查找 38. def query_user(): 39. # id=request.args.get('id') 40. # user=User.query.get(id) 41. # # http: // 127.0.0.1: 5000 / user / query?id = 1 42. username = request.args.get('username') 43. users=User.query.filter_by(username=username) 44. for user in users: 45. print(f"{user.id}:{user.username}-{user.email}") 46. return "数据查找成功!" 47. 48. # 修改数据 49. @app.route("/user/update") 50. def update_user(): 51. username = request.args.get('username') 52. newemail=request.args.get('newemail') 53. user = User.query.filter_by(username=username).first() 54. user.email=newemail 55. db.session.commit() 56. return "update--successful" 57. 58. 59. # 根路由 60. @app.route('/') 61. def hello_world(): 62. return 'Hello World!' 63. 64. if __name__ == '__main__': 65. app.run()
http://127.0.0.1:5000/user/update?username=coleak&newemail=666888@qq.com
删除数据
1. from flask import Flask,request 2. from flask_sqlalchemy import SQLAlchemy 3. 4. app = Flask(__name__) 5. 6. # 数据库配置 7. HOSTNAME = '127.0.0.1' 8. PORT = 3306 9. DATABASE = 'flask' 10. USERNAME = 'root' 11. PASSWORD = '123456' 12. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4" 13. db=SQLAlchemy(app) 14. 15. # 继承了db.Model 16. class User(db.Model): 17. __tablename__ = 'user' 18. id = db.Column(db.Integer, primary_key=True, autoincrement=True) 19. # varchar(50),唯一约束,不能为空 20. username = db.Column(db.String(50), nullable=False) 21. email = db.Column(db.String(120), unique=True, nullable=False) 22. 23. # 新增数据 24. @app.route('/uesr/add') 25. def add_user(): 26. user1 = User(username="coleak", email="66666@qq.com") 27. user2 = User() 28. user2.username = "ayue" 29. user2.email = "99999@qq.com" 30. db.session.add(user1) 31. db.session.add(user2) 32. db.session.commit() 33. return "用户创建成功" 34. 35. # 查询数据 36. @app.route("/user/query") 37. # get查找和filter_by查找 38. def query_user(): 39. # id=request.args.get('id') 40. # user=User.query.get(id) 41. # # http: // 127.0.0.1: 5000 / user / query?id = 1 42. username = request.args.get('username') 43. users=User.query.filter_by(username=username) 44. for user in users: 45. print(f"{user.id}:{user.username}-{user.email}") 46. return "数据查找成功!" 47. 48. # 修改数据 49. @app.route("/user/update") 50. def update_user(): 51. username = request.args.get('username') 52. newemail=request.args.get('newemail') 53. user = User.query.filter_by(username=username).first() 54. user.email=newemail 55. db.session.commit() 56. return "update--successful" 57. 58. # 删除数据 59. @app.route("/user/delete") 60. def delete_user(): 61. username = request.args.get('username') 62. user = User.query.filter_by(username=username).first() 63. db.session.delete(user) 64. db.session.commit() 65. return "delete--successful" 66. 67. 68. 69. # 根路由 70. @app.route('/') 71. def hello_world(): 72. return 'Hello World!' 73. 74. if __name__ == '__main__': 75. app.run()
http://127.0.0.1:5000/user/delete?username=coleak
此时第一个coleak被删除了
外键与表的关系
- 创造外键
1. from flask import Flask,request 2. from flask_sqlalchemy import SQLAlchemy 3. 4. app = Flask(__name__) 5. 6. # 数据库配置 7. HOSTNAME = '127.0.0.1' 8. PORT = 3306 9. DATABASE = 'flask' 10. USERNAME = 'root' 11. PASSWORD = '123456' 12. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4" 13. db=SQLAlchemy(app) 14. 15. # 继承了db.Model 16. class User(db.Model): 17. __tablename__ = 'user' 18. id = db.Column(db.Integer, primary_key=True, autoincrement=True) 19. # varchar(50),唯一约束,不能为空 20. username = db.Column(db.String(50), nullable=False) 21. email = db.Column(db.String(120), unique=True, nullable=False) 22. 23. class article(db.Model): 24. __tablename__="article" 25. arid=db.Column(db.Integer, primary_key=True, autoincrement=True) 26. artitle=db.Column(db.String(100), nullable=False) 27. arcontent=db.Column(db.Text,nullable=False) 28. author_id=db.Column(db.Integer, db.ForeignKey('user.id')) 29. # 自动给User添加一个属性articles来获取文章列表 30. author=db.relationship("User",backref="articles") 31. 32. with app.app_context(): 33. db.create_all() 34. # 创建user和article 35. # coleak=User(username="coleak",email="678@163.com") 36. # article=article() 37. # article.title="flask_learn" 38. # article.content="flask_content" 39. # article.id=1 40. # article.author_id=coleak.id 41. 42. # 新增数据 43. @app.route('/uesr/add') 44. def add_user(): 45. user1 = User(username="coleak", email="66666@qq.com") 46. user2 = User() 47. user2.username = "ayue" 48. user2.email = "99999@qq.com" 49. db.session.add(user1) 50. db.session.add(user2) 51. db.session.commit() 52. return "用户创建成功" 53. 54. # 查询数据 55. @app.route("/user/query") 56. # get查找和filter_by查找 57. def query_user(): 58. # id=request.args.get('id') 59. # user=User.query.get(id) 60. # # http: // 127.0.0.1: 5000 / user / query?id = 1 61. username = request.args.get('username') 62. users=User.query.filter_by(username=username) 63. for user in users: 64. print(f"{user.id}:{user.username}-{user.email}") 65. return "数据查找成功!" 66. 67. # 修改数据 68. @app.route("/user/update") 69. def update_user(): 70. username = request.args.get('username') 71. newemail=request.args.get('newemail') 72. user = User.query.filter_by(username=username).first() 73. user.email=newemail 74. db.session.commit() 75. return "update--successful" 76. 77. # 删除数据 78. @app.route("/user/delete") 79. def delete_user(): 80. username = request.args.get('username') 81. user = User.query.filter_by(username=username).first() 82. db.session.delete(user) 83. db.session.commit() 84. return "delete--successful" 85. 86. 87. 88. # 根路由 89. @app.route('/') 90. def hello_world(): 91. return 'Hello World!' 92. 93. if __name__ == '__main__': 94. app.run()
- backref关联查找
1. from flask import Flask,request 2. from flask_sqlalchemy import SQLAlchemy 3. 4. app = Flask(__name__) 5. 6. # 数据库配置 7. HOSTNAME = '127.0.0.1' 8. PORT = 3306 9. DATABASE = 'flask' 10. USERNAME = 'root' 11. PASSWORD = '123456' 12. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4" 13. db=SQLAlchemy(app) 14. 15. # 继承了db.Model 16. class User(db.Model): 17. __tablename__ = 'user' 18. id = db.Column(db.Integer, primary_key=True, autoincrement=True) 19. # varchar(50),唯一约束,不能为空 20. username = db.Column(db.String(50), nullable=False) 21. email = db.Column(db.String(120), unique=True, nullable=False) 22. 23. class article(db.Model): 24. __tablename__="article" 25. arid=db.Column(db.Integer, primary_key=True, autoincrement=True) 26. artitle=db.Column(db.String(100), nullable=False) 27. arcontent=db.Column(db.Text,nullable=False) 28. author_id=db.Column(db.Integer, db.ForeignKey('user.id')) 29. # 自动给User添加一个属性articles来获取文章列表 30. author=db.relationship("User",backref="articles") 31. 32. 33. 34. @app.route('/article/add') 35. def add_article(): 36. Article1=article(artitle="flask_learn",arcontent="flask_content") 37. Article1.author=User.query.get(2) 38. Article2 = article(artitle="flask_learn2", arcontent="flask_content2") 39. Article2.author = User.query.get(2) 40. db.session.add_all([Article1,Article2]) 41. db.session.commit() 42. return "文章1,2添加成功" 43. 44. @app.route("/article/query") 45. def query(): 46. user=User.query.get(2) 47. for article in user.articles: 48. print(article.artitle) 49. return "文章查找成功" 50. 51. 52. # 新增数据 53. @app.route('/uesr/add') 54. def add_user(): 55. user1 = User(username="coleak", email="66666@qq.com") 56. user2 = User() 57. user2.username = "ayue" 58. user2.email = "99999@qq.com" 59. db.session.add(user1) 60. db.session.add(user2) 61. db.session.commit() 62. return "用户创建成功" 63. 64. # 查询数据 65. @app.route("/user/query") 66. # get查找和filter_by查找 67. def query_user(): 68. # id=request.args.get('id') 69. # user=User.query.get(id) 70. # # http: // 127.0.0.1: 5000 / user / query?id = 1 71. username = request.args.get('username') 72. users=User.query.filter_by(username=username) 73. for user in users: 74. print(f"{user.id}:{user.username}-{user.email}") 75. return "数据查找成功!" 76. 77. # 修改数据 78. @app.route("/user/update") 79. def update_user(): 80. username = request.args.get('username') 81. newemail=request.args.get('newemail') 82. user = User.query.filter_by(username=username).first() 83. user.email=newemail 84. db.session.commit() 85. return "update--successful" 86. 87. # 删除数据 88. @app.route("/user/delete") 89. def delete_user(): 90. username = request.args.get('username') 91. user = User.query.filter_by(username=username).first() 92. db.session.delete(user) 93. db.session.commit() 94. return "delete--successful" 95. 96. 97. 98. # 根路由 99. @app.route('/') 100. def hello_world(): 101. return 'Hello World!' 102. 103. if __name__ == '__main__': 104. app.run()
migrate迁移
1. from flask import Flask,request 2. from flask_sqlalchemy import SQLAlchemy 3. from flask_migrate import Migrate 4. 5. app = Flask(__name__) 6. 7. # 数据库配置 8. HOSTNAME = '127.0.0.1' 9. PORT = 3306 10. DATABASE = 'flask' 11. USERNAME = 'root' 12. PASSWORD = '123456' 13. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4" 14. db=SQLAlchemy(app) 15. 16. 17. migrate=Migrate(app,db) 18. # flask db init 19. # flask db migrate 20. # flask db upgrade 21. 22. # 继承了db.Model 23. class User(db.Model): 24. __tablename__ = 'user' 25. id = db.Column(db.Integer, primary_key=True, autoincrement=True) 26. # varchar(50),唯一约束,不能为空 27. username = db.Column(db.String(50), nullable=False) 28. passw = db.Column(db.String(50), nullable=False) 29. email = db.Column(db.String(120), unique=True, nullable=False) 30. 31. class article(db.Model): 32. __tablename__="article" 33. arid=db.Column(db.Integer, primary_key=True, autoincrement=True) 34. artitle=db.Column(db.String(100), nullable=False) 35. arcontent=db.Column(db.Text,nullable=False) 36. author_id=db.Column(db.Integer, db.ForeignKey('user.id')) 37. # 自动给User添加一个属性articles来获取文章列表 38. author=db.relationship("User",backref="articles") 39. 40. 41. 42. @app.route('/article/add') 43. def add_article(): 44. Article1=article(artitle="flask_learn",arcontent="flask_content") 45. Article1.author=User.query.get(2) 46. Article2 = article(artitle="flask_learn2", arcontent="flask_content2") 47. Article2.author = User.query.get(2) 48. db.session.add_all([Article1,Article2]) 49. db.session.commit() 50. return "文章1,2添加成功" 51. 52. @app.route("/article/query") 53. def query(): 54. user=User.query.get(2) 55. for article in user.articles: 56. print(article.artitle) 57. return "文章查找成功" 58. 59. 60. # 新增数据 61. @app.route('/uesr/add') 62. def add_user(): 63. user1 = User(username="coleak", email="66666@qq.com") 64. user2 = User() 65. user2.username = "ayue" 66. user2.email = "99999@qq.com" 67. db.session.add(user1) 68. db.session.add(user2) 69. db.session.commit() 70. return "用户创建成功" 71. 72. # 查询数据 73. @app.route("/user/query") 74. # get查找和filter_by查找 75. def query_user(): 76. # id=request.args.get('id') 77. # user=User.query.get(id) 78. # # http: // 127.0.0.1: 5000 / user / query?id = 1 79. username = request.args.get('username') 80. users=User.query.filter_by(username=username) 81. for user in users: 82. print(f"{user.id}:{user.username}-{user.email}") 83. return "数据查找成功!" 84. 85. # 修改数据 86. @app.route("/user/update") 87. def update_user(): 88. username = request.args.get('username') 89. newemail=request.args.get('newemail') 90. user = User.query.filter_by(username=username).first() 91. user.email=newemail 92. db.session.commit() 93. return "update--successful" 94. 95. # 删除数据 96. @app.route("/user/delete") 97. def delete_user(): 98. username = request.args.get('username') 99. user = User.query.filter_by(username=username).first() 100. db.session.delete(user) 101. db.session.commit() 102. return "delete--successful" 103. 104. 105. 106. # 根路由 107. @app.route('/') 108. def hello_world(): 109. return 'Hello World!' 110. 111. if __name__ == '__main__': 112. app.run()
1. migrate=Migrate(app,db) 2. # flask db init 只执行一次生成文件目录 3. # flask db migrate 4. # flask db upgrade