Elastic Stack学习--logstash入门

Elastic Stack学习--logstash入门

    logstash是基于实时管道的数据收集引擎。它像一根处理数据的管道,收集分散的数据,进行汇总处理后输出给下游进行数据分析和展现。

Elastic Stack学习--logstash入门

    logstash可以配合Beats组件或者其它第三方组件进行数据收集,数据经过处理后存放到elasticsearch中进行检索和分析。

Elastic Stack学习--logstash入门

    除了对接beats以外,logstash有着丰富的组件,能够支持各种数据源接入。包括jdbc、kafka、http等。

Elastic Stack学习--logstash入门

    logstash是基于pipeline方式进行数据处理的,pipeline可以理解为数据处理流程的抽象。在一条pipeline数据经过上游数据源汇总到消息队列中,然后由多个工作线程进行数据的转换处理,最后输出到下游组件。一个logstash中可以包含多个pipeline。

Elastic Stack学习--logstash入门

基本概念

Elastic Stack学习--logstash入门

  • pipeline:一条数据处理流程的逻辑抽象,类似于一条管道,数据从一端流入,经过处理后,从另一端流出;一个pipeline包括输入、过滤、输出3个部分,其中输入和输出部分是必选组件,过滤是可选组件;
  • instance:一个logstash实例,可以包含多条数据处理流程,即多个pipeline;
  • inputs:数据输入组件,用于对接各种数据源,接入数据,支持解码器,允许对数据进行编码解码操作;必选组件;
  • filters:数据过滤组件,负责对输入数据进行加工处理;可选组件;
  • outputs:数据输出组件,用于对接下游组件,发送处理后的数据,支持解码器,允许对数据进行编码解码操作;必选组件;
  • event:pipeline中的数据都是基于事件的,一个event可以看作是数据流中的一条数据或者一条消息;

安装logstash

1)依赖java8,且不支持java9:检查java版本并配置JAVA_HOME环境变量。logstash基于jruby开发,logstash 6.x版本要求运行在java8环境,且目前不支持java9;
2)下载并解压:下载logstash,解压文件;注意logstash所在路径中不可以包含冒号;

tar -zxvf logstash-6.2.2.tar.gz
cd logstash-6.2.2

3)启动logstash,发布第一个事件:通过-e指定一个pipeline的处理流程,指定从stdin中读取event,然后在stdout输出;

bin/logstash -e 'input { stdin { } } output { stdout {} }'

可以看到类似如下日志:

Sending Logstash's logs to /home/work/zion_package/elastic/logstash/logstash-6.2.2/logs which is now configured via log4j2.properties
h[2018-03-14T14:48:00,053][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/home/work/zion_package/elastic/logstash/logstash-6.2.2/modules/fb_apache/configuration"}
[2018-03-14T14:48:00,074][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/home/work/zion_package/elastic/logstash/logstash-6.2.2/modules/netflow/configuration"}
[2018-03-14T14:48:00,172][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/home/work/zion_package/elastic/logstash/logstash-6.2.2/data/queue"}
[2018-03-14T14:48:00,178][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/home/work/zion_package/elastic/logstash/logstash-6.2.2/data/dead_letter_queue"}
[2018-03-14T14:48:00,631][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-03-14T14:48:00,672][INFO ][logstash.agent           ] No persistent UUID file found. Generating new UUID {:uuid=>"5901ab9f-fdc9-43dc-a88d-c5c636cf8224", :path=>"/home/work/zion_package/elastic/logstash/logstash-6.2.2/data/uuid"}
[2018-03-14T14:48:01,337][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.2.2"}
[2018-03-14T14:48:01,733][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-03-14T14:48:03,264][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>12, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-03-14T14:48:03,453][INFO ][logstash.pipeline        ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0x128244e5 run>"}
The stdin plugin is now waiting for input:
[2018-03-14T14:48:03,546][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}

在控制台随便输入字符串按回车后,发现消息会立刻在控制台输出,此时pipeline的处理过程是从stdin接收我们的输入,然后再在stdout输出:

Hello, this is my first event to logstash !
2018-03-14T06:50:26.261Z yf-beidou-dmp00.yf01.baidu.com Hello, this is my first event to logstash !

logstash会给消息添加ip和时间戳,要退出logstash,输入ctrl+d;

通过Filebeat发送日志到logstash

配置Filebeat

    Filebeat用于追踪服务器上的文件数据,它的设计以可靠和低资源占用为初衷,和业务系统部署在一起时,能够避免因为资源消耗影响到业务系统的正常运行。logstash安装后默认包含Beats input组件,用于接收各种beat组件上报事件。Filebeat也可以直接上报事件给elasticsearch,而不用经过logstash。

1)下载文件样例logstash-tutorial.log.gz,上传至Filebeat将要部署的服务器并解压;该样例为官方文档提供的apache的web日志样例。
2)下载Filebeat,上传服务器并解压;此处下载LINUX 64-BIT的安装包;

tar -zxvf filebeat-6.2.2-linux-x86_64.tar.gz
cd filebeat-6.2.2-linux-x86_64

3)配置Filebeat,修改filebeat.yml,设置监控日志文件路径和上报logstash地址;

filebeat.prospectors:
- type: log
  # 监控日志文件路径
  paths:
    - /path/to/file/logstash-tutorial.log 
output.logstash:
  # 上报logstash地址
  hosts: ["localhost:5044"]

注:设置监控日志文件的路径一定是绝对路径,支持通配符;

4)启动Filebeat;其中-e参数指定输出日志到stderr,而非输出到日志文件;
-c参数指定配置文件路径;-d参数debug选择器;

./filebeat -e -c filebeat.yml -d "publish"

配置logstash

1)创建pipeline配置文件;pipeline配置格式如下:

# The # character at the beginning of a line indicates a comment. Use
# comments to describe your configuration.
input {
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
}

创建一个名为first-pipeline.conf的文件,配置如下:

input {
    beats {
        # 设置beats上报端口
        port => "5044"
    }
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
    # 输出到stdout,同时指定日志解码器为rubydebug
    stdout { codec => rubydebug }
}

2)校验配置是否正确,命令如下。--config.test_and_exit选项会校验配置文件,并输出错误;

bin/logstash -f first-pipeline.conf --config.test_and_exit

输出类似如下日志,说明配置文件格式校验通过:

Configuration OK
[2018-03-14T16:23:08,919][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

3)启动logstash,命令如下。--config.reload.automatic选项能够使得配置文件修改后被自动加载,从而避免重新启动logstash;

bin/logstash -f first-pipeline.conf --config.reload.automatic

如果配置正确,可以看到logstash命令行有类似如下输出:

{
          "host" => "yf-beidou-dmp00.yf01.baidu.com",
        "offset" => 22310,
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
       "message" => "218.30.103.62 - - [04/Jan/2015:05:28:43 +0000] \"GET /blog/geekery/xvfb-firefox.html HTTP/1.1\" 200 10975 \"-\" \"Sogou web spider/4.0(+http://www.sogou.com/docs/help/webmasters.htm#07)\"",
    "prospector" => {
        "type" => "log"
    },
          "beat" => {
            "name" => "yf-beidou-dmp00.yf01.baidu.com",
         "version" => "6.2.2",
        "hostname" => "yf-beidou-dmp00.yf01.baidu.com"
    },
    "@timestamp" => 2018-03-14T08:43:52.564Z,
        "source" => "/home/work/zion_package/elastic/filebeat/logstash-tutorial.log",
      "@version" => "1"
}

通过Grok过滤组件解析日志

通过上面的例子,我们可以将filebeat上报的日志经由logstash输出,接下来将添加filter组件,对日志进行处理。grok组件是logstash的filter组件之一,它可以将非结构化的数据按照一定规则整理成结构化数据,从而便于检索。这些规则需要根据日志格式事先设定好,因此需要了解采集日志的格式。因为我们的样例日志是apache的web日志,因而可以直接使用grok提供的%{COMBINEDAPACHELOG}格式进行日志的过滤,过滤后的日志格式如下:

Elastic Stack学习--logstash入门

修改first-pipeline.conf文件,增加filter,如下:

input {
    beats {
        port => "5044"
    }
}
filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
}
output {
    stdout { codec => rubydebug }
}

因为启动logstash时添加了--config.reload.automatic选项,logstash能够自动加载修改后的配置文件,因而不需要重启;修改保存后可以看到重新加载配置,重启pipeline的日志:

[2018-03-14T17:04:57,325][INFO ][logstash.pipelineaction.reload] Reloading pipeline {"pipeline.id"=>:main}
[2018-03-14T17:05:01,758][INFO ][logstash.pipeline        ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x58f295b9 run>"}
[2018-03-14T17:05:02,048][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>12, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-03-14T17:05:02,362][INFO ][logstash.inputs.beats    ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[2018-03-14T17:05:02,422][INFO ][logstash.pipeline        ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0x5506295b sleep>"}
[2018-03-14T17:05:02,429][INFO ][org.logstash.beats.Server] Starting server on port: 5044
[2018-03-14T17:05:02,453][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}

为了能够使filebeat重新读取文件,需要停止filebeat,删除读取文件的保存点记录,并重启filebeat:

cd filebeat-6.2.2-linux-x86_64
rm data/registry

添加filter后,输出的日志格式如下,发现不仅输出了日志原文,同时对日志进行解析切割,存放到相应的字段中。

{
           "host" => "yf-beidou-dmp00.yf01.baidu.com",
       "clientip" => "121.107.188.202",
           "verb" => "GET",
           "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
        "message" => "121.107.188.202 - - [04/Jan/2015:05:27:57 +0000] \"GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png HTTP/1.1\" 200 171717 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.107 Safari/537.36\"",
           "beat" => {
            "name" => "yf-beidou-dmp00.yf01.baidu.com",
         "version" => "6.2.2",
        "hostname" => "yf-beidou-dmp00.yf01.baidu.com"
    },
    "httpversion" => "1.1",
           "auth" => "-",
       "response" => "200",
          "bytes" => "171717",
          "ident" => "-",
       "@version" => "1",
        "request" => "/presentations/logstash-monitorama-2013/images/kibana-dashboard3.png",
         "offset" => 21927,
     "prospector" => {
        "type" => "log"
    },
     "@timestamp" => 2018-03-14T09:06:47.927Z,
         "source" => "/home/work/zion_package/elastic/filebeat/logstash-tutorial.log",
      "timestamp" => "04/Jan/2015:05:27:57 +0000",
          "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.107 Safari/537.36\"",
       "referrer" => "\"-\""
}

使用Geoip过滤器组件增强数据处理

geoip也是filter组件的一种,用于从ip地址中解析出位置信息,并添加到输出日志中;geoip需要配置需要指定存放ip地址的字段,在本例中,我们使用通过gork解析后clientip字段;因为过滤器是按顺序过滤,所以需要确保geoip的过滤器在之前配置的gork过滤器之后,配置文件如下:

input {
    beats {
        port => "5044"
    }
}
 filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
    geoip {
        # 指定要进行ip解析的字段
        source => "clientip"
    }
}
output {
    stdout { codec => rubydebug }
}

停止filebeat,删除data/registry文件,重启filebeats后再次查看日志,发现新增了地理位置信息:

{
           "host" => "yf-beidou-dmp00.yf01.baidu.com",
       "clientip" => "218.30.103.62",
           "verb" => "GET",
           "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
        "message" => "218.30.103.62 - - [04/Jan/2015:05:28:43 +0000] \"GET /blog/geekery/xvfb-firefox.html HTTP/1.1\" 200 10975 \"-\" \"Sogou web spider/4.0(+http://www.sogou.com/docs/help/webmasters.htm#07)\"",
           "beat" => {
            "name" => "yf-beidou-dmp00.yf01.baidu.com",
         "version" => "6.2.2",
        "hostname" => "yf-beidou-dmp00.yf01.baidu.com"
    },
    "httpversion" => "1.1",
           "auth" => "-",
       "response" => "200",
          "bytes" => "10975",
          "ident" => "-",
       "@version" => "1",
        "request" => "/blog/geekery/xvfb-firefox.html",
          "geoip" => {
              "location" => {
            "lat" => 39.9289,
            "lon" => 116.3883
        },
              "latitude" => 39.9289,
        "continent_code" => "AS",
           "region_code" => "11",
         "country_code3" => "CN",
         "country_code2" => "CN",
             "longitude" => 116.3883,
             "city_name" => "Beijing",
          "country_name" => "China",
                    "ip" => "218.30.103.62",
           "region_name" => "Beijing",
              "timezone" => "Asia/Shanghai"
    },
         "offset" => 22310,
     "prospector" => {
        "type" => "log"
    },
     "@timestamp" => 2018-03-14T09:22:31.299Z,
         "source" => "/home/work/zion_package/elastic/filebeat/logstash-tutorial.log",
      "timestamp" => "04/Jan/2015:05:28:43 +0000",
          "agent" => "\"Sogou web spider/4.0(+http://www.sogou.com/docs/help/webmasters.htm#07)\"",
       "referrer" => "\"-\""
}

将logstash数据输出到elasticsearch

1)修改output,写入数据到elasticsearch:修改first-pipeline.conf文件,配置elasticsearch访问地址;

input {
    beats {
        port => "5044"
    }
}
 filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
    geoip {
        source => "clientip"
    }
}
output {
    elasticsearch {
        # 指定elasticsearch地址,指定多个地址,logstash会自动负载均衡
        hosts => [ "ip1:port1", "ip2,port2" ]
        # 如果设置用户名和密码,则需要指定如下两个字段;
        # 用户必须具有对index的CRUD权限;
        user => "username"
        password => "password"
    }
}

2)重新发送消息:停止filebeat,删除data/registry文件,重启filebeats;
3)检索日志:执行如下语句,查询是否有日志写入elasticsearch,ip和端口是elasticsearch实例的ip和端口;因为first-pipeline.conf中未指定创建index的名称格式,默认为:logstash-yyyy.MM.dd(日期部分需要替换);如果未指定用户名/密码,则不需要-u参数;

curl -XGET 'ip:port/logstash-2018.03.14/_search?pretty&q=response=200' -u username:password

检索结果类似如下json串:

{
  "took" : 20,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 98,
    "max_score" : 2.3988724,
    "hits" : [
      {
        "_index" : "logstash-2018.03.14",
        "_type" : "doc",
        "_id" : "t76VJGIBe9U4s2F_OL_W",
        "_score" : 2.3988724,
        "_source" : {
          "verb" : "GET",
          "@timestamp" : "2018-03-14T12:56:19.779Z",
          "response" : "200",
          "agent" : "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
          "source" : "/home/work/zion_package/elastic/filebeat/logstash-tutorial.log",
          "host" : "yf-beidou-dmp00.yf01.baidu.com",
          "auth" : "-",
          "clientip" : "83.149.9.216",
          "geoip" : {
            "country_code3" : "RU",
            "continent_code" : "EU",
            "ip" : "83.149.9.216",
           ... ...

4)在kibana上配置index-pattern,通过Discover检索日志:
点击菜单:Management -> Index Pattern;

Elastic Stack学习--logstash入门

点击 Create Index Pattern,创建index pattern;

Elastic Stack学习--logstash入门

点击菜单:Discover,检索日志;

Elastic Stack学习--logstash入门

参考

官方视频:Getting Started with Logstash
官方文档:Getting Started with Logstash

上一篇:java并发编程笔记--PriorityBlockingQueue实现


下一篇:Elastic Stack学习--elasticsearch安装X-Pack