为了在django 下使用token功能,网上搜索发现djangorestframework能够解决这个问题,下面记录了下操作步骤。
我的ubuntu上已经安装了python+django+mysql环境,只需再安装djangorestframework
1,安装djangorestframework
pip install djangorestframework
2,创建django工程test_token
django-admin startproject test_token 创建工程test_token
python manage.py startapp testtoken 创建应用testtoken
3,创建数据库
mysql -u root -p 登录到数据库
create schema test_token_db default character set utf8 collate utf8_general_ci;创建数据库test_token_db
4,添加数据库test_token_db
修改settings.py
注释掉默认的sqlite数据库
#DATABASES = {
# 'default': {
# 'ENGINE': 'django.db.backends.sqlite3',
# 'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
# }
#}
添加我们定义的sql数据库
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'test_token_db',
'USER': 'root',
'PASSWORD': '*********',
'HOST': '127.0.0.1',
'PORT': '3306',
}
}
5, 导入token模块
修改settings.py
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework.authtoken',##添加token模块
'testtoken', ##添加我们的应用
]
6,创建数据库表
修改__init__.py
import pymysql
pymysql.install_as_MySQLdb()
然后执行:
python manage.py makemigrations
python manage.py migrate 生成数据库表
python manage.py createsuperuser 创建管理员
7,url 过滤
修改url.py
from django.contrib import admin
from django.urls import path
from testtoken import views as testtoken_views
urlpatterns = [
path('admin/', admin.site.urls),
path('testtoken/user/<str:operation>',testtoken_views.user_mgr),//添加url路径
]
8,登录生成token代码
from django.http import JsonResponse
from rest_framework.authtoken.models import Token
def user_login(request,name,passwd):
result={}
user=auth.authenticate(username=name,password=passwd)
if user is not None and user.is_active:
print("authenticate sucess")
try:
token = Token.objects.get(user=user)
token.delete()
token = Token.objects.create(user=user)
except:
token = Token.objects.create(user=user) ###同一个user每次创建出来的token不同
print("token.key:",token.key)
#print("token.user:", token.user.is_active)
auth.login(request, user)
result['status']='sucess'
result['token'] = token.key ###返回token给客户端
return result
else:
print("authenticate fail")
result['status']='authenticate fail'
return result
def user_mgr(request,operation):
print("upgrade operation=",operation)
print("request.method=", request.method)
print("request.content_type=", request.content_type)
print("user=", request.user.username)
print("request.path=", request.path)
if request.method == 'POST':
if request.body:
body = eval(request.body)
if operation=="login":
name = body['name']
passwd = body['passwd']
result=user_login(request, name, passwd)
return JsonResponse(result)
else:
result = {}
result['status'] = 'illegal request'
return result
9,客户端登录测试代码
import requests
from http import cookiejar
def login_simulate():
httprequest.cookies = cookiejar.LWPCookieJar(filename="wangliang.txt")
data = {}
data['name'] = admin_list[0]
data['passwd'] = admin_list[1]
response = httprequest.post(url_path + "testtoken/user/login", json=data, headers=header)
print("statusCode:" , response.status_code)
print("text = ",response.text)
print("response",response.json())
httprequest.cookies.save()
url_path="http://114.115.223.53:8000/"
admin_list=["zhangjun","zhj10231982"]
httprequest = requests.session()
userAgent ="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36"
header = {
"origin": url_path,
"Referer": url_path,
'User-Agent': userAgent,
}
if __name__ == '__main__':
login_simulate()
10,token 过期检测
代码如下
from django.utils.translation import ugettext_lazy as _
from rest_framework.authentication import BaseAuthentication, TokenAuthentication
from rest_framework.authtoken.models import Token
from rest_framework import HTTP_HEADER_ENCODING
from rest_framework import exceptions
from django.utils import timezone
from datetime import timedelta
from django.conf import settings
# 获取请求头里的token信息
def get_authorization_header(request):
"""
Return request's 'Authorization:' header, as a bytestring.
Hide some test client ickyness where the header can be unicode.
"""
auth = request.META.get('HTTP_AUTHORIZATION', b'')
if isinstance(auth, type('')):
# Work around django test client oddness
auth = auth.encode(HTTP_HEADER_ENCODING)
return auth
# 自定义的ExpiringTokenAuthentication认证方式
class ExpiringTokenAuthentication(TokenAuthentication):
model = Token
def authenticate(self, request):
print("authenticate")
auth = get_authorization_header(request)
if not auth:
return None
try:
token = auth.decode()
except UnicodeError:
msg = _('Invalid token header. Token string should not contain invalid characters.')
raise exceptions.AuthenticationFailed(msg)
print("token",token)
return self.authenticate_credentials(token)
def authenticate_credentials(self, key):
model = self.get_model()
try:
token = model.objects.select_related('user').get(key=key)
except model.DoesNotExist:
raise exceptions.AuthenticationFailed(_('Invalid token.'))
if not token.user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
print("token.created",token.created)
if timezone.now() > (token.created + timedelta(days=settings.TOKEN_LIFETIME)): # Token过期验证
raise exceptions.AuthenticationFailed(_('Token has expired'))
return (token.user, token)
TOKEN_LIFETIME=1 ,在settings.py 中定义,表示该token能维持一天
过期token检测调用如下:
auth=ExpiringTokenAuthentication()
try:
auth.authenticate(request)
except Exception as e:
print("error=%s",repr(e))
10,客户端token发送格式
header['Authorization']='e4998367ea41c3b60f8fb6451025303e1f5a414e'
添加到请求头中,headers=head