filter区块对事件进行中间处理,常用插件有grok,date,mutate,geoip,ruby,kv等
更多插件知识请查看官网文档
grok插件:
通过正则解析任意文本,将非结构化日志数据弄成结构化和方便查询的结构。
示例:
以下为nginx访问日志示例
116.236.167.58 - - [29/Mar/2016:17:30:32 +0800] "POST /get-screen-data/ HTTP/1.1" 200 19939 "http://noc.cheyaoshicorp.com/1/" "Mozilla/5.0 (Android 4.3; Tablet; rv:43.0) Gecko/43.0 Firefox/43.0"
将该日志使用grok解析,转换成结构化类型
filter {
grok {
match => {
"message" => "%{IPORHOST:clientip} - %{NOTSPACE:remote_user} \[%{HTTPDATE:timestamp}\] \"%{WORD:method} %{NOTSPACE:request} %{URIPROTO:proto}/%{NUMBER:httpversion}\" %{NUMBER:status} (?:%{NUMBER:size}|-) %{QS:referrer} %{QS:agent}"
}
}
}
原始数据经过grok解析后会将原始日志分割成多个字段,每个字段都有一个字段名表明该字段的意思,如clientip,remote_user,request等(这里的字段名根据nginx的logformat来定义)
至于IPORHOST,NOTSPACE等这些是logstash官方预定义好的一些正则表达式,具体表达式可查看grok patterns,logstash支持的正则表达式查看Oniguruma网站的介绍
结果如下:
如果想自定义正则表达式,有两种方式:
1.直接在grok里面使用自定义表达式
语法:(?<filed_name>the pattern here)
比如上例中的request字段用这种方式写为:(?<request>\S+)
也就是说%{NOTSPACE:request}等于(?<request>\S+)
2.自定义表达式文件
首先创建一个patterns目录,然后在目录中创建文件保存你的正则表达式(文件名自定义),然后在grok中指定patterns_dir
filter {
grok {
match => {
patterns_dir => ["/path/to/your/own/patterns"]
match => { ...... }
}
}
}
文件中自定义的正则表达式语法如下:
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
第一行,用普通的正则表达式来定义一个 grok 表达式;第二行,通过打印赋值格式,用前面定义好的 grok 表达式来定义另一个 grok 表达式。
比如在/opt/logstash中创建patterns目录:mkdir /opt/logstash/patterns,然后创建自定义表达式:
vim /opt/logstash/patterns/custom
ID [0-9A-F]{10,11}
USER_ID %{ID}
使用完整语法为:%{PATTERN_NAME:capture_name:data_type}
例子中默认所有字段都保存为字符串,上面的例子把status以整型方式转换
修改为:%{NUMBER:status:int}
结果如下:
可以看到status的结果200为整型了
注:data_type目前只支持两种类型int和float
有一个在线的grok debugger调试网站,强烈建议使用该工具来调试你的grok表达式
如果日志有多种可能的格式,单一的正则匹配比较困难,logstash 的语法提供给我们一个有趣的解决方式。
官方文档中match参数接收的数据类型是一个hash值,所以其实现在传递一个数组给match
参数也完全没问题,所以,这里其实可以传递多个正则来匹配同一个字段:
match => [
"message", "(?<request_time>\d+(?:\.\d+)?)",
"message", "%{SYSLOGBASE} %{DATA:message}",
"message", "(?m)%{WORD}"
]
logstash 会按照这个定义次序依次尝试匹配,到匹配成功为止。
同output一样,当Logstash 在有多个 conf 文件的情况下,会多次对grok进行匹配,没有对日志进行筛选判断的各插件配置都会全部执行一次。所以如果有多个conf文件,在filter段对type或者其他条件进行判断,确保匹配唯一,不然会在输出中包含_grokparsefailure, _dateparsefailure等tags。
date插件:
从日志中解析时间,然后使用该时间作为logstash的事件时间戳。
date插件是对于排序事件和回填旧数据尤其重要。如果你没有正确的的事件日期,搜索可能会出现混乱的排序。
如果事件中没有设定timestamp,logstash会选择第一次看到事件(在输入时)的时间戳。例如,用文件输入,时间戳会设置为每个事件读出的时间。
比如,日志中有”2016-05-31 15:24:10“的时间戳
使用日志格式"yyyy-MM-dd HH:mm:ss"去匹配这个时间戳。
date插件支持5中时间格式:
1.ISO8601
类似 "2011-04-19T03:44:01.103Z" 这样的格式。具体Z后面可以有 "08:00"也可以没有,".103"这个也可以没有。常用场景里来说,Nginx 的log_format配置里就可以使用$time_iso8601
变量来记录请求时间成这种格式。
2.UNIX
UNIX 时间戳格式,记录的是从 1970 年起始至今的总秒数。Squid 的默认日志格式中就使用了这种格式。
3.UNIX_MS
这个时间戳则是从 1970 年起始至今的总毫秒数。JavaScript 里经常使用这个时间格式。
4.TAI64N
TAI64N 格式比较少见,是这个样子的:@4000000052f88ea32489532c
。常见应用中, qmail 会用这个格式。
5.Joda-Time库
Logstash 内部使用了 Java 的 Joda 时间库来作时间处理。所以我们可以使用 Joda 库所支持的时间格式来作具体定义。
Joda 时间格式定义见下表:
Elasticsearch 内部,对时间类型字段,是统一采用 UTC 时间,存成 long 长整形数据的!对日志统一采用 UTC 时间存储,是国际安全/运维界的一个通识——欧美公司的服务器普遍广泛分布在多个时区里——不像中国,地域横跨五个时区却只用北京时间。所以在logstash导入日志的时候可以看到@timestamp晚了8小时,其实是正常的。当在kibana上读取数据时,会读取浏览器的当前时区,然后在页面上转换时间内容的显示。
mutate插件:
提供了丰富的基础类型数据处理能力,包括类型转换,字符串处理和字段处理等。
1.类型转换
mutate可以设置的转换类型包括:"integer","float" 和 "string"。
示例如下:
filter {
mutate {
covert => ["filed_name", "integer"]
}
}
如果有多个字段需要转换可使用如下方法:
covert => ["filed_name_1", "integer", "filed_name_2", "float", "filed_name_3", "string"]
mutate 除了转换简单的字符值,还支持对数组类型的字段进行转换,即将 ["1","2"]
转换成[1,2]
。但不支持对哈希类型的字段做类似处理。有这方面需求的可以采用稍后讲述的 filters/ruby 插件完成。
2.字符串处理
gsub通过正则表达式替换匹配的值,只对字符串有效,每个替换包括3个元素的数组,分别对应字段名,正则匹配表达式,替换的值。
示例:
filter {
mutate {
gsub => [
#将filed_name_1字段中所有"/"转换为"_"
"filed_name_1", "/" , "_",
# 将filed_name_2字段中所有"\","?","#","-"转换为"."
"filed_name_2", "[\\?#-]", "."
]
}
}
split通过特定的分隔符分割字符串为数组
示例:
filter {
mutate {
split => {"filed_name_1", "|"}
}
}
join通过特定分隔符将数组元素拼接起来
示例:
filter {
mutate {
join => {"filed_name_1", "|"}
}
}
merge合并两个数组或者哈希字段
`array` + `string` will work
`string` + `string` will result in an 2 entry array in `filed_name_1`
`array` and `hash` will not work
示例:
filter {
mutate {
merge => {"filed_name_1" => "filed_name_2"}
}
}
strip去除字段中开头结尾的空格
示例:
filter {
mutate {
strip => ["filed_name_1", "filed_name_2"]
}
}
3.字段处理
rename重命名某个字段
filter {
mutate {
rename => {"old_field" => "new_field"}
}
}
update更新某个字段的内容。如果字段不存在,不会新建。
filter {
mutate {
update => {"field_name" => "new message"}
}
}
replace作用和 update 类似,但是当字段不存在的时候,它会起到add_field
参数一样的效果,自动添加新的字段。
geoip插件:
GeoIP 是最常见的免费 IP 地址归类查询库,同时也有收费版可以采购。GeoIP 库可以根据 IP 地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。
示例:
filter {
geoip {
source => "ip_field"
}
}
GeoIP 库数据较多,如果你不需要这么多内容,可以通过 fields
选项指定自己所需要的。下例为全部可选内容:
filter {
geoip {
fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]
}
}
logstash会通过 latitude
和 longitude
额外生成geoip.location,用于地图定位
geoip库内只存有公共网络上的IP信息,查询不到结果的,会直接返回 null,而 logstash 的 geoip 插件对 null 结果的处理是:不生成对应的 geoip.字段。
kv插件:
该插件用于自动解析类似于foo=bar类型的数据
比如日志中包含ip=1.1.1.1 error=REFUSED,自动解析后转换为:
ip: 1.1.1.1
error: REFUSED
一个完整的示例:
input { stdin { type => "kv" } }
filter {
if [type] == "kv" {
kv {
source => "message"
prefix => "ex_"
field_split => "&? "
allow_duplicate_values => false
default_keys => {
"from" => "A"
"to" => "B"
}
trim => "<>\[\],"
trimkey => "<>\[\],"
value_split => "=:"
}
}
}
output { stdout { codec => rubydebug } }
配置解释:
source为数据源,需要解析的数据,可以是字段等
prefix给所有解析出来的字段加上一个前缀
field_split解析出键值对的分隔符
allow_duplicate_values布尔类型,是否删除重复的键值对。默认值true,不删除重复键值
default_keys增加默认的键值对到事件中,以防源数据解析后不存在这些键值
trim去除解析后value里面包含的小括号或者中括号等符号
trimkey去除解析后key里面包含的小括号或者中括号等符号
value_split设置键值识别关系的分隔符,默认为=
logstash启动后,输入<a:[,9]&b>=,34,?b:34]&[c]=<123>,结果如下:
结合例子中的配置来解释:
配置中通过'&','?'和' '作为字段分隔符,通过'='和':'作为键值分隔符,数据解析后应该为:
<a: [,9]
b>=,34,
b:34]
[c]=<123>
然后去除key和value中的'[',']','<','>'和','得到:
"a" => "9"
"b" => "34"
"b" => "34"
"c" => "123"
然后去除重复的键值对,并且添加前缀ex_得到:
"ex_a" => "9"
"ex_b" => "34"
"ex_c" => "123"
最后增加默认key得到:
"from“ => "A"
"to" => "B"
"ex_a" => "9"
"ex_b" => "34"
"ex_c" => "123"