Python开发【Django】:日志记录、API认证

日志记录:

  调用同一个对象,分别记录错误日志和运行日志

自定义日志类:

class Logger(object):
__instance = None def __init__(self):
self.run_log_file = settings.RUN_LOG_FILE
self.error_log_file = settings.ERROR_LOG_FILE
self.run_logger = None
self.error_logger = None self.initialize_run_log()
self.initialize_error_log() def __new__(cls, *args, **kwargs): # 单例模式
if not cls.__instance:
cls.__instance = object.__new__(cls, *args, **kwargs)
return cls.__instance @staticmethod
def check_path_exist(log_abs_file):
log_path = os.path.split(log_abs_file)[0]
if not os.path.exists(log_path):
os.mkdir(log_path) def initialize_run_log(self):
self.check_path_exist(self.run_log_file)
file_1_1 = logging.FileHandler(self.run_log_file, 'a', encoding='utf-8')
fmt = logging.Formatter(fmt="%(asctime)s - %(levelname)s : %(message)s")
file_1_1.setFormatter(fmt)
logger1 = logging.getLogger('run_log') # 'run_log' 随意写
logger1.setLevel(logging.INFO) # 效果与error里logging.Logger一样
logger1.addHandler(file_1_1)
self.run_logger = logger1 def initialize_error_log(self):
self.check_path_exist(self.error_log_file)
file_1_1 = logging.FileHandler(self.error_log_file, 'a', encoding='utf-8')
fmt = logging.Formatter(fmt="%(asctime)s - %(levelname)s : %(message)s")
file_1_1.setFormatter(fmt)
logger1 = logging.Logger('run_log', level=logging.ERROR)
logger1.addHandler(file_1_1)
self.error_logger = logger1 def log(self, message, mode=True):
"""
写入日志
:param message: 日志信息
:param mode: True表示运行信息,False表示错误信息
:return:
"""
if mode:
self.run_logger.info(message)
else:
self.error_logger.error(message)

调用方式: 

if ret['code'] == 1000:
print(IP,'更新成功')
Logger().log(ret['message'], True)
else:
Logger().log(ret['message'], False)

  

API认证: 

  密钥串+时间戳发送进行认证,超过超时时间认证失败,认证过的密钥过时清空

客户端:

import hashlib
import time
import requests class Client(object): def __init__(self):
self.key = '299095cc-1330-11e5-b06a-a45e60bec08b'
self.key_name = 'auth-key'
self.asset_api = 'http://127.0.0.1:8000/api/' def auth_key(self):
"""
接口认证
"""
ha = hashlib.md5(self.key.encode('utf-8')) # 用self.key 做加密盐
time_span = time.time()
ha.update(bytes("%s|%f" % (self.key, time_span), encoding='utf-8'))
encryption = ha.hexdigest()
result = "%s|%f" % (encryption, time_span)
return {self.key_name: result} def get_asset(self):
"""
post方式向街口提交资产信息
"""
headers = {}
headers.update(self.auth_key())
response = requests.get(
url=self.asset_api,
headers=headers,
) print(response.text) Client().get_asset()

服务端:  

from django.views import View
import time
import hashlib ASSET_AUTH_KEY = '299095cc-1330-11e5-b06a-a45e60bec08b' #认证的KEY
ASSET_AUTH_HEADER_NAME = 'HTTP_AUTH_KEY' # 认证头
ASSET_AUTH_TIME = 2 # 超时时间
ENCRYPT_LIST = [] #存放认证过的key def api_auth(request): auth_key = request.META.get(ASSET_AUTH_HEADER_NAME)
if not auth_key: # 请求认证头不正确
return False
sp = auth_key.split('|')
if len(sp) != 2: # 格式不正确
return False
encrypt, timestamp = sp
timestamp = float(timestamp) # str换成float
limit_timestamp = time.time() - ASSET_AUTH_TIME if limit_timestamp > timestamp: # 当前程序时间与客户端时间戳对比 超时
return False
ha = hashlib.md5(ASSET_AUTH_KEY.encode('utf-8'))
ha.update(bytes("%s|%f" % (ASSET_AUTH_KEY, timestamp), encoding='utf-8'))
result = ha.hexdigest()
if encrypt != result: # md5值校验
return False exist = False
del_keys = []
for k, v in enumerate(ENCRYPT_LIST): # 记录当前认证,如果在过期内已经认证过,则认证失败,另过期认证记录删除
m = v['time']
n = v['encrypt']
if m < limit_timestamp:
del_keys.append(k)
continue
if n == encrypt:
exist = True
for k in del_keys:
del ENCRYPT_LIST[k] if exist:
return False
ENCRYPT_LIST.append({'encrypt': encrypt, 'time': timestamp})
return True class Api(View): def get(self, request):
result = api_auth(request)
if result:
return HttpResponse('认证成功')
else:
return HttpResponse('去你的吧')

  

上一篇:pyoj61 双线DP


下一篇:Django REST framework 之 API认证