使用logstash过滤出特定格式的日志

1.一个例子

  • 项目日志生成在某个路径,如/var/log/project,里面有warn,info,error目录,分别对应不同级别的日志,需要采集这些日志。
  • 需要采集特定格式的日志,如:
[2018-11-24 08:33:43,253][ERROR][http-nio-8080-exec-4][com.hh.test.logs.LogsApplication][code:200,msg:测试录入错误日志,param:{}]
  • 筛选采集到的日志,生成自定义字段,如 date, level, thread, class, msg
filter {
  if "nova" in [tags]{
    grok {
      # 筛选过滤
      match => {
        "message" => "(?<date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\[(?<level>[A-Z]{4,5})\]\[(?<thread>[A-Za-z0-9/-]{4,40})\]\[(?<class>[A-Za-z0-9/.]{4,40})\]\[(?<msg>.*)"
      }
    mutate {
      remove_field => [
        "message",
      ]
    }
    # 不匹配正则则删除,匹配正则用=~
    if [level] !~ "(ERROR|WARN|INFO)" {
      # 删除日志
      drop {}
    }
  }
}

备注:grok里边有定义好的现场模板可以用,但是更多的是自定义模板,规则是这样的,小括号里边包含一个key和value,例子:(?value),比如以上的信息,第一个定义的key是date,表示方法为:? 前边一个问号,然后用<>把key包含在里边去。value就是纯正则了。这有个在线的调试库,以供参考:http://grokdebug.herokuapp.com/

2.多项匹配(提高性能)

grok {
  "match" => {
    "message => [
      '%{IPORHOST:clientip} %{DATA:process_name}\[%{NUMBER:process_id}\]: %{WORD:word_1} %{WORD:word_2} %{NUMBER:number_1} %{NUMBER:number_2} %{DATA:data}',
      '%{IPORHOST:clientip} %{DATA:process_name}\[%{NUMBER:process_id}\]: %{WORD:word_1} %{NUMBER:number_1} %{NUMBER:number_2} %{NUMBER:number_3} %{DATA:data};%{NUMBER:number_4}',
      '%{IPORHOST:clientip} %{DATA:process_name}\[%{NUMBER:process_id}\]: %{DATA:data} | %{NUMBER:number}'
    ]
  }
}

logstash 会按照这个定义次序依次尝试匹配,直到匹配成功为止。虽说效果跟用 | 分割写个大大的正则是一样的,但是可阅读性好很多。

3.grok匹配失败(性能基准测试)

当grok匹配失败的时候,插件会为这个事件打个tag,默认是 _grokparsefailure。LogStash允许你把这些处理失败的事件路由到其他地方做后续的处理,例如:

input { # ... }
filter {
  grok {
    match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{IPV4:ip};%{WORD:environment}\] %{LOGLEVEL:log_level} %{GREEDYDATA:message}" }
  }
}
output {
  if "_grokparsefailure" in [tags] {
    # write events that didn't match to a file
    file { "path" => "/tmp/grok_failures.txt" }
  } else {
     elasticsearch { }
  }
}

4.快速失败,设置锚点(提高性能)

^%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}$

5.数据修改(Mutate)

filter {
  mutate {
    convert => ["request_time", "float"]
  }
}

6.大小写转换:uppercase 和 lowercase

filter {
  mutate {
    uppercase => [ "fieldname" ]
  }
}
上一篇:filebeat 提取获取massage字段 利用pipeline grok 7.12


下一篇:关于企业信息的正则表达式