Logstash搜集Nginx日志介绍

正则表达式基础

发送整行日志存在的问题

  1. 整行message一般我们并不关心
  2. 需要对message进行段拆分,需要用到正则表达式

正则表达式

  1. 使用给定好的符号去表示某个含义
  2. 例如.代表任意字符
  3. 正则符号当普通符号使用需要加反斜杠

正则的发展

  1. 普通正则表达式
  2. 扩展正则表达式

普通正则表达式

.        任意一个字符
*        前面一个字符出现0次或者多次
[abc]  中括号内任意一个字符
[^abc] 非中括号内的字符
[0-9]   表示一个数字
[a-z]    小写字母
[A-Z]   大写字母
[a-zA-Z]   所有字母
[a-zA-Z0-9] 所有字母+数字
[^0-9]  非数字
^xx    以xx开头
xx$    以xx结尾
\d   任何一个数字
\s   任何一个空白字符

扩展正则表达式,在普通正则符号再进行了扩展

? 前面字符出现0或者1次
+ 前面字符出现1或者多次
{n}  前面字符匹配n次
{a,b}  前面字符匹配a到b次
{,b}   前面字符匹配0次到b次
{a,}   前面字符匹配a或a+次
(string1|string2) 匹配string1或string2

示例:简单提取IP

1.1.1.1
114.114.114.114
255.277.277.277
(\d+\.){1,3}(\d+)

二、Logstash正则分析Nginx日志

Nginx日志说明

192.168.237.1 - - [24/Feb/2019:17:48:47 +0800] "GET /shijiange HTTP/1.1" 404 571 "-" 
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"
  • 访问IP地址
  • 访问时间
  • 请求方式(GET/POST)
  • 请求URL
  • 状态码
  • 响应body大小
  • Referer
  • User Agent

Logstash正则提取日志

  1. 需要懂得正则,Logstash支持普通正则和扩展正则
  2. 需要了解Grok,利用Kibana的Grok学习Logstash正则提取日志

Grok提取Nginx日志

  1. Grok使用(?提取内容)来提取xxx字段
  2. 提取客户端IP
(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})
  1. 提取时间:
\[(?<requesttime>[^ ]+ \+[0-9]+)\]
  1. Grok提取Nginx日志(提取Tomcat等日志使用类似的方法)
(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<requesttime>[^ ]+ \+[0-9]+)\] \ 

"(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"

Json的好处

  1. 原生日志需要做正则匹配,比较麻烦
  2. Json格式的日志不需要正则能直接分段采集

Nginx使用Json格式日志

log_format json '{"@timestamp":"$time_iso8601",'
                 '"clientip":"$remote_addr",'
                 '"status":$status,'
                 '"bodysize":$body_bytes_sent,'
                 '"referer":"$http_referer",'
                 '"ua":"$http_user_agent",'
                 '"handletime":$request_time,'
                 '"url":"$uri"}';
access_log  logs/access.log;
access_log  logs/access.json.log  json;

Filebeat采集Json格式的日志

filebeat.inputs:
- type: log
  tail_files: true
  backoff: "1s"
  paths:
      - /usr/local/nginx/logs/access.json.log
output:
  logstash:
    hosts: ["192.168.237.51:5044"]

Logstash解析Json日志

input {
  beats {
    host => '0.0.0.0'
    port => 5044
  }
}
filter {
   json {     
       source => "message"
      remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]
  }
}
output {
  elasticsearch {
    hosts => ["http://192.168.237.50:9200"]
  }
}

Logstash正则提取Nginx日志

input {
  file {
    path => "/usr/local/nginx/logs/access.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
filter {
    grok {
        match => {
            "message" => '(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<requesttime>[^ ]+ \+[0-9]+)\] \
            "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"'
        }
        #注意,只能去除_source里的,非_source里的去除不了
        remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]
    }
    # ELK时间轴自定义,默认ELK时间轴,以发送日志的时间为准,而Nginx上本身记录着用户的访问时间
    # 分析Nginx上的日志以用户的访问时间为准,而不以发送日志的时间  
    # 不同的时间格式,覆盖的时候格式要对应
    # 20/Feb/2019:14:50:06 -> dd/MMM/yyyy:HH:mm:ss
    # 2016-08-24 18:05:39,830 -> yyyy-MM-dd HH:mm:ss,SSS
    date {
        match => ["requesttime", "dd/MMM/yyyy:HH:mm:ss Z"]
        # 覆盖@timestamp
        target => "@timestamp"
    }
}
# Logstash正则提取出错就不输出到ES
output{
    if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] {
        elasticsearch {
            hosts => ["http://192.168.237.50:9200"]
        }
    }
}
上一篇:logstash安装和基本使用


下一篇:(五)史上最强ELK集群搭建系列教程——logstash搭建