正则表达式基础
发送整行日志存在的问题
- 整行message一般我们并不关心
- 需要对message进行段拆分,需要用到正则表达式
正则表达式
- 使用给定好的符号去表示某个含义
- 例如.代表任意字符
- 正则符号当普通符号使用需要加反斜杠
正则的发展
- 普通正则表达式
- 扩展正则表达式
普通正则表达式
. 任意一个字符
* 前面一个字符出现0次或者多次
[abc] 中括号内任意一个字符
[^abc] 非中括号内的字符
[0-9] 表示一个数字
[a-z] 小写字母
[A-Z] 大写字母
[a-zA-Z] 所有字母
[a-zA-Z0-9] 所有字母+数字
[^0-9] 非数字
^xx 以xx开头
xx$ 以xx结尾
\d 任何一个数字
\s 任何一个空白字符
扩展正则表达式,在普通正则符号再进行了扩展
? 前面字符出现0或者1次
+ 前面字符出现1或者多次
{n} 前面字符匹配n次
{a,b} 前面字符匹配a到b次
{,b} 前面字符匹配0次到b次
{a,} 前面字符匹配a或a+次
(string1|string2) 匹配string1或string2
示例:简单提取IP
1.1.1.1
114.114.114.114
255.277.277.277
(\d+\.){1,3}(\d+)
二、Logstash正则分析Nginx日志
Nginx日志说明
192.168.237.1 - - [24/Feb/2019:17:48:47 +0800] "GET /shijiange HTTP/1.1" 404 571 "-"
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"
- 访问IP地址
- 访问时间
- 请求方式(GET/POST)
- 请求URL
- 状态码
- 响应body大小
- Referer
- User Agent
Logstash正则提取日志
- 需要懂得正则,Logstash支持普通正则和扩展正则
- 需要了解Grok,利用Kibana的Grok学习Logstash正则提取日志
Grok提取Nginx日志
- Grok使用(?提取内容)来提取xxx字段
- 提取客户端IP
(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})
- 提取时间:
\[(?<requesttime>[^ ]+ \+[0-9]+)\]
- Grok提取Nginx日志(提取Tomcat等日志使用类似的方法)
(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<requesttime>[^ ]+ \+[0-9]+)\] \
"(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"
Json的好处
- 原生日志需要做正则匹配,比较麻烦
- Json格式的日志不需要正则能直接分段采集
Nginx使用Json格式日志
log_format json '{"@timestamp":"$time_iso8601",'
'"clientip":"$remote_addr",'
'"status":$status,'
'"bodysize":$body_bytes_sent,'
'"referer":"$http_referer",'
'"ua":"$http_user_agent",'
'"handletime":$request_time,'
'"url":"$uri"}';
access_log logs/access.log;
access_log logs/access.json.log json;
Filebeat采集Json格式的日志
filebeat.inputs:
- type: log
tail_files: true
backoff: "1s"
paths:
- /usr/local/nginx/logs/access.json.log
output:
logstash:
hosts: ["192.168.237.51:5044"]
Logstash解析Json日志
input {
beats {
host => '0.0.0.0'
port => 5044
}
}
filter {
json {
source => "message"
remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]
}
}
output {
elasticsearch {
hosts => ["http://192.168.237.50:9200"]
}
}
Logstash正则提取Nginx日志
input {
file {
path => "/usr/local/nginx/logs/access.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => {
"message" => '(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<requesttime>[^ ]+ \+[0-9]+)\] \
"(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"'
}
#注意,只能去除_source里的,非_source里的去除不了
remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]
}
# ELK时间轴自定义,默认ELK时间轴,以发送日志的时间为准,而Nginx上本身记录着用户的访问时间
# 分析Nginx上的日志以用户的访问时间为准,而不以发送日志的时间
# 不同的时间格式,覆盖的时候格式要对应
# 20/Feb/2019:14:50:06 -> dd/MMM/yyyy:HH:mm:ss
# 2016-08-24 18:05:39,830 -> yyyy-MM-dd HH:mm:ss,SSS
date {
match => ["requesttime", "dd/MMM/yyyy:HH:mm:ss Z"]
# 覆盖@timestamp
target => "@timestamp"
}
}
# Logstash正则提取出错就不输出到ES
output{
if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] {
elasticsearch {
hosts => ["http://192.168.237.50:9200"]
}
}
}