Python API 接口权限控制思路

需求:权限模块对各子系统拥有动态配置权限的能力,发现接口变化能力,权限认证能力等,因为各子系统相互独立,

所以需要使用统一的认证 JWT。

 

1、使用Django DRF权限模块,进行修改

 1 import requests
 2 from config import AUTHENTICATE_URL
 3 from rest_framework import permissions
 4 from rest_framework.exceptions import PermissionDenied, AuthenticationFailed
 5 
 6 # URL(ViewSet) 权限配置
 7 URL_PERMISSION = {
 8     "pipelines": {
 9         "test_pipeline": "account.dev_audit",
10         "list": "account.test_audit",
11         "create": "account.yw_audit",
12         "retrieve": "",
13     },
14     # "sub_pipelines": {
15     #     "test_pipeline": "account.dev_audit",
16     #     "list": "account.test_audit",
17     #     "create": "account.yw_audit",
18     # }
19 }
20 
21 
22 class LoginPermission(permissions.BasePermission):
23     """
24     登录验证
25     """
26 
27     def has_permission(self, request, view):
28         # Read permissions are allowed to any request,
29         # so we'll always allow GET, HEAD or OPTIONS requests.
30         # print(request.method)
31         # if request.method == "POST":
32         #     return False
33         #
34         #     # 只有该snippet的所有者才允许写权限。
35         # return False
36 
37         token = request.META.get("HTTP_AUTHORIZATION", "").replace("JWT ", '')
38         data = requests.post(AUTHENTICATE_URL + "/api/account/verify/", json={"token": token})
39         if data.status_code != 404:
40             raise AuthenticationFailed("登录验证失败")
41         return True
42 
43 
44 class UrlPermission(permissions.BasePermission):
45     """
46     URL 权限验证
47     """
48 
49     def has_permission(self, request, view):
50         # Read permissions are allowed to any request,
51         # so we'll always allow GET, HEAD or OPTIONS requests.
52         # print(request.method)
53         # if request.method == "POST":
54         #     return False
55         #
56         #     # 只有该snippet的所有者才允许写权限。
57         # return False
58         token = request.META.get("HTTP_AUTHORIZATION", "").replace("JWT ", '')
59         print("==========通过视图的basename,action进行权限认证==========")
60         print(view.basename, view.action)
61         print("==========")
62         if view.basename in URL_PERMISSION.keys():
63             if view.action in URL_PERMISSION[view.basename].keys():
64                 print(URL_PERMISSION[view.basename][view.action])
65                 data = requests.post(AUTHENTICATE_URL + "/api/account/verify/",
66                                      json={"token": token, "perm_code": URL_PERMISSION[view.basename][view.action]})
67                 if data.status_code != 404:
68                     raise PermissionDenied("访问权限不合法")
69 
70         return True

 

2、设置权限全局生效

 

REST_FRAMEWORK = {
    # "DEFAULT_AUTHENTICATION_CLASSES": ["apps.permissions.Loginpermission", ]
    "DEFAULT_PERMISSION_CLASSES": [
        "apps.permissions.LoginPermission",
        "apps.permissions.UrlPermission",
    ]
}

 

如果需要局部权限,可以在相关视图集(ViewSet)下增加自己的权限模块

 

class PipelinesViewSet(BaseView):
    """
    流水线视图
    """
    queryset = Pipelines.objects.filter(is_del=0).order_by("update_time")
    serializer_class = PipelinesSerializer
    filter_backends = (DjangoFilterBackend,)
    filter_fields = ('pipeline_name', 'pipeline_type')

    # permission_classes = ()  # 这里配置视图权限

    @action(methods=['post'], detail=True, permission_classes=[])  #这里配置接口权限
    def test_pipeline(self, request, pk=None):
        """
        测试流水线接口
        """
        instance = self.get_object()
        serializer = self.get_serializer(instance)
        return Response(self.object_return(serializer.data))

 

3、权限系统统一获取子系统接口,后期可以进行动态权限赋予

import coreapi

# Initialize a client & load the schema document
client = coreapi.Client()
schema = client.get("http://127.0.0.1:8000/docs/")

URL = []
# print(schema.data)
for basename, actions in schema.data.items():
    for action, atr in actions.links.items():
        URL_dict = {}
        URL_dict["basename"] = basename
        URL_dict["action"] = action
        URL_dict["method"] = atr.action
        URL_dict["url"] = atr.url
        URL_dict["description"] = atr.description
        URL.append(URL_dict)

for url in URL:
    print(url)

运行结果:这里收集视图集中的base_name、action以及method,

对接口权限进行精确定位,不受url变化影响, 这里的url只做相关描述展示:

{'basename': 'pipelines', 'action': 'list', 'method': 'get', 'url': 'http://127.0.0.1:8000/apps/pipelines/', 'description': '流水线视图'}
{'basename': 'pipelines', 'action': 'create', 'method': 'post', 'url': 'http://127.0.0.1:8000/apps/pipelines/', 'description': '流水线视图'}
{'basename': 'pipelines', 'action': 'read', 'method': 'get', 'url': 'http://127.0.0.1:8000/apps/pipelines/{id}/', 'description': '流水线视图'}
{'basename': 'pipelines', 'action': 'update', 'method': 'put', 'url': 'http://127.0.0.1:8000/apps/pipelines/{id}/', 'description': '流水线视图'}
{'basename': 'pipelines', 'action': 'partial_update', 'method': 'patch', 'url': 'http://127.0.0.1:8000/apps/pipelines/{id}/', 'description': '流水线视图'}
{'basename': 'pipelines', 'action': 'delete', 'method': 'delete', 'url': 'http://127.0.0.1:8000/apps/pipelines/{id}/', 'description': '流水线视图'}
{'basename': 'pipelines', 'action': 'test_pipeline', 'method': 'post', 'url': 'http://127.0.0.1:8000/apps/pipelines/{id}/test_pipeline/', 'description': '测试流水线接口'}
{'basename': 'urlcollections', 'action': 'list', 'method': 'get', 'url': 'http://127.0.0.1:8000/apps/urlcollections/', 'description': ''}
{'basename': 'urlcollections', 'action': 'create', 'method': 'post', 'url': 'http://127.0.0.1:8000/apps/urlcollections/', 'description': ''}

 

前提:这里需要子系统开启API接口文档,通过接口文档获取接口详情

 

from django.urls import path, include
from rest_framework.documentation import include_docs_urls

urlpatterns = [
    # path('admin/', admin.site.urls),
    path('docs/', include_docs_urls(title='API文档', authentication_classes=[], permission_classes=[])),
    path('apps/', include('apps.urls')),
]

 

 

以上,大概可以符合需求,URL可以读取接口导入权限系统,动态分配

java 也有比较好的接口权限细化管理的方案,如apache shiro.

上一篇:[虚拟化]认识虚拟化


下一篇:.NET(C#) Linq中join、into、let和group by的使用