所有步骤中的账号密码仅供参考,千万不要在自己的生产环境中使用,否则产生的安全问题由您自己承担。
1.配置Django-OAuth-toolkit
因为我们要实现的是管理员注册/登录,所以这里选用密码模式(参见2.5 Restful API 身份验证(6))。接下来配置Django-OAuth-toolkit:在settings.py文件同级目录下新建一个python包保存与DjangoRestFramework和Django-OAuth-toolkit 有关的设置。Django-OAuth-toolkit 在项目中扮演了认证服务器的角色。
restful_api_settings.py
REST_FRAMEWORK = {
# 身份验证类
'DEFAULT_AUTHENTICATION_CLASSES': (
'oauth2_provider.contrib.rest_framework.OAuth2Authentication',
),
# 权限验证类
'DEFAULT_PERMISSION_CLASSES': (
'rest_framework.permissions.IsAuthenticated',
),
# 数据分页
'PAGE_SIZE': 10
}
# 允许权限范围
OAUTH2_PROVIDER = {
# this is the list of available scopes
'SCOPES': {'read': 'Read scope', 'write': 'Write scope', 'groups': 'Access to your groups'}
}
然后不要忘记在settings.py中导入设置:
2.创建测试用户
我们还需要创建一些用户来测试我们的API,这里我们创建两个用户:webmanager,站点管理员,拥有最高权限,可以登录网站管理后端;productmanager,产品管理员,可以通过安卓APP管理我们的产品,只能操作产品数据模型。在Pycharm Terminal中运行创建webmanager命令,输入每个值然后回车即可,不输入直接回车代表使用默认值。
运行django项目,使用浏览器打开管理后端网站:http://127.0.0.1:8000/admin/。后端网站默认是英文的,如果要换为中文,在settings.py中设置以下字段:
为了方便进行授权管理,添加一个产品管理员组productmanagers并授予以下权限:
然后添加productmanager用户:
记得将这个用户加入我们创建的productmanagers组,这样就不用单独定义权限:
3.创建客户端应用
接下需要创建一个OAuth Application 给我们的客户端使用。在Django 管理站点添加一个客户端应用。
选择授权类型为基于资源所有者密码( Resource owner password-based),注意这里的Client secret 千万不能泄露,在认证请求中需要携带client id 和client secret进行验证,User就选择为productmanager,为了安全,一般不会把API的用户选择为站点管理员(使用命令行创建的Django superuser),重定向(redirect uris)可以暂时不定义,因为我们的资源API还未开发。重定向uri一般设置为首页数据加载API的url,即登录完成后加载首页数据,也可以在API认证时作为参数传递过来。
4.传输数据加密
现在我们确定了OAuth 验证方法,也创建了客户端应用,相当于架设了授权服务器,但是OAuth 2.0 原来的认证流程(参见2.5 (6))并未考虑数据加密,因此我们需要增加数据加密的流程。这里使用对称加密算法AES加密数据,然后用非对称加密算法RSA加密AES密钥进行传输。对称算法加密解密较快,数据容量大,但是安全性较差;非对称算法安全性好,但是加密解密较慢,数据容量较小。请求数据签名使用md5算法,用来保证请求内容一致性和完整性。
RSA 原理参见:
http://www.ruanyifeng.com/blog/2013/06/rsa_algorithm_part_one.html
http://www.ruanyifeng.com/blog/2013/07/rsa_algorithm_part_two.html
AES 原理参见
AES 关键概念
1.密钥
加密和解密使用同一个密钥。AES支持三种长度的密钥:128位,192位,256位。128位加、解密速度最快,256位安全性最好。
2.填充
AES算法在对明文加密的时候,并不是把整个明文加密成一整段密文,而是把明文拆分成一个个独立的明文块,每一个明文块长度128bit。AES对每一个明文块使用相同的密钥加密,生成一个个独立的密文块,然后这些密文块拼接在一起,就是最终的AES加密结果。如果按每128bit一个明文块来拆分,明文长度不够划分成整数个块,就需要进行填充。
(1).NoPadding:
不做任何填充,但是要求明文必须是16字节的整数倍。
(2).PKCS5Padding(默认):
如果明文块少于16个字节(128bit),在明文块末尾补足相应数量的字符,且每个字节的值等于缺少的字符数。
比如明文:{1,2,3,4,5,a,b,c,d,e},缺少6个字节,则补全为{1,2,3,4,5,a,b,c,d,e,6,6,6,6,6,6}
(3).ISO10126Padding:
如果明文块少于16个字节(128bit),在明文块末尾补足相应数量的字节,最后一个字符值等于缺少的字符数,其他字符填充随机数。
加密和解密的填充模式必须相同。
3.工作模式
(1).ECB模式(电子密码本模式:Electronic codebook,默认)
ECB是最简单的块密码加密模式,加密前根据加密块大小(如AES为128位)分成若干块,之后将每块使用相同的密钥单独加密,解密同理。
(2).CBC模式(密码分组链接:Cipher-block chaining)
CBC模式对于每个待加密的密码块在加密前会先与前一个密码块的密文异或然后再用加密器加密。第一个明文块与一个叫初始化向量的数据块异或。
(3).CFB模式(密文反馈:Cipher feedback)
与ECB和CBC模式只是加密块数据不同,CFB能够将块密文(Block Cipher)转换为流密文(Stream Cipher)。
(4).OFB模式(输出反馈:Output feedback)
OFB是先用块加密器生成密钥流(Keystream),然后再将密钥流与明文流异或得到密文流,解密是先用块加密器生成密钥流,再将密钥流与密文流异或得到明文,由于异或操作的对称性所以加密和解密的流程是完全一样的。
加密和解密的工作模式必须相同。
5.使用Serializer、Render和Response类提高API通用性
通常情况下解析请求数据和生成响应的代码会放在视图中,一旦数据内容发生变动,就需要改动视图,维护不方便。使用djangorestframework序列化器来解析下面各步骤中客户端请求和服务器响应数据,使用Response类和Render类自动渲染返回数据。这样的好处是视图代码可以保持简洁化,数据解析和验证任务(比如时间戳验证)由序列化器自动完成,响应数据生成和类型转换、内容协商和Http状态码由Response类自动完成。如果后期需要改变请求内容,只需要改动Response的data属性即可,或者替换Response类,不需要改动视图。此外,数据加密可以放在Response类初始化时进行,不用直接放置在视图中,方便更换加密手段。在settings.py同级目录中新建api_auth包,然后新建以下文件:
然后在restful_api_settings.py中设定渲染器:
REST_FRAMEWORK = {
......
# 响应渲染类
'DEFAULT_RENDERER_CLASSES': [
'rest_framework.renderers.JSONRenderer',
'rest_framework.renderers.BrowsableAPIRenderer',
],
......
}
然后编写每个请求的对应的Response类即可。
6.认证流程
(A)请求认证
客户端发送认证请求,请求中附带客户端RSA公钥(注意这一步中请求数据是未加密的,下面每一步都要注意验证请求的时间戳和数字签名,降低请求劫持和篡改可能性)。
POST /api_auth/require_auth/
{
"key":"客户端RSA加密公钥",
"timestamp":"时间戳",
"signature":"请求内容的数字签名"
}
为了能够使用RSA和AES 加密,需要安装rsa和Crypto包:
pip install rsa pycryptodome
python模拟客户端请求代码,需要同时在本地运行django应用。这里需要注意是python 单元测试运行时是按方法名的字符串顺序运行的,所以给第一个运行的测试方法取名为test_1_require_api_auth。
import base64
import hashlib
import json
import time
import unittest
from binascii import a2b_hex, b2a_hex
import requests
import rsa
from Crypto.Cipher import AES
# 缓存测试数据
cache_list = {}
class TestRequest(unittest.TestCase):
def signature(self, data):
"""
md5数字签名
:param data:
:return:
"""
md5_data = ''
sorted_keys = sorted(data)
for i in sorted_keys:
md5_data += i + str(data[i])
md5_data = md5_data.replace('\n', '')
signature = hashlib.md5(md5_data.encode(encoding='utf-8')).hexdigest()
return signature
def test_1_require_api_auth(self):
"""
测试require_api_auth
:return:
"""
# 生成rsa公钥
public_key, private_key = rsa.newkeys(1024)
public_key = public_key.save_pkcs1()
private_key = private_key.save_pkcs1()
r_data = {"key": public_key.decode(encoding='utf-8'), "timestamp": str(time.time()), }
r_data["signature"] = self.signature(r_data)
# 发起请求
r = requests.post('http://localhost:8000/api_auth/require_auth/', data=r_data)
print("响应", json.loads(r.content.decode(encoding='utf-8')))
res_data = base64.b64decode(json.loads(r.content.decode(encoding='utf-8'))['data'])
res_data = a2b_hex(res_data)
# 分段解密
text = []
for i in range(0, len(res_data), 128):
content = res_data[i:i + 128]
text.append(rsa.decrypt(content, rsa.PrivateKey.load_pkcs1(private_key)))
text = b''.join(text)
print("解密后数据", text.decode(encoding='utf-8'))
# 缓存aes密钥
res_data = eval(text.decode(encoding='utf-8'))
cache_list["aes_key"] = res_data['aes_key']
cache_list["aes_key_vector"] = a2b_hex(base64.b64decode(res_data['aes_key_vector']))
cache_list["state"] = res_data['state']
Tips
为了增强加密措施,在请求参数中公钥使用模糊性词汇key作为字段名,隐去使用的算法,增加破解难度;还可以在公钥原文中增加随机字符串(比如在某个位置插入一位字符),进一步增加破解难度。
(B) 服务端下发数据加密密钥
认证服务器接收到请求后,验证参数是否正确,没问题则将AES密钥和一个随机字符串(即state,客户端必须回复一个一模一样的)用客户端发送来的RSA公钥加密后发送给客户端,服务端缓存以state为键,缓存AES密钥供后续验证使用。
// 加密前服务端响应的数据内容
{
"aes_key":"服务端生成的AES密钥",
"aes_key_vector":"AES密钥向量"
"state":"随机字符串",
"timestamp":"时间戳",
"signature":"内容的数字签名"
}
// 加密后服务端响应的数据内容
{
"data":"数据密文",
}
首先在restful_api_settings.py中增加请求超时时间设定值:
# 自定义请求超时时间120秒
API_ReQUEST_TIME_OUT = 120
在cache_settings.py中增加AES密钥缓存键,然后通过django.conf.settings引入,这样做的目的是方便以后更改:
# aes加密公钥缓存键
AES_KEY_CACHE_KEY = 'aes_key'
AES_KEY_VECTOR_CACHE_KEY = 'aes_key_vector'
然后在请求序列化器基类中验证时间戳值和数字签名,所有序列化器都应该继承这个基类,这样不用重复编写时间戳和数字签名验证,新建一个该基类的子类验证rsa公钥。
api_auth_serializers.py中
import time
from hashlib import md5
from django.conf import settings
from rest_framework import serializers
"""
认证API序列化器
"""
# 请求序列化器基类
class KaopuAPIAuthRequestBaseSerializer(serializers.Serializer):
"""
认证请求数据序列化器基类
"""
timestamp = serializers.CharField()
signature = serializers.CharField()
def validate(self, raw_data):
# 验证时间戳
current_timestamp = time.time()
if float(current_timestamp) - float(raw_data['timestamp']) > settings.API_REQUEST_TIME_OUT:
message = 'Request timeout.'
raise serializers.ValidationError(message)
# 请求携带的数字签名
raw_data_signature = raw_data['signature']
# 删除请求中的签名值
raw_data.pop('signature')
# 生成请求内容md5
sorted_keys = sorted(raw_data)
md5_data = ''
for i in sorted_keys:
md5_data += i + str(raw_data[i])
md5_data = md5_data.replace('\n', '')
new_signature = md5(md5_data.encode(encoding='utf-8')).hexdigest()
# 验证md5
if raw_data_signature != str(new_signature):
message = 'Request data have been modified.'
raise serializers.ValidationError(message)
# 请求时间戳已经无用
raw_data.pop('timestamp')
return raw_data
class RequireAuthSerializer(KaopuAPIAuthRequestBaseSerializer):
"""
请求认证序列化器
"""
# rsa 公钥
key = serializers.CharField()
接着在Response类中定义返回数据和执行数据加密:
api_auth_responses.py中
import base64
import hashlib
import time
import uuid
from binascii import b2a_hex
from random import sample
from django.conf import settings
import rsa
from Crypto.Cipher import AES
from Cryptodome import Random
from django.core.cache import cache
from rest_framework.response import Response
class APIAuthBaseResponse(Response):
def __init__(self):
super().__init__()
self.data = {'timestamp': time.time()}
def generate_signature(self, data):
"""
请求内容数据签名
:param data:
:return: md5签名值
"""
sorted_keys = sorted(data)
# 键排序,保证签名时字典顺序一致
md5_data = ''
for i in sorted_keys:
md5_data += i + str(data[i])
signature = hashlib.md5(md5_data.encode(encoding='utf-8')).hexdigest()
return signature
class RequireAuthResponse(APIAuthBaseResponse):
def __init__(self, **kwargs):
super().__init__()
rsa_pub_key = rsa.PublicKey.load_pkcs1(str(kwargs['data']['key']).encode(encoding='utf-8'))
# 生成响应数据
response_data = self.generate_response_data()
# generate_response_data()后缓存aes密钥
cache.set(response_data['state'],
{settings.AES_KEY_CACHE_KEY: response_data['aes_key'],
settings.AES_KEY_VECTOR_CACHE_KEY: response_data['aes_key_vector']})
# 响应数据加密
self.response_data_encrypt(response_data, rsa_pub_key)
def generate_response_data(self):
"""
生成响应数据
:return:
"""
response_data = dict()
# 生成随机AES_key
base_aes_key = str(uuid.uuid4()).split('-')
aes_key = base_aes_key[0] + base_aes_key[-1][:8] + '5g&k3#'.join(sample("w>^qs)hk+l;,*UBHnmCD./)+", 2))
# 密钥key长度必须为 24(AES-192)
response_data['aes_key'] = aes_key
# 生成长度等于AES块大小的不可重复的密钥向量
response_data['aes_key_vector'] = base64.b64encode(b2a_hex(Random.new().read(AES.block_size))).decode(
encoding='utf-8')
# 生成state
state = ''.join(sample(str(uuid.uuid1()).replace('-', ''), 14)).join(
sample("b!c[d@o19ghlMvTEas_=.76/?&zmi89f", 2))
# base64编码避免后续请求中state与URL关键词冲突
response_data['state'] = base64.b64encode(b2a_hex(state.encode(encoding='utf-8'))).decode(encoding='utf-8')
# 时间戳
response_data['timestamp'] = self.data['timestamp']
return response_data
def response_data_encrypt(self, data, rsa_pub_key):
"""
响应数据加密
:param data: 待加密数据
:param rsa_pub_key: rsa公钥
:return: 密文
"""
# 先生成数字签名
signature = self.generate_signature(data)
data['signature'] = signature
# 数据分段长度
encrypt_split_size = 100
encrypt_list = []
data_text = str(data)
# 分段加密
for i in range(0, len(data_text), encrypt_split_size):
content = rsa.encrypt(data_text[i:i + encrypt_split_size].encode(encoding='utf-8'), rsa_pub_key)
encrypt_list.append(content)
crypto_data = b''.join(encrypt_list)
# 清空数据原文,必须在数据加密后执行
self.data.clear()
# 加密数据base64编码解决传输问题
self.data['data'] = base64.b64encode(b2a_hex(crypto_data)).decode(encoding='utf-8')
最后定义API对应的视图函数和注册URL路径:
api_auth_view.py中
from rest_framework import viewsets
from rest_framework.decorators import action
from rest_framework.response import Response
from KaopuBackendDoNotDelete.api_auth.api_auth_responses import RequireAuthResponse
from KaopuBackendDoNotDelete.api_auth.api_auth_serializers import RequireAuthSerializer
from KaopuBackendDoNotDelete.restful_api_settings.api_exception_handle import KaopuValueError
class KaopuAPIAuthViewSet(viewsets.ViewSet):
# 取消默认的身份验证
permission_classes = []
@action(detail=False, methods=['post'], url_name='api_oauth_require_auth',
url_path='require_auth')
def require_auth(self, request):
try:
serializer = RequireAuthSerializer(data=request.data, )
if serializer.is_valid(raise_exception=True):
return RequireAuthResponse(data=serializer.validated_data)
except KaopuValueError as e:
return Response(e.get_full_details(), status=e.status_code)
except KeyError:
error = KaopuValueError()
return Response(error.get_full_details(), status=error.status_code)
与settings.py同级目录下的urls.py中添加以下代码,后续增加的Viewswet中的方法不用再注册路由,只需要在装饰器中填上url_path参数即可。
from django.contrib import admin
from django.urls import path, include
from rest_framework import routers
from KaopuBackendDoNotDelete.api_auth.api_auth_view import KaopuAPIAuthViewSet
# 自动生成api 认证路由
api_oauth_router = routers.SimpleRouter()
api_oauth_router.register(r'api_auth', KaopuAPIAuthViewSet, basename='api_auth')
urlpatterns = api_oauth_router.urls
urlpatterns = [
path('admin/', admin.site.urls),
......,
path('', include(api_oauth_router.urls))
]
提示
使用git跟踪这个重大特性。
(C)客户端发送身份验证数据
客户端将用户名和密码等认证信息用AES加密后发送给认证服务器。
POST POST /api_auth/token/
//加密前
{
"user_name":"用户名",
"password":"用户密码",
"grant_type":"password",
"scope":"可选,表示授权范围",
"client_id":"客户端id",
"client_secret":"应用密钥",
"state":"服务端发来的state",
"timestamp":"时间戳",
"signature":"内容的数字签名"
}
//加密后
{
"data":"数据密文",
"state":"服务端发来的state",
}
客户端代码
# 缓存测试数据
cache_list = {}
class TestRequest(unittest.TestCase):
def signature(self, data):
......
def test_1_require_api_auth(self):
......
def test_2_get_access_token(self):
"""
测试获取access_token
:return:
"""
r_data = {"username": 'productmanager', "password": 'kaopu123', "grant_type": 'password',
"scope": 'read write',
"client_id": '你的client_id',
"client_secret": '你的client_secret',
"state": cache_list["state"], "timestamp": time.time()}
signature = self.signature(r_data)
r_data['signature'] = signature
# 发起请求
r = requests.post('http://localhost:8000/api_auth/token/',
data={"data": self.aes_encrypt(r_data), "state": cache_list['state']})
print("test_get_access_token响应解密前", r.content.decode(encoding='utf-8'))
res_data = self.aes_decrypt(json.loads(r.content.decode(encoding='utf-8'))['data'])
print("test_get_access_token响应原文", res_data)
def aes_encrypt(self, data):
"""
aes 加密
:param data:
:return:
"""
data = self.add_to_16(data)
encryptor = AES.new(str(cache_list["aes_key"]).encode(encoding='utf-8'), AES.MODE_CBC,
cache_list["aes_key_vector"])
cipher_text = encryptor.encrypt(data)
# 因为AES加密后的字符串不一定是ascii字符集的,输出保存可能存在问题,所以这里转为16进制字符串并用base64编码
return base64.b64encode(b2a_hex(cipher_text)).decode(encoding='utf-8')
def add_to_16(self, text):
"""
AES 加密补位,如果text不足16位的倍数就用空格补足为16位
:return:
"""
text = str(text)
if len(text.encode('utf-8')) % 16:
add = 16 - (len(text.encode('utf-8')) % 16)
else:
add = 0
text = text + ('\0' * add)
return text.encode('utf-8')
def aes_decrypt(self, data):
"""
aes 解密
:param key: aes密钥缓存键
:param data:
:return:
"""
encrypted_data = a2b_hex(base64.b64decode(data))
encryptor = AES.new(str(cache_list["aes_key"]).encode(encoding='utf-8'), AES.MODE_CBC,
cache_list["aes_key_vector"])
raw_data = encryptor.decrypt(encrypted_data)
raw_data = raw_data.decode(encoding='utf-8')
return eval(raw_data.replace('\0', ''))
(D) 服务端派发Token
认证服务器确认通过后用AES加密并下发Access Token和Refresh Token,客户端应缓存Access Token并定期刷新。
//响应头
Content-Type: application/json;charset=UTF-8//这一部分不用显式添加,因为restframework可以进行自动内容协商
Cache-Control: no-store
Pragma: no-cache
//数据
{
"access_token":"2YotnFZFEjr1zCsicMWpAA",
"token_type":"example",
"expires_in":3600,
"refresh_token":"tGzv3JOkF0XG5Qx2TlKWIA",
"example_parameter":"example_value",
"timestamp":"时间戳",
"signature":"内容的数字签名"
}
//加密后
{
"data":"数据密文"
}
首先新建一个aes加密模块,方便复用。新建文件api_data_aes_encrypt.py:
api_data_aes_encrypt.py
import base64
from binascii import a2b_hex, b2a_hex
from Crypto.Cipher import AES
from django.conf import settings
from django.core.cache import cache
from rest_framework import serializers
def aes_decrypt(data, key):
"""
aes 解密
:param key: aes密钥缓存键
:param data:
:return:
"""
aes_key, aes_key_vector = get_cached_aes_key(key)
encrypted_data = a2b_hex(base64.b64decode(data))
cryptos = AES.new(aes_key, AES.MODE_CBC, aes_key_vector)
raw_data = cryptos.decrypt(encrypted_data)
raw_data = raw_data.decode(encoding='utf-8')
return eval(raw_data.replace('\0', ''))
def get_cached_aes_key(key):
"""
获取缓存的aes密钥
:param key:
:return:
"""
# 获取缓存的aes密钥
cache_data = cache.get(key)
# 验证缓存是否过期
if not cache_data:
message = 'Data-encrypting key timeout.Please try require_auth again.'
raise serializers.ValidationError(message)
aes_key = str(cache_data[settings.AES_KEY_CACHE_KEY]).encode(encoding='utf-8')
aes_key_vector = a2b_hex(base64.b64decode(cache_data[settings.AES_KEY_VECTOR_CACHE_KEY]))
return aes_key, aes_key_vector
def aes_encrypt(data, key):
"""
aes 加密
:param key: aes 缓存键
:param data:
:return:
"""
data = add_to_16(data)
aes_key, aes_key_vector = get_cached_aes_key(key)
encryptor = AES.new(aes_key, AES.MODE_CBC,
aes_key_vector)
cipher_text = encryptor.encrypt(data)
# 因为AES加密后的字符串不一定是ascii字符集的,输出保存可能存在问题,所以这里转为16进制字符串并用base64编码
return base64.b64encode(b2a_hex(cipher_text)).decode(encoding='utf-8')
def add_to_16(text):
"""
AES 加密补位,如果text不足16位的倍数就用空格补足为16位
:return:
"""
text = str(text)
if len(text.encode('utf-8')) % 16:
add = 16 - (len(text.encode('utf-8')) % 16)
else:
add = 0
text = text + ('\0' * add)
return text.encode('utf-8')
密文使用base64编码,方便传输。 接着在api_auth_responses.py中添加响应:
class APIAuthBaseResponse(Response):
......
class RequireAuthResponse(APIAuthBaseResponse):
......
class GetAccessTokenResponse(APIAuthBaseResponse):
"""
获取token验证
"""
def __init__(self, **kwargs):
super().__init__()
request_data = kwargs['data']
# 获取state
state = request_data.pop('state')
# 转发请求到授权服务器获取access token
request_data = dict(request_data)
auth_data = (request_data['client_id'], request_data['client_secret'])
request_data.pop('client_id')
request_data.pop('client_secret')
token_response = requests.post(settings.GET_ACCESS_TOKEN_URL, data=request_data, auth=auth_data)
# 验证错误
if token_response.status_code == HTTP_400_BAD_REQUEST:
raise serializers.ValidationError('Username or password is invalid.')
# 加密数据并返回
response_data = eval(token_response.content.decode())
response_data['timestamp'] = self.data['timestamp']
signature = self.generate_signature(response_data)
response_data['signature'] = signature
encrypted_data = aes_encrypt(response_data, state)
self.data = {'data': encrypted_data}
为了方便修改获取token的url,在restful_api_settings.py中添加设置GET_ACCESS_TOKEN_URL设置变量:
REST_FRAMEWORK = {
......
}
# 允许权限
OAUTH2_PROVIDER = {
.....
}
# 获取access token的url
GET_ACCESS_TOKEN_URL = "http://localhost:8000/o/token/"
注意
1.state的作用是代替client_id作为客户端身份的区分(个人理解,不一定正确),避免暴露client_id。
2.实际上我们的授权服务器是独立的,这里应用了请求转发,即客户端将加密的验证信息解密并验证后传给授权服务器获取token,再返回token给客户端。
(E) 使用Token请求资源
客户端将Access Token用AES加密后向资源服务器请求受保护的资源。
POST/GET/PUT/PATCH/DELETE 例如:https://api.myhost.com/v1/source?state=<state>&<返回数据的过滤要求>
//请求头中设置
Authorization: Bearer <your_access_token>
(F) 资源服务器用AES加密并下发资源。
在应用文件夹kaopu_shop包下新建一个kaopu_shop_responses包,然后新建kaopu_shop_responses.py文件:
然后编写响应类:
import hashlib
import json
import time
from rest_framework.response import Response
from KaopuBackendDoNotDelete.restful_api_settings.api_data_aes_encrypt import aes_encrypt
class KaopuShopBaseResponse(Response):
"""
kaopu_shop资源API响应基类
"""
def __init__(self):
super().__init__()
self.data = {'timestamp': time.time()}
def generate_signature(self, data):
"""
请求内容数据签名
:param data:
:return: md5签名值
"""
sorted_keys = sorted(data)
# 键排序,保证签名时字典顺序一致
md5_data = ''
for i in sorted_keys:
md5_data += i + str(data[i])
signature = hashlib.md5(md5_data.encode(encoding='utf-8')).hexdigest()
return signature
class KaopuShopProductResponse(KaopuShopBaseResponse):
"""
kaopu shop产品数据响应
"""
def __init__(self, **kwargs):
super().__init__()
response_data = {'data': json.dumps(kwargs['data'], ensure_ascii=False),
'timestamp': self.data.pop('timestamp')}
state = kwargs['state']
signature = self.generate_signature(response_data)
# 返回数据aes加密
self.data['data'] = aes_encrypt(response_data, state)
self.data['signature'] = signature
接着在资源API视图函数中使用这些响应类即可,返回数据加密已经集成在响应类中了。
注:A.客户端与服务端通信存在两对密钥,一对是由客户端创建的RSA密钥对,公钥由服务端持有,私钥由客户端持有,私钥绝对不能泄露。服务端向客户端发送AES密钥时用这个公钥加密,客户端用对应的私钥解密得到AES密钥用于后续数据加密;另一对是服务端创建的AES密钥对,AES密钥绝对不能泄露。B.为了安全,每次认证使用的密钥对都不应该相同。
下一节:Django 架设 Restful API(五)API开发:资源API实现
https://blog.csdn.net/anbuqi/article/details/114491086