drf源码系列

过滤器

对查询出来的数据进行筛选可写可不写

from rest_framework.filters import BaseFilterBackend
源码
'''
def filter_queryset(self, request, queryset, view):
#继承了这个类必须写这个方法不然报错
raise NotImplementedError(".filter_queryset() must be overridden.")
'''
view是当前视图self
queryset是筛选之后的数据
request 就是request
def filter_queryset(self,request,queryset,view):
pass

使用方法

对查询出来的数据进行筛选可写可不写
#第一部分
from rest_framework.filters import BaseFilterBackend
#继承类
class MyFilterBack(BaseFilterBackend):
def filter_queryset(self,request,queryset,view):
val = request.query_params.get('cagetory')
return queryset.filter(category_id=val) #先实例化一个
class Indexview(APIview):
def get(self,request,*arg,**kwargs):
#就是查询一个表不理他
queryset=models.News.objects.all()
#实例化一个类对象
obj=MyFilterBack()
#传值顺序request,queryset,self
result=obj.filter_queryset(request,queryset,self)
return Response('...')

视图类

F查询

 def get(self,request,*args,**kwargs):
# 1.获取单挑数据再做序列化
result = super().get(request,*args,**kwargs)
# 2.对浏览器进行自加1
pk = kwargs.get('pk')
models.Article.objects.filter(pk=pk).update(read_count=F('read_count')+1)
# 3.对浏览器进行自加1
# instance = self.get_object()
# instance.read_count += 1
# instance.save()
return result

apiview

提供了公共方法
request.data 是request.post的封装 json序列化
dispatch()分发
self封装了request属性
self.request.method

源码剖析

 @classmethod
def as_view(cls, **initkwargs):
if isinstance(getattr(cls, 'queryset', None), models.query.QuerySet):
def force_evaluation():
raise RuntimeError(
'Do not evaluate the `.queryset` attribute directly, '
'as the result will be cached and reused between requests. '
'Use `.all()` or call `.get_queryset()` instead.'
)
cls.queryset._fetch_all = force_evaluation
#执行父类的as_view方法
view = super().as_view(**initkwargs)
#执行父类的super().as_view(**initkwargs)
view.cls = cls
view.initkwargs = initkwargs
#闭包 csrf_exempt免除csrftoken的认证
return csrf_exempt(view) -------------------------------------- @classonlymethod
def as_view(cls, **initkwargs):
"""
Main entry point for a request-response process.
"""
for key in initkwargs:
if key in cls.http_method_names:
raise TypeError("You tried to pass in the %s method name as a "
"keyword argument to %s(). Don't do that."
% (key, cls.__name__))
if not hasattr(cls, key):
raise TypeError("%s() received an invalid keyword %r. as_view "
"only accepts arguments that are already "
"attributes of the class." % (cls.__name__, key)) def view(request, *args, **kwargs):
self = cls(**initkwargs)
if hasattr(self, 'get') and not hasattr(self, 'head'):
self.head = self.get
self.request = request
self.args = args
self.kwargs = kwargs
#执行本类的self.dispatch
return self.dispatch(request, *args,**kwargs)
#执行view
return view -------------apiview的dispatch--------
def dispatch(self, request, *args, **kwargs):
"""
`.dispatch()` is pretty much the same as Django's regular dispatch,
but with extra hooks for startup, finalize, and exception handling.
"""
self.args = args
self.kwargs = kwargs
#封装老值request
request = self.initialize_request(request, *args, **kwargs)
self.request = request
self.headers = self.default_response_headers # deprecate? try:
self.initial(request, *args, **kwargs) # Get the appropriate handler method
if request.method.lower() in self.http_method_names:
handler = getattr(self, request.method.lower(),
self.http_method_not_allowed)
else:
handler = self.http_method_not_allowed response = handler(request, *args, **kwargs) except Exception as exc:
response = self.handle_exception(exc) self.response = self.finalize_response(request, response, *args, **kwargs)
return self.response

GenericAPIView

#一 初识
from rest_framework.generics import GenericAPIView
GenericAPIview继承了APIview
class NewView(GenericAPIview):
querset=model.News.objects.all()
def get(self,request,*arg,**kwargs):
#self对应NewView的对象
self.filter_queryset(self.queryset)
1.执行对应的filter_queryset方法
2.本类没有去寻找父类GenericAPIview
'''
def filter_queryset(self, queryset):
for backend in list(self.filter_backends):
queryset = backend().filter_queryset(self.request, queryset, self)
return queryset '''#所以默认返回原来的queryset
3.寻找filter_backends
'''
filter_backends = api_settings.DEFAULT_FILTER_BACKENDS
filter_backends是空的
'''

1. 本类重写filter_backends

from rest_framework.generics import GenericAPIView
#GenericAPIview继承了APIview
class NewFiltrBackend(BaseFilterBackend):
def filter_queryset(self,request,queryset,view):
val = request.query_params.get('cagetory')
return queryset.filter(category_id=val)
class NewView(GenericAPIview):
querset=model.News.objects.all()
filter_backends=[NewFiltrBackend ,]
def get(self,request,*arg,**kwargs):
#self对应NewView的对象
v=self.get_queryset()
queryset=self.filter_queryset(v)
1.执行对应的filter_queryset方法
2.本类没有去寻找父类GenericAPIview
'''
源码 继承类不写self.queryset会报错
assert self.queryset is not None, (
"'%s' should either include a `queryset` attribute, "
"or override the `get_queryset()` method."
% self.__class__.__name__
)
''' '''
def filter_queryset(self, queryset):
for backend in list(self.filter_backends):
queryset = backend().filter_queryset(self.request, queryset, self)
return queryset '''4.backend等于对应NewFiltrBackend类名() 实例化对象
·· 执行NewFiltrBackend里面的filter_queryset方法
3.寻找filter_backends,本类的filter_backends
#filter_backends=[NewFiltrBackend ,]
5.queryset=self.filter_queryset(self.queryset)是筛选之后的结果
#v=self.get_queryset()
queryset=self.filter_queryset(v)
6.寻找对应get_queryset
'''
源码
queryset = self.queryset
if isinstance(queryset, QuerySet):
# Ensure queryset is re-evaluated on each request.
queryset = queryset.all()
return queryset '''#返回等于筛选之后的queryset
#queryset=self.filter_queryset(queryset)
#queryset=get_queryset()
·········self.get_serializer()···············
class Newserializers(serializers.ModelSerializer):
class Meta:
model=models.News
fields="__all__"
class NewFiltrBackend(BaseFilterBackend):
def filter_queryset(self,request,queryset,view):
val = request.query_params.get('cagetory')
return queryset.filter(category_id=val)
class NewView(GenericAPIview):
querset=model.News.objects.all()
filter_backends=[NewFiltrBackend ,]
def get(self,request,*arg,**kwargs):
#self对应NewView的对象
v=self.get_queryset()
queryset=self.filter_queryset(v)
self.get_serializer()#本类没有去父类找
#代替了 ser=Newserializers(instance=queryset,many=True)
#ser.data 1.寻找get_serializer()
2.
'''
源码
def get_serializer(self, *args, **kwargs):
serializer_class = self.get_serializer_class()
kwargs['context'] = self.get_serializer_context()
return serializer_class(*args, **kwargs)
'''
3.进行寻找get_serializer_class()本类没有去父类找
#serializer_class = self.get_serializer_class()
4.
'''
def get_serializer_class(self):
assert self.serializer_class is not None, (
"'%s' should either include a `serializer_class` attribute, "
"or override the `get_serializer_class()` method."
% self.__class__.__name__
) return self.serializer_class
''' #return self.serializer_class 返回serializer_class
#get_serializer()=serializer_class
在本类定义seializer_class
seializer_class=Newserializers 相当于 #ser=Newserializers(instance=queryset,many=True)
#ser.data
ser=self.get_serializer(instance=queryset,many=True)
ser.data
​``````````````````````````````
分页
querset=model.News.objects.all()
pagination_class =PageNumberPagination
self.paginate_queryset(queryset)
'''
源码
self.paginate_queryset() self._paginator = self.pagination_class()
需要定义
def paginator(self):
if not hasattr(self, '_paginator'):
if self.pagination_class is None:
self._paginator = None
else:
self._paginator = self.pagination_class()
return self._paginator '''pagination_class()需要本地定义
pagination_class =PageNumberPagination

GenericAPIView源码总结

    源码的剖析
def get_queryset(self):
"""
Get the list of items for this view.
This must be an iterable, and may be a queryset.
Defaults to using `self.queryset`. This method should always be used rather than accessing `self.queryset`
directly, as `self.queryset` gets evaluated only once, and those results
are cached for all subsequent requests. You may want to override this if you need to provide different
querysets depending on the incoming request. (Eg. return a list of items that is specific to the user)
"""
assert self.queryset is not None, (
"'%s' should either include a `queryset` attribute, "
"or override the `get_queryset()` method."
% self.__class__.__name__
) queryset = self.queryset
if isinstance(queryset, QuerySet):
# Ensure queryset is re-evaluated on each request.
queryset = queryset.all()
return queryset def get_object(self):
"""
Returns the object the view is displaying. You may want to override this if you need to provide non-standard
queryset lookups. Eg if objects are referenced using multiple
keyword arguments in the url conf.
"""
queryset = self.filter_queryset(self.get_queryset()) # Perform the lookup filtering.
lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field assert lookup_url_kwarg in self.kwargs, (
'Expected view %s to be called with a URL keyword argument '
'named "%s". Fix your URL conf, or set the `.lookup_field` '
'attribute on the view correctly.' %
(self.__class__.__name__, lookup_url_kwarg)
) filter_kwargs = {self.lookup_field: self.kwargs[lookup_url_kwarg]}
obj = get_object_or_404(queryset, **filter_kwargs) # May raise a permission denied
self.check_object_permissions(self.request, obj) return obj def get_serializer(self, *args, **kwargs):
"""
Return the serializer instance that should be used for validating and
deserializing input, and for serializing output.
"""
serializer_class = self.get_serializer_class()
kwargs['context'] = self.get_serializer_context()
return serializer_class(*args, **kwargs) def get_serializer_class(self):
"""
Return the class to use for the serializer.
Defaults to using `self.serializer_class`. You may want to override this if you need to provide different
serializations depending on the incoming request. (Eg. admins get full serialization, others get basic serialization)
"""
assert self.serializer_class is not None, (
"'%s' should either include a `serializer_class` attribute, "
"or override the `get_serializer_class()` method."
% self.__class__.__name__
) return self.serializer_class def get_serializer_context(self):
"""
Extra context provided to the serializer class.
"""
return {
'request': self.request,
'format': self.format_kwarg,
'view': self
} def filter_queryset(self, queryset):
"""
Given a queryset, filter it with whichever filter backend is in use. You are unlikely to want to override this method, although you may need
to call it either from a list view, or from a custom `get_object`
method if you want to apply the configured filtering backend to the
default queryset.
"""
for backend in list(self.filter_backends):
queryset = backend().filter_queryset(self.request, queryset, self)
return queryset
源码解释
from rest_framework.generics import GenericAPIView
from rest_framework.filters import BaseFilterBackend
from rest_framework import serializers
GenericAPIview继承了APIview
class Newserializers(serializers.ModelSerializer):
class Meta:
model=models.News
fields="__all__"
class NewFiltrBackend(BaseFilterBackend):
def filter_queryset(self,request,queryset,view):
val = request.query_params.get('cagetory')
return queryset.filter(category_id=val)
class NewView(GenericAPIview):
querset=model.News.objects.all()
filter_backends=[NewFiltrBackend ,]
seializer_class=Newserializers
def get(self,request,*arg,**kwargs):
#self对应NewView的对象
v=self.get_queryset()
queryset=self.filter_queryset(v)
self.get_serializer(instance==queryset,many=True)
retutn Response(ser.data)
'''
1.querset=model.News.objects.all()
2.filter_backends=[NewFiltrBackend ,]
3.seializer_class=Newserializers
4.pagination_class =PageNumberPagination
1.查询
self.get_queryset()#等于querset=model.News.objects.all()
2.序列化
self.get_serializer()等于seializer_class=Newserializers()
3.筛选
self.filter_queryset()
等于filter_backends=[NewFiltrBackend ,]
内部有一个for循环列表 同等与NewFiltrBackend()
4.分页
self.paginate_queryset(queryset)
等于page_obj=PageNumberPagination()
'''

基于GenericAPIView

listapiview查看

基于GenericAPIView

模拟get请求

1.queryset

2.分页

#settings的配置
paginator.paginate_queryset(queryset, self.request, view=self)
"DEFAULT_PAGINATION_CLASS":"rest_framework.pagination.PageNumberPagination", "PAGE_SIZE":2, 3.序列化 4.返回序列化数据

源码剖析

 #源码
def get(self, request, *args, **kwargs):
return self.list(request, *args, **kwargs)
1.执行list
def list(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
#self.get_queryset()执行自定的
#queryset=model.News.objects.all()
#self.filter_queryset()筛选执行
#filter_backends=[NewFiltrBackend ,]
#执行对应本类的filter_queryset
2. page = self.paginate_queryset(queryset)
#分页
#执行 pagination_class =PageNumberPagination
if page is not None:
3.serializer = self.get_serializer(page, many=True)
#序列化 执行seializer_class=Newserializers
return self.get_paginated_response(serializer.data) serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
#最后返回序列化的数据

RetrieveAPIView单条查看

模拟get请求

执行流程

1. #对应获得对象 自己要定义单条数据 queryset
2. #序列化
3. #返回数据

源码剖析

class RetrieveAPIView(mixins.RetrieveModelMixin,
GenericAPIView):
#从左往右继承
def get(self, request, *args, **kwargs):
return self.retrieve(request, *args, **kwargs)
#去找retrieve这个方法
class RetrieveModelMixin:
"""
Retrieve a model instance.
"""
def retrieve(self, request, *args, **kwargs):
#对应获得对象
instance = self.get_object()
#序列化
serializer = self.get_serializer(instance)
#返回数据
return Response(serializer.data) 执行 def get_object(self):
"""
Returns the object the view is displaying. You may want to override this if you need to provide non-standard
queryset lookups. Eg if objects are referenced using multiple
keyword arguments in the url conf.
"""
queryset = self.filter_queryset(self.get_queryset()) # Perform the lookup filtering.
lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field assert lookup_url_kwarg in self.kwargs, (
'Expected view %s to be called with a URL keyword argument '
'named "%s". Fix your URL conf, or set the `.lookup_field` '
'attribute on the view correctly.' %
(self.__class__.__name__, lookup_url_kwarg)
) return obj

基于GenericAPIView

createapiview创建

基于GenericAPIView

封装了post请求

执行流程

1.序列化
2.序列化验证
3.保存数据

源码剖析

    def post(self, request, *args, **kwargs):
return self.create(request, *args, **kwargs) 1.执行create
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
#序列化
serializer.is_valid(raise_exception=True)
#序列化验证
self.perform_create(serializer)
'''
保存数据
def perform_create(self, serializer):
serializer.save()
''' headers = self.get_success_headers(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)

示例使用

#第一种写法重写create书写 多条序列化器
def create(self, request, *args, **kwargs): article = articleseralizer(data=request.data)
articleDetail = atricleDetaliser(data=request.data)
if article.is_valid() and articleDetail.is_valid():
article_obj = article.save(author=request.user)
print(">>>>>>>>>", request.user, type(request.user))
articleDetail.save(article=article_obj)
return Response('修改成功')
else:
return Response(f"{article.errors},{articleDetail.errors}") #第二种方式
重写序列化器分发
def get_serializer_class(self):
self.request.method=="POST":
return 对应序列化器
self.request.method=="GET":
return 对应序列化器
def perform_create(self,serializer):
Aritlel=serializer.save(author)
serializer_second=AricleSerializer(data=request.data)
serializer.save(aritle=Aritlel)
字段对象=对象

UpdateAPIView全局局部

封装了局部更新和全局更新

基于GenericAPIView

执行流程

1.获取更新那条数据的对象
2.序列化
2.序列化校验
3.校验成功更新

源码剖析


def put(self, request, *args, **kwargs):
return self.update(request, *args, **kwargs)
'''
def update(self, request, *args, **kwargs):
partial = kwargs.pop('partial', False)
instance = self.get_object()
'''
#执行queryset查询处理的model对象
#相当于
#queryset=model.表名.object.filter(pk=pk).frist()
'''
serializer = self.get_serializer(instance, data=request.data, partial=partial)默认不为Ture
#执行对应的序列化
serializer.is_valid(raise_exception=True)
#序列化验证
self.perform_update(serializer)
'''
# def perform_update(self, serializer):
#serializer.save() def patch(self, request, *args, **kwargs):
return self.partial_update(request, *args, **kwargs)
'''
def partial_update(self, request, *args, **kwargs):
kwargs['partial'] = True
return self.update(request, *args, **kwargs)
'''
执行
'''
def update(self, request, *args, **kwargs):
partial = kwargs.pop('partial', False)
instance = self.get_object()
'''
#执行queryset查询处理的model对象
#相当于
#queryset=model.表名.object.filter(pk=pk).frist()
'''
serializer = self.get_serializer(instance, data=request.data, partial=partial)
kwargs['partial'] = True 局部更新
#执行对应的序列化
serializer.is_valid(raise_exception=True)
#序列化验证
self.perform_update(serializer)
'''
# def perform_update(self, serializer):
#serializer.save()

删除DestroyAPIView

执行流程

1.传入对象
2.执行def perform_destroy(self, instance):
3.删除

源码剖析

 def delete(self, request, *args, **kwargs):
return self.destroy(request, *args, **kwargs)
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
#c传入对象删除
self.perform_destroy(instance)
return Response(status=status.HTTP_204_NO_CONTENT) def perform_destroy(self, instance):
instance.delete()#删除

Viewset视图类

是APIview视图类的继承对象

createapiview
UpdateAPIview。。。。等等

使用方法

1.直接继承即可
2.因为没有封装对应get post等请求方法 一点点去找父类的方法
3.GenericViewSet(ViewSetMixin)它继承的ViewSetMixin
4.重写了asview()需要传参
5.使用两个类进行id和没有id的区分
url(r'^article/$',article.AtricleView.as_view({"get":"list"}))
url(r'^article/(?P<pk>\d+)/$',article.AtricleView.as_view({"get":"retrieve"}))

知识点

from rest_framework.viewsets import GenericVIewSet
class GenericViewSet(ViewSetMixin,generics.GenericAPIView
):
默认继承GenericAPIView
def get_serializer_class(self):
pk = self.kwargs.get('pk')
if pk:
return ArticleDetailSerializer
return ArticleSerializer

ListModelMixin添加

源码

 def list(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset()) page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data) serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)

RetrieveModelMixin单条查询

源码


def retrieve(self, request, *args, **kwargs):
instance = self.get_object()
serializer = self.get_serializer(instance)
return Response(serializer.data)

UpdateModelMixin更新

源码

    def update(self, request, *args, **kwargs):
partial = kwargs.pop('partial', False)
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
self.perform_update(serializer) if getattr(instance, '_prefetched_objects_cache', None):
# If 'prefetch_related' has been applied to a queryset, we need to
# forcibly invalidate the prefetch cache on the instance.
instance._prefetched_objects_cache = {} return Response(serializer.data) def perform_update(self, serializer):
serializer.save() #
def partial_update(self, request, *args, **kwargs):
kwargs['partial'] = True
return self.update(request, *args, **kwargs)
class

DestroyModelMixin:

删除

源码

class DestroyModelMixin:
"""
Destroy a model instance.
"""
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
self.perform_destroy(instance)
return Response(status=status.HTTP_204_NO_CONTENT) def perform_destroy(self, instance):
instance.delete()

版本

setting配置

    default_version = api_settings.DEFAULT_VERSION
#默认版本
allowed_versions = api_settings.ALLOWED_VERSIONS
#指定版本
version_param = api_settings.VERSION_PARAM
#url版本传输关键字
#指定版本类类
"DEFAULT_VERSIONING_CLASS":"rest_framework.versioning.URLPathVersioning",

知识点

dispath 老的request 封装了很多功能

request.version#版本
scheme#版本对象
#执行返回一个版本元组 第一个是版本第二个是版本对象
def determine_version(self, request, *args, **kwargs):
"""
If versioning is being used, then determine any API version for the
incoming request. Returns a two-tuple of (version, versioning_scheme)
"""
if self.versioning_class is None:
return (None, None)
scheme = self.versioning_class()
return (scheme.determine_version(request, *args, **kwargs), scheme)
request.version, request.versioning_scheme = version, scheme

源码执行流程


class APIView(View):
versioning_class = api_settings.DEFAULT_VERSIONING_CLASS def dispatch(self, request, *args, **kwargs): # ###################### 第一步 ###########################
"""
request,是django的request,它的内部有:request.GET/request.POST/request.method
args,kwargs是在路由中匹配到的参数,如:
url(r'^order/(\d+)/(?P<version>\w+)/$', views.OrderView.as_view()),
http://www.xxx.com/order/1/v2/
"""
self.args = args
self.kwargs = kwargs """
request = 生成了一个新的request对象,此对象的内部封装了一些值。
request = Request(request)
- 内部封装了 _request = 老的request
"""
request = self.initialize_request(request, *args, **kwargs)
self.request = request self.headers = self.default_response_headers # deprecate? try:
# ###################### 第二步 ###########################
self.initial(request, *args, **kwargs) 执行视图函数。。 def initial(self, request, *args, **kwargs): # ############### 2.1 处理drf的版本 ##############
version, scheme = self.determine_version(request, *args, **kwargs)
request.version, request.versioning_scheme = version, scheme
... def determine_version(self, request, *args, **kwargs):
if self.versioning_class is None:
return (None, None)
#是版本类的实例化对象
scheme = self.versioning_class()
# obj = XXXXXXXXXXXX()
return (scheme.determine_version(request, *args, **kwargs), scheme) class OrderView(APIView):
versioning_class = URLPathVersioning
def get(self,request,*args,**kwargs):
print(request.version)
print(request.versioning_scheme)
return Response('...') def post(self,request,*args,**kwargs):
return Response('post')

1.局部配置

视图写法

from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.request import Request
from rest_framework.versioning import URLPathVersioning class OrderView(APIView): versioning_class = URLPathVersioning
def get(self,request,*args,**kwargs):
print(request.version)
print(request.versioning_scheme)
return Response('...') def post(self,request,*args,**kwargs):
return Response('post')

setting配置

 REST_FRAMEWORK = {
"ALLOWED_VERSIONS":['v1','v2'],
'VERSION_PARAM':'version'
}

2.全局配置

不用手动一个个加全部都配置上

url

url(r'^(?P<version>[v1|v2]+)/users/$', users_list, name='users-list'),

url(r'^(?P<version>\w+)/users/$', users_list, name='users-list'),

settings配置

REST_FRAMEWORK = {
指定版本类 必须要写要些路径 "DEFAULT_VERSIONING_CLASS":"rest_framework.versioning.URLPathVersioning",
#限制版本范围
"ALLOWED_VERSIONS":['v1','v2'],
#指定url有名分组的关键字
'VERSION_PARAM':'version'
}

认证

自定义认证token
把数据库的user拿出来比较
如果想使用**request.data 就要把他变为字典 所以前端要返回json数据类型
源码写法
#配置
authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES

局部配置

和登陆配合使用
import uuid
from django.shortcuts import render
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from rest_framework.versioning import URLPathVersioning
from rest_framework.views import APIView
from rest_framework.response import Response
class Loginview(APIView):
versioning_class = None
#认证写法
authentication_classes = [Myauthentication, ] def post(self,request,*args,**kwargs):
user_object = models.UserInfo.objects.filter(**request.data).first()
if not user_object:
return Response('登录失败')
random_string = str(uuid.uuid4())
user_object.token = random_string
user_object.save()
return Response(random_string) class MyAuthentication:
def authenticate(self, request):
"""
Authenticate the request and return a two-tuple of (user, token).
"""
token = request.query_params.get('token')
user_object = models.UserInfo.objects.filter(token=token).first()
if user_object:
return (user_object,token)
return (None,None) class OrderView(APIView):
authentication_classes = [MyAuthentication, ]
def get(self,request,*args,**kwargs):
print(request.user)
print(request.auth)
return Response('order') class UserView(APIView):
authentication_classes = [MyAuthentication,]
def get(self,request,*args,**kwargs):
print(request.user)
print(request.auth)
return Response('user') ···············手写认证类···························· class Myauthentication:
# 认证
# versioning_class = None
#手写认证
def authenticate(self, request):
token = request.query_params.get('token')
print(token)
user_object = models.UserInfo.objects.filter(token=token).first()
print(user_object)
if user_object:
print(1)
#必须返回一个元组的形式
#返回对应的user_object和token值 return (user_object, token)
else:
print(2)
return (None, None) ----------------------
为什么要返回元组
----------------------
#为什么要返回元组
def _authenticate(self):
"""
Attempt to authenticate the request using each authentication instance
in turn.
"""
for authenticator in self.authenticators:
try:
user_auth_tuple =
#到这返回为什么可以执行对应的方法
authenticator.authenticate(self)
except exceptions.APIException:
self._not_authenticated()
raise if user_auth_tuple is not None:
#把查询出来的元组
self._authenticator = authenticator
#给user和auth进行赋值
self.user, self.auth = user_auth_tuple#必须返回一个元组的形式
return#结束函数 self._not_authenticated() def _not_authenticated(self):
"""
Set authenticator, user & authtoken representing an unauthenticated request. Defaults are None, AnonymousUser & None.
"""
self._authenticator = None if api_settings.UNAUTHENTICATED_USER:
#user 没有设置返回none
self.user =
#匿名用户
api_settings.UNAUTHENTICATED_USER()
else:
self.user = None if api_settings.UNAUTHENTICATED_TOKEN:
self.auth =
#对应author没有设置返回none
api_settings.UNAUTHENTICATED_TOKEN()
else:
self.auth = None

全局配置

DEFAULT_AUTHENTICATION_CLASSES=['写一个认证的类手写的']
class Myauthentication:
# 认证
# versioning_class = None
#手写认证
def authenticate(self, request):
token = request.query_params.get('token')
print(token)
user_object = models.UserInfo.objects.filter(token=token).first()
print(user_object)
if user_object:
print(1)
#必须返回一个元组的形式
#返回对应的user_object和token值 return (user_object, token)
else:
print(2)
return (None, None)
'''
# raise Exception(), 不在继续往下执行,直接返回给用户。
# return None ,本次认证完成,执行下一个认证
# return ('x',"x"),认证成功,不需要再继续执行其他认证了,继续往后权限、节流、视图函数
'''

view视图写法

class LoginView(APIView):
authentication_classes = []
def post(self,request,*args,**kwargs):
user_object = models.UserInfo.objects.filter(**request.data).first()
if not user_object:
return Response('登录失败')
random_string = str(uuid.uuid4())
user_object.token = random_string
user_object.save()
return Response(random_string) class OrderView(APIView):
# authentication_classes = [TokenAuthentication, ]
def get(self,request,*args,**kwargs):
print(request.user)
print(request.auth)
if request.user:
return Response('order')
return Response('滚') class UserView(APIView):
同上

源码执行流程

'''
当请求发过来的实话会先执行 apiview里面的dispath方法
1.执行认证的dispatch的方法
2.dispatch方法会执行initialize_request方法封装旧的request对象 3.在封装的过程中会执行
get_authenticators(self): 并实例化列表中的每一个认证类
如果自己写的类有对应的认证类 会把认证类的实例化对象封装到新的request当中
继续执行initial
会执行认证里面的perform_authentication 执行request.user
会执行循环每一个对象执行对应的authenticate方法
会有三种返回值
# raise Exception(), 不在继续往下执行,直接返回给用户。
# return None ,本次认证完成,执行下一个认证
# return ('x',"x"),认证成功,不需要再继续执行其他认证了,继续往后权限、节流、视图函数 里面会执行四件事
版本
认证
权限
节流
'''
def dispatch(self, request, *args, **kwargs):
"""
`.dispatch()` is pretty much the same as Django's regular dispatch,
but with extra hooks for startup, finalize, and exception handling.
"""
self.args = args
self.kwargs = kwargs
#封装旧的request
request = self.initialize_request(request, *args, **kwargs)
self.request = request
self.headers = self.default_response_headers # deprecate?
循环自定义的认证
#request封装旧的request执行initialize_request def initialize_request(self, request, *args, **kwargs):
"""
Returns the initial request object.
"""
parser_context = self.get_parser_context(request) return Request(
request,
parsers=self.get_parsers(),
authenticators=self.get_authenticators(),
negotiator=self.get_content_negotiator(),
parser_context=parser_context
) def get_authenticators(self):
"""
Instantiates and returns the list of authenticators that this view can use.
""" return [auth() for auth in self.authentication_classes] def initial(self, request, *args, **kwargs):
"""
Runs anything that needs to occur prior to calling the method handler.
"""
self.format_kwarg = self.get_format_suffix(**kwargs) # Perform content negotiation and store the accepted info on the request
neg = self.perform_content_negotiation(request)
request.accepted_renderer, request.accepted_media_type = neg # Determine the API version, if versioning is in use.
version, scheme = self.determine_version(request, *args, **kwargs)
request.version, request.versioning_scheme = version, scheme # Ensure that the incoming request is permitted
self.perform_authentication(request)
self.check_permissions(request)
self.check_throttles(request) #执行完四件事在执行视图

权限

message=消息 自定义错误消息
可以自定义错误信息
'''
for permission in self.get_permissions():
if not permission.has_permission(request, self):
self.permission_denied(
request, message=getattr(permission, 'message', None)
)
''' 本质上自定义和
return Flase
单条验证
验证两次
不写url_kwarg默认封装识别pk关键字

重写错误信息的两种方式

第一种

message={"code":"恭喜你报错了"}

第二种

from rest_framework.permissions import BasePermission
from rest_framework import exceptions class MyPermission(BasePermission):
#第一种
message = {'code': 10001, 'error': '你没权限'}
def has_permission(self, request, view):
#request.user执行认证
if request.user:
return True
#自定义错误信息
raise exceptions.PermissionDenied({'code': 10001, 'error': '你没权限'})
return False def has_object_permission(self, request, view, obj):
"""
Return `True` if permission is granted, `False` otherwise.
"""
return False #对应源码
相当于重写了错误信息
#执行顺序
dispatch---》intital--》check_permissions--》permission_denied
def permission_denied(self, request, message=None):
"""
If request is not permitted, determine what kind of exception to raise.
"""
if request.authenticators and not request.successful_authenticator:
raise exceptions.NotAuthenticated()
raise
#等于message 或者自定义错误函数
#raise exceptions.PermissionDenied({'code': 10001, 'error': '你没权限'})
exceptions.PermissionDenied(detail=message) ················源码··················
def check_permissions(self, request):
for permission in self.get_permissions():
if not permission.has_permission(request, self):
self.permission_denied(
request, message=getattr(permission, 'message', None)
)

源码剖析

执行流程
apiview
---
dispatch
---
initial
---
执行权限判断
如果自己写了权限类 会先循环权限类并实例化存在列表当中
然后循环列表对象,如果权限判断返回不为Ture机会进行主动抛出异常
可以自定义错误信息
具体代码
class APIView(View):
permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES def dispatch(self, request, *args, **kwargs):
封装request对象
self.initial(request, *args, **kwargs)
通过反射执行视图中的方法 def initial(self, request, *args, **kwargs):
版本的处理
# 认证
self.perform_authentication(request) # 权限判断
self.check_permissions(request) self.check_throttles(request)
#执行认证
def perform_authentication(self, request):
request.user def check_permissions(self, request):
# [对象,对象,]
self.执行
'''
def get_permissions(self):
return [permission() for permission in self.permission_classes]
'''
for permission in self.get_permissions():
if not permission.has_permission(request, self):
self.permission_denied(request, message=getattr(permission, 'message', None))
def permission_denied(self, request, message=None):
if request.authenticators and not request.successful_authenticator:
raise exceptions.NotAuthenticated()
raise exceptions.PermissionDenied(detail=message) # class UserView(APIView):
permission_classes = [MyPermission, ] def get(self,request,*args**kwargs):
return Response('user')

使用

class MyPermission(BasePermission):
message = {'code': 10001, 'error': '你没权限'}
def has_permission(self, request, view):
#request.user执行认证
if request.user:
return True raise exceptions.PermissionDenied({'code': 10001, 'error': '你没权限'})
return False def has_object_permission(self, request, view, obj):
"""
Return `True` if permission is granted, `False` otherwise.
"""
return False class UserView(APIView):
先执行上面的数据
permission_classes = [MyPermission, ]
#再执行
def get(self,request,*args**kwargs):
return Response('user')

源码流程

    def dispatch(self, request, *args, **kwargs):
"""
`.dispatch()` is pretty much the same as Django's regular dispatch,
but with extra hooks for startup, finalize, and exception handling.
"""
self.args = args
self.kwargs = kwargs
request = self.initialize_request(request, *args, **kwargs)
self.request = request
self.headers = self.default_response_headers # deprecate? try:
#走这
self.initial(request, *args, **kwargs)
​`````````````````def initial`````````````````````````` def initial(self, request, *args, **kwargs):
"""
Runs anything that needs to occur prior to calling the method handler.
"""
self.format_kwarg = self.get_format_suffix(**kwargs) # Perform content negotiation and store the accepted info on the request
neg = self.perform_content_negotiation(request)
request.accepted_renderer, request.accepted_media_type = neg # Determine the API version, if versioning is in use.
version, scheme =
'''
第三步
'''
self.determine_version(request, *args, **kwargs)
request.version, request.versioning_scheme = version, scheme # Ensure that the incoming request is permitted
self.perform_authentication(request)
self.check_permissions(request)
self.check_throttles(request) ---------------------------------------
# 第二步
def check_permissions(self, request):
# [对象,对象,]
self.执行
'''
def get_permissions(self):
return [permission() for permission in self.permission_classes]
'''
for permission in self.get_permissions():
if not permission.has_permission(request, self):
self.permission_denied(request, message=getattr(permission, 'message', None))
def permission_denied(self, request, message=None):
if request.authenticators and not request.successful_authenticator:
raise exceptions.NotAuthenticated()
raise exceptions.PermissionDenied(detail=message)

第三步

    def determine_version(self, request, *args, **kwargs):
if self.versioning_class is None:
return (None, None)
scheme = self.versioning_class()
return (scheme.determine_version(request, *args, **kwargs), scheme)

单挑数据的验证

单挑数据

def has_object_permission(self, request, view, obj):
"""
Return `True` if permission is granted, `False` otherwise.
"""
return False
#在执行RetrieveAPIView 会执行单挑数据的验证 def get_object(self): queryset = self.filter_queryset(self.get_queryset()) # Perform the lookup filtering.
#不写url_kwarg默认封装识别pk关键字
lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field assert lookup_url_kwarg in self.kwargs, (
'Expected view %s to be called with a URL keyword argument '
'named "%s". Fix your URL conf, or set the `.lookup_field` '
'attribute on the view correctly.' %
(self.__class__.__name__, lookup_url_kwarg)
) filter_kwargs = {self.lookup_field: self.kwargs[lookup_url_kwarg]}
obj = get_object_or_404(queryset, **filter_kwargs) # May raise a permission denied
#检验单挑数据的权限
self.check_object_permissions(self.request, obj) return obj ···················check_object_permissions··········
def check_object_permissions(self, request, obj):
"""
Check if the request should be permitted for a given object.
Raises an appropriate exception if the request is not permitted.
"""
for permission in self.get_permissions():
if not permission.has_object_permission(request, self, obj):
self.permission_denied(
request, message=getattr(permission, 'message', None)
) 执行has_object_permission
def has_object_permission(self, request, view, obj):
return True

版本认证权限执行流程

1.首先所有方法都执行父类的apiview方法
2.执行dispatch方法 3.进行
def get_authenticators(self):
return [auth() for auth in self.authentication_classes]
4.
intiail request.user

drf源码系列

如果代码中出现了CSRF使用装饰器即可

from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator def csrf_exempt(view_func):
"""
Marks a view function as being exempt from the CSRF view protection.
"""
# We could just do view_func.csrf_exempt = True, but decorators
# are nicer if they don't have side-effects, so we return a new
# function.
def wrapped_view(*args, **kwargs):
return view_func(*args, **kwargs)
wrapped_view.csrf_exempt = True
return wraps(view_func, assigned=available_attrs(view_func))(wrapped_view)

drf访问频率的限制

- 匿名用户,用IP作为用户唯一标记,但如果用户换代理IP,无法做到真正的限制。
- 登录用户,用用户名或用户ID做标识。可以做到真正的限制

知识点

当前这个字典存放在django的缓存中
{
throttle_anon_1.1.1.1:[100121340,],
1.1.1.2:[100121251,100120450,]
} 限制:60s能访问3次
来访问时:
1.获取当前时间 100121280
2.100121280-60 = 100121220,小于100121220所有记录删除全部剔除
3.判断1分钟以内已经访问多少次了? 4
判断访问次数
4.无法访问
停一会
来访问时:
1.获取当前时间 100121340
2.100121340-60 = 100121280,小于100121280所有记录删除
3.判断1分钟以内已经访问多少次了? 0
4.可以访问

执行流程

在视图类中配置 throttle_classes= [] 这是一个列表
实现 allow_request 方法 wait 是一个提示方法
返回 True/False。
True 可以继续访问,
False 表示限制
全局配置
DEFAULT_THROTTLE_CLASSE,节流限制类
DEFAULT_THROTTLE_RATES 表示频率限制,比如 10/m 表示 每分钟 10 次
源码在 initial 中实现 check_throttles 方法 在进行节流之前已经做了版本认证权限
#dispatch分发
def initial(self, request, *args, **kwargs):
#版本
version, scheme = self.determine_version(request, *args, **kwargs)
request.version, request.versioning_scheme = version, scheme # Ensure that the incoming request is permitted
#认证
self.perform_authentication(request)
#权限
self.check_permissions(request)
#频率限制
self.check_throttles(request)
1.先找allow_request本类没有去父类找 同dispatch
2.执行对应的rate获取对应scope 节流的频率
2.key是获取ip值
3.history是获取ip地址 获取对应的访问时间记录
'''
self.history = self.cache.get(self.key, [])
'''
4.对应的访问记录等于ture获取到值,
把当前的时间减去一个自己设定的时间戳
'''
while self.history and self.history[-1] <= self.now - self.duration:
#条件满足 把最后一个值删除
#再去循环判断
self.history.pop()
'''
5.然后判断访问次数有没有大于自己设定的值
'''
if len(self.history) >= self.num_requests:
满足条件走这
return self.throttle_failure()
不满足条件走这
return self.throttle_success()
'''
6.成功 def throttle_success(self):
"""
Inserts the current request's timestamp along with the key
into the cache.
"""
self.history.insert(0, self.now)
self.cache.set(self.key, self.history, self.duration)
return True
6.失败
def throttle_failure(self):
"""
Called when a request to the API has failed due to throttling.
"""
return False#不可以访问
7如果返回false进入等待时间
def wait(self):
"""
Returns the recommended next request time in seconds.
"""
if self.history:
remaining_duration = self.duration - (self.now - self.history[-1])
else:
remaining_duration = self.duration available_requests = self.num_requests - len(self.history) + 1
if available_requests <= 0:
return None return remaining_duration / float(available_requests)

源码

#dispatch分发
def initial(self, request, *args, **kwargs):
#版本
version, scheme = self.determine_version(request, *args, **kwargs)
request.version, request.versioning_scheme = version, scheme # Ensure that the incoming request is permitted
#认证
self.perform_authentication(request)
#权限
self.check_permissions(request)
#频率限制
self.check_throttles(request) def check_throttles(self, request):
throttle_durations = []
for throttle in self.get_throttles():
#找到当前类的allow_request
if not throttle.allow_request(request, self):
throttle_durations.append(throttle.wait()) if throttle_durations:
durations = [
duration for duration in throttle_durations
if duration is not None
] duration = max(durations, default=None)
self.throttled(request, duration) #执行类的allow_request
def allow_request(self, request, view):
"""
Implement the check to see if the request should be throttled. On success calls `throttle_success`.
On failure calls `throttle_failure`.
"""
if self.rate is None:
return True
获取
'''
def __init__(self):
if not getattr(self, 'rate', None):
self.rate = self.get_rate()
self.num_requests, self.duration = self.parse_rate(self.rate)
当前这个类有一个获取rate的方法
''' #执行get_rate
读取settings设置的配置文件
'''
def get_rate(self):
#如果不设置会进行报错
try:
#设置了就进行键取值
return self.THROTTLE_RATES[self.scope]
except KeyError:
msg = "No default throttle rate set for '%s' scope" % self.scope
raise ImproperlyConfigured(msg)
'''
#通过键去取值/进行分割获取值
'''
def parse_rate(self, rate):
if rate is None:
return (None, None)
num, period = rate.split('/')
num_requests = int(num)
duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
return (num_requests, duration)
'''
#这步继续往下执行继承类的get_cache_key
self.key = self.get_cache_key(request, view) #匿名
#继承了class AnonRateThrottle(SimpleRateThrottle):
''' scope = 'anon'
def get_cache_key(self, request, view):
if request.user.is_authenticated:
return None # Only throttle unauthenticated requests. return self.cache_format % {
'scope': self.scope,
'ident': self.get_ident(request)
}
}
#返回字符串格式化
throttle_(匿名)anon_(ip地址的拼接)1.1.1.1:[100121340,],
'''
#根据用户进行判断 重写get_cache_key
''' ''' if self.key is None:
return True self.history = self.cache.get(self.key, [])
self.now = self.timer() # Drop any requests from the history which have now passed the
# throttle duration
while self.history and self.history[-1] <= self.now - self.duration:
self.history.pop()
#元组返回的值num_requests
if len(self.history) >= self.num_requests:
#不可以访问
return self.throttle_failure()
#可以访问
return self.throttle_success()
'''
6.成功 def throttle_success(self):
#成功把访问的当前时间插入history
self.history.insert(0, self.now)
self.cache.set(self.key, self.history, self.duration)
return True
6.失败
def throttle_failure(self):
"""
Called when a request to the API has failed due to throttling.
"""
return False#不可以访问 '''

默认配置文件写法settings

REST_FRAMEWORK = {
throttle_classes = [AnonRateThrottle,]
"DEFAULT_THROTTLE_RATES":{"anon":"3/m"}
#这样写的原因 源码
#通过匿名
scope = 'anon'
def get_cache_key(self, request, view):
if request.user.is_authenticated:
return None # Only throttle unauthenticated requests. return self.cache_format % {
'scope': self.scope,
'ident': self.get_ident(request)
}
} #获取全局设置的步骤
def __init__(self):
if not getattr(self, 'rate', None):
#第一步 self.rate = self.get_rate()
'''
以设置的键进行取值获取时间
def get_rate(self):
如果不设置就进行报错
try:
return self.THROTTLE_RATES[self.scope]
except KeyError:
msg = "No default throttle rate set for '%s' scope" % self.scope
raise ImproperlyConfigured(msg)
''' self.num_requests, self.duration = self.parse_rate(self.rate)
#后面设置值格式
#例 "DEFAULT_THROTTLE_RATES":{"anon":"3/m"}
以/分割 前面是限制的次数 后面的是访问限制的时间 '''
def parse_rate(self, rate):
if rate is None:
return (None, None)
num, period = rate.split('/')
num_requests = int(num)
duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
return (num_requests, duration) '''
之后执行对应字典的取值
key=throttle_(匿名)anon_(ip地址的拼接)1.1.1.1:
[100121340,],#值
self.key = self.get_cache_key(request, view)
if self.key is None:
return True self.history = self.cache.get(self.key, [])
self.now = self.timer() # Drop any requests from the history which have now passed the
# throttle duration
while self.history and self.history[-1] <= self.now - self.duration:
self.history.pop()
if len(self.history) >= self.num_requests:
return self.throttle_failure()
return self.throttle_success()

根据匿名用户和id进行判断

全局配置

REST_FRAMEWORK = {
'DEFAULT_THROTTLE_CLASSES': [
'api.utils.throttles.throttles.LuffyAnonRateThrottle',
'api.utils.throttles.throttles.LuffyUserRateThrottle',
],
'DEFAULT_THROTTLE_RATES': {
#不重写的默认走这
'anon': '10/day',
'user': '10/day',
'luffy_anon': '10/m',
'luffy_user': '20/m',
},
} settings

url写法

from django.conf.urls import url, include
from web.views.s3_throttling import TestView urlpatterns = [
url(r'^test/', TestView.as_view()),
] urls.py

settings写法局部

REST_FRAMEWORK = {
'UNAUTHENTICATED_USER': None,
'UNAUTHENTICATED_TOKEN': None,
'DEFAULT_THROTTLE_RATES': {
'luffy_anon': '10/m',
'luffy_user': '20/m',
},
} settings.py

根据匿名ip或者user 进行判断

视图写法

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework.views import APIView
from rest_framework.response import Response from rest_framework.throttling import SimpleRateThrottle class LuffyAnonRateThrottle(SimpleRateThrottle):
"""
匿名用户,根据IP进行限制
"""
scope = "luffy_anon" def get_cache_key(self, request, view):
# 用户已登录,则跳过 匿名频率限制
if request.user:
return None return self.cache_format % {
'scope': self.scope,
'ident': self.get_ident(request)
} class LuffyUserRateThrottle(SimpleRateThrottle):
"""
登录用户,根据用户token限制
"""
#重写scope
scope = "luffy_user" def get_ident(self, request):
"""
认证成功时:request.user是用户对象;request.auth是token对象
:param request:
:return:
"""
# return request.auth.token
return "user_token" def get_cache_key(self, request, view):
"""
获取缓存key
:param request:
:param view:
:return:
"""
# 未登录用户,则跳过 Token限制
if not request.user:
return None return self.cache_format % {
'scope': self.scope,
'ident': self.get_ident(request)
} class TestView(APIView):
throttle_classes = [LuffyUserRateThrottle, LuffyAnonRateThrottle, ] def get(self, request, *args, **kwargs):
# self.dispatch
print(request.user)
print(request.auth)
return Response('GET请求,响应内容') def post(self, request, *args, **kwargs):
return Response('POST请求,响应内容') def put(self, request, *args, **kwargs):
return Response('PUT请求,响应内容') ​```````````````````````````````````````````
#源码
class AnonRateThrottle(SimpleRateThrottle):
scope = 'anon'
def get_cache_key(self, request, view):
if request.user.is_authenticated:
return None
# Only throttle unauthenticated requests. return self.cache_format % {
'scope': self.scope,
'ident': self.get_ident(request)
} #
def get_cache_key(self, request, view):
return self.get_ident(request)#获取对应的ip值

settings

# settings.py
'DEFAULT_THROTTLE_RATES': {
'Vistor': '3/m',
'User': '10/m'
},

drf总结

django'中可以免除csrftoken的认证

from django.views.decorators.csrf import csrf_exempt
from django.shortcuts import HttpResponse
@csrf_exempt def index(request):    
return HttpResponse('...')
# index = csrf_exempt(index)
urlpatterns = [    
url(r'^index/$',index),
]

drf中view进行csrftoken的认证

urlpatterns = [    url(r'^login/$',account.LoginView.as_view()),
]
class APIView(View):    
@classmethod    
def as_view(cls, **initkwargs):        
view = super().as_view(**initkwargs)    
view.cls = cls        
view.initkwargs = initkwargs
       # Note: session based authentication is explicitly CSRF validated,        
# all other authentication is CSRF exempt.        
return csrf_exempt(view)

1.写视图的方法

  • 第一种:原始APIView

    url(r'^login/$',account.LoginView.as_view()),
    from rest_framework.views import APIView
    from rest_framework.response import Response
    from rest_framework_jwt.settings import api_settings
    from rest_framework.throttling import AnonRateThrottle
    from api import models class LoginView(APIView):
    authentication_classes = []
    def post(self,request,*args,**kwargs):
    # 1.根据用户名和密码检测用户是否可以登录
    user = models.UserInfo.objects.filter(username=request.data.get('username'),password=request.data.get('password')).first()
    if not user:
    return Response({'code':10001,'error':'用户名或密码错误'}) # 2. 根据user对象生成payload(中间值的数据)
    jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
    payload = jwt_payload_handler(user) # 3. 构造前面数据,base64加密;中间数据base64加密;前两段拼接然后做hs256加密(加盐),再做base64加密。生成token
    jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
    token = jwt_encode_handler(payload)
    return Response({'code': 10000, 'data': token})
  • 第二种:ListApiView等

    url(r'^article/$',article.ArticleView.as_view()),
    url(r'^article/(?P<pk>\d+)/$',article.ArticleDetailView.as_view()),
    from rest_framework.throttling import AnonRateThrottle
    from rest_framework.response import Response
    from rest_framework.generics import ListAPIView,RetrieveAPIView
    from api import models
    from api.serializer.article import ArticleSerializer,ArticleDetailSerializer class ArticleView(ListAPIView):
    authentication_classes = []
    # throttle_classes = [AnonRateThrottle,] queryset = models.Article.objects.all()
    serializer_class = ArticleSerializer class ArticleDetailView(RetrieveAPIView):
    authentication_classes = []
    queryset = models.Article.objects.all()
    serializer_class = ArticleDetailSerializer
  • 第三种:

    url(r'^article/$',article.ArticleView.as_view({"get":'list','post':'create'})),
    url(r'^article/(?P<pk>\d+)/$',article.ArticleView.as_view({'get':'retrieve','put':'update','patch':'partial_update','delete':'destroy'}))
    from rest_framework.viewsets import GenericViewSet
    from rest_framework.mixins import ListModelMixin,RetrieveModelMixin,CreateModelMixin,UpdateModelMixin,DestroyModelMixin
    from api.serializer.article import ArticleSerializer,ArticleDetailSerializer class ArticleView(GenericViewSet,ListModelMixin,RetrieveModelMixin,CreateModelMixin,UpdateModelMixin,DestroyModelMixin):
    authentication_classes = []
    throttle_classes = [AnonRateThrottle,] queryset = models.Article.objects.all()
    serializer_class = None def get_serializer_class(self):
    pk = self.kwargs.get('pk')
    if pk:
    return ArticleDetailSerializer
    return ArticleSerializer

drf 相关知识点梳理

  1. 装饰器


    def outer(func):
    def inner(*args,**kwargs):
    return func(*args,**kwargs)
    return inner @outer
    def index(a1):
    pass index()
    def outer(func):
    def inner(*args,**kwargs):
    return func(*args,**kwargs)
    return inner def index(a1):
    pass index = outer(index) index()
  2. django中可以免除csrftoken认证

    from django.views.decorators.csrf import csrf_exempt
    from django.shortcuts import HttpResponse @csrf_exempt
    def index(request):
    return HttpResponse('...') # index = csrf_exempt(index) urlpatterns = [
    url(r'^index/$',index),
    ]
    urlpatterns = [
    url(r'^login/$',account.LoginView.as_view()),
    ] class APIView(View):
    @classmethod
    def as_view(cls, **initkwargs):
    view = super().as_view(**initkwargs)
    view.cls = cls
    view.initkwargs = initkwargs # Note: session based authentication is explicitly CSRF validated,
    # all other authentication is CSRF exempt.
    return csrf_exempt(view)
  3. 面向对象中基于继承+异常处理来做的约束

    class BaseVersioning:
    def determine_version(self, request, *args, **kwargs):
    raise NotImplementedError("must be implemented") class URLPathVersioning(BaseVersioning):
    def determine_version(self, request, *args, **kwargs):
    version = kwargs.get(self.version_param, self.default_version)
    if version is None:
    version = self.default_version if not self.is_allowed_version(version):
    raise exceptions.NotFound(self.invalid_version_message)
    return version
  4. 面向对象封装

    class Foo(object):
    def __init__(self,name,age):
    self.name = name
    self.age = age obj = Foo('汪洋',18)
    class APIView(View):
    def dispatch(self, request, *args, **kwargs): self.args = args
    self.kwargs = kwargs
    request = self.initialize_request(request, *args, **kwargs)
    self.request = request
    ... def initialize_request(self, request, *args, **kwargs):
    """
    Returns the initial request object.
    """
    parser_context = self.get_parser_context(request) return Request(
    request,
    parsers=self.get_parsers(),
    authenticators=self.get_authenticators(), # [MyAuthentication(),]
    negotiator=self.get_content_negotiator(),
    parser_context=parser_context
    )
  5. 面向对象继承

    class View(object):
    pass class APIView(View):
    def dispatch(self):
    method = getattr(self,'get')
    method() class GenericAPIView(APIView):
    serilizer_class = None def get_seriliser_class(self):
    return self.serilizer_class class ListModelMixin(object):
    def get(self):
    ser_class = self.get_seriliser_class()
    print(ser_class) class ListAPIView(ListModelMixin,GenericAPIView):
    pass class UserInfoView(ListAPIView):
    pass view = UserInfoView()
    view.dispatch()
    class View(object):
    pass class APIView(View):
    def dispatch(self):
    method = getattr(self,'get')
    method() class GenericAPIView(APIView):
    serilizer_class = None def get_seriliser_class(self):
    return self.serilizer_class class ListModelMixin(object):
    def get(self):
    ser_class = self.get_seriliser_class()
    print(ser_class) class ListAPIView(ListModelMixin,GenericAPIView):
    pass class UserInfoView(ListAPIView):
    serilizer_class = "汪洋" view = UserInfoView()
    view.dispatch()
    class View(object):
    pass class APIView(View):
    def dispatch(self):
    method = getattr(self,'get')
    method() class GenericAPIView(APIView):
    serilizer_class = None def get_seriliser_class(self):
    return self.serilizer_class class ListModelMixin(object):
    def get(self):
    ser_class = self.get_seriliser_class()
    print(ser_class) class ListAPIView(ListModelMixin,GenericAPIView):
    pass class UserInfoView(ListAPIView): def get_seriliser_class(self):
    return "咩咩" view = UserInfoView()
    view.dispatch()
  6. 反射

    class View(object):
    def dispatch(self, request, *args, **kwargs):
    # Try to dispatch to the right method; if a method doesn't exist,
    # defer to the error handler. Also defer to the error handler if the
    # request method isn't on the approved list.
    if request.method.lower() in self.http_method_names:
    handler = getattr(self, request.method.lower(), self.http_method_not_allowed)
    else:
    handler = self.http_method_not_allowed
    return handler(request, *args, **kwargs)
  7. 发送ajax请求


    $.ajax({
    url:'地址',
    type:'GET',
    data:{...},
    success:function(arg){
    console.log(arg);
    }
    })
  8. 浏览器具有 "同源策略的限制",导致 发送ajax请求 + 跨域 存在无法获取数据。

    • 简单请求,发送一次请求。
    • 复杂请求,先options请求做预检,然后再发送真正请求

    <!DOCTYPE html>
    <html lang="en">
    <head>
    <meta charset="UTF-8">
    <title>Title</title>
    </head>
    <body>
    <h1>常鑫的网站</h1>
    <p>
    <input type="button" value="点我" onclick="sendMsg()">
    </p>
    <p>
    <input type="button" value="点他" onclick="sendRemoteMsg()">
    </p> <script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js"></script>
    <script>
    function sendMsg() {
    $.ajax({
    url:'/msg/',
    type:'GET',
    success:function (arg) {
    console.log(arg);
    }
    })
    }
    function sendRemoteMsg() {
    $.ajax({
    url:'http://127.0.0.1:8002/json/',
    type:'GET',
    success:function (arg) {
    console.log(arg);
    }
    }) }
    </script>
    </body>
    </html>
  9. 如何解决ajax+跨域?

    CORS,跨站资源共享,本质:设置响应头。
  10. 常见的Http请求方法

    get
    post
    put
    patch
    delete
    options
  11. http请求中Content-type请起头

    情况一:
    content-type:x-www-form-urlencode
    name=alex&age=19&xx=10 request.POST和request.body中均有值。 情况二:
    content-type:application/json
    {"name":"ALex","Age":19} request.POST没值
    request.body有值。
  12. django中F查询

  13. django中获取空Queryset

    models.User.object.all().none()
  14. 基于django的fbv和cbv都能实现遵循restful规范的接口


    def user(request):
    if request.metho == 'GET':
    pass class UserView(View):
    def get()... def post...
  15. 基于django rest framework框架实现restful api的开发。

    - 免除csrf认证
    - 视图(APIView、ListAPIView、ListModelMinx)
    - 版本
    - 认证
    - 权限
    - 节流
    - 解析器
    - 筛选器
    - 分页
    - 序列化
    - 渲染器
  16. 简述drf中认证流程?

    1.用户发来请求优先执行dispatch方法
    2.内部会封装reqeustd1
  17. 简述drf中节流的实现原理以及过程?匿名用户/非匿名用户 如何实现频率限制?


  18. GenericAPIView视图类的作用?

    他提供了一些规则,例如:
    
    class GenericAPIView(APIView):
    serializer_class = None
    queryset = None
    lookup_field = 'pk' filter_backends = api_settings.DEFAULT_FILTER_BACKENDS
    pagination_class = api_settings.DEFAULT_PAGINATION_CLASS def get_queryset(self):
    return self.queryset def get_serializer_class(self):
    return self.serializer_class def filter_queryset(self, queryset):
    for backend in list(self.filter_backends):
    queryset = backend().filter_queryset(self.request, queryset, self)
    return queryset @property
    def paginator(self):
    if not hasattr(self, '_paginator'):
    if self.pagination_class is None:
    self._paginator = None
    else:
    self._paginator = self.pagination_class()
    return self._paginator 他相当于提供了一些规则,建议子类中使用固定的方式获取数据,例如:
    class ArticleView(GenericAPIView):
    queryset = models.User.objects.all() def get(self,request,*args,**kwargs):
    query = self.get_queryset() 我们可以自己继承GenericAPIView来实现具体操作,但是一般不会,因为更加麻烦。
    而GenericAPIView主要是提供给drf内部的 ListAPIView、Create....
    class ListModelMixin:
    def list(self, request, *args, **kwargs):
    queryset = self.filter_queryset(self.get_queryset()) page = self.paginate_queryset(queryset)
    if page is not None:
    serializer = self.get_serializer(page, many=True)
    return self.get_paginated_response(serializer.data) serializer = self.get_serializer(queryset, many=True)
    return Response(serializer.data) class ListAPIView(mixins.ListModelMixin,GenericAPIView):
    def get(self, request, *args, **kwargs):
    return self.list(request, *args, **kwargs) class MyView(ListAPIView):
    queryset = xxxx
    ser...
    总结:GenericAPIView主要为drf内部帮助我们提供增删改查的类LIstAPIView、CreateAPIView、UpdateAPIView、提供了执行流程和功能,我们在使用drf内置类做CURD时,就可以通过自定义 静态字段(类变量)或重写方法(get_queryset、get_serializer_class)来进行更高级的定制。
  19. jwt以及其优势。

    jwt前后端分离 用于用户认证
    jwt的实现原理:
    -用户登陆成功,会给前端返回一个tokon值。
    此token值只在前端保存
    token值分为
    一段类型和算法信息
    第二段用户信息和超时时间
    第三段前两段数据拼接之后进行has256再次加密+base64url
  20. 序列化时many=True和many=False的区别?

  21. 应用DRF中的功能进行项目开发

    *****
    解析器:request.query_parmas/request.data
    视图
    序列化
    渲染器:Response ****
    request对象封装
    版本处理
    分页处理
    ***
    认证
    权限
    节流
    • 基于APIView实现呼啦圈
    • 继承ListAPIView+ GenericViewSet,ListModelMixin实现呼啦圈

跨域

  • 域相同,永远不会存在跨域。
    • crm,非前后端分离,没有跨域。
    • 路飞学城,前后端分离,没有跨域(之前有,现在没有)。
  • 域不同时,才会存在跨域。
    • l拉勾网,前后端分离,存在跨域(设置响应头解决跨域)
由于浏览器具有同源策略的限制
对ajax请求的限制 javascript的src对应src不进行限制
同源同端口
不同源就是跨域
写了/api 不跨域 api访问django
api.xx.com跨域了访问django

简单请求

发一次请求

设置响应头就可以解决
from django.shortcuts import render,HttpResponse def json(request):
response = HttpResponse("JSONasdfasdf")
response['Access-Control-Allow-Origin'] = "*"
return response

复杂请求

预检option

请求

html写法


写法

from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
@csrf_exempt
def put_json(request):
response = HttpResponse("JSON复杂请求")
if request.method == 'OPTIONS':
# 处理预检
response['Access-Control-Allow-Origin'] = "*"
response['Access-Control-Allow-Methods'] = "PUT"
return response
elif request.method == "PUT":
return response

解决跨域:CORS

本质在数据返回值设置响应头

from django.shortcuts import render,HttpResponse

def json(request):
response = HttpResponse("JSONasdfasdf")
response['Access-Control-Allow-Origin'] = "*"
return response

通过jsonp

<!DOCTYPE html>
<html>
<head lang="en">
<meta charset="UTF-8">
<title></title>
</head>
<body> <p>
<input type="button" onclick="Jsonp1();" value='提交'/>
</p> <p>
<input type="button" onclick="Jsonp2();" value='提交'/>
</p> <script type="text/javascript" src="jquery-1.12.4.js"></script>
<script>
function Jsonp1(){
var tag = document.createElement('script');
tag.src = "http://c2.com:8000/test/";
document.head.appendChild(tag);
document.head.removeChild(tag); } function Jsonp2(){
$.ajax({
url: "http://c2.com:8000/test/",
type: 'GET',
dataType: 'JSONP',
success: function(data, statusText, xmlHttpRequest){
console.log(data);
}
})
} </script>
</body>
</html>

总结

条件:
1、请求方式:HEAD、GET、POST
2、请求头信息:
Accept
Accept-Language
Content-Language
Last-Event-ID
Content-Type 对应的值是以下三个中的任意一个
application/x-www-form-urlencoded
multipart/form-data
text/plain 注意:同时满足以上两个条件时,则是简单请求,否则为复杂请求

总结

  1. 由于浏览器具有“同源策略”的限制,所以在浏览器上跨域发送Ajax请求时,会被浏览器阻止。
  2. 解决跨域
    • 不跨域
    • CORS(跨站资源共享,本质是设置响应头来解决)。
      • 简单请求:发送一次请求
      • 复杂请求:发送两次请求

部署 collectstaic 收集静态文件

jwt

drf源码系列

用于在前后端分离时,实现用户登录相关。

1.知识点

1.2jwt代替token 进行优化

用户登录成功之后,生成一个随机字符串,给前端。
- 生成随机字符串
加密信息
#{typ:"jwt","alg":'HS256'}
# 加密手段segments.append(base64url_encode(json_header))
98qow39df0lj980945lkdjflo.
#第二部分的信息 {id:1,username:'alx','exp':10}
#加密手段segments.append(base64url_encode(payload))
saueoja8979284sdfsdf.
#两个密文拼接加盐
asiuokjd978928374
- 类型信息通过base64加密
- 数据通过base64加密
- 两个密文拼接在h256加密+加盐
- 给前端返回token值只在前端
token是由。分割的三段组成
- 第一段:类型和算法信息
- 第二段。 用户的信息和超时时间
- 第三段:hs256(前两段拼接)加密 + base64url
- 以后前端再次发来信息时
- 超时验证
- token合法性校验 前端获取随机字符串之后,保留起来。
以后再来发送请求时,携带98qow39df0lj980945lkdjflo.saueoja8979284sdfsdf.asiuokjd978928375。 后端接收到之后
1.取出第二部分进行时间的判断
2. 把前面两个进行加密对第三个值进行加密
- token只在前端保存,后端只负责校验。
- 内部集成了超时时间,后端可以根据时间进行校验是否超时。 - 由于内部存在hash256加密,所以用户不可以修改token,只要一修改就认证失败。 一般在前后端分离时,用于做用户认证(登录)使用的技术。
jwt的实现原理:
- 用户登录成功之后,会给前端返回一段token。
- token是由.分割的三段组成。
- 第一段:类型和算法信心
- 第二段:用户信息+超时时间
- 第三段:hs256(前两段拼接)加密 + base64url
- 以后前端再次发来信息时
- 超时验证
- token合法性校验
优势:
- token只在前端保存,后端只负责校验。
- 内部集成了超时时间,后端可以根据时间进行校验是否超时。
- 由于内部存在hash256加密,所以用户不可以修改token,只要一修改就认证失败。

1.2token

用户登陆成功之后,生成一个随机字符串,自己保留一份,给前端返回一份。
以后前端再来发请求时,需要携带字符串
后端对字符串进行校验

流程

1.安装

pip3 install djangorestframework-jwt

2.注册

INSTALLED_APPS = [  
'django.contrib.admin',  
'django.contrib.auth',  
'django.contrib.contenttypes',  
'django.contrib.sessions',  
'django.contrib.messages',  
'django.contrib.staticfiles',  
'api.apps.ApiConfig',  
'rest_framework',  
'rest_framework_jwt']

3.源码剖析和实现流程

from rest_framework_jwt.views import obtain_jwt_token

具体流程

用户信息加密
jwt_payload_hander = api_settings.JWT_PAYLOAD_HANDLER
第三段信息的加密
jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
#解密
jwt_decode_handler = api_settings.JWT_DECODE_HANDLER def jwt_payload_handler(user):
username_field = get_username_field()
username = get_username(user) warnings.warn(
'The following fields will be removed in the future: '
'`email` and `user_id`. ',
DeprecationWarning
) payload = {
'user_id': user.pk,
'username': username,
'exp': datetime.utcnow() +
#默认5分钟
api_settings.JWT_EXPIRATION_DELTA
#如果有email会把email配置上
if hasattr(user, 'email'):
payload['email'] = user.email
if isinstance(user.pk, uuid.UUID):
#如果有user_id会把pk值配置上
payload['user_id'] = str(user.pk) payload[username_field] = username #源码
settings= 'JWT_EXPIRATION_DELTA':datetime.timedelta(seconds=300)
, }

加密的具体实现流程


api_settings.JWT_ENCODE_HANDLER --> 加密的具体实现 def jwt_encode_handler(payload):
#payload传入
key = api_settings.JWT_PRIVATE_KEY or jwt_get_secret_key(payload)
#加盐
#SECRET_KEY = '+gr4bbq8e$yqbd%n_h)2(osz=bmk1x2+o6+w5g@a4r1#3%q1n*'
return jwt.encode(
payload,#类型信息头信息
key,#加盐
#默认封装传入了hs256
api_settings.JWT_ALGORITHM
# 'JWT_ALGORITHM': 'HS256',
).decode('utf-8')
encode默认继承父类的
父类的encode方法
# Header
header = {'typ': self.header_typ, 'alg': algorithm}
#self.header_typ #-- 》 header_typ = 'JWT'
signing_input = b'.'.join(segments)#把类型信息和数据用.的形式拼接到了一起
try:
alg_obj = self._algorithms[algorithm]#执行算法
key = alg_obj.prepare_key(key)
signature = alg_obj.sign(signing_input, key)#把拼接起来的值进行二次加密 成为第三个信息
'''
except KeyError:
if not has_crypto and algorithm in requires_cryptography:
raise NotImplementedError(
"Algorithm '%s' could not be found. Do you have cryptography "
"installed?" % algorithm
)
else:
raise NotImplementedError('Algorithm not supported')
'''
segments.append(base64url_encode(signature))#再把第三个信息放入列表 对第三个信息进行base64进行加密 return b'.'.join(segments)#用.的形式再把第三个数据拼接起来 进行返回
98qow39df0lj980945lkdjflo.saueoja8979284sdfsdf.asiuokjd978928375

解密

将token分割成 header_segment、payload_segment、crypto_segment 三部分
对第一部分header_segment进行base64url解密,得到header
对第二部分payload_segment进行base64url解密,得到payload
对第三部分crypto_segment进行base64url解密,得到signature
对第三部分signature部分数据进行合法性校验
拼接前两段密文,即:signing_input
从第一段明文中获取加密算法,默认:HS256
使用 算法+盐 对signing_input 进行加密,将得到的结果和signature密文进行比较。

jwt的原理和优势

一般在前后端分离时,用于做用户认证(登录)使用的技术。
jwt的实现原理:
- 用户登录成功之后,会给前端返回一段token。
- token是由.分割的三段组成。
- 第一段:类型和算法信心
- 第二段:用户信息+超时时间
- 第三段:hs256(前两段拼接)加密 + base64url
- 以后前端再次发来信息时
- 超时验证
- token合法性校验
#优势:
- token只在前端保存,后端只负责校验。
- 内部集成了超时时间,后端可以根据时间进行校验是否超时。
- 由于内部存在hash256加密,所以用户不可以修改token,只要一修改就认证失败。

具体使用

用户登陆

import uuid
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.versioning import URLPathVersioning
from rest_framework import status from api import models class LoginView(APIView):
"""
登录接口
"""
def post(self,request,*args,**kwargs): # 基于jwt的认证
# 1.去数据库获取用户信息
from rest_framework_jwt.settings import api_settings
#头信息
jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
#第三个数据的加密
jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER user = models.UserInfo.objects.filter(**request.data).first()
if not user:
return Response({'code':1000,'error':'用户名或密码错误'}) '''
'user_id': user.pk,
'username': username,
'exp': datetime.utcnow() + api_settings.JWT_EXPIRATION_DELTA
'''
#头信息的处理
payload = jwt_payload_handler(user)
#对三段信息的编码 加密
token = jwt_encode_handler(payload)
return Response({'code':1001,'data':token}) #第二种方式
class LoginView(APIView):
authentication_classes = []
def post(self,request,*args,**kwargs):
# 1.根据用户名和密码检测用户是否可以登录
user = models.UserInfo.objects.filter(username=request.data.get('username'),password=request.data.get('password')).first()
if not user:
return Response({'code':10001,'error':'用户名或密码错误'}) # 2. 根据user对象生成payload(中间值的数据)
jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
payload = jwt_payload_handler(user) # 3. 构造前面数据,base64加密;中间数据base64加密;前两段拼接然后做hs256加密(加盐),再做base64加密。生成token
jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
token = jwt_encode_handler(payload)
return Response({'code': 10000, 'data': token})

用户认证

开始解码

from rest_framework.views import APIView
from rest_framework.response import Response # from rest_framework.throttling import AnonRateThrottle,BaseThrottle class ArticleView(APIView):
# throttle_classes = [AnonRateThrottle,] def get(self,request,*args,**kwargs):
# 获取用户提交的token,进行一步一步校验
import jwt
from rest_framework import exceptions
from rest_framework_jwt.settings import api_settings
#进行解码
jwt_decode_handler = api_settings.JWT_DECODE_HANDLER
#获取到加密之后的字符串
jwt_value = request.query_params.get('token')
try:
#对token进行解密
payload = jwt_decode_handler(jwt_value)
#判断签名是否过期
except jwt.ExpiredSignature:
msg = '签名已过期'
raise exceptions.AuthenticationFailed(msg)
#判断是否被篡改
except jwt.DecodeError:
msg = '认证失败'
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
print(payload) return Response('文章列表')

settings设置

'JWT_EXPIRATION_DELTA':datetime.timedelta(seconds=300)
#默认五分钟有效
#自定义
import datetime
JWT_AUTH = {
"JWT_EXPIRATION_DELTA":datetime.timedelta(minutes=10)
}

视图的mixins写法

ListModelMixin

执行list方法

上一篇:Hadoop 学习笔记 (十一) MapReduce 求平均成绩


下一篇:Java集合源码分析(三)——LinkedList