from flask import Flask,make_response,jsonify,request,url_for,g from flask_restful import reqparse, abort, Api, Resource from flask_httpauth import HTTPBasicAuth from flask_sqlalchemy import SQLAlchemy from werkzeug.security import check_password_hash,generate_password_hash from itsdangerous import TimedJSONWebSignatureSerializer as Serializer app = Flask(__name__) api = Api(app) auth=HTTPBasicAuth() db=SQLAlchemy(app) class User(db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key = True) username = db.Column(db.String(32), index = True) password_hash = db.Column(db.String(128)) def set_password(self, password): self.password = generate_password_hash(password) def check_password(self, password): return check_password_hash(self.password, password=password) @auth.verify_password def verify_password(username_or_token, password):#这里是接口基于令牌或者用户名和密码的验证 user = User.verify_auth_token(username_or_token) if not user: user = User.query.filter_by(username=username_or_token).first() if not user or not user.verify_password(password): return False g.user = user return True def generate_auth_token(self,expiration=600): s=Serializer('2344asdfasdf',expires_in=expiration) return s.dumps({'id':self.id}) @staticmethod def verify_auth_token(token): s = Serializer('2344asdfasdf') try: data = s.loads(token) except : return None user = User.query.get(data['id']) return user def abort_if_todo_doesnt_exist(todo_id): if todo_id not in TODOS: abort(404, message="Todo {} doesn't exist".format(todo_id)) parser = reqparse.RequestParser() parser.add_argument('task', type=str) @auth.login_required @app.route('/todo/api/v1.0/tasks', methods=['GET']) @auth.login_required def get_tasks(): return jsonify({'tasks': '111'}) TODOS = { 'todo1': {'task': 'build an API'}, 'todo2': {'task': '?????'}, 'todo3': {'task': 'profit!'}, } class Todo(Resource): decorators = [auth.login_required] def get(self, todo_id): abort_if_todo_doesnt_exist(todo_id) return TODOS[todo_id] def delete(self, todo_id): abort_if_todo_doesnt_exist(todo_id) del TODOS[todo_id] return '', 204 def put(self, todo_id): args = parser.parse_args() task = {'task': args['task']} TODOS[todo_id] = task return task, 201 class TodoList(Resource): decorators = [auth.login_required] def get(self): return TODOS def post(self): args = parser.parse_args() todo_id = int(max(TODOS.keys()).lstrip('todo')) + 1 todo_id = 'todo%i' % todo_id TODOS[todo_id] = {'task': args['task']} return TODOS[todo_id], 201 api.add_resource(TodoList, '/todos') api.add_resource(Todo, '/todos/<todo_id>') @auth.get_password def get_password(username): if username == 'miguel': return 'python' return None @auth.error_handler def unauthorized(): return make_response(jsonify({'error': 'Unauthorized access'}), 403) @app.route('/api/users', methods = ['POST']) def new_user(): username = request.json.get('username') password = request.json.get('password') if username is None or password is None: abort(400) # missing arguments if User.query.filter_by(username = username).first() is not None: abort(400) # existing user user = User(username = username) user.hash_password(password) db.session.add(user) db.session.commit() return jsonify({ 'username': user.username }), 201, {'Location': url_for('get_user', id = user.id, _external = True)} @app.route('/api/resource') @auth.login_required def get_resource(): return jsonify({ 'data': 'Hello, %s!' % g.user.username }) if __name__ == '__main__': app.run(debug=True)