python线程安全日志处理模块
import time
import logging
from logging import handlers
from queue import Queue
from threading import Thread
logs_queue = Queue()
class LogMsg(object):
def __init__(self, type, msg):
self._type = type
self._msg = msg
class Logger(object):
# 日志级别关系映射
level_relations = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'crit': logging.CRITICAL
}
def __init__(self, filename, level='info', when='D', backCount=3,
fmt='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'):
self.logger = logging.getLogger(filename)
format_str = logging.Formatter(fmt) # 设置日志格式
self.logger.setLevel(self.level_relations.get(level)) # 设置日志级别
sh = logging.StreamHandler() # 往屏幕上输出
sh.setFormatter(format_str) # 设置屏幕上显示的格式
# 往文件里写入 指定间隔时间自动生成文件的处理器
th = handlers.TimedRotatingFileHandler(filename=filename, when=when, backupCount=backCount, encoding='utf-8')
# 实例化TimedRotatingFileHandler
# interval是时间间隔,backupCount是备份文件的个数,如果超过这个个数,就会自动删除,when是间隔的时间单位,单位有以下几种:
# S 秒
# M 分
# H 小时、
# D 天、
# W 每星期(interval==0时代表星期一)
# midnight 每天凌晨
th.setFormatter(format_str) # 设置文件里写入的格式
self.logger.addHandler(sh) # 把对象加到logger里
self.logger.addHandler(th)
def cop_log_queue(logger):
while True:
if logs_queue.empty():
time.sleep(1)
log_msg = logs_queue.get()
if log_msg._type == 'debug':
print(f'2==={log_msg._msg}')
logger.debug(log_msg._msg)
elif log_msg._type == 'info':
logger.info(log_msg._msg)
elif log_msg._type == 'warning':
logger.warning(log_msg._msg)
elif log_msg._type == 'error':
logger.error(log_msg._msg)
elif log_msg._type == 'critical':
logger.critical(log_msg._msg)
def error_thread1():
while True:
logs_queue.put(LogMsg('warning', 'warning_msg'))
print('1')
time.sleep(1)
def error_thread2():
while True:
logs_queue.put(LogMsg('info', 'info_msg'))
time.sleep(2)
if __name__ == '__main__':
filename = './log/l2.log'
logger = Logger(filename, level='info', when='D').logger
t1 = Thread(target=cop_log_queue, args=(logger,))
t2 = Thread(target=error_thread1, args=())
t3 = Thread(target=error_thread2, args=())
tList = [t1, t2, t3]
for t in tList:
t.start()
for t in tList:
t.join()