config.py配置文件
# 配置数据库信息 db_config = { 'host': 'localhost', 'port': '3306', 'database': 'bdpoi', 'username': 'bdpoi', 'password': 'rKADFaJxAG2b7Lfd' }
操作语句
import json from config import * from flask import Flask from flask_sqlalchemy import SQLAlchemy from sqlalchemy import text app = Flask(__name__) app.config['SECRET_KEY'] = 'xufeJjtC9Lsuv8m7D7ZZfph3a6MjpMpJ' # url的格式为,数据库的协议://用户名:密码@ip地址:端口号(默认可以不写)/数据库名 app.config["SQLALCHEMY_DATABASE_URI"] = 'mysql://{username}:{password}@{host}:{port}/{database}'.format(**db_config) # 动态追踪数据库的修改 app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = True app.config['SQLALCHEMY_ECHO'] = True # 创建数据库的操作对象 db = SQLAlchemy(app) ''' db.Column属性 primary_key 如果设为 True,这列就是表的主键 unique 如果设为 True,这列不允许出现重复的值 index 如果设为 True,为这列创建索引,提升查询效率 nullable 如果设为 True,这列允许使用空值;如果设为 False,这列不允许使用空值 default 为这列定义默认值 ''' class Poi(db.Model): __tablename__ = "po_poi" poi_id = db.Column(db.Integer, primary_key=True, autoincrement=True) poi_name = db.Column(db.String(255), nullable=True) poi_category_id = db.Column(db.Integer, nullable=True) # repr()方法显示一个可读字符串 def __repr__(self): return 'Poi:%s' % self.poi_name # 默认首页 @app.route("/") def get_index(): return "py-LockDataV API" # 创建数据表 @app.route("/cre") def create_index(): db.create_all() return "创建数据表,状态:OK" # 新增数据 @app.route("/add") def add_index(): p1 = Poi(poi_name='宁波商会', poi_category_id="2") p2 = Poi(poi_name='大海大厦', poi_category_id="1") p3 = Poi(poi_name='国骅大厦', poi_category_id="2") db.session.add_all([p1, p2, p3]) db.session.commit() # 如何防止重复提交 return "新增数据,状态:OK" # 删除数据 @app.route("/del") @app.route("/del/") @app.route("/del/<poi_id>") def del_index(poi_id=None): if poi_id: # get_or_404 如果id不存在,抛出404页面 # del_id = Poi.query.get_or_404(poi_id) # get方式 del_id = Poi.query.get(poi_id) if del_id: db.session.delete(del_id) db.session.commit() else: return "Id不存在数据,状态:Fail" return "删除数据,状态:OK" else: return "poi_id为空,越权操作" # 查询数据 @app.route("/get") def get_data(): # in查询 # Poi.query.filter(Poi.poi_name.in_('A', 'B', 'C', 'D')) # 限制查询 # Poi.query.filter(poi_name=18).offset(2).limit(3) # 跳过二条开始查询,限制输出3条 # 排序 # results = Poi.query.order_by('poi_id') # 按默认升序,在前面加-号为降序'-age' # results = Poi.query.order_by(text("-poi_id")) results = Poi.query.filter_by(poi_category_id=2).all() # 定义字典和序列 rows = [] data = {} for rs in results: row = {"poi_id": rs.poi_id, "poi_name": rs.poi_name, "poi_category_id": rs.poi_category_id} rows.append(row) data['code'] = 0 data['msg'] = 'OK' data['data'] = rows # 输出标准的JSON字符串 json_str = json.dumps(data, ensure_ascii=False) return json_str # 获取数据库记录API接口 @app.route("/poi") @app.route("/poi/") @app.route("/poi/<pid>") def get_poi(pid=None): # total = Poi.query.count() # print(total) if pid is None: results = Poi.query.all() else: results = Poi.query.filter_by(poi_id=pid).all() # 定义字典和序列 rows = [] data = {} for rs in results: row = {"poi_id": rs.poi_id, "poi_name": rs.poi_name, "poi_category_id": rs.poi_category_id} rows.append(row) data['code'] = 0 data['msg'] = 'OK' data['data'] = rows # 输出标准的JSON字符串 json_str = json.dumps(data, ensure_ascii=False) return json_str if __name__ == '__main__': app.run(debug=True)
lockdatav Done!