Logstash是一款轻量级的日志搜集处理框架,可以方便的把分散的、多样化的日志搜集起来,并进行自定义的处理,然后传输到指定的位置,比如某个服务器或者文件。
logstash功能很强大。从 logstash 1.5.0 版本开始,logstash 将所有的插件都独立拆分成 gem 包。这样,每个插件都可以独立更新,不用等待 logstash 自身做整体更新的时候才能使用了。为了达到这个目标,logstash 配置了专门的 plugins 管理命令
logstash插件安装(本地安装)
logstash处理事件有三个阶段:input ---> filter ---> output。input产生事件,filter 对事件进行修改,output输出到其它地方。
filter是logstash管道中间处理的设备。可以结合条件语句对符合标准的事件进行处理。这里只介绍filter的插件:
你可以通过 bin/plugin list 查看本机现在有多少插件可用。(其实就在 vendor/bundle/jruby/1.9/gems/ 目录下)
插件本地安装的方法
1
2
3
4
5
|
bin /logstash-plugin install logstash-filter-kv
Ignoring ffi-1.9.13 because its extensions are not built. Try: gem pristine ffi --version 1.9.13 Validating logstash-filter-kv Installing logstash-filter-kv Installation successful |
更新插件:bin/logstash-plugin update logstash-filter-kv
插件介绍
这里只介绍几种常用的
grok: 解析和结构化任何文本(前面单独介绍过就不重复了)
http://irow10.blog.51cto.com/2425361/1828077
geoip: 添加有关IP地址地理位置信息。
geoip这个插件非常重要,而且很常用。他能分析访问的ip分析出访问者的地址信息。举例如下:
1
2
3
4
5
|
filter { geoip {
source => "message"
}
} |
message你输入个IP地址,结果如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
{ "message" => "183.60.92.253",
"@version" => "1",
"@timestamp" => "2016-07-07T10:32:55.610Z",
"host" => "raochenlindeMacBook-Air.local",
"geoip" => {
"ip" => "183.60.92.253",
"country_code2" => "CN",
"country_code3" => "CHN",
"country_name" => "China",
"continent_code" => "AS",
"region_name" => "30",
"city_name" => "Guangzhou",
"latitude" => 23.11670000000001,
"longitude" => 113.25,
"timezone" => "Asia/Chongqing",
"real_region_name" => "Guangdong",
"location" => [
[0] 113.25,
[1] 23.11670000000001
]
}
} |
实际应用中我们可以把grok获取的request_ip传给geoip处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
filter { if [type] == "apache" {
grok {
patterns_dir => "/usr/local/logstash-2.3.4/ownpatterns/patterns"
match => {
"message" => "%{APACHE_LOG}"
}
remove_field => [ "message" ]
}
geoip {
source => "request_ip"
}
}
|
在logstash分析完数据到output阶段输出到其它地方时,数据中就有访问者的地理信息。
date:用来转换你的日志记录中的时间字符串
date 插件是日期插件,这个插件,常用而重要。
该插件必须是用 date 包裹,如下所示:
date {
}
可用的配置选项如下表所示:
add_field
add_tag
locale
match 匹配日志格式
periodic_flush 按时间间隔调用
remove_field
remove_tag
tag_on_failure 如果标签匹配失败,则默认为_grokparsefailure
target 把 match 的时间字段保存到指定字段。若为指定,默认更新到 @timestamp。
timezone
备注:
add_field、remove_field、add_tag、remove_tag 是所有 Logstash 插件都有。
tag 作用是,当你对字段处理期间,还期望进行后续处理,就先作个标记。Logstash 有个内置 tags 数组,包含了期间产生的 tag,无论是 Logstash 自己产生的,还是你添加的,比如,你用 grok 解析日志,但是错了,那么 Logstash 自己就会自己添加一个 _grokparsefailure 的 tag。这样,你在 output 时,可以对解析失败的日志不做任何处理;
field 作用是,对字段的操作。
举例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
filter { if [type] == "apache" {
grok {
patterns_dir => "/usr/local/logstash-2.3.4/ownpatterns/patterns"
match => {
"message" => "%{APACHE_LOG}"
}
remove_field => [ "message" ]
}
date {
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
}
}
} |
apache日志中的时间戳是:[19/Jul/2016:16:28:52 +0800] 。match的时间格式要和日志中的匹配对应。如果你的时间字段可能有多个格式,则可指定多个可能的日期格式:
match => [ "timestamp", "MMM dd YYY HH:mm:ss", "MMM d YYY HH:mm:ss", "ISO8601" ]
apache日志的grok表达式:
1
|
APACHE_LOG %{ IPORHOST :addre } %{ USER :ident } %{ USER :auth } \[%{ HTTPDATE :timestamp }\] \"%{ WORD :http_method } %{ NOTSPACE :request } HTTP /%{ NUMBER :httpversion }\" %{ NUMBER :status } (?:%{ NUMBER :bytes }|-) \"(?:%{ URI :http_referer }|-)\" \"%{ GREEDYDATA :User_Agent}\"
|
在apache日志中已经有[%{HTTPDATE:timestamp}\],为什么还要经过date插件再处理下呢?
我们将访问时间作为logstash的时间戳,有了这个,我们就可以以时间为区分,查看分析某段时间的请求是怎样的,如果没有匹配到这个时间的话,logstash将以当前时间作为该条记录的时间戳。所以需要再filter里面定义时间戳的格式。如果不用 date 插件,那么 Logstash 将处理时间作为时间戳。时间戳字段是 Logstash 自己添加的内置字段 @timestamp,在ES中关于时间的相关查询,必须使用该字段,你当然也可以修改该字段的值。
备注:@timestamp 比我们晚了 8 个小时,在kibana显示也是如此。如何能让日志收集时间能和@timestamp一致呢?在date插件中添加如下字段:timezone =>"Asia/Chongqing"
useragent:用来处理分析访问者使用的浏览器及操作系统
在apache日志中会发现有这么一段日志:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36
我们在用grok分割数据时也是使用%{GREEDYDATA:User_Agent}。
备注:GREEDYDATA这个grok表达式是匹配任何类型的数据。 GREEDYDATA .*
通过这条数据,即使我们显示出来意义也不大,但我们可以通过useragent挖掘它的信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
filter { if [type] == "apache" {
grok {
patterns_dir => "/usr/local/logstash-2.3.4/ownpatterns/patterns"
match => {
"message" => "%{APACHE_LOG}"
}
remove_field => [ "message" ]
}
useragent {
source => "User_Agent"
target => "ua"
}
date {
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
}
}
} |
显示结果:
从上图我们可以看到访问者浏览器及操作系统的信息。比那一大串信息更加有意义。
mutate:它提供了丰富的基础类型数据处理能力。包括类型转换,字符串处理和字段处理等。
可以设置的转换类型包括:"integer","float" 和 "string"。
可用的配置选项如下表所示:
add_field 添加新的字段
add_tag 添加新的标签
convert 数据类型转换
gsub 字符串替换。用正则表达式和字符串都行
join 用分隔符连接数组. 如果字段不是数组,那什么都不做
lowercase 把字符串转换成小写
merge 合并两个数组或散列字段
periodic_flush 按时间间隔调用
remove_field 移除字段
remove_tag 移除标识
rename 重命名一个或多个字段
replace 用一个新的值替换掉指定字段的值
split 用分隔符或字符分割一个字符串。只能应用在字符串上
strip 去掉字段首尾的空格
update 更新字段的值。如果该字段不存在,则什么都不做
uppercase 把字符串转换成大写
简单优化数据
logstash采集数据加上date,geoip,useragent等插件会使我们获取的信息更加详细,但是也更加臃肿。所有我们要踢掉一些没有意义的数据,简化传输给elasticsearch的数据。
remove_field能很好的完成这个任务。上面也有用到。
remove_field => ["message"]
在grok中我们已经发message分成了很多段小数据,如果在把message传输给elasticsearch就重复了。
当然在传输的小数据中也有很多我们用不到或者毫无意义。我们就可以使用remove_field来清除。
1
2
3
|
mutate{
remove_field => [ "Syslog_Timestamp" ]
remove_field => [ "message" ]
|
参考:https://zengjice.gitbooks.io/logstash-best-practice-cn/content/filter/mutate.html
drop: 完全丢弃事件,如debug事件
filter { if [loglevel] == "debug" { drop { } } }
参考:https://www.elastic.co/guide/en/logstash/current/plugins-filters-drop.html
本文转自 irow10 51CTO博客,原文链接:http://blog.51cto.com/irow10/1828521,如需转载请自行联系原作者