Logstash(3)插件的使用(input、filter、output的使用)

一:输入插件(input)

输入插件地官方文档:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
Logstash(3)插件的使用(input、filter、output的使用)
下面将举例说明:

  1. 标准输入
  2. 读取文件
  3. 读取TCP网络数据

1 标准输入(stdin)

在控制台打helloworld(stdin{}),回车就会在当前控制台进行输出(codec=>rubydebug )

input{
    stdin{
       
    }
}
output {
    stdout{
        codec=>rubydebug    
    }
}

测试:

[root@localhost config]# ls
java_pid12011.hprof  jvm.options        logstash-sample.conf  my-app.conf    startup.options
java_pid21521.hprof  log4j2.properties  logstash.yml          pipelines.yml
[root@localhost config]# vim test1.conf
input{
    stdin{
       
    }
}
output {
    stdout{
        codec=>rubydebug    
    }
}
~
~
~
~
~
~
"test1.conf" [新] 17L, 167C 已写入                                                           
[root@localhost config]# ls
java_pid12011.hprof  jvm.options        logstash-sample.conf  my-app.conf    startup.options
java_pid21521.hprof  log4j2.properties  logstash.yml          pipelines.yml  test1.conf
[root@localhost config]# 

[root@localhost config]# ../bin/logstash -f test1.conf 
Using JAVA_HOME defined java: /opt/elasticsearch-7.6.1/jdk
WARNING: Using JAVA_HOME while Logstash distribution comes with a bundled JDK.
DEPRECATION: The use of JAVA_HOME is now deprecated and will be removed starting from 8.0. Please configure LS_JAVA_HOME instead.
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
[2022-01-11T15:49:40,297][INFO ][logstash.runner          ] Log4j configuration path used is: /usr/local/logstash/config/log4j2.properties
[2022-01-11T15:49:40,315][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.16.2", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 13.0.2+8 on 13.0.2+8 +indy +jit [linux-x86_64]"}
[2022-01-11T15:49:40,652][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2022-01-11T15:49:42,009][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[2022-01-11T15:49:42,568][INFO ][org.reflections.Reflections] Reflections took 84 ms to scan 1 urls, producing 119 keys and 417 values 
[2022-01-11T15:49:43,491][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["/usr/local/logstash/config/test1.conf"], :thread=>"#<Thread:0x772c1831 run>"}
[2022-01-11T15:49:44,165][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.67}
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by com.jrubystdinchannel.StdinChannelLibrary$Reader (file:/usr/local/logstash/vendor/bundle/jruby/2.5.0/gems/jruby-stdin-channel-0.2.0-java/lib/jruby_stdin_channel/jruby_stdin_channel.jar) to field java.io.FilterInputStream.in
WARNING: Please consider reporting this to the maintainers of com.jrubystdinchannel.StdinChannelLibrary$Reader
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[2022-01-11T15:49:44,268][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2022-01-11T15:49:44,327][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
sss
{
       "message" => "sss",
          "host" => "localhost.localdomain",
    "@timestamp" => 2022-01-11T07:49:47.049Z,
      "@version" => "1"
}
hello world
{
       "message" => "hello world",
          "host" => "localhost.localdomain",
    "@timestamp" => 2022-01-11T07:49:57.473Z,
      "@version" => "1"
}

2 读取文件(file)

可以按如下操作查看具体地内容:
Logstash(3)插件的使用(input、filter、output的使用)
Logstash(3)插件的使用(input、filter、output的使用)
logstash使用一个名为filewatch的ruby gem库来监听文件变化,并通过一个叫.sincedb的数据库文件来记录被监听的日志文件的读取进度(时间戳)

  1. sincedb数据文件的默认路径在 <path.data>/plugins/inputs/file下面,文件名类似于.sincedb_123456
  2. <path.data>表示logstash插件存储目录,默认是LOGSTASH_HOME/data。

举例:

  1. path:log日志的位置;启动该配置文件后,logstash就可以读取当前位置的日志的内容
  2. start_position :开始的位置,告诉logstash读取文件时从哪里开始读(下方的配置中是从开始开始读)
  3. 其会以,每一行的数据进行读
  4. 其会一直截图,如果不关掉的话
input {
    file {
        path => ["/var/*/*"]
        start_position => "beginning"
    }
}
output {
    stdout{
        codec=>rubydebug    
    }
}

默认情况下,logstash会从文件的结束位置开始读取数据,也就是说logstash进程会以类似tail -f命令的形式逐行获取数据。

3 读取TCP网络数据

读取网络数据,前面演示过,在这里就不演示了。

input {
  tcp {
    port => "1234"
  }
}

filter {
  grok {
    match => { "message" => "%{SYSLOGLINE}" }
  }
}

output {
    stdout{
        codec=>rubydebug
    }
}

二:过滤器插件(Filter)

详细看官方文档:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
Logstash(3)插件的使用(input、filter、output的使用)

1 Grok 正则捕获

通过正则解析任意文本,将非结构化日志数据弄成结构化和方便查询的结构。他是目前logstash 中解析非结构化日志数据最好的方式。

将一行数据,正则匹配一些东西。如logstash将数据读到之后,其会将一行数据读取到放到一个message里面。想要明确知道每一个字段是什么,方便查询,Grok就可以将这一行数据进行划分成多个字段。

Grok 的语法规则是:

%{语法: 语义}

举例:

  1. 例如输入的内容为:

    172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
    
    • %{IP:clientip}匹配模式将获得的结果为:clientip: 172.16.213.132
    • %{HTTPDATE:timestamp}匹配模式将获得的结果为:timestamp: 07/Feb/2018:16:24:19 +0800
    • %{QS:referrer}匹配模式将获得的结果为:referrer: “GET / HTTP/1.1”
  2. 下面是一个组合匹配模式,它可以获取上面输入的所有内容:

    %{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}
    

    通过上面这个组合匹配模式,我们将输入的内容分成了五个部分,即五个字段,将输入内容分割为不同的数据字段,这对于日后解析和查询日志数据非常有用,这正是使用grok的目的。

  3. 完整例子

    input{
        stdin{}
    }
    filter{
        grok{
            match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
        }
    }
    output{
        stdout{
            codec => "rubydebug"
        }
    }
    

    输入日志内容:

    172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
    
  4. 执行过程

    [root@localhost config]# ls
    java_pid12011.hprof  logstash-sample.conf  startup.options
    java_pid21521.hprof  logstash.yml          test1.conf
    jvm.options          my-app.conf
    log4j2.properties    pipelines.yml
    [root@localhost config]# vim test2.conf
    input {
        stdin {
        }
    }
    
    filter{
        grok{
            match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
        }
    }
    
    
    output {
        stdout {
            codec=>rubydebug
        }
    }
    ~
    ~
    ~
    ~
    "test2.conf" [新] 17L, 237C 已写入                
    [root@localhost config]# ../bin/logstash -f test2.conf 
    Using JAVA_HOME defined java: /opt/elasticsearch-7.6.1/jdk
    WARNING: Using JAVA_HOME while Logstash distribution comes with a bundled JDK.
    DEPRECATION: The use of JAVA_HOME is now deprecated and will be removed starting from 8.0. Please configure LS_JAVA_HOME instead.
    OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
    Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
    [2022-01-11T16:31:45,624][INFO ][logstash.runner          ] Log4j configuration path used is: /usr/local/logstash/config/log4j2.properties
    [2022-01-11T16:31:45,636][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.16.2", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 13.0.2+8 on 13.0.2+8 +indy +jit [linux-x86_64]"}
    [2022-01-11T16:31:45,919][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
    [2022-01-11T16:31:47,168][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
    [2022-01-11T16:31:47,858][INFO ][org.reflections.Reflections] Reflections took 70 ms to scan 1 urls, producing 119 keys and 417 values 
    [2022-01-11T16:31:49,066][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["/usr/local/logstash/config/test2.conf"], :thread=>"#<Thread:0xfabd406 run>"}
    [2022-01-11T16:31:49,824][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.75}
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by com.jrubystdinchannel.StdinChannelLibrary$Reader (file:/usr/local/logstash/vendor/bundle/jruby/2.5.0/gems/jruby-stdin-channel-0.2.0-java/lib/jruby_stdin_channel/jruby_stdin_channel.jar) to field java.io.FilterInputStream.in
    WARNING: Please consider reporting this to the maintainers of com.jrubystdinchannel.StdinChannelLibrary$Reader
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    [2022-01-11T16:31:49,910][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
    [2022-01-11T16:31:49,985][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
    The stdin plugin is now waiting for input:
    172.16.213.132 [07/Feb/2022:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
    {
          "response" => "403",
          "clientip" => "172.16.213.132",
        "@timestamp" => 2022-01-11T08:32:38.109Z,
             "bytes" => "5039",
          "referrer" => "\"GET / HTTP/1.1\"",
           "message" => "172.16.213.132 [07/Feb/2022:16:24:19 +0800] \"GET / HTTP/1.1\" 403 5039",
         "timestamp" => "07/Feb/2022:16:24:19 +0800",
              "host" => "localhost.localdomain",
          "@version" => "1"
    }
    
    

2 时间处理(Date)

date插件是对于排序事件和回填旧数据尤其重要,它可以用来转换日志记录中的时间字段,变成LogStash::Timestamp对象,然后转存到@timestamp字段里
举例(只是列出filter部分):

filter {
    grok {
        match => ["message", "%{HTTPDATE:timestamp}"]
    }
    date {
        match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
}

3 数据修改(Mutate)

3.1 正则表达式替换匹配字段(gsub)

gsub可以通过正则表达式替换字段中匹配到的值,只对字符串字段有效

举例:

  1. 这个示例表示将filed_name_1字段中所有"/“字符替换为”_"
    filter {
        mutate {
            gsub => ["filed_name_1", "/" , "_"]
        }
    }
    

3.2 分隔符分割字符串为数组(split)

split可以通过指定的分隔符分割字段中的字符串为数组
举例:

  1. 将filed_name_2字段以"|"为区间分隔为数组。
    filter {
        mutate {
            split => ["filed_name_2", "|"]
        }
    }
    

3.3 重命名字段(rename)

rename可以实现重命名某个字段的功能

举例:

  1. 将字段old_field重命名为new_field。
    filter {
        mutate {
            rename => { "old_field" => "new_field" }
        }
    }
    

3.4 删除字段(remove_field)

remove_field可以实现删除某个字段的功能

举例:

  1. 将字段timestamp删除。
    filter {
        mutate {
            remove_field  =>  ["timestamp"]
        }
    }
    

3.5 GeoIP 地址查询归类

filter {
    geoip {
        source => "ip_field"
    }
}

3.6 综合举例

配置文件内容:

  1. convert => [ “response”,“float” ]:将response转为float类型(本来是按NUMBER类型解析的)
  2. rename => { “response” => “response_new” } :将response改名为response_new
  3. input {
        stdin {}
    }
    filter {
        grok {
            match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
            remove_field => [ "message" ]
       }
    date {
            match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
        }
    mutate {
              convert => [ "response","float" ]
               rename => { "response" => "response_new" }   
               gsub => ["referrer","\"",""]          
               split => ["clientip", "."]
            }
    }
    output {
        stdout {
            codec => "rubydebug"
        }
     }
    
  4. 测试日志
    172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
    
  5. 运行过程
    [root@localhost config]# ls
    java_pid12011.hprof  log4j2.properties     my-app.conf      test1.conf
    java_pid21521.hprof  logstash-sample.conf  pipelines.yml    test2.conf
    jvm.options          logstash.yml          startup.options
    [root@localhost config]# 
    [root@localhost config]# vim test3.conf
    input {
        stdin {}
    }
    filter {
        grok {
            match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
            remove_field => [ "message" ]
       }
    date {
            match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
        }
    mutate {
              convert => [ "response","float" ]
               rename => { "response" => "response_new" }
               gsub => ["referrer","\"",""]
               split => ["clientip", "."]
            }
    }
    output {
        stdout {
            codec => "rubydebug"
        }
    }
    "test3.conf" [新] 23L, 556C 已写入                                               
    [root@localhost config]# ../bin/logstash -f test3.conf 
    Using JAVA_HOME defined java: /opt/elasticsearch-7.6.1/jdk
    WARNING: Using JAVA_HOME while Logstash distribution comes with a bundled JDK.
    DEPRECATION: The use of JAVA_HOME is now deprecated and will be removed starting from 8.0. Please configure LS_JAVA_HOME instead.
    OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
    Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
    [2022-01-11T17:57:16,417][INFO ][logstash.runner          ] Log4j configuration path used is: /usr/local/logstash/config/log4j2.properties
    [2022-01-11T17:57:16,430][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.16.2", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 13.0.2+8 on 13.0.2+8 +indy +jit [linux-x86_64]"}
    [2022-01-11T17:57:16,693][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
    [2022-01-11T17:57:17,938][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
    [2022-01-11T17:57:18,676][INFO ][org.reflections.Reflections] Reflections took 86 ms to scan 1 urls, producing 119 keys and 417 values 
    [2022-01-11T17:57:19,987][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["/usr/local/logstash/config/test3.conf"], :thread=>"#<Thread:0x73426cc8 run>"}
    [2022-01-11T17:57:20,809][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.81}
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by com.jrubystdinchannel.StdinChannelLibrary$Reader (file:/usr/local/logstash/vendor/bundle/jruby/2.5.0/gems/jruby-stdin-channel-0.2.0-java/lib/jruby_stdin_channel/jruby_stdin_channel.jar) to field java.io.FilterInputStream.in
    WARNING: Please consider reporting this to the maintainers of com.jrubystdinchannel.StdinChannelLibrary$Reader
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    [2022-01-11T17:57:20,882][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
    The stdin plugin is now waiting for input:
    [2022-01-11T17:57:20,933][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
    172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
    {
            "referrer" => "GET / HTTP/1.1",
        "response_new" => "403",
               "bytes" => "5039",
           "timestamp" => "07/Feb/2019:16:24:19 +0800",
            "clientip" => [
            [0] "172",
            [1] "16",
            [2] "213",
            [3] "132"
        ],
          "@timestamp" => 2019-02-07T08:24:19.000Z,
                "host" => "localhost.localdomain",
            "@version" => "1"
    }
    
    

三:输出插件(output)

本插件的官方文档:https://www.elastic.co/guide/en/logstash/current/output-plugins.html
output是Logstash的最后阶段,一个事件可以经过多个输出,而一旦所有输出处理完成,整个事件就执行完成。 一些常用的输出包括:

  • file: 表示将日志数据写入磁盘上的文件。
  • elasticsearch:表示将日志数据发送给Elasticsearch。Elasticsearch可以高效方便和易于查询的保存数据。

3.1 输出到标准输出(stdout)

在控制台输出

output {
    stdout {
        codec => rubydebug
    }
}

3.2 保存为文件(file)

将输出的数据存储到文件里面

output {
    file {
        path => "/data/log/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
    }
}

3.3 输出到elasticsearch

将数据输出到es中

output {
    elasticsearch {
        host => ["192.168.1.1:9200","172.16.213.77:9200"]
        index => "logstash-%{+YYYY.MM.dd}"       
    }
}
  • host:是一个数组类型的值,后面跟的值是elasticsearch节点的地址与端口,默认端口是9200。可添加多个地址。
  • index:写入elasticsearch的索引的名称,这里可以使用变量。Logstash提供了%{+YYYY.MM.dd}这种写法。在语法解析的时候,看到以+ 号开头的,就会自动认为后面是时间格式,尝试用时间格式来解析后续字符串。这种以天为单位分割的写法,可以很容易的删除老的数据或者搜索指定时间范围内的数据。此外,注意索引名中不能有大写字母
  • manage_template:用来设置是否开启logstash自动管理模板功能,如果设置为false将关闭自动管理模板功能。如果我们自定义了模板,那么应该设置为false。
  • template_name:这个配置项用来设置在Elasticsearch中模板的名称。

综合测试

监控nginx单位日志,从nginx的日志文件中获取到数据,通过filter的转换,最终输出到es中对应的索引中

input {
    file {
        path => ["D:/ES/logstash-7.3.0/nginx.log"]        
        start_position => "beginning"
    }
}

filter {
    grok {
        match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
        remove_field => [ "message" ]
   }
	date {
        match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
	mutate {
           rename => { "response" => "response_new" }
           convert => [ "response","float" ]
           gsub => ["referrer","\"",""]
           remove_field => ["timestamp"]
           split => ["clientip", "."]
        }
}

output {
    stdout {
        codec => "rubydebug"
    }

	elasticsearch {
	    host => ["localhost:9200"]
	    index => "logstash-%{+YYYY.MM.dd}"       
	}

}
上一篇:【Oracle】表空间概述


下一篇:新手入门 | 上链第一步,如何完成XuperChain环境、服务部署