Logstash详解

下载安装

地址:https://www.elastic.co/cn/downloads/logstash

实例分析

下面是一条tomcat日志:

83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png
HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel
Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"

从filebeat中输出到logstash,配置如下:

input {
    beats {
        port => "5043"
    }
}
filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
}
output {
    stdout { codec => rubydebug }
}

使配置文件生效:

bin/logstash -f logstash-simple.conf

检查配置并启动Logstash

bin/logstash -f first-pipeline.conf --config.test_and_exit

--config.test_and_exit: 解析配置文件并报告任何错误

bin/logstash -f first-pipeline.conf --config.reload.automatic

--config.reload.automatic: 启用自动配置加载,以至于每次你修改完配置文件以后无需停止然后重启Logstash

fileter中的message代表一条一条的日志,%{COMBINEDAPACHELOG}代表解析日志的正则表达式,COMBINEDAPACHELOG的具体内容见: https://github.com/logstash-plugins/logstash-patterns-core。解析后:

{
        "request" => "/presentations/logstash-monitorama-2013/images/kibana-search.png",
          "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
         "offset" => 325,
           "auth" => "-",
          "ident" => "-",
     "input_type" => "log",
           "verb" => "GET",
         "source" => "/path/to/file/logstash-tutorial.log",
        "message" => "83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1\" 200 203023 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
           "type" => "log",
           "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
       "referrer" => "\"http://semicomplete.com/presentations/logstash-monitorama-2013/\"",
     "@timestamp" => 2016-10-11T21:04:36.167Z,
       "response" => "200",
          "bytes" => "203023",
       "clientip" => "83.149.9.216",
       "@version" => "1",
           "beat" => {
        "hostname" => "My-MacBook-Pro.local",
            "name" => "My-MacBook-Pro.local"
    },
           "host" => "My-MacBook-Pro.local",
    "httpversion" => "1.1",
      "timestamp" => "04/Jan/2015:05:13:42 +0000"
}

再比如,下面这条日志:

55.3.244.1 GET /index.html 15824 0.043

这条日志可切分为5个部分,IP(55.3.244.1)方法(GET)请求文件路径(/index.html)字节数(15824)访问时长(0.043),对这条日志的解析模式(正则表达式匹配)如下:

%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

写到filter中:

filter {
    grok {
        match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}
    }
}

解析后:

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

解析任意格式日志

解析任意格式日志的步骤:

  1. 先确定日志的切分原则,也就是一条日志切分成几个部分。
  2. 对每一块进行分析,如果Grok中正则满足需求,直接拿来用。如果Grok中没用现成的,采用自定义模式。
  3. 调试Grok:https://www.5axxw.com/tools/v2/grok.html。

下面给出例子,来两条日志:

2017-03-07 00:03:44,373 4191949560 [          CASFilter.java:330:DEBUG]  entering doFilter()

2017-03-16 00:00:01,641 133383049 [    UploadFileModel.java:234:INFO ]  上报内容准备写入文件

切分原则:

2017-03-16 00:00:01,641:时间
133383049:编号
UploadFileModel.java:java类名
234:代码行号
INFO:日志级别
entering doFilter():日志内容

前五个字段用Grok中已有的,分别是TIMESTAMP_ISO8601NUMBERJAVAFILENUMBERLOGLEVEL,最后一个采用自定义正则的形式,日志级别的]之后的内容不论是中英文,都作为日志信息处理,使用自定义正则表达式子的规则如下:

(?<field_name>the pattern here)

最后一个字段的内容用info表示,正则如下:

(?<info>([\s\S]*))

上面两条日志对应的完整的正则如下,其中\s*用于剔除空格。

\s*%{TIMESTAMP_ISO8601:time}\s*%{NUMBER:num} \[\s*%{JAVAFILE:class}\s*\:\s*%{NUMBER:lineNumber}\s*\:%{LOGLEVEL:level}\s*\]\s*(?<info>([\s\S]*))

正则解析容易出错,强烈建议使用Grok Debugger调试,姿势如下。

Logstash详解

配置文件类型

log-kafka配置文件

输入源为nginx的日志文件,输出源为kafka

input {
    file {
        path => "/var/logs/nginx/*.log"
        discover_interval => 5
        start_position => "beginning"
    }
}
 
output {
    kafka {
       topic_id => "accesslog"
       codec => plain {
          format => "%{message}"
          charset => "UTF-8"
       }
       bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092"
    }
}

file-kafka配置文件

输入源为txt文件,输出源为kafka

input {
   file {
      codec => plain {
        charset => "GB2312"
      }
      path => "D:/GameLog/BaseDir/*/*.txt"
      discover_interval => 30
      start_position => "beginning"
   }
}
 
output {
   kafka {
       topic_id => "gamelog"
       codec => plain {
          format => "%{message}"
          charset => "GB2312"
       }
       bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092"
   }
}

log-elasticsearch配置文件

输入源为nginx的日志文件,输出源为elasticsearch

input {
     file {
         type => "flow"
         path => "var/logs/nginx/*.log"
         discover_interval => 5
         start_position => "beginning"
     }
}
 
output {
    if [type] == "flow" {
        elasticsearch {
             index => "flow-%{+YYYY.MM.dd}"
             hosts => ["hadoop1:9200", "hadoop2:9200", "hadoop3:9200"]
        }
    }
}

本地日志输出到ES

input {
  file {
    path => "/root/journal.log"
      type => "journal"
      start_position => "beginning"
      sincedb_path => "/dev/null"
    }
}
output{
  elasticsearch {
    hosts => "192.168.4.155:9200"
    user => "elastic"
    # 索引名称
    index => "logstash_logs-%{+YYYY.MM.dd}"
    ecs_compatibility => disabled
  }
  stdout {
    # JSON格式输出
    codec=> json_lines  
  }
}

kafka-elasticsearch配置文件

输入源为kafka的accesslog和gamelog主题,并在中间分别针对accesslog和gamelog进行过滤,输出源为elasticsearch。当input里面有多个kafka输入源时,client_id => "es*"必须添加且需要不同,否则会报错javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-0。

input {
  kafka {
     type => "accesslog"
     codec => "plain"
     auto_offset_reset => "earliest"
     client_id => "es1"
     group_id => "es1"
     topics => ["accesslog"]
  }
 
  kafka {
     type => "gamelog"
     codec => "plain"
     auto_offset_reset => "earliest"
     client_id => "es2"
     group_id => "es2"
     topics => ["gamelog"]
     bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092"
  }
}
 
filter {
  if [type] == "accesslog" {
     json {
    	source => "message"
   		remove_field => ["message"]
    	target => "access"
     }
  }
   
  if [type] == "gamelog" {
     mutate {
         split => { "message" => "    " }
         add_field => {
          "event_type" => "%{message[3]}"
          "current_map" => "%{message[4]}"
          "current_x" => "%{message[5]}"
          "current_y" => "%{message[6]}"
          "user" => "%{message[7]}"
          "item" => "%{message[8]}"
          "item_id" => "%{message[9]}"
          "current_time" => "%{message[12]}"
         }
         remove_field => ["message"]
     }
  }
}
 
output {
  if [type] == "accesslog" {
     elasticsearch {
       index => "accesslog"
       codec => "json"
       hosts => ["hadoop1:9200","hadoop2:9200","hadoop3:9200"]
     }
  }
   
  if [type] == "gamelog" {
     elasticsearch {
      	flush_size => 20000    //表示logstash的包数量达到20000个才批量提交到es.默认是500
        idle_flush_time => 10  //多长时间发送一次数据,flush_size和idle_flush_time以定时定量的方式发送,按照批次发送,可以减少logstash的网络IO请求

        index => "gamelog"
        codec => plain {
             charset => "UTF-16BE"
        }
        hosts => ["hadoop1:9200","hadoop2:9200","hadoop3:9200"]
     }
  }
}

注:UTF-16BE为解决中文乱码,而不是UTF-8

logstash启动

logstash -f /opt/app/logstash-6.2.3/conf/flow-kafka.conf

Logstash详解

logstash遇到的问题

在使用logstash采集日志时,如果我们采用file为input类型,采用不能反复对一份文件进行测试!第一次会成功,之后就会失败!

Grok插件

Logstash的Grok 可以使蹩脚的、无结构,杂乱无章的的日志内容结构化

需要注意的地方
grok 模式是正则表达式,因此这个插件的性能受到正则表达式引擎影响,效率并不高。如果通过给定的匹配格式匹配不上,那么Kibana查询的时候会自动打上tag 为 grok failed 的标签。

为什么通过正则表达式效率不高?
个人见解: 如果你的每一条日志都是无结构的,那么Grok 需要对每一条日志去正则匹配,这样子就相当于你用java或者php对每一个日志进行分析然后过滤出自己想要的字段一样,同时也和Linux上的Grep或者Awk 去过滤文本内容是一样,都会消耗内存和CPU资源。

然而假如你的日志是有结构化的比如JSON格式的日志,那么高级语言像Python等直接导入json的库就可以,(Logstash里面直接使用json的编码插件即可)不需要对每一条日志进行过多的分析,那样解析的效率就会很高

语法详解

grok模式的语法如下:

%{SYNTAX:SEMANTIC}

SYNTAX:代表匹配值的类型,例如3.44可以用NUMBER类型所匹配,127.0.0.1可以使用IP类型匹配。

SEMANTIC:代表存储该值的一个变量名称,例如 3.44 可能是一个事件的持续时间,10.0.0.2 可能是请求的client地址。

所以这两个值可以用 %{NUMBER:duration} %{IP:client} 来匹配。

你也可以选择将数据类型转换添加到Grok模式。默认情况下,所有语义都保存为字符串。如果您希望转换语义的数据类型。
例如将字符串更改为整数,则将其后缀为目标数据类型。

例如%{NUMBER:num:int}将num语义从一个字符串转换为一个整数。

匹配的典型格式举例

  • %{NUMBER:duration} — 匹配浮点数
  • %{IP:client} — 匹配IP
  • (?([\S+])),自定义正则
  • (?<class_info>([\S+])), 自定义正则匹配多个字符
  • \s或者\s+,代表多个空格
  • \S+或者\S,代表多个字符
  • 大括号里面:xxx,相当于起别名
  • %{UUID},匹配类似091ece39-5444-44a1-9f1e-019a17286b48
  • %{WORD}, 匹配请求的方式
  • %{GREEDYDATA},匹配所有剩余的数据
  • %{LOGLEVEL:loglevel} ---- 匹配日志级别

自定义类型
(1) 命名捕获

(?<xx>[0-9A-F]{4})   # 这样可以匹配连续 长度为4的数字,并用xx来存储

(?<xx>[0-9A-F]{4})   # 这样可以匹配连续 长度为5的数字

(2) 创建自定义 patterns 文件进行匹配

  • 2.1 创建一个名为patterns其中创建一个文件xx_postfix (名字任意定义),在该文件中,将需要的模式写为
    模式名称,空格,然后是该模式的正则表达式
    例如:

    #contents of ./patterns/xx_postfix:
    xx [0-9A-F]{10,11}
    
    • 2.2 然后使用这个插件中的patterns_dir设置告诉logstash目录是你的自定义模式。
    filter {
    	grok {
        	patterns_dir => ["./patterns"]
    	    match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:xx}: %{GREEDYDATA:syslog_message}" }
    	}
    }
    

Grok过滤器配置选项
break_on_match
● 值类型是布尔值
● 默认是true
● 描述:match可以一次设定多组,预设会依照顺序设定处理,如果日志满足设定条件,则会终止向下处理。但有的时候我们会希望让Logstash跑完所有的设定,这时可以将break_on_match设为false

keep_empty_captures
● 值类型是布尔值
● 默认值是 false
● 描述:如果为true,捕获失败的字段将设置为空值

match
● 值类型是数组
● 默认值是 {}
● 描述:字段⇒值匹配
例如:

filter {
  grok { match => { "message" => "Duration: %{NUMBER:duration}" } }
}
//如果你需要针对单个字段匹配多个模式,则该值可以是一组,例如:
filter {
  grok { match => { "message" => [ "Duration: %{NUMBER:duration}", "Speed: %{NUMBER:speed}" ] } }
}

实例2

output plugin docs: https://www.elastic.co/guide/en/logstash/current/output-plugins.html

input plugin dosc: https://www.elastic.co/guide/en/logstash/current/input-plugins.html

input {
	file {
    	path => "/usr/local/nginx/logs/access.log"
	    type => "nginx"
	    start_position => "beginning"
        //每次从头读文件,意味着会重复发送log,慎用
	    sincedb_path => "/dev/null"
    	bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092"
	    //path属性接受的参数是一个数组,其含义是标明需要读取的文件位置
        path => [‘pathA’,‘pathB’]
        //表示多就去path路径下查看是够有新的文件产生。默认是15秒检查一次。
        discover_interval => 15
        //#排除不想监听的文件
        exclude => [‘fileName1’,‘fileNmae2’]
		//添加自定义的字段
        add_field => {"test"=>"test"}
		//增加标签
        tags => "tag1"
		//#设置新事件的标志
        delimiter => "\n"
        //#设置多长时间扫描目录,发现新文件
        discover_interval => 15
        //#设置多长时间检测文件是否修改
        stat_interval => 1
        //被监听的文件多久没更新后断开连接不在监听,默认是一个小时。
        close_older => 3600
        //在每次检查文件列 表的时候, 如果一个文件的最后 修改时间 超过这个值, 就忽略这个文件。 默认一天。
        ignore_older => 86400
        //logstash 每隔多 久检查一次被监听文件状态( 是否有更新) , 默认是 1 秒。
        stat_interval => 1
        //#监听文件的起始位置,默认是end
        start_position => ‘beginning’
        //注意:如果需要每次都从同开始读取文件的话,关设置start_position => beginning是没有用的,可以选择sincedb_path 定义为 /dev/null
    }

	file {
	    path => "/var/log/secure"
    	type => "secure"
	    start_position => "beginning"
	    sincedb_path => "/dev/null"
    }

	beats {
    	//数据类型
    	type => "logs"
        //接受数据端口
    	port => 5044
	    //ssl => true
    	//ssl_certificate => "/etc/logstash/logstash.crt"
	    //ssl_key => "/etc/logstash/logstash.key"
	}
}
// filter 模块主要是数据预处理,提取一些信息,方便 elasticsearch 好归类存储。
filter {
    grok {  
      match => {"message" => "%{EXIM_DATE:timestamp}\|%{LOGLEVEL:log_level}\|%{INT:pid}\|%{GREEDYDATA}"}
// message 字段是 log 的内容,例如 2018-12-11 23:46:47.051|DEBUG|3491|helper.py:85|helper._save_to_cache|shop_session
//在这里我们提取出了 timestamp log_level pid,grok 有内置定义好的 patterns: EXIM_DATE, EXIM_DATE, INT
// GREEDYDATA 贪婪数据,代表任意字符都可以匹配 
    }
// 如果在 filebeat 里面添加了这个字段[fields][function] 的话,那就会执行对应的 match 规则去匹配 path
// source 字段就是 log 的来源路径,例如 /var/log/nginx/feiyang233.club.access.log
// match 后我们就可以得到 path=feiyang233.club.access
    if [fields][function]=="nginx" {
        grok {         
       	    match => {"source" => "/var/log/nginx/%{GREEDYDATA:path}.log%{GREEDYDATA}"}  
            }
        } 
// 例如 ims 日志来源是 /var/log/ims_logic/debug.log
// match 后我们就可以得到 path=ims_logic
    else if [fields][function]=="ims" {
        grok {
        match => {"source" => "/var/log/%{GREEDYDATA:path}/%{GREEDYDATA}"}
            }
        }  

    else {
        grok {
        match => {"source" => "/var/log/app/%{GREEDYDATA:path}/%{GREEDYDATA}"}
            }         
        }
// filebeat 有定义 [fields][function] 时,我们就添加上这个字段,例如 QA
    if [fields][function] {
          mutate {
              add_field => {
                  "function" => "%{[fields][function]}"
                }
            }
        } 
// 因为线上的机器更多,线上的我默认不在 filebeat 添加 function,所以 else 添加上 live  
    else {
          mutate {
              add_field => {
                  "function" => "live"
                }
            }
        }
// 在之前 filter message 时,我们得到了 timestamp,这里修改一下格式,添加上时区。
    date {
      match => ["timestamp" , "yyyy-MM-dd HH:mm:ss Z"]
      target => "@timestamp"
      timezone => "Asia/Singapore"
    }
// 将之前获得的 path 替换其中的 / 替换为 - , 因为 elasticsearch index name 有要求
// 例如 feiyang/test  feiyang_test 
    mutate {
     gsub => ["path","/","-"]
      add_field => {"host_ip" => "%{[fields][host]}"}
      remove_field => ["tags","@version","offset","beat","fields","exim_year","exim_month","exim_day","exim_time","timestamp"]
    }
// remove_field 去掉一些多余的字段
}
// 单节点 output 就在本机,也不需要 SSL, 但 index 的命名规则还是需要非常的注意
output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "sg-%{function}-%{path}-%{+xxxx.ww}"
// sg-nginx-feiyang233.club.access-2019.13  ww 代表周数
  }
}
//输出到文件
output{
	file{
		path => "/tmp/%{+YYYY.MM.dd}-%{host}-file.txt"
       }
}
output {
	if [type] == "nginx" {
		elasticsearch {
    		hosts => ["http://192.168.0.9:9200"]
		    index => "nginx-%{+YYYY.MM.dd}"
    	}
    }

    else if [type] == "secure" {
        elasticsearch {
	    	hosts => ["http://192.168.0.9:9200"]
	    	index => "secure-%{+YYYY.MM.dd}"
        }
    }
}
  • sincedb_path => "/dev/null": 该参数用来指定 sincedb 文件名, Logstash 会记录自己读取文件内容的偏移量到一个隐藏文件里,默认情况下,下次启动,他会从这个偏移量继续往后读,避免重复读取数据。

    这个隐藏文件,叫做 $HOME/.sincedb_****,如果设置为 /dev/null这个 Linux 系统上特殊的空洞文件,那么 logstash 每次重启进程的时候,尝试读取 sincedb 内容,都只会读到空白内容,也就会理解成之前没有过运行记录,就会从初始位置开始读取!

参考

https://blog.51cto.com/u_13760226/2433967 ELK--grok正则分析日志与多日志收集

https://blog.csdn.net/napoay/article/details/62885899 ELK日志处理之使用Grok解析日志

ELK 学习笔记之 Logstash之output配置

https://developer.aliyun.com/article/152043 Logstash详解之——input模块

logstash-input-file]插件使用详解

https://blog.csdn.net/knight_zhou/article/details/104954098 Logstash Grok详解

上一篇:Elasticsearch[实战四]


下一篇:Mac es + kibana 环境集群搭建