一、什么是RESTful
- REST与技术无关,代表的是一种软件架构风格,REST是Representational State Transfer的简称,中文翻译为“表征状态转移”
- REST从资源的角度类审视整个网络,它将分布在网络中某个节点的资源通过URL进行标识,客户端应用通过URL来获取资源的表征,获得这些表征致使这些应用转变状态
- REST与技术无关,代表的是一种软件架构风格,REST是Representational State Transfer的简称,中文翻译为“表征状态转移”
- 所有的数据,不过是通过网络获取的还是操作(增删改查)的数据,都是资源,将一切数据视为资源是REST区别与其他架构风格的最本质属性
- 对于REST这种面向资源的架构风格,有人提出一种全新的结构理念,即:面向资源架构(ROA:Resource Oriented Architecture)
二、认证补充:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.authentication import BaseAuthentication
from rest_framework.permissions import BasePermission from rest_framework.request import Request
from rest_framework import exceptions token_list = [
'sfsfss123kuf3j123',
'asijnfowerkkf9812',
] class TestAuthentication(BaseAuthentication):
def authenticate(self, request):
"""
用户认证,如果验证成功后返回元组: (用户,用户Token)
:param request:
:return:
None,表示跳过该验证;
如果跳过了所有认证,默认用户和Token和使用配置文件进行设置
self._authenticator = None
if api_settings.UNAUTHENTICATED_USER:
self.user = api_settings.UNAUTHENTICATED_USER() # 默认值为:匿名用户
else:
self.user = None if api_settings.UNAUTHENTICATED_TOKEN:
self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默认值为:None
else:
self.auth = None
(user,token)表示验证通过并设置用户名和Token;
AuthenticationFailed异常
"""
val = request.query_params.get('token')
if val not in token_list:
raise exceptions.AuthenticationFailed("用户认证失败") return ('登录用户', '用户token') def authenticate_header(self, request):
"""
Return a string to be used as the value of the `WWW-Authenticate`
header in a `401 Unauthenticated` response, or `None` if the
authentication scheme should return `403 Permission Denied` responses.
"""
pass class TestPermission(BasePermission):
message = "权限验证失败" def has_permission(self, request, view):
"""
判断是否有权限访问当前请求
Return `True` if permission is granted, `False` otherwise.
:param request:
:param view:
:return: True有权限;False无权限
"""
if request.user == "管理员":
return True # GenericAPIView中get_object时调用
def has_object_permission(self, request, view, obj):
"""
视图继承GenericAPIView,并在其中使用get_object时获取对象时,触发单独对象权限验证
Return `True` if permission is granted, `False` otherwise.
:param request:
:param view:
:param obj:
:return: True有权限;False无权限
"""
if request.user == "管理员":
return True class TestView(APIView):
# 认证的动作是由request.user触发
authentication_classes = [TestAuthentication, ] # 权限
# 循环执行所有的权限
permission_classes = [TestPermission, ] def get(self, request, *args, **kwargs):
# self.dispatch
print(request.user)
print(request.auth)
return Response('GET请求,响应内容') def post(self, request, *args, **kwargs):
return Response('POST请求,响应内容') def put(self, request, *args, **kwargs):
return Response('PUT请求,响应内容') views.py
views.py
#
2 class MyAuthtication(BasicAuthentication):
3 def authenticate(self, request):
4 token = request.query_params.get('token') #注意是没有GET的,用query_params表示
5 if token == 'zxxzzxzc':
6 return ('uuuuuu','afsdsgdf') #返回user,auth
7 # raise AuthenticationFailed('认证错误') #只要抛出认证错误这样的异常就会去执行下面的函数
8 raise APIException('认证错误')
9 def authenticate_header(self, request): #认证不成功的时候执行
10 return 'Basic reala="api"'
11
12 class UserView(APIView):
13 authentication_classes = [MyAuthtication,]
14 def get(self,request,*args,**kwargs):
15 print(request.user)
16 print(request.auth)
17 return Response('用户列表')
自定义功能
django的中间件比rest_framework执行的早
认证的功能放到中间件也是可以做的
认证一般做,检查用户是否存在,如果存在request.user/request.auth;不存在request.user/request.auth=None
认证小总结:
---类:authenticate/authenticate_header
---返回值:None,元组(user,auth),异常
---配置:
---视图:
class IndexView(APIView):
authentication_classes = [MyAuthentication,]
---全局
REST_FRAMEWORK = {
'UNAUTHENTICATED_USER': None,
'UNAUTHENTICATED_TOKEN': None,
"DEFAULT_AUTHENTICATION_CLASSES": [
# "app02.utils.MyAuthentication",
],
}
三、权限
权限的应用:
from django.conf.urls import url
from django.contrib import admin
from app01 import views
from app02 import views as app02_view
from app03 import views as app03_view
from app04 import views as app04_view urlpatterns = [ django rest framework
url(r'^auth/', app02_view.AuthView.as_view()),
url(r'^hosts/', app02_view.HostView.as_view()),
url(r'^users/', app02_view.UserView.as_view()),
url(r'^salary/', app02_view.SalaryView.as_view()), ]
url
from django.views import View from rest_framework.views import APIView
from rest_framework.authentication import SessionAuthentication
from rest_framework.authentication import BasicAuthentication
from rest_framework.authentication import BaseAuthentication
from rest_framework.permissions import AllowAny from rest_framework.renderers import JSONRenderer,BrowsableAPIRenderer from rest_framework.request import Request
from rest_framework.exceptions import APIException,AuthenticationFailed
from rest_framework import exceptions
from rest_framework.response import Response from app02 import models
import hashlib
import time class AuthView(APIView):
authentication_classes=[]
def get(self,request):
"""
接收用户名和密码
:param request:
:return:
"""
ret = {'code':1000,'msg':None} user = request.query_params.get('user')
pwd = request.query_params.get('pwd') obj = models.UserInfo.objects.filter(username=user,password=pwd).first()
if not obj:
ret['code'] = 1001
ret['msg'] = "用户名或密码错误"
return Response(ret)
# 创建随机字符串
ctime = time.time()
key = "%s|%s" %(user,ctime)
m = hashlib.md5()
m.update(key.encode('utf-8'))
token = m.hexdigest() # 保存到数据
obj.token = token
obj.save() ret['token'] = token
return Response(ret) class MyAuthentication(BaseAuthentication): def authenticate(self, request):
token = request.query_params.get('token')
obj = models.UserInfo.objects.filter(token=token).first()
if obj:
return (obj.username,obj)
return None def authenticate_header(self, request):
"""
Return a string to be used as the value of the `WWW-Authenticate`
header in a `401 Unauthenticated` response, or `None` if the
authentication scheme should return `403 Permission Denied` responses.
"""
# return 'Basic realm="api"'
pass class MyPermission(object):
message = "无权访问"
def has_permission(self,request,view):
if request.user:
return True
return False class AdminPermission(object):
message = "无权访问"
def has_permission(self,request,view):
if request.user == 'alex':
return True
return False class HostView(APIView):
"""
匿名用户和用户都能访问
"""
authentication_classes = [MyAuthentication,]
permission_classes = []
def get(self,request,*args,**kwargs):
# 原来request对象,django.core.handlers.wsgi.WSGIRequest
# 现在的request对象,rest_framework.request.Request\
self.dispatch
print(request.user)
# print(request.user)
# print(request.auth)
return Response('主机列表') class UserView(APIView):
"""
用户能访问
"""
authentication_classes = [MyAuthentication, ]
permission_classes = [MyPermission,]
def get(self,request,*args,**kwargs):
return Response('用户列表') class SalaryView(APIView):
"""
用户能访问
"""
authentication_classes = [MyAuthentication, ]
permission_classes = [MyPermission,AdminPermission,]
def get(self,request,*args,**kwargs):
self.dispatch
return Response('薪资列表') def permission_denied(self, request, message=None):
"""
If request is not permitted, determine what kind of exception to raise.
"""
if request.authenticators and not request.successful_authenticator:
raise exceptions.NotAuthenticated(detail='xxxxxxxx')
raise exceptions.PermissionDenied(detail=message)
Views.py
REST_FRAMEWORK = {
'UNAUTHENTICATED_USER': None,
'UNAUTHENTICATED_TOKEN': None,
"DEFAULT_AUTHENTICATION_CLASSES": [
# "app02.utils.MyAuthentication",
],
'DEFAULT_PERMISSION_CLASSES':[ ],
'DEFAULT_THROTTLE_RATES':{
'wdp_anon':'5/minute',
'wdp_user':'10/minute', }
}
settings
权限才真正的做request.user/request.auth拿到它们做是否访问的判断
权限小总结:
---类: has_permission/has_object_permission
---返回值:True、False、exceptions.PermissionDenied(detail="错误信息")
---配置:
---视图:
class IndexView(APIView):
permission_classes = [MyPermission,]
---全局:
REST_FRAMEWORK = {
"DEFAULT_PERMISSION_CLASSES": [
# "app02.utils.MyAuthentication",
],
}
四、限制访问的频率
限制访问频率的应用:
a、对匿名用户进行限制,每个用户1分钟允许访问10次
在这里用唯一标识:self.get_ident()
from django.conf.urls import url
from django.contrib import admin
from app01 import views
from app02 import views as app02_view
from app03 import views as app03_view
from app04 import views as app04_view urlpatterns = [ # url(r'^limit/', app03_view.LimitView.as_view()), ]
url.py
from django.shortcuts import render
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle from rest_framework import exceptions RECORD = { }
class MyThrottle(BaseThrottle):
def allow_request(self,request,view):
"""
# 返回False,限制
# 返回True,通行
:param request:
:param view:
:return:
"""
"""
a. 对匿名用户进行限制:每个用户1分钟允许访问10次
- 获取用户IP request 1.1.1.1
"""
import time
ctime = time.time()
ip = self.get_ident()
if ip not in RECORD:
RECORD[ip] = [ctime,]
else:
# [4507862389234,3507862389234,2507862389234,1507862389234,]
time_list = RECORD[ip]
while True:
val = time_list[-1]
if (ctime-60) > val:
time_list.pop()
else:
break
if len(time_list) > 10:
return False
time_list.insert(0,ctime)
return True
def wait(self):
import time
ctime = time.time()
first_in_time = RECORD[self.get_ident()][-1]
wt = 60 - (ctime - first_in_time)
return wt class MySimpleRateThrottle(SimpleRateThrottle):
scope = "wdp" def get_cache_key(self, request, view):
return self.get_ident(request) class LimitView(APIView):
authentication_classes = []
permission_classes = []
throttle_classes=[MySimpleRateThrottle,]
def get(self,request,*args,**kwargs):
self.dispatch
return Response('控制访问频率示例') def throttled(self, request, wait):
"""
If request is throttled, determine what kind of exception to raise.
""" class MyThrottled(exceptions.Throttled):
default_detail = '请求被限制.'
extra_detail_singular = 'Expected available in {wait} second.'
extra_detail_plural = '还需要再等待{wait}' raise MyThrottled(wait)
Views.py
REST_FRAMEWORK = {
'UNAUTHENTICATED_USER': None,
'UNAUTHENTICATED_TOKEN': None,
"DEFAULT_AUTHENTICATION_CLASSES": [
# "app02.utils.MyAuthentication",
],
'DEFAULT_PERMISSION_CLASSES':[ ],
'DEFAULT_THROTTLE_RATES':{
'wdp_anon':'5/minute',
'wdp_user':'10/minute', }
}
settings.py
b、对匿名用户进行限制:每个用户1分钟 允许访问5次,登录用户1分钟允许访问10次,VIP1分钟允许访问20次
from django.conf.urls import url
from django.contrib import admin
from app01 import views
from app02 import views as app02_view
from app03 import views as app03_view
from app04 import views as app04_view urlpatterns = [ # url(r'^limit/', app03_view.LimitView.as_view()),
url(r'^index/', app04_view.IndexView.as_view()),
url(r'^manage/', app04_view.ManageView.as_view()), ]
url.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle
from rest_framework.authentication import BaseAuthentication
from app02 import models class MyAuthentication(BaseAuthentication):
def authenticate(self, request):
token = request.query_params.get('token')
obj = models.UserInfo.objects.filter(token=token).first()
if obj:
return (obj.username,obj)
return None def authenticate_header(self, request):
pass class MyPermission(object):
message = "无权访问"
def has_permission(self,request,view):
if request.user:
return True
return False class AdminPermission(object):
message = "无权访问"
def has_permission(self,request,view):
if request.user == 'alex':
return True
return False class AnonThrottle(SimpleRateThrottle):
scope = "wdp_anon" def get_cache_key(self, request, view):
# 返回None,表示我不限制
# 登录用户我不管
if request.user:
return None
# 匿名用户
return self.get_ident(request) class UserThrottle(SimpleRateThrottle):
scope = "wdp_user" def get_cache_key(self, request, view):
# 登录用户
if request.user:
return request.user
# 匿名用户我不管
return None # 无需登录就可以访问
class IndexView(APIView):
authentication_classes = [MyAuthentication,]
permission_classes = []
throttle_classes=[AnonThrottle,UserThrottle,]
def get(self,request,*args,**kwargs):
self.dispatch
return Response('访问首页') # 需登录就可以访问
class ManageView(APIView):
authentication_classes = [MyAuthentication,]
permission_classes = [MyPermission,]
throttle_classes=[AnonThrottle,UserThrottle,]
def get(self,request,*args,**kwargs):
self.dispatch
return Response('访问首页')
views.py
REST_FRAMEWORK = {
'UNAUTHENTICATED_USER': None,
'UNAUTHENTICATED_TOKEN': None,
"DEFAULT_AUTHENTICATION_CLASSES": [
# "app02.utils.MyAuthentication",
],
'DEFAULT_PERMISSION_CLASSES':[ ],
'DEFAULT_THROTTLE_RATES':{
'wdp_anon':'5/minute',
'wdp_user':'10/minute', }
} CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.filebased.FileBasedCache',
'LOCATION': 'cache',
}
}
settings.py
c、基于用户IP限制访问频率
from django.conf.urls import url, include
from web.views import TestView urlpatterns = [
url(r'^test/', TestView.as_view()),
]
url.py
import time
from rest_framework.views import APIView
from rest_framework.response import Response from rest_framework import exceptions
from rest_framework.throttling import BaseThrottle
from rest_framework.settings import api_settings # 保存访问记录
RECORD = {
'用户IP': [12312139, 12312135, 12312133, ]
} class TestThrottle(BaseThrottle):
ctime = time.time def get_ident(self, request):
"""
根据用户IP和代理IP,当做请求者的唯一IP
Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
if present and number of proxies is > 0. If not use all of
HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
"""
xff = request.META.get('HTTP_X_FORWARDED_FOR')
remote_addr = request.META.get('REMOTE_ADDR')
num_proxies = api_settings.NUM_PROXIES if num_proxies is not None:
if num_proxies == 0 or xff is None:
return remote_addr
addrs = xff.split(',')
client_addr = addrs[-min(num_proxies, len(addrs))]
return client_addr.strip() return ''.join(xff.split()) if xff else remote_addr def allow_request(self, request, view):
"""
是否仍然在允许范围内
Return `True` if the request should be allowed, `False` otherwise.
:param request:
:param view:
:return: True,表示可以通过;False表示已超过限制,不允许访问
"""
# 获取用户唯一标识(如:IP) # 允许一分钟访问10次
num_request = 10
time_request = 60 now = self.ctime()
ident = self.get_ident(request)
self.ident = ident
if ident not in RECORD:
RECORD[ident] = [now, ]
return True
history = RECORD[ident]
while history and history[-1] <= now - time_request:
history.pop()
if len(history) < num_request:
history.insert(0, now)
return True def wait(self):
"""
多少秒后可以允许继续访问
Optionally, return a recommended number of seconds to wait before
the next request.
"""
last_time = RECORD[self.ident][0]
now = self.ctime()
return int(60 + last_time - now) class TestView(APIView):
throttle_classes = [TestThrottle, ] def get(self, request, *args, **kwargs):
# self.dispatch
print(request.user)
print(request.auth)
return Response('GET请求,响应内容') def post(self, request, *args, **kwargs):
return Response('POST请求,响应内容') def put(self, request, *args, **kwargs):
return Response('PUT请求,响应内容') def throttled(self, request, wait):
"""
访问次数被限制时,定制错误信息
""" class Throttled(exceptions.Throttled):
default_detail = '请求被限制.'
extra_detail_singular = '请 {wait} 秒之后再重试.'
extra_detail_plural = '请 {wait} 秒之后再重试.' raise Throttled(wait)
views.py
d. 基于用户IP显示访问频率(利于Django缓存)
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_RATES': {
'test_scope': '10/m',
},
}
setting.py
from django.conf.urls import url, include
from web.views import TestView urlpatterns = [
url(r'^test/', TestView.as_view()),
]
url.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework.views import APIView
from rest_framework.response import Response from rest_framework import exceptions
from rest_framework.throttling import SimpleRateThrottle class TestThrottle(SimpleRateThrottle): # 配置文件定义的显示频率的Key
scope = "test_scope" def get_cache_key(self, request, view):
"""
Should return a unique cache-key which can be used for throttling.
Must be overridden. May return `None` if the request should not be throttled.
"""
if not request.user:
ident = self.get_ident(request)
else:
ident = request.user return self.cache_format % {
'scope': self.scope,
'ident': ident
} class TestView(APIView):
throttle_classes = [TestThrottle, ] def get(self, request, *args, **kwargs):
# self.dispatch
print(request.user)
print(request.auth)
return Response('GET请求,响应内容') def post(self, request, *args, **kwargs):
return Response('POST请求,响应内容') def put(self, request, *args, **kwargs):
return Response('PUT请求,响应内容') def throttled(self, request, wait):
"""
访问次数被限制时,定制错误信息
""" class Throttled(exceptions.Throttled):
default_detail = '请求被限制.'
extra_detail_singular = '请 {wait} 秒之后再重试.'
extra_detail_plural = '请 {wait} 秒之后再重试.' raise Throttled(wait)
views.py
e. view中限制请求频率
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_RATES': {
'xxxxxx': '10/m',
},
}
setting
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_RATES': {
'xxxxxx': '10/m',
},
}
url.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework.views import APIView
from rest_framework.response import Response from rest_framework import exceptions
from rest_framework.throttling import ScopedRateThrottle # 继承 ScopedRateThrottle
class TestThrottle(ScopedRateThrottle): def get_cache_key(self, request, view):
"""
Should return a unique cache-key which can be used for throttling.
Must be overridden. May return `None` if the request should not be throttled.
"""
if not request.user:
ident = self.get_ident(request)
else:
ident = request.user return self.cache_format % {
'scope': self.scope,
'ident': ident
} class TestView(APIView):
throttle_classes = [TestThrottle, ] # 在settings中获取 xxxxxx 对应的频率限制值
throttle_scope = "xxxxxx" def get(self, request, *args, **kwargs):
# self.dispatch
print(request.user)
print(request.auth)
return Response('GET请求,响应内容') def post(self, request, *args, **kwargs):
return Response('POST请求,响应内容') def put(self, request, *args, **kwargs):
return Response('PUT请求,响应内容') def throttled(self, request, wait):
"""
访问次数被限制时,定制错误信息
""" class Throttled(exceptions.Throttled):
default_detail = '请求被限制.'
extra_detail_singular = '请 {wait} 秒之后再重试.'
extra_detail_plural = '请 {wait} 秒之后再重试.' raise Throttled(wait)
views.py
f. 匿名时用IP限制+登录时用Token限制
REST_FRAMEWORK = {
'UNAUTHENTICATED_USER': None,
'UNAUTHENTICATED_TOKEN': None,
'DEFAULT_THROTTLE_RATES': {
'luffy_anon': '10/m',
'luffy_user': '20/m',
},
}
setting.py
from django.conf.urls import url, include
from web.views.s3_throttling import TestView urlpatterns = [
url(r'^test/', TestView.as_view()),
]
url.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework.views import APIView
from rest_framework.response import Response from rest_framework.throttling import SimpleRateThrottle class LuffyAnonRateThrottle(SimpleRateThrottle):
"""
匿名用户,根据IP进行限制
"""
scope = "luffy_anon" def get_cache_key(self, request, view):
# 用户已登录,则跳过 匿名频率限制
if request.user:
return None return self.cache_format % {
'scope': self.scope,
'ident': self.get_ident(request)
} class LuffyUserRateThrottle(SimpleRateThrottle):
"""
登录用户,根据用户token限制
"""
scope = "luffy_user" def get_ident(self, request):
"""
认证成功时:request.user是用户对象;request.auth是token对象
:param request:
:return:
"""
# return request.auth.token
return "user_token" def get_cache_key(self, request, view):
"""
获取缓存key
:param request:
:param view:
:return:
"""
# 未登录用户,则跳过 Token限制
if not request.user:
return None return self.cache_format % {
'scope': self.scope,
'ident': self.get_ident(request)
} class TestView(APIView):
throttle_classes = [LuffyUserRateThrottle, LuffyAnonRateThrottle, ] def get(self, request, *args, **kwargs):
# self.dispatch
print(request.user)
print(request.auth)
return Response('GET请求,响应内容') def post(self, request, *args, **kwargs):
return Response('POST请求,响应内容') def put(self, request, *args, **kwargs):
return Response('PUT请求,响应内容')
views.py
限制访问的频率小总结:
---类: allow_request/wait PS: scope = "wdp_user"
---返回值:True、False
---配置:
---视图:
class IndexView(APIView):
throttle_classes=[AnonThrottle,UserThrottle,]
def get(self,request,*args,**kwargs):
self.dispatch
return Response('访问首页')
---全局:
REST_FRAMEWORK = {
"DEFAULT_THROTTLE_CLASSES":[
],
'DEFAULT_THROTTLE_RATES':{
'wdp_anon':'5/minute',
'wdp_user':'10/minute',
}
}
五、认证、权限、限制访问频率返回结果的比较:
认证、权限、限制访问频率这三个返回的结果:
认证返回的三种结果:
a、None
b、元组(user,auth)
c、异常 raise APIException(...)
权限的返回值三种结果:
a、True 有权限
b、False 没权限
c、异常
限制访问的频率返回值的两种结果:
a、True
b、False