本文主要介绍django restframework 用户认证部分的内容
- 环境配置
- 基于 token 认证
- JWT 认证
1、环境配置
pip install django==2.0
pip install djangorestframework==3.10.0
pip install pymysql==1.0.2
2、基于token认证(非全局配置)
2.1 数据库模型设计:models.py
from django.db import models
from werkzeug.security import generate_password_hash, check_password_hash
class User(models.Model):
db_table = 'adminuser_user'
id = models.AutoField(primary_key=True)
username = models.CharField(max_length=30)
password_hash = models.TextField()
root = models.IntegerField()
email = models.EmailField(max_length=30, default='')
mobile = models.CharField(max_length=11, default='')
status = models.IntegerField(default=0)
@property
def password(self):
raise AttributeError('Can not read password!')
@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)
def verify_password(self, password):
return check_password_hash(self.password_hash, password)
2.2 同步数据库:数据同步好了之后添加几条用户信息
2.3 路由:urls.py
from django.urls import path,re_path
from . import views, loginView
urlpatterns = [
# 登录接口
path('admin/login', loginView.LoginView.as_view()),
path('admin/users', views.UserView.as_view()),
]
app_name = 'Admin'
2.4 视图:views.py
-------------------------------------------loginView.py-------------------------------------------
from .models import User
import datetime
from rest_framework.response import Response
from rest_framework.views import APIView
import jwt
from PetHome import settings
class LoginView(APIView):
def post(self, request):
ret = {
"data": {},
"meta": {
"status": 200,
"message": ""
}
}
try:
username = request.data["username"]
password = request.data["password"]
print(username, password)
if username and password:
user = User.objects.filter(username=username)
print(2)
if user.count == 0:
print(3)
ret["meta"]["status"] = 500
ret["meta"]["message"] = "用户不存在或密码错误"
return Response(ret)
elif user and user.first().verify_password(password):
print(4)
dict = {
"exp": datetime.datetime.now() + datetime.timedelta(days=1), # 过期时间
"iat": datetime.datetime.now(), # 开始时间
"id": user.first().id,
"username": user.first().username,
"mobile": user.first().mobile
}
token = jwt.encode(dict, settings.SECRET_KEY, algorithm="HS256")
ret["data"]["token"] = token
ret["data"]["username"] = user.first().username
ret["data"]["user_id"] = user.first().id
ret["meta"]["status"] = 200
ret["meta"]["message"] = "登录成功"
return Response(ret)
else:
ret["meta"]["status"] = 500
ret["meta"]["message"] = "用户不存在或密码错误"
return Response(ret)
except Exception as error:
print(error)
ret["meta"]["status"] = 500
ret["meta"]["message"] = "用户不存在或密码错误"
return Response(ret)
-------------------------------------------views.py-------------------------------------------
from werkzeug.security import generate_password_hash
from .TokenAuthtication import TokenAuthtication
from rest_framework.response import Response
from rest_framework.views import APIView
from adminuser.serializer.userSerializers import *
from rest_framework.pagination import PageNumberPagination
from collections import OrderedDict
from .models import User
class UserView(APIView):
authentication_classes = [TokenAuthtication, ]
def get(self, request, *args, **kwargs):
"""
获取用户列表
:param request:
:param args:
:param kwargs:
:return:
"""
pk = kwargs.get('pk') # 获取单挑数据 id 值
query = request.GET.get('query')
print(query)
ret = {
"data": {
"users": []
},
"meta": {
"status": 200,
"message": ""
}
}
page = LargeResultsSetPagination()
if not pk:
if not query:
queryset = User.objects.all().order_by("-id")
page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
ser = UserSerializer(instance=page_user, many=True)
ret["data"]["users"] = ser.data
else:
queryset = User.objects.filter(Q(mobile__contains=query) | Q(username__contains=query) | Q(email__contains=query)).order_by("-id")
page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
ser = UserSerializer(instance=page_user, many=True)
ret["data"]["users"] = ser.data
return page.get_paginated_response(ret)
else:
if User.objects.filter(pk=pk):
obj_dict = User.objects.filter(pk=pk).first()
ser = UserSerializer(instance=obj_dict, many=False)
ret["data"]["users"] = ser.data
else:
ret["meta"]["status"] = 500
ret["meta"]["message"] = "系统报错"
return Response(ret)
-------------------------------------------TokenAuthtication.py-------------------------------------------
from rest_framework.authentication import BaseAuthentication
from rest_framework import exceptions
import time
from PetHome import settings
import jwt
from .models import User
class TokenAuthtication(BaseAuthentication):
def authenticate(self, request):
"""
先获取 header 中的token进行解析,在进行校验
- 获取token,检查过期时间是否大于当前时间
:param request:
:return:
"""
try:
# header 中的token 在 request.MEAT.get("HTTP_TOKEN")获取
token = request.META.get("HTTP_TOKEN")
print("检查token:" + token)
data = jwt.decode(token, settings.SECRET_KEY, 'HS256')
print(data)
if data["exp"] < int(time.time()):
raise exceptions.AuthenticationFailed("登录超时,请重新登录")
elif User.objects.filter(username=data["username"]).count == 0:
raise exceptions.AuthenticationFailed("认证用户不存在")
except Exception as error:
print(error)
raise exceptions.AuthenticationFailed("用户未登录,请登录")
2.5 测试
http://127.0.0.1:8089/admin/users
1、用户未登录时 (请求headers未携带token)
2、用户已经登 (请求headers携带token)
3、全局配置
- setting.py添加配置
-------------------------------------------setting.py.py-------------------------------------------
# django restframework 配置
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': ['adminuser.TokenAuthtication.TokenAuthtication', ]
}
1、全局配置用户认证后,所有的接口类都不需要添加 authentication_classes = [TokenAuthtication, ] 属性,默认所有接口都会进行登录校验
------------------------------------------例子1:views.py-------------------------------------------
from werkzeug.security import generate_password_hash
from .TokenAuthtication import TokenAuthtication
from rest_framework.response import Response
from rest_framework.views import APIView
from adminuser.serializer.userSerializers import *
from rest_framework.pagination import PageNumberPagination
from collections import OrderedDict
from .models import User
class UserView(APIView):
def get(self, request, *args, **kwargs):
"""
获取用户列表
:param request:
:param args:
:param kwargs:
:return:
"""
pk = kwargs.get('pk') # 获取单挑数据 id 值
query = request.GET.get('query')
print(query)
ret = {
"data": {
"users": []
},
"meta": {
"status": 200,
"message": ""
}
}
page = LargeResultsSetPagination()
if not pk:
if not query:
queryset = User.objects.all().order_by("-id")
page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
ser = UserSerializer(instance=page_user, many=True)
ret["data"]["users"] = ser.data
else:
queryset = User.objects.filter(Q(mobile__contains=query) | Q(username__contains=query) | Q(email__contains=query)).order_by("-id")
page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
ser = UserSerializer(instance=page_user, many=True)
ret["data"]["users"] = ser.data
return page.get_paginated_response(ret)
else:
if User.objects.filter(pk=pk):
obj_dict = User.objects.filter(pk=pk).first()
ser = UserSerializer(instance=obj_dict, many=False)
ret["data"]["users"] = ser.data
else:
ret["meta"]["status"] = 500
ret["meta"]["message"] = "系统报错"
return Response(ret)
2、为解决部分接口不需要登录验证问题,只需要在该类中设置 authentication_classes = [] ,[] 表示不需要认证
------------------------------------------例子2:views.py-------------------------------------------
from werkzeug.security import generate_password_hash
from .TokenAuthtication import TokenAuthtication
from rest_framework.response import Response
from rest_framework.views import APIView
from adminuser.serializer.userSerializers import *
from rest_framework.pagination import PageNumberPagination
from collections import OrderedDict
from .models import User
class UserView(APIView):
authentication_classes = []
def get(self, request, *args, **kwargs):
"""
获取用户列表
:param request:
:param args:
:param kwargs:
:return:
"""
pk = kwargs.get('pk') # 获取单挑数据 id 值
query = request.GET.get('query')
print(query)
ret = {
"data": {
"users": []
},
"meta": {
"status": 200,
"message": ""
}
}
page = LargeResultsSetPagination()
if not pk:
if not query:
queryset = User.objects.all().order_by("-id")
page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
ser = UserSerializer(instance=page_user, many=True)
ret["data"]["users"] = ser.data
else:
queryset = User.objects.filter(Q(mobile__contains=query) | Q(username__contains=query) | Q(email__contains=query)).order_by("-id")
page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
ser = UserSerializer(instance=page_user, many=True)
ret["data"]["users"] = ser.data
return page.get_paginated_response(ret)
else:
if User.objects.filter(pk=pk):
obj_dict = User.objects.filter(pk=pk).first()
ser = UserSerializer(instance=obj_dict, many=False)
ret["data"]["users"] = ser.data
else:
ret["meta"]["status"] = 500
ret["meta"]["message"] = "系统报错"
return Response(ret)
不传 token 也能返回数据啦