一:输入插件(input)
输入插件地官方文档:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
下面将举例说明:
- 标准输入
- 读取文件
- 读取TCP网络数据
1 标准输入(stdin)
在控制台打helloworld(stdin{}),回车就会在当前控制台进行输出(codec=>rubydebug )
input{
stdin{
}
}
output {
stdout{
codec=>rubydebug
}
}
测试:
[root@localhost config]# ls
java_pid12011.hprof jvm.options logstash-sample.conf my-app.conf startup.options
java_pid21521.hprof log4j2.properties logstash.yml pipelines.yml
[root@localhost config]# vim test1.conf
input{
stdin{
}
}
output {
stdout{
codec=>rubydebug
}
}
~
~
~
~
~
~
"test1.conf" [新] 17L, 167C 已写入
[root@localhost config]# ls
java_pid12011.hprof jvm.options logstash-sample.conf my-app.conf startup.options
java_pid21521.hprof log4j2.properties logstash.yml pipelines.yml test1.conf
[root@localhost config]#
[root@localhost config]# ../bin/logstash -f test1.conf
Using JAVA_HOME defined java: /opt/elasticsearch-7.6.1/jdk
WARNING: Using JAVA_HOME while Logstash distribution comes with a bundled JDK.
DEPRECATION: The use of JAVA_HOME is now deprecated and will be removed starting from 8.0. Please configure LS_JAVA_HOME instead.
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
[2022-01-11T15:49:40,297][INFO ][logstash.runner ] Log4j configuration path used is: /usr/local/logstash/config/log4j2.properties
[2022-01-11T15:49:40,315][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.16.2", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 13.0.2+8 on 13.0.2+8 +indy +jit [linux-x86_64]"}
[2022-01-11T15:49:40,652][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2022-01-11T15:49:42,009][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[2022-01-11T15:49:42,568][INFO ][org.reflections.Reflections] Reflections took 84 ms to scan 1 urls, producing 119 keys and 417 values
[2022-01-11T15:49:43,491][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["/usr/local/logstash/config/test1.conf"], :thread=>"#<Thread:0x772c1831 run>"}
[2022-01-11T15:49:44,165][INFO ][logstash.javapipeline ][main] Pipeline Java execution initialization time {"seconds"=>0.67}
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by com.jrubystdinchannel.StdinChannelLibrary$Reader (file:/usr/local/logstash/vendor/bundle/jruby/2.5.0/gems/jruby-stdin-channel-0.2.0-java/lib/jruby_stdin_channel/jruby_stdin_channel.jar) to field java.io.FilterInputStream.in
WARNING: Please consider reporting this to the maintainers of com.jrubystdinchannel.StdinChannelLibrary$Reader
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[2022-01-11T15:49:44,268][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2022-01-11T15:49:44,327][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
sss
{
"message" => "sss",
"host" => "localhost.localdomain",
"@timestamp" => 2022-01-11T07:49:47.049Z,
"@version" => "1"
}
hello world
{
"message" => "hello world",
"host" => "localhost.localdomain",
"@timestamp" => 2022-01-11T07:49:57.473Z,
"@version" => "1"
}
2 读取文件(file)
可以按如下操作查看具体地内容:
logstash使用一个名为filewatch的ruby gem库来监听文件变化,并通过一个叫.sincedb的数据库文件来记录被监听的日志文件的读取进度(时间戳)
- sincedb数据文件的默认路径在 <path.data>/plugins/inputs/file下面,文件名类似于.sincedb_123456
- <path.data>表示logstash插件存储目录,默认是LOGSTASH_HOME/data。
举例:
- path:log日志的位置;启动该配置文件后,logstash就可以读取当前位置的日志的内容
- start_position :开始的位置,告诉logstash读取文件时从哪里开始读(下方的配置中是从开始开始读)
- 其会以,每一行的数据进行读
- 其会一直截图,如果不关掉的话
input {
file {
path => ["/var/*/*"]
start_position => "beginning"
}
}
output {
stdout{
codec=>rubydebug
}
}
默认情况下,logstash会从文件的结束位置开始读取数据,也就是说logstash进程会以类似tail -f命令的形式逐行获取数据。
3 读取TCP网络数据
读取网络数据,前面演示过,在这里就不演示了。
input {
tcp {
port => "1234"
}
}
filter {
grok {
match => { "message" => "%{SYSLOGLINE}" }
}
}
output {
stdout{
codec=>rubydebug
}
}
二:过滤器插件(Filter)
详细看官方文档:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
1 Grok 正则捕获
通过正则解析任意文本,将非结构化日志数据弄成结构化和方便查询的结构。他是目前logstash 中解析非结构化日志数据最好的方式。
将一行数据,正则匹配一些东西。如logstash将数据读到之后,其会将一行数据读取到放到一个message里面。想要明确知道每一个字段是什么,方便查询,Grok就可以将这一行数据进行划分成多个字段。
Grok 的语法规则是:
%{语法: 语义}
举例:
-
例如输入的内容为:
172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
- %{IP:clientip}匹配模式将获得的结果为:clientip: 172.16.213.132
- %{HTTPDATE:timestamp}匹配模式将获得的结果为:timestamp: 07/Feb/2018:16:24:19 +0800
- %{QS:referrer}匹配模式将获得的结果为:referrer: “GET / HTTP/1.1”
-
下面是一个组合匹配模式,它可以获取上面输入的所有内容:
%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}
通过上面这个组合匹配模式,我们将输入的内容分成了五个部分,即五个字段,将输入内容分割为不同的数据字段,这对于日后解析和查询日志数据非常有用,这正是使用grok的目的。
-
完整例子
input{ stdin{} } filter{ grok{ match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"] } } output{ stdout{ codec => "rubydebug" } }
输入日志内容:
172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
-
执行过程
[root@localhost config]# ls java_pid12011.hprof logstash-sample.conf startup.options java_pid21521.hprof logstash.yml test1.conf jvm.options my-app.conf log4j2.properties pipelines.yml [root@localhost config]# vim test2.conf input { stdin { } } filter{ grok{ match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"] } } output { stdout { codec=>rubydebug } } ~ ~ ~ ~ "test2.conf" [新] 17L, 237C 已写入 [root@localhost config]# ../bin/logstash -f test2.conf Using JAVA_HOME defined java: /opt/elasticsearch-7.6.1/jdk WARNING: Using JAVA_HOME while Logstash distribution comes with a bundled JDK. DEPRECATION: The use of JAVA_HOME is now deprecated and will be removed starting from 8.0. Please configure LS_JAVA_HOME instead. OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties [2022-01-11T16:31:45,624][INFO ][logstash.runner ] Log4j configuration path used is: /usr/local/logstash/config/log4j2.properties [2022-01-11T16:31:45,636][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.16.2", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 13.0.2+8 on 13.0.2+8 +indy +jit [linux-x86_64]"} [2022-01-11T16:31:45,919][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified [2022-01-11T16:31:47,168][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false} [2022-01-11T16:31:47,858][INFO ][org.reflections.Reflections] Reflections took 70 ms to scan 1 urls, producing 119 keys and 417 values [2022-01-11T16:31:49,066][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["/usr/local/logstash/config/test2.conf"], :thread=>"#<Thread:0xfabd406 run>"} [2022-01-11T16:31:49,824][INFO ][logstash.javapipeline ][main] Pipeline Java execution initialization time {"seconds"=>0.75} WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by com.jrubystdinchannel.StdinChannelLibrary$Reader (file:/usr/local/logstash/vendor/bundle/jruby/2.5.0/gems/jruby-stdin-channel-0.2.0-java/lib/jruby_stdin_channel/jruby_stdin_channel.jar) to field java.io.FilterInputStream.in WARNING: Please consider reporting this to the maintainers of com.jrubystdinchannel.StdinChannelLibrary$Reader WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release [2022-01-11T16:31:49,910][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"} [2022-01-11T16:31:49,985][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} The stdin plugin is now waiting for input: 172.16.213.132 [07/Feb/2022:16:24:19 +0800] "GET / HTTP/1.1" 403 5039 { "response" => "403", "clientip" => "172.16.213.132", "@timestamp" => 2022-01-11T08:32:38.109Z, "bytes" => "5039", "referrer" => "\"GET / HTTP/1.1\"", "message" => "172.16.213.132 [07/Feb/2022:16:24:19 +0800] \"GET / HTTP/1.1\" 403 5039", "timestamp" => "07/Feb/2022:16:24:19 +0800", "host" => "localhost.localdomain", "@version" => "1" }
2 时间处理(Date)
date插件是对于排序事件和回填旧数据尤其重要,它可以用来转换日志记录中的时间字段,变成LogStash::Timestamp对象,然后转存到@timestamp字段里
举例(只是列出filter部分):
filter {
grok {
match => ["message", "%{HTTPDATE:timestamp}"]
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
3 数据修改(Mutate)
3.1 正则表达式替换匹配字段(gsub)
gsub可以通过正则表达式替换字段中匹配到的值,只对字符串字段有效
举例:
- 这个示例表示将filed_name_1字段中所有"/“字符替换为”_"
filter { mutate { gsub => ["filed_name_1", "/" , "_"] } }
3.2 分隔符分割字符串为数组(split)
split可以通过指定的分隔符分割字段中的字符串为数组
举例:
- 将filed_name_2字段以"|"为区间分隔为数组。
filter { mutate { split => ["filed_name_2", "|"] } }
3.3 重命名字段(rename)
rename可以实现重命名某个字段的功能
举例:
- 将字段old_field重命名为new_field。
filter { mutate { rename => { "old_field" => "new_field" } } }
3.4 删除字段(remove_field)
remove_field可以实现删除某个字段的功能
举例:
- 将字段timestamp删除。
filter { mutate { remove_field => ["timestamp"] } }
3.5 GeoIP 地址查询归类
filter {
geoip {
source => "ip_field"
}
}
3.6 综合举例
配置文件内容:
- convert => [ “response”,“float” ]:将response转为float类型(本来是按NUMBER类型解析的)
- rename => { “response” => “response_new” } :将response改名为response_new
-
input { stdin {} } filter { grok { match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" } remove_field => [ "message" ] } date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] } mutate { convert => [ "response","float" ] rename => { "response" => "response_new" } gsub => ["referrer","\"",""] split => ["clientip", "."] } } output { stdout { codec => "rubydebug" } }
- 测试日志
172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
- 运行过程
[root@localhost config]# ls java_pid12011.hprof log4j2.properties my-app.conf test1.conf java_pid21521.hprof logstash-sample.conf pipelines.yml test2.conf jvm.options logstash.yml startup.options [root@localhost config]# [root@localhost config]# vim test3.conf input { stdin {} } filter { grok { match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" } remove_field => [ "message" ] } date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] } mutate { convert => [ "response","float" ] rename => { "response" => "response_new" } gsub => ["referrer","\"",""] split => ["clientip", "."] } } output { stdout { codec => "rubydebug" } } "test3.conf" [新] 23L, 556C 已写入 [root@localhost config]# ../bin/logstash -f test3.conf Using JAVA_HOME defined java: /opt/elasticsearch-7.6.1/jdk WARNING: Using JAVA_HOME while Logstash distribution comes with a bundled JDK. DEPRECATION: The use of JAVA_HOME is now deprecated and will be removed starting from 8.0. Please configure LS_JAVA_HOME instead. OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties [2022-01-11T17:57:16,417][INFO ][logstash.runner ] Log4j configuration path used is: /usr/local/logstash/config/log4j2.properties [2022-01-11T17:57:16,430][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.16.2", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 13.0.2+8 on 13.0.2+8 +indy +jit [linux-x86_64]"} [2022-01-11T17:57:16,693][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified [2022-01-11T17:57:17,938][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false} [2022-01-11T17:57:18,676][INFO ][org.reflections.Reflections] Reflections took 86 ms to scan 1 urls, producing 119 keys and 417 values [2022-01-11T17:57:19,987][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["/usr/local/logstash/config/test3.conf"], :thread=>"#<Thread:0x73426cc8 run>"} [2022-01-11T17:57:20,809][INFO ][logstash.javapipeline ][main] Pipeline Java execution initialization time {"seconds"=>0.81} WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by com.jrubystdinchannel.StdinChannelLibrary$Reader (file:/usr/local/logstash/vendor/bundle/jruby/2.5.0/gems/jruby-stdin-channel-0.2.0-java/lib/jruby_stdin_channel/jruby_stdin_channel.jar) to field java.io.FilterInputStream.in WARNING: Please consider reporting this to the maintainers of com.jrubystdinchannel.StdinChannelLibrary$Reader WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release [2022-01-11T17:57:20,882][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"} The stdin plugin is now waiting for input: [2022-01-11T17:57:20,933][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} 172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039 { "referrer" => "GET / HTTP/1.1", "response_new" => "403", "bytes" => "5039", "timestamp" => "07/Feb/2019:16:24:19 +0800", "clientip" => [ [0] "172", [1] "16", [2] "213", [3] "132" ], "@timestamp" => 2019-02-07T08:24:19.000Z, "host" => "localhost.localdomain", "@version" => "1" }
三:输出插件(output)
本插件的官方文档:https://www.elastic.co/guide/en/logstash/current/output-plugins.html
output是Logstash的最后阶段,一个事件可以经过多个输出,而一旦所有输出处理完成,整个事件就执行完成。 一些常用的输出包括:
- file: 表示将日志数据写入磁盘上的文件。
- elasticsearch:表示将日志数据发送给Elasticsearch。Elasticsearch可以高效方便和易于查询的保存数据。
3.1 输出到标准输出(stdout)
在控制台输出
output {
stdout {
codec => rubydebug
}
}
3.2 保存为文件(file)
将输出的数据存储到文件里面
output {
file {
path => "/data/log/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
}
}
3.3 输出到elasticsearch
将数据输出到es中
output {
elasticsearch {
host => ["192.168.1.1:9200","172.16.213.77:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
}
- host:是一个数组类型的值,后面跟的值是elasticsearch节点的地址与端口,默认端口是9200。可添加多个地址。
- index:写入elasticsearch的索引的名称,这里可以使用变量。Logstash提供了%{+YYYY.MM.dd}这种写法。在语法解析的时候,看到以+ 号开头的,就会自动认为后面是时间格式,尝试用时间格式来解析后续字符串。这种以天为单位分割的写法,可以很容易的删除老的数据或者搜索指定时间范围内的数据。此外,注意索引名中不能有大写字母。
- manage_template:用来设置是否开启logstash自动管理模板功能,如果设置为false将关闭自动管理模板功能。如果我们自定义了模板,那么应该设置为false。
- template_name:这个配置项用来设置在Elasticsearch中模板的名称。
综合测试
监控nginx单位日志,从nginx的日志文件中获取到数据,通过filter的转换,最终输出到es中对应的索引中
input {
file {
path => ["D:/ES/logstash-7.3.0/nginx.log"]
start_position => "beginning"
}
}
filter {
grok {
match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
remove_field => [ "message" ]
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
mutate {
rename => { "response" => "response_new" }
convert => [ "response","float" ]
gsub => ["referrer","\"",""]
remove_field => ["timestamp"]
split => ["clientip", "."]
}
}
output {
stdout {
codec => "rubydebug"
}
elasticsearch {
host => ["localhost:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
}