0. 介绍
Logstash是一款轻量级的日志搜集处理框架,可以方便的把分散的、多样化的日志搜集起来,并进行自定义的处理,然后传输到指定的位置,比如某个服务器或者文件。
http://kibana.logstash.es/content/logstash/
风来了.fox
1.下载和安装
下载地址
https://www.elastic.co/downloads/logstash
目前最新版 2.4.0
这里使用 TAR.GZ 源码安装,即方式一
方式一:TAR
wget https://download.elastic.co/logstash/logstash/logstash-2.4.0.tar.gz
tar -zxvf logstash-2.4.0.tar.gz
方式二:deb
curl -L -O https://download.elastic.co/logstash/logstash/packages/debian/logstash-2.4.0_all.deb
sudo dpkg -i logstash-2.4.0_all.deb
方式三:rpm
curl -L -O https://download.elastic.co/logstash/logstash/packages/centos/logstash-2.4.0.noarch.rpm
sudo rpm -vi logstash-2.4.0.noarch.rpm
这里是方式一后续操作
2.配置
创建配置目录
先进入 Logstash 根目录
mkdir -p etc
vim etc/www.lanmps.com.conf
etc/test.conf 文件内容
input {
file {
type => "nginx-access"
path => ["/www/wwwLogs/www.lanmps.com/*.log"]
start_position => "beginning"
}
}
filter {
grok {
"message"=>"%{IPORHOST:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" (%{HOSTNAME:domain}|-) %{NUMBER:response} (?:%{NUMBER:bytes}|-) (%{QS:referrer}) %{QS:agent} \"(%{WORD:x_forword}|-)\" (%{URIHOST:upstream_host}|-) (%{NUMBER:upstream_response}|-) (%{WORD:upstream_cache_status}|-) %{QS:upstream_content_type} (%{USERNAME:upstream_response_time}) > (%{USERNAME:response_time})"
#匹配模式 message是每段读进来的日志,IP、HTTPDATE、WORD、NOTSPACE、NUMBER都是patterns/grok-patterns中定义好的正则格式名称,对照上面的日志进行编写,冒号,(?:%{USER:ident}|-)这种形式是条件判断,相当于程序里面的二目运算。如果有双引号""或者[]号,需要在前面加\进行转义。
}
kv {
source => "request"
field_split => "&?"
value_split => "="
}
#再单独将取得的URL、request字段取出来进行key-value值匹配,需要kv插件。提供字段分隔符"&?",值键分隔符"=",则会自动将字段和值采集出来。
urldecode {
all_fields => true
}
#把所有字段进行urldecode(显示中文)
}
output {
elasticsearch {
hosts => ["10.1.5.66:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
document_type => "%{type}"
}
}
配置说明
http://kibana.logstash.es/content/logstash/plugins/input/file.html
2.2 Nginx 日志格式定义
log_format access '$remote_addr - $remote_user [$time_local] "$request" $http_host $status $body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for" $upstream_addr $upstream_status $upstream_cache_status "$upstream_http_content_type" $upstream_response_time > $request_time';
3.启动和停止
3.1测试命令
测试logstash
bin/logstash -e 'input { stdin { } } output { stdout {codec=>rubydebug} }'
然后你会发现终端在等待你的输入。没问题,敲入 Hello World,回车,然后看看会返回什么结果!
3.2 测试配置文件是否 正确
bin/logstash -t -f etc/
3.3 启动
加载 etc文件夹下所有 *.conf 的文本文件,然后在自己内存里拼接成一个完整的大配置文件
bin/logstash -f etc/
后台运行
nohup bin/logstash -f etc/ &
3.4停止
查找进程 ID
ps -ef |grep logstash
KILL 他
kill -9 id
4.高级配置
http://kibana.logstash.es/content/logstash/get_start/full_config.html
http://kibana.logstash.es/content/logstash/plugins/output/elasticsearch.html
插件安装
http://www.jianshu.com/p/4fe495639a9a
5.插件
5.1 grok,useragent,urldecode和kv插件功能
./bin/logstash-plugin install kv
./bin/logstash-plugin install urldecode
./bin/logstash-plugin install grok
./bin/logstash-plugin install useragent
使用方式
input {
file {
path => "/home/vovo/access.log" #指定日志目录或文件,也可以使用通配符*.log输入目录中的log文件。
start_position => "beginning"
}
}
filter {
grok {
match => {
"message"=>"%{IPORHOST:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" (%{HOSTNAME:domain}|-) %{NUMBER:response} (?:%{NUMBER:bytes}|-) (%{QS:referrer}) %{QS:agent} \"(%{WORD:x_forword}|-)\" (%{URIHOST:upstream_host}|-) (%{NUMBER:upstream_response}|-) (%{WORD:upstream_cache_status}|-) %{QS:upstream_content_type} (%{USERNAME:upstream_response_time}) > (%{USERNAME:response_time})"
}
}
kv {
source => "request"
field_split => "&?"
value_split => "="
}
#再单独将取得的URL、request字段取出来进行key-value值匹配,需要kv插件。提供字段分隔符"&?",值键分隔符"=",则会自动将字段和值采集出来。
urldecode {
all_fields => true
}
#把所有字段进行urldecode(显示中文)
}
x.报错
x.1 Pipeline aborted due to error {:exception=>”LogStash::ConfigurationError”,
原因:这类错误都是 参数不存在或参数写的不对造成,
例如 elasticsearch 新版host已改为hosts,port 已删除