flask_restful 学习笔记

from flask import Flask,make_response,jsonify,request,url_for,g
from flask_restful import reqparse, abort, Api, Resource
from flask_httpauth import HTTPBasicAuth
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import check_password_hash,generate_password_hash
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
app = Flask(__name__)
api = Api(app)
auth=HTTPBasicAuth()
db=SQLAlchemy(app)
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key = True)
username = db.Column(db.String(32), index = True)
password_hash = db.Column(db.String(128))
def set_password(self, password):
self.password = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password, password=password)
@auth.verify_password
def verify_password(username_or_token, password):#这里是接口基于令牌或者用户名和密码的验证
user = User.verify_auth_token(username_or_token)
if not user:
user = User.query.filter_by(username=username_or_token).first()
if not user or not user.verify_password(password):
return False
g.user = user
return True
def generate_auth_token(self,expiration=600):
s=Serializer('2344asdfasdf',expires_in=expiration)
return s.dumps({'id':self.id})
@staticmethod
def verify_auth_token(token):
s = Serializer('2344asdfasdf')
try:
data = s.loads(token)
except :
return None
user = User.query.get(data['id'])
return user
def abort_if_todo_doesnt_exist(todo_id):
if todo_id not in TODOS:
abort(404, message="Todo {} doesn't exist".format(todo_id))
parser = reqparse.RequestParser()
parser.add_argument('task', type=str)
@auth.login_required
@app.route('/todo/api/v1.0/tasks', methods=['GET'])
@auth.login_required
def get_tasks():
return jsonify({'tasks': ''})
TODOS = {
'todo1': {'task': 'build an API'},
'todo2': {'task': '?????'},
'todo3': {'task': 'profit!'},
}
class Todo(Resource):
decorators = [auth.login_required]
def get(self, todo_id):
abort_if_todo_doesnt_exist(todo_id)
return TODOS[todo_id]
def delete(self, todo_id):
abort_if_todo_doesnt_exist(todo_id)
del TODOS[todo_id]
return '', 204
def put(self, todo_id):
args = parser.parse_args()
task = {'task': args['task']}
TODOS[todo_id] = task
return task, 201
class TodoList(Resource):
decorators = [auth.login_required]
def get(self):
return TODOS
def post(self):
args = parser.parse_args()
todo_id = int(max(TODOS.keys()).lstrip('todo')) + 1
todo_id = 'todo%i' % todo_id
TODOS[todo_id] = {'task': args['task']}
return TODOS[todo_id], 201
api.add_resource(TodoList, '/todos')
api.add_resource(Todo, '/todos/<todo_id>')
@auth.get_password
def get_password(username):
if username == 'miguel':
return 'python'
return None
@auth.error_handler
def unauthorized():
return make_response(jsonify({'error': 'Unauthorized access'}), 403)
@app.route('/api/users', methods = ['POST'])
def new_user():
username = request.json.get('username')
password = request.json.get('password')
if username is None or password is None:
abort(400) # missing arguments
if User.query.filter_by(username = username).first() is not None:
abort(400) # existing user
user = User(username = username)
user.hash_password(password)
db.session.add(user)
db.session.commit()
return jsonify({ 'username': user.username }), 201, {'Location': url_for('get_user', id = user.id, _external = True)}
@app.route('/api/resource')
@auth.login_required
def get_resource():
return jsonify({ 'data': 'Hello, %s!' % g.user.username })
if __name__ == '__main__':
app.run(debug=True)
上一篇:【php增删改查实例】第十二节 - 数据删除功能


下一篇:禁用Windows重复数据删除