logstash之filter插件

filter区块对事件进行中间处理,常用插件有grok,date,mutate,geoip,ruby,kv等

更多插件知识请查看官网文档


grok插件:

通过正则解析任意文本,将非结构化日志数据弄成结构化和方便查询的结构。

示例:

以下为nginx访问日志示例

116.236.167.58 - - [29/Mar/2016:17:30:32 +0800] "POST /get-screen-data/ HTTP/1.1" 200 19939 "http://noc.cheyaoshicorp.com/1/" "Mozilla/5.0 (Android 4.3; Tablet; rv:43.0) Gecko/43.0 Firefox/43.0"

将该日志使用grok解析,转换成结构化类型

filter {

    grok {

        match => {

            "message" => "%{IPORHOST:clientip} - %{NOTSPACE:remote_user} \[%{HTTPDATE:timestamp}\] \"%{WORD:method} %{NOTSPACE:request} %{URIPROTO:proto}/%{NUMBER:httpversion}\" %{NUMBER:status} (?:%{NUMBER:size}|-) %{QS:referrer} %{QS:agent}"

        }

    }

}

原始数据经过grok解析后会将原始日志分割成多个字段,每个字段都有一个字段名表明该字段的意思,如clientip,remote_user,request等(这里的字段名根据nginx的logformat来定义)

至于IPORHOST,NOTSPACE等这些是logstash官方预定义好的一些正则表达式,具体表达式可查看grok patterns,logstash支持的正则表达式查看Oniguruma网站的介绍

结果如下:

logstash之filter插件



如果想自定义正则表达式,有两种方式:

1.直接在grok里面使用自定义表达式

语法:(?<filed_name>the pattern here)

比如上例中的request字段用这种方式写为:(?<request>\S+)

也就是说%{NOTSPACE:request}等于(?<request>\S+)


2.自定义表达式文件

首先创建一个patterns目录,然后在目录中创建文件保存你的正则表达式(文件名自定义),然后在grok中指定patterns_dir

filter {

    grok {

        match => {

            patterns_dir => ["/path/to/your/own/patterns"]

            match => { ...... }

        }

    }

}

文件中自定义的正则表达式语法如下:

USERNAME [a-zA-Z0-9._-]+

USER %{USERNAME}

第一行,用普通的正则表达式来定义一个 grok 表达式;第二行,通过打印赋值格式,用前面定义好的 grok 表达式来定义另一个 grok 表达式。

比如在/opt/logstash中创建patterns目录:mkdir /opt/logstash/patterns,然后创建自定义表达式:

vim /opt/logstash/patterns/custom

ID [0-9A-F]{10,11}

USER_ID %{ID}

使用完整语法为:%{PATTERN_NAME:capture_name:data_type}

例子中默认所有字段都保存为字符串,上面的例子把status以整型方式转换

修改为:%{NUMBER:status:int}

结果如下:

logstash之filter插件


可以看到status的结果200为整型了

注:data_type目前只支持两种类型int和float

有一个在线的grok debugger调试网站,强烈建议使用该工具来调试你的grok表达式

logstash之filter插件



如果日志有多种可能的格式,单一的正则匹配比较困难,logstash 的语法提供给我们一个有趣的解决方式。

官方文档中match参数接收的数据类型是一个hash值,所以其实现在传递一个数组给match参数也完全没问题,所以,这里其实可以传递多个正则来匹配同一个字段:

match => [

    "message", "(?<request_time>\d+(?:\.\d+)?)",

    "message", "%{SYSLOGBASE} %{DATA:message}",

    "message", "(?m)%{WORD}"

]

logstash 会按照这个定义次序依次尝试匹配,到匹配成功为止。


同output一样,当Logstash 在有多个 conf 文件的情况下,会多次对grok进行匹配,没有对日志进行筛选判断的各插件配置都会全部执行一次。所以如果有多个conf文件,在filter段对type或者其他条件进行判断,确保匹配唯一,不然会在输出中包含_grokparsefailure, _dateparsefailure等tags。


date插件:

从日志中解析时间,然后使用该时间作为logstash的事件时间戳。

date插件是对于排序事件和回填旧数据尤其重要。如果你没有正确的的事件日期,搜索可能会出现混乱的排序。

如果事件中没有设定timestamp,logstash会选择第一次看到事件(在输入时)的时间戳。例如,用文件输入,时间戳会设置为每个事件读出的时间。

比如,日志中有”2016-05-31 15:24:10“的时间戳

使用日志格式"yyyy-MM-dd HH:mm:ss"去匹配这个时间戳。


date插件支持5中时间格式:

1.ISO8601

类似 "2011-04-19T03:44:01.103Z" 这样的格式。具体Z后面可以有 "08:00"也可以没有,".103"这个也可以没有。常用场景里来说,Nginx 的log_format配置里就可以使用$time_iso8601变量来记录请求时间成这种格式。

2.UNIX

UNIX 时间戳格式,记录的是从 1970 年起始至今的总秒数。Squid 的默认日志格式中就使用了这种格式。

3.UNIX_MS

这个时间戳则是从 1970 年起始至今的总毫秒数。JavaScript 里经常使用这个时间格式。

4.TAI64N

TAI64N 格式比较少见,是这个样子的:@4000000052f88ea32489532c。常见应用中, qmail 会用这个格式。

5.Joda-Time库

Logstash 内部使用了 Java 的 Joda 时间库来作时间处理。所以我们可以使用 Joda 库所支持的时间格式来作具体定义。

Joda 时间格式定义见下表:






Elasticsearch 内部,对时间类型字段,是统一采用 UTC 时间,存成 long 长整形数据的!对日志统一采用 UTC 时间存储,是国际安全/运维界的一个通识——欧美公司的服务器普遍广泛分布在多个时区里——不像中国,地域横跨五个时区却只用北京时间。所以在logstash导入日志的时候可以看到@timestamp晚了8小时,其实是正常的。当在kibana上读取数据时,会读取浏览器的当前时区,然后在页面上转换时间内容的显示。


mutate插件:

提供了丰富的基础类型数据处理能力,包括类型转换,字符串处理和字段处理等。

1.类型转换

mutate可以设置的转换类型包括:"integer","float" 和 "string"。

示例如下:

filter {

    mutate {

        covert => ["filed_name", "integer"]

    }

}

如果有多个字段需要转换可使用如下方法:

covert => ["filed_name_1", "integer", "filed_name_2", "float", "filed_name_3", "string"]


mutate 除了转换简单的字符值,还支持对数组类型的字段进行转换,即将 ["1","2"] 转换成[1,2]。但不支持对哈希类型的字段做类似处理。有这方面需求的可以采用稍后讲述的 filters/ruby 插件完成。

2.字符串处理

gsub通过正则表达式替换匹配的值,只对字符串有效,每个替换包括3个元素的数组,分别对应字段名,正则匹配表达式,替换的值。

示例:

filter {

    mutate {

        gsub => [

            #将filed_name_1字段中所有"/"转换为"_"

            "filed_name_1", "/" , "_",

            # 将filed_name_2字段中所有"\","?","#","-"转换为"."

            "filed_name_2", "[\\?#-]", "."

        ]

    }

}


split通过特定的分隔符分割字符串为数组

示例:

filter {

    mutate {

        split => {"filed_name_1", "|"}

    }

}


join通过特定分隔符将数组元素拼接起来

示例:

filter {

    mutate {

        join => {"filed_name_1", "|"}

    }

}


merge合并两个数组或者哈希字段

`array` + `string` will work

`string` + `string` will result in an 2 entry array in `filed_name_1`

`array` and `hash` will not work

示例:  

filter {

    mutate {

        merge => {"filed_name_1" => "filed_name_2"}

    } 

}


strip去除字段中开头结尾的空格

示例:

filter {

    mutate {

        strip => ["filed_name_1", "filed_name_2"]

    }

}


3.字段处理

rename重命名某个字段

filter {

    mutate {

        rename => {"old_field" => "new_field"}

    }

}


update更新某个字段的内容。如果字段不存在,不会新建。

filter {

    mutate {

        update => {"field_name" => "new message"}

    }

}


replace作用和 update 类似,但是当字段不存在的时候,它会起到add_field参数一样的效果,自动添加新的字段。


geoip插件:

GeoIP 是最常见的免费 IP 地址归类查询库,同时也有收费版可以采购。GeoIP 库可以根据 IP 地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。

示例:

filter {

    geoip {

        source => "ip_field"

    }

}

GeoIP 库数据较多,如果你不需要这么多内容,可以通过 fields 选项指定自己所需要的。下例为全部可选内容:

filter {

    geoip {

        fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]

    }

}

logstash会通过 latitude 和 longitude 额外生成geoip.location,用于地图定位

geoip库内只存有公共网络上的IP信息,查询不到结果的,会直接返回 null,而 logstash 的 geoip 插件对 null 结果的处理是:不生成对应的 geoip.字段。


kv插件:

该插件用于自动解析类似于foo=bar类型的数据

比如日志中包含ip=1.1.1.1 error=REFUSED,自动解析后转换为:

ip: 1.1.1.1

error: REFUSED


一个完整的示例:

input { stdin { type => "kv" } }

filter {

    if [type] == "kv" {

        kv {

            source => "message"

            prefix => "ex_"

            field_split => "&? "

            allow_duplicate_values => false

            default_keys => {

                "from" => "A"

                "to" => "B"

            }

            trim => "<>\[\],"

            trimkey => "<>\[\],"

            value_split => "=:"

        }

    }

}

output { stdout { codec => rubydebug } }

配置解释:

source为数据源,需要解析的数据,可以是字段等

prefix给所有解析出来的字段加上一个前缀

field_split解析出键值对的分隔符

allow_duplicate_values布尔类型,是否删除重复的键值对。默认值true,不删除重复键值

default_keys增加默认的键值对到事件中,以防源数据解析后不存在这些键值

trim去除解析后value里面包含的小括号或者中括号等符号

trimkey去除解析后key里面包含的小括号或者中括号等符号

value_split设置键值识别关系的分隔符,默认为=


logstash启动后,输入<a:[,9]&b>=,34,?b:34]&[c]=<123>,结果如下:

logstash之filter插件


结合例子中的配置来解释:

配置中通过'&','?'和' '作为字段分隔符,通过'='和':'作为键值分隔符,数据解析后应该为:

<a: [,9]

b>=,34,

b:34]

[c]=<123>

然后去除key和value中的'[',']','<','>'和','得到:

"a" => "9"

"b" => "34"

"b" => "34"

"c" => "123"

 然后去除重复的键值对,并且添加前缀ex_得到:

"ex_a" => "9"

"ex_b" => "34"

"ex_c" => "123"

最后增加默认key得到:

"from“ => "A"

"to" => "B"

"ex_a" => "9"

"ex_b" => "34"

"ex_c" => "123"











本文转自 曾哥最爱 51CTO博客,原文链接:http://blog.51cto.com/zengestudy/1832818,如需转载请自行联系原作者
上一篇:经典算法之计数排序


下一篇:C语言原子接口与实现