创建 game_controller.py
在 API 中公开数据库之前,我们将创建一个游戏控制器,它将负责保存、更新、删除和获取游戏数据的所有操作。
所有这些函数都在一个名为 game_controller.py
的文件中,它看起来像这样:
from db import get_db def insert_game(name, price, rate): db = get_db() cursor = db.cursor() statement = "INSERT INTO games(name, price, rate) VALUES (?, ?, ?)" cursor.execute(statement, [name, price, rate]) db.commit() return True def update_game(id, name, price, rate): db = get_db() cursor = db.cursor() statement = "UPDATE games SET name = ?, price = ?, rate = ? WHERE id = ?" cursor.execute(statement, [name, price, rate, id]) db.commit() return True def delete_game(id): db = get_db() cursor = db.cursor() statement = "DELETE FROM games WHERE id = ?" cursor.execute(statement, [id]) db.commit() return True def get_by_id(id): db = get_db() cursor = db.cursor() statement = "SELECT id, name, price, rate FROM games WHERE id = ?" cursor.execute(statement, [id]) return cursor.fetchone() def get_games(): db = get_db() cursor = db.cursor() query = "SELECT id, name, price, rate FROM games" cursor.execute(query) return cursor.fetchall()
在文件中,我们看到了几个函数。 insert_game
函数接收游戏数据并将其插入数据库(INSERT);所有这些都使用准备好的语句来避免我们使用 Python 和 Flask 创建的这个 API 中的 SQL 注入。
我们还看到其他方法,例如 update_game
执行 UPDATE 操作以更新游戏,delete_game
从其 id 删除游戏 (DELETE),get_by_id
从其 id 返回游戏(使用 SELECT 操作)。
最后我们看看返回所有现有游戏的 get_games
函数。
请注意,所有函数都使用数据库和游标来执行所有操作。 现在我们有了数据库操作的 CRUD,是时候用 Flask 公开 API 中的所有内容了
创建 main.py
网络异常,图片无法展示
|
安装 flask
可以查看官方安装帮助:
$ pip install Flask
我们在 API 中做的第一件事是创建 Flask 应用程序并导入游戏控制器。我们还从数据库中导入了一个函数,因为我们需要在启动应用程序时创建表:
from flask import Flask, jsonify, request import game_controller from db import create_tables app = Flask(__name__)
然后使用 GET、PUT、POST 和 DELETE http 动词定义路由:
@app.route('/games', methods=["GET"]) def get_games(): games = game_controller.get_games() return jsonify(games) @app.route("/game", methods=["POST"]) def insert_game(): game_details = request.get_json() name = game_details["name"] price = game_details["price"] rate = game_details["rate"] result = game_controller.insert_game(name, price, rate) return jsonify(result) @app.route("/game", methods=["PUT"]) def update_game(): game_details = request.get_json() id = game_details["id"] name = game_details["name"] price = game_details["price"] rate = game_details["rate"] result = game_controller.update_game(id, name, price, rate) return jsonify(result) @app.route("/game/<id>", methods=["DELETE"]) def delete_game(id): result = game_controller.delete_game(id) return jsonify(result) @app.route("/game/<id>", methods=["GET"]) def get_game_by_id(id): game = game_controller.get_by_id(id) return jsonify(game)
每条路由都对应了我们之前创建的游戏控制器功能,从而与 SQLite3 数据库进行交互。
- 在更新和插入游戏时,我们使用
get_json
阅读了请求的 JSON ,然后访问字典:
game_details = request.get_json()
- 在删除或通过 ID 获取的情况下,我们从路由中读取 id 变量作为 <variable> 并在方法中接收它:
game = game_controller.get_by_id(id)
- 此 Python API 通过 JSON 进行通信,因此所有响应都是根据 jsonify 函数返回的内容进行的:
return jsonify(result)
- 最后,我们创建 Flask 应用程序来启动服务器并监听请求:
if __name__ == "__main__": create_tables() """ Here you can change debug and port Remember that, in order to make this API functional, you must set debug in False """ app.run(host='0.0.0.0', port=8000, debug=False)
至此,我们完整的 main.py
函数如下:
from flask import Flask, jsonify, request, json import game_controller from db import create_tables app = Flask(__name__) @app.route('/games', methods=["GET"]) def get_games(): games = game_controller.get_games() return jsonify(games) @app.route("/game", methods=["POST"]) def insert_game(): game_details = json.loads(request.get_data()) name = game_details["name"] price = game_details["price"] rate = game_details["rate"] result = game_controller.insert_game(name, price, rate) return jsonify(result) @app.route("/game", methods=["PUT"]) def update_game(): game_details = request.get_json() id = game_details["id"] name = game_details["name"] price = game_details["price"] rate = game_details["rate"] result = game_controller.update_game(id, name, price, rate) return jsonify(result) @app.route("/game/<id>", methods=["DELETE"]) def delete_game(id): result = game_controller.delete_game(id) return jsonify(result) @app.route("/game/<id>", methods=["GET"]) def get_game_by_id(id): game = game_controller.get_by_id(id) return jsonify(game) """ Enable CORS. Disable it if you don't need CORS """ @app.after_request def after_request(response): response.headers["Access-Control-Allow-Origin"] = "*" # <- You can change "*" for a domain for example "http://localhost" response.headers["Access-Control-Allow-Credentials"] = "true" response.headers["Access-Control-Allow-Methods"] = "POST, GET, OPTIONS, PUT, DELETE" response.headers["Access-Control-Allow-Headers"] = "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization" return response if __name__ == "__main__": create_tables() """ Here you can change debug and port Remember that, in order to make this API functional, you must set debug in False """ app.run(host='0.0.0.0', port=8000, debug=False)
值得注意的是,这里我们添加 CORS,如果您要从与 API 本身不同的域使用此 API,则需要启用 CORS。只需将以下代码片段添加到 API(在存储库中,您会发现已添加的代码,您可以根据需要将其删除):
""" Enable CORS. Disable it if you don't need CORS """ @app.after_request def after_request(response): response.headers["Access-Control-Allow-Origin"] = "*" # <- You can change "*" for a domain for example "http://localhost" response.headers["Access-Control-Allow-Credentials"] = "true" response.headers["Access-Control-Allow-Methods"] = "POST, GET, OPTIONS, PUT, DELETE" response.headers["Access-Control-Allow-Headers"] = "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization" return response