1、yum源配置
[root@localhost ~]# cat > /etc/yum.repos.d/logstash.repo <<EOF
[logstash-1.5]
name=logstash repository for 1.5.x packages
baseurl=http://packages.elasticsearch.org/logstash/1.5/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1
EOF
[root@localhost ~]# yum clean all
[root@localhost ~]# yum makecache
2、安装logstash
[root@localhost ~]# yum install logstash
3、yum安装的目录如下
[root@logstash]$ whereis logstash
logstash: /etc/logstash /opt/logstash/bin/logstash.bat /opt/logstash/bin/logstash
/opt/logstash/bin/logstash #执行文件
/etc/logstash/conf.d/ #配置文件目录
4、java环境变量配置
因为logstash默认回去/usr/sbin/和/usr/bin/里找java,如果安装java时没有配置相关环境变量,则可在/usr/sbin/和/usr/bin/下做个软连接即可。
[root@localhost nginx]# which java
/usr/java/jdk1.8.0_60/bin/java
[root@localhost bin]# ln -s /usr/java/jdk1.8.0_60/bin/java java
注:每个 logstash 过滤插件,都会有四个方法叫 add_tag, remove_tag, add_field 和remove_field。它们在插件过滤匹配成功时生效。
5、配置文件(具体配置方法和说明可查看本人博客《logstash配置语法》
[root@logstash]$ more /etc/logstash/conf.d/logstash-nginx.conf
input {
file {
path => "/data/logs/nginx/*.log"
start_position => beginning
}
}
filter {
if [path] =~ "access" {
mutate { replace => { type => "apache_access" } }
grok {
match => { "message" => "%{IPORHOST:clientip} - %{USER:ident} \[%{HTTPDATE:timestamp}\] %{NUMBER:reqLen} \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{INT:status} %{NUMBER:respLen} %{NUMBER:duration} %{QS:referrer} %{QS:userAgent} %{QS:xforward} %{INT:conn}:%{INT:reqs}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
} else if [path] =~ "error" {
mutate { replace => { type => "apache_error" } }
grok {
match => { "message" => "(?<datetime>\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d) \[(?<errtype>\w+)\] \S+: \*\d+ (?<errmsg>[^,]+), (?<errinfo>.*)$" }
}
mutate {
rename => [ "host", "fromhost" ]
gsub => [ "errmsg", "too large body: \d+ bytes", "too large body" ]
}
if [errinfo]
{
ruby {
code => "event.append(Hash[event['errinfo'].split(', ').map{|l| l.split(': ')}])"
}
}
grok {
match => { "request" => '"%{WORD:verb} %{URIPATH:urlpath}?(?: HTTP/%{NUMBER:httpversion})"' }
patterns_dir => "/etc/logstash/patterns"
remove_field => [ "message", "errinfo", "request" ]
}
} else {
mutate { replace => { type => "random_logs" } }
}
mutate {
convert => [
"duration", "float", #将字段由str类型转换成数值类型
"status", "integer",
"respLen", "integer"
]
}
geoip {
source => "clientip"
#add_tag => [ "geoip" ]
database => "/etc/logstash/GeoLiteCity.dat" #本地ip库
}
geoip {
source => "client"
#add_tag => [ "geoip" ]
database => "/etc/logstash/GeoLiteCity.dat" #本地ip库
}
}
output {
elasticsearch {
host => ["10.173.17.71","10.172.198.108","10.170.237.100"] #数据以轮询的方式传输到数组中的主机列表,充分发挥elasticsearch集群的作用
protocol => "http"
ndex => "uustore-nginx-log-%{+yyyy.MM}" #索引名称,以日期进行命名会每天生产一个索引文件,当需要删除时可按时间进行清理而不影响其他索引使用,但需要注意的是在kibana设置索引时就需要使用通配符“*”。
}
stdout {
codec => rubydebug
}
}
6、时间戳(记录每次日志已抽取的时间点和行数)
[root@logstash]$ locate sincedb
/root/.sincedb_53b7f195d5f913db850de77bc552cec0
如果要重新抽取日志的话需要删除时间戳(如果不是以service logstash start方式启动服务的,则时间戳放在/root目录)
[root@localhost ~]# rm -f /var/lib/logstash/.sincedb_e90b8ae60d1c692cb46b94ebbf869e32
注:不同方式安装、不同方式启动,.sincedb文件有可以在/root/目录下也可以在/var/lib/logstash/目录下,看个人情况,上面就是本人两种不同启动方式生成的.sincedb存放路径
7、logstash有个插件geoip,提供公共的ip库进行查询
[root@localhost nginx]# cd /etc/logstash/
[root@localhost nginx]# wget http://geolite.maxmind.com/download/geoip/database/GeoLiteCity.dat.gz
[root@localhost nginx]# gunzip GeoLiteCity.dat.gz
[root@localhost nginx]# cd conf.d/
[root@localhost nginx]# /opt/logstash/bin/logstash -f logstash-nginx.conf