日志分析
概述
分析的前提
半结构化数据
文本分析
提取数据(信息提取)
一、空格分隔
with open('xxx.log')as f:
for line in f:
for field in line.split():
print(field)
#注意这里拼接的一些技巧
logs = '''138.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /020/media.html?menu\
=3 HTTP/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou\
.com/search/spider.html)"''' fields = []
flag = False
tmp = '' #注意拼接"GET /020/media.html?menu=3 HTTP/1.1"这种字符串需借助标记变量!
for field in logs.split():
if not flag and (field.startswith('[') or field.startswith('"')):
if field.endswith(']') or field.endswith('"'):#处理首尾均有[]的字符串
fields.append(field.strip('[]"'))
# 处理只有左中括号的字符串,但是该字符串应该与接下类的某一段含有右括号的字符拼接起来[19/Feb/2013:10:23:29
else:#
tmp += field[1:]
flag = True
continue
#处理[19/Feb/2013:10:23:29 +0800]中的+0800]
if flag:
if field.endswith(']') or field.endswith('"'):
tmp += " " + field[:-1]
fields.append(tmp)
tmp = ''
flag = False
else:
tmp +=" " + field
continue fields.append(field)#直接加入不带有[]""的字符串
类型转换
import datetime def convert_time(timestr):
return datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z') #若上面的函数可简写成匿名函数形式
lambda timestr:datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z')
请求信息的解析
def get_request(request:str):
return dict(zip(['method','url','protocol'],request.split())) #上面的函数对应为如下匿名函数
lambda request:dict(zip(['method','url','protocol'],request.split()))
映射
import datetime
logs = '''138.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /020/media.html?menu\
=3 HTTP/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou\
.com/search/spider.html)"''' def convert_time(timestr):
return datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z') # lambda timestr:datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z') def get_request(request:str):
return dict(zip(['method','url','protocol'],request.split())) # lambda request:dict(zip(['method','url','protocol'],request.split())) names = ('remote','','','datetime','request','status','length','','useragent')
ops = (None,None,None,convert_time,get_request,int,int,None,None) def extract(line):
fields = []
flag = False
tmp = '' #"GET /020/media.html?menu=3 HTTP/1.1"
for field in logs.split():
if not flag and (field.startswith('[') or field.startswith('"')):
if field.endswith(']') or field.endswith('"'):#处理首尾均有[]的字符串
fields.append(field.strip('[]"'))
# 处理只有左中括号的字符串,但是该字符串应该与接下类的某一段含有右括号的字符拼接起来[19/Feb/2013:10:23:29
else:#
tmp += field[1:]
flag = True
continue
#处理[19/Feb/2013:10:23:29 +0800]中的+0800]
if flag:
if field.endswith(']') or field.endswith('"'):
tmp += " " + field[:-1]
fields.append(tmp)
tmp = ''
flag = False
else:
tmp +=" " + field
continue fields.append(field)#直接加入不带有[]""的字符串 # print(fields)
info = {}
for i,field in enumerate(fields):
name = names[i]
op = ops[i]
if op:
info[name] = (op(field))
return info print(extract(logs))
二、正则表达式提取
pattern = '''([\d.]{7,}) - - \[([/\w +:]+)\] "(\w+) (\S+) ([\w/\d.]+)" (\d+) (\d+) .+ "(.+)"'''
names = ('remote','datetime','request','method','url','ptorocol','status','length','useragent')
ops = (None,lambda timestr:datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),None,None,None,int,int,None)
pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[/\w +:]+)\] \
"(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)"\
(?P<status>\d+) (?P<length>\d+) .+ "(?PM<useragent>.+)"'''
ops = {
'datetime': lambda timestr:datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),
'status':int,
'length':int
}
import datetime
import re
logs = '''138.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /020/media.html?menu=3 HTTP/1.1" 200 16997 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'''
pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[\w/ +:]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"''' ops = {
'datetime': lambda timestr:datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),
'status':int,
'length':int
} regex = re.compile(pattern)
def extract(line):
matcher = regex.match(line)
#matcher.groupdict()函数返回一个包含所有match匹配的命名分组的字典
info = {k:ops.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}
return info print(extract(logs))
异常处理
滑动窗口
数据载入
时间窗口分析
概念
当width>interval(数据求值时会有重叠)
当width=interval(数据求值时没有重叠)
当width<interval(一般不采纳这种方案,会有数据缺失)
时序数据
数据分析基本程序结构
import random
import datetime def source():
while True:
yield {'datetime':datetime.datetime.now(),'value':random.randint(1,10)} #获取数据
src = source()
items = [next(src) for _ in range(3)]
# print(items) #处理函数
def handler(iterable):
vals = [x['value'] for x in iterable]
return sum(vals)/len(vals) print(handler(items))
#上述代码实模拟了一段时间内产生了数据,等了一段固定的时间取数据计算其平均值。
窗口函数实现
将上面的获取数据的程序扩展为windows函数,使用重叠的方案!
#代码实现:
import random
import datetime
import time def source():
while True:
yield {'value':random.randint(1,100),'datetime':datetime.datetime.now()}
time.sleep(1)
def windows(src,handler,width:int,interval:int):
"""
:param src:数据源、生成器、用来拿数据
:param handler: 数据处理函数
:param width: 时间窗口宽度,秒
:param interval: 处理时间间隔,秒
:return:None
"""
start = datetime.datetime.strptime('19710101 00:00:00 +0800','%Y/%m/%d %H:%M:%S %z')
current = datetime.datetime.strptime('19710101 00:00:01 +0800','%Y/%m/%d %H:%M:%S %z')
buffer = [] #窗口中待计算的数据
delta = datetime.timedelta(seconds=width-interval) for data in src:
if data:#存入临时缓存区
buffer.append(x)
current =data['datetime'] if (current - start).total_seconds() >= interval:
ret = handler(buffer)
print("{:.2f}".format(ret))
start = current
#更新buffer,current - delta表示需要重叠的数据
buffer = [x for x in buffer if x['datetime'] > current - delta] #处理函数
def handler(iterable):
vals = [x['value'] for x in iterable]
return sum(vals) / len(vals) windows(source(),handler,10,5)
分发
生产者消费模型
queue模块--队列
from queue import Queue
import random q = Queue()
print(q.put(random.randint(1,100)))
print(q.put(random.randint(1,100))) print(q.get())
print(q.get())
print(q.get(timeout=2))#阻塞两秒后抛出空值异常
分发器的实现
import threading
#定义线程
#target线程中运行的函数;args这个函数运行时需要的实参
t = threading.Thread(target=windows,args=(src,handler,width,interval)) #启动线程
t.start()
分发器代码实现
# Author: Baozi
#-*- codeing:utf-8 -*- # Author: Baozi
#-*- codeing:utf-8 -*- #日志分析项目
'''
1.新建一个python文件test.py
2.从日志文件中复制一条日志信息用于测试。logline存储这个日志字符串
'''
import threading
from queue import Queue
import datetime
import re
import random
import time # logs = '''138.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /020/media.html?menu=3 HTTP/1.1" 200 16997 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'''
pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[\w/ +:]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"''' ops = {
'datetime': lambda timestr:datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),
'status':int,
'length':int
} regex = re.compile(pattern)
def extract(line):
matcher = regex.match(line)
print(matcher.groupdict())
#matcher.groupdict()函数返回一个包含所有match匹配的命名分组的字典
info = {k:ops.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}
return info def load(path:str):
#单文件装载
with open(path)as f:
for line in f:
d = extract(line)
if d:
yield d
else:
#TODO 不合格的数据
continue
############################滑动窗口实现##################################################def windows(src:Queue,handler,width:int,interval:int):
"""
:param src:数据源、生成器、用来拿数据
:param handler: 数据处理函数
:param width: 时间窗口宽度,秒
:param interval: 处理时间间隔,秒
:return:
"""
start = datetime.datetime.strptime('1971/01/01 00:00:00 +0800','%Y/%m/%d %H:%M:%S %z')
current = datetime.datetime.strptime('1971/01/01 00:00:01 +0800','%Y/%m/%d %H:%M:%S %z')
buffer = [] #窗口中待计算的数据
delta = datetime.timedelta(seconds=width-interval) while True:
data = src.get()
if data:
buffer.append(data)
current =data['datetime'] if (current - start).total_seconds() >= interval:
ret = handler(buffer)
print(ret)
start = current
#buffer的处理
buffer = [x for x in buffer if x['datetime'] > current - delta] #处理函数
def handler(iterable):
vals = [x['value'] for x in iterable]
return sum(vals) / len(vals) def donothing_handler(iterable:list):
print(iterable)
return iterable ######################分发器实现##########################################
#数据分发器:这里做一个简单的一对多副本发送,一个数据通过分发器,发送到n个消费者
def dispatcher(src):
queues = []
threads = [] def req(handler,width,interval):
q = Queue()
queues.append(q) t = threading.Thread(target=windows,args=(q,handler,width,interval))
threads.append(t) def run():
for t in threads:
t.start() for x in src:#一条数据送到n个消费者各自的队列中
for q in queues:
q.put(x) return req,run req,run = dispatcher(load('test.log')) #req注册窗口
req(donothing_handler,1,1) #启动
run()
完成分析功能
状态码分析
def status_handler(iterable):
#一批时间窗口内的数据
status = {}
for item in iterable:
key = item['status']
if key not in status.keys():
status[key] = 0
status[key] = 1
total = sum(status.values())
return {k:v/total*100 for k,v in status.items()}
日志文件的加载
def openfile(path:str):
with open(path)as f:
for line in f:
d = extract(line)
if d:
yield d
else:
# TODO 不合格的数据
continue def load(*path:str):
#装载日志文件
for file in path:
p = Path(file)
if not p.exists():
continue
if p.is_dir():
for x in p.iterdir():
if x.if_file():
yield from openfile(str(x))
elif p.is_file():
yield from openfile(str(p))
完整代码如下:
#日志分析项目
'''
1.新建一个python文件test.py
2.从日志文件中复制一条日志信息用于测试。logline存储这个日志字符串
'''
import threading
from queue import Queue
import datetime
import re
import random
import time
from pathlib import Path
# logline = '''138.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /020/media.html?menu=3 HTTP/1.1" 200 16997 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'''
pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[\w/ +:]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"''' ops = {
'datetime': lambda timestr:datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),
'status':int,
'length':int
}
regex = re.compile(pattern) def extract(line):
matcher = regex.match(line)
print(matcher.groupdict())
#matcher.groupdict()函数返回一个包含所有match匹配的命名分组的字典
info = {k:ops.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}
return info def openfile(path:str):
with open(path)as f:
for line in f:
d = extract(line)
if d:
yield d
else:
# TODO 不合格的数据
continue def load(*path:str):
#文件装载
for file in path:
p = Path(file)
if not p.exists():
continue
if p.is_dir():
for x in p.iterdir():
if x.if_file():
yield from openfile(str(x))
elif p.is_file():
yield from openfile(str(p))
##################################滑动窗口实现##################################################
def windows(src:Queue,handler,width:int,interval:int):
start = datetime.datetime.strptime('1971/01/01 00:00:00 +0800','%Y/%m/%d %H:%M:%S %z')
current = datetime.datetime.strptime('1971/01/01 00:00:01 +0800','%Y/%m/%d %H:%M:%S %z')
buffer = [] #窗口中待计算的数据
delta = datetime.timedelta(seconds=width-interval) while True:
data = src.get()
if data:
buffer.append(data)
current =data['datetime'] if (current - start).total_seconds() >= interval:
ret = handler(buffer)
print(ret)
start = current
#buffer的处理
buffer = [x for x in buffer if x['datetime'] > current - delta] #处理函数
def status_handler(iterable):
#一批时间窗口内的数据
status = {}
for item in iterable:
key = item['status']
if key not in status.keys():
status[key] = 0
status[key] = 1
total = sum(status.values())
return {k:v/total*100 for k,v in status.items()} def handler(iterable):
vals = [x['value'] for x in iterable]
return sum(vals) / len(vals) def donothing_handler(iterable:list):
print(iterable)
return iterable
##########################数据分发器实现####################################
#数据分发器:这里做一个简单的一对多副本发送,一个数据通过分发器,发送到n个消费者
def dispatcher(src):
queues = []
threads = [] def req(handler,width,interval):
q = Queue()
queues.append(q) t = threading.Thread(target=windows,args=(q,handler,width,interval))
threads.append(t) def run():
for t in threads:
t.start() for x in src:#一条数据送到n个消费者各自的队列中
for q in queues:
q.put(x) return req,run req,run = dispatcher(load('test.log'))
#req注册窗口
req(donothing_handler,1,1)
# req(status_handler,2,2) #启动
run()
浏览器分析
useragent
信息提取
from user_agents import parse useragent = "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 Safari/537.36"
uaobj = parse(useragent) print(uaobj.browser)
print(uaobj.browser.family,uaobj.browser.version)
#输出如下:
Browser(family='Chrome', version=(67, 0, 3396), version_string='67.0.3396')
Chrome (67, 0, 3396)
#日志分析完整代码(新增几个小模块)
# Author: Baozi
#-*- codeing:utf-8 -*-
#日志分析项目
'''
1.新建一个python文件test.py
2.从日志文件中复制一条日志信息用于测试。logline存储这个日志字符串
'''
import threading
from queue import Queue
import datetime
import re
import random
import time
from pathlib import Path
from user_agents import parse
from collections import defaultdict # logline = '''138.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /020/media.html?menu=3 HTTP/1.1" 200 16997 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'''
# pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[\w/ +:]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"'''
pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[\w/ +:]+)\] "(?P<request>[^"]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"''' ops = {
'datetime': lambda timestr:datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),
'status':int,
'length':int,
'request':lambda request:dict(zip(('method','url','ptorocol'),request.split())),
'useragent':lambda useragent:parse(useragent)
}
regex = re.compile(pattern) def extract(line):
matcher = regex.match(line)
print(matcher.groupdict())
#matcher.groupdict()函数返回一个包含所有match匹配的命名分组的字典
info = {k:ops.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}
return info def openfile(path:str):
with open(path)as f:
for line in f:
d = extract(line)
if d:
yield d
else:
# TODO 不合格的数据
continue def load(*path:str):
#文件装载
for file in path:
p = Path(file)
if not p.exists():
continue
if p.is_dir():
for x in p.iterdir():
if x.if_file():
yield from openfile(str(x))
elif p.is_file():
yield from openfile(str(p))
###################################滑动窗口实现##############################################
def windows(src:Queue,handler,width:int,interval:int):
start = datetime.datetime.strptime('1971/01/01 00:00:00 +0800','%Y/%m/%d %H:%M:%S %z')
current = datetime.datetime.strptime('1971/01/01 00:00:01 +0800','%Y/%m/%d %H:%M:%S %z')
buffer = [] #窗口中待计算的数据
delta = datetime.timedelta(seconds=width-interval) while True:
data = src.get()
if data:
buffer.append(data)
current =data['datetime'] if (current - start).total_seconds() >= interval:
ret = handler(buffer)
print(ret)
start = current
#buffer的处理
buffer = [x for x in buffer if x['datetime'] > current - delta] #处理函数
#状态码分析
def status_handler(iterable):
#一批时间窗口内的数据
status = {}
for item in iterable:
key = item['status']
if key not in status.keys():
status[key] = 0
status[key] = 1
total = sum(status.values())
return {k:v/total*100 for k,v in status.items()} #浏览器分析
ua_dict = defaultdict(lambda :0)
def browser_handler(iterable:list):
for item in iterable:
ua = item['useragent']
key = (ua.browser.family,ua.browser.version_string)
ua_dict[key] =1
return ua_dict def handler(iterable):
vals = [x['value'] for x in iterable]
return sum(vals) / len(vals) def donothing_handler(iterable:list):
print(iterable)
return iterable
###########################数据分发器实现#####################################
#数据分发器:这里做一个简单的一对多副本发送,一个数据通过分发器,发送到n个消费者
def dispatcher(src):
queues = []
threads = [] def req(handler,width,interval):
q = Queue()
queues.append(q)
t = threading.Thread(target=windows,args=(q,handler,width,interval))
threads.append(t) def run():
for t in threads:
t.start() for x in src:#一条数据送到n个消费者各自的队列中
for q in queues:
q.put(x)
return req,run req,run = dispatcher(load('test.log'))
#req注册窗口
# req(donothing_handler,1,1)
# req(status_handler,2,2)
req(browser_handler,2,2) #启动
run()