下载安装
地址:https://www.elastic.co/cn/downloads/logstash
实例分析
下面是一条tomcat日志:
83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png
HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel
Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
从filebeat中输出到logstash,配置如下:
input {
beats {
port => "5043"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
output {
stdout { codec => rubydebug }
}
使配置文件生效:
bin/logstash -f logstash-simple.conf
检查配置并启动Logstash
bin/logstash -f first-pipeline.conf --config.test_and_exit
--config.test_and_exit
: 解析配置文件并报告任何错误
bin/logstash -f first-pipeline.conf --config.reload.automatic
--config.reload.automatic
: 启用自动配置加载,以至于每次你修改完配置文件以后无需停止然后重启Logstash
fileter中的message
代表一条一条的日志,%{COMBINEDAPACHELOG}
代表解析日志的正则表达式,COMBINEDAPACHELOG的具体内容见: https://github.com/logstash-plugins/logstash-patterns-core。解析后:
{
"request" => "/presentations/logstash-monitorama-2013/images/kibana-search.png",
"agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
"offset" => 325,
"auth" => "-",
"ident" => "-",
"input_type" => "log",
"verb" => "GET",
"source" => "/path/to/file/logstash-tutorial.log",
"message" => "83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1\" 200 203023 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
"type" => "log",
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"referrer" => "\"http://semicomplete.com/presentations/logstash-monitorama-2013/\"",
"@timestamp" => 2016-10-11T21:04:36.167Z,
"response" => "200",
"bytes" => "203023",
"clientip" => "83.149.9.216",
"@version" => "1",
"beat" => {
"hostname" => "My-MacBook-Pro.local",
"name" => "My-MacBook-Pro.local"
},
"host" => "My-MacBook-Pro.local",
"httpversion" => "1.1",
"timestamp" => "04/Jan/2015:05:13:42 +0000"
}
再比如,下面这条日志:
55.3.244.1 GET /index.html 15824 0.043
这条日志可切分为5个部分,IP(55.3.244.1)
、方法(GET)
、请求文件路径(/index.html)
、字节数(15824)
、访问时长(0.043)
,对这条日志的解析模式(正则表达式匹配)如下:
%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
写到filter中:
filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}
}
}
解析后:
client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043
解析任意格式日志
解析任意格式日志的步骤:
- 先确定日志的切分原则,也就是一条日志切分成几个部分。
- 对每一块进行分析,如果Grok中正则满足需求,直接拿来用。如果Grok中没用现成的,采用自定义模式。
- 调试Grok:https://www.5axxw.com/tools/v2/grok.html。
下面给出例子,来两条日志:
2017-03-07 00:03:44,373 4191949560 [ CASFilter.java:330:DEBUG] entering doFilter()
2017-03-16 00:00:01,641 133383049 [ UploadFileModel.java:234:INFO ] 上报内容准备写入文件
切分原则:
2017-03-16 00:00:01,641:时间
133383049:编号
UploadFileModel.java:java类名
234:代码行号
INFO:日志级别
entering doFilter():日志内容
前五个字段用Grok中已有的,分别是TIMESTAMP_ISO8601
、NUMBER
、JAVAFILE
、NUMBER
、LOGLEVEL
,最后一个采用自定义正则的形式,日志级别的]之后的内容不论是中英文,都作为日志信息处理,使用自定义正则表达式子的规则如下:
(?<field_name>the pattern here)
最后一个字段的内容用info表示,正则如下:
(?<info>([\s\S]*))
上面两条日志对应的完整的正则如下,其中\s*
用于剔除空格。
\s*%{TIMESTAMP_ISO8601:time}\s*%{NUMBER:num} \[\s*%{JAVAFILE:class}\s*\:\s*%{NUMBER:lineNumber}\s*\:%{LOGLEVEL:level}\s*\]\s*(?<info>([\s\S]*))
正则解析容易出错,强烈建议使用Grok Debugger调试,姿势如下。
配置文件类型
log-kafka配置文件
输入源为nginx的日志文件,输出源为kafka
input {
file {
path => "/var/logs/nginx/*.log"
discover_interval => 5
start_position => "beginning"
}
}
output {
kafka {
topic_id => "accesslog"
codec => plain {
format => "%{message}"
charset => "UTF-8"
}
bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092"
}
}
file-kafka配置文件
输入源为txt文件,输出源为kafka
input {
file {
codec => plain {
charset => "GB2312"
}
path => "D:/GameLog/BaseDir/*/*.txt"
discover_interval => 30
start_position => "beginning"
}
}
output {
kafka {
topic_id => "gamelog"
codec => plain {
format => "%{message}"
charset => "GB2312"
}
bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092"
}
}
log-elasticsearch配置文件
输入源为nginx的日志文件,输出源为elasticsearch
input {
file {
type => "flow"
path => "var/logs/nginx/*.log"
discover_interval => 5
start_position => "beginning"
}
}
output {
if [type] == "flow" {
elasticsearch {
index => "flow-%{+YYYY.MM.dd}"
hosts => ["hadoop1:9200", "hadoop2:9200", "hadoop3:9200"]
}
}
}
本地日志输出到ES
input {
file {
path => "/root/journal.log"
type => "journal"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
output{
elasticsearch {
hosts => "192.168.4.155:9200"
user => "elastic"
# 索引名称
index => "logstash_logs-%{+YYYY.MM.dd}"
ecs_compatibility => disabled
}
stdout {
# JSON格式输出
codec=> json_lines
}
}
kafka-elasticsearch配置文件
输入源为kafka的accesslog和gamelog主题,并在中间分别针对accesslog和gamelog进行过滤,输出源为elasticsearch。当input里面有多个kafka输入源时,client_id => "es*"必须添加且需要不同,否则会报错javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-0。
input {
kafka {
type => "accesslog"
codec => "plain"
auto_offset_reset => "earliest"
client_id => "es1"
group_id => "es1"
topics => ["accesslog"]
}
kafka {
type => "gamelog"
codec => "plain"
auto_offset_reset => "earliest"
client_id => "es2"
group_id => "es2"
topics => ["gamelog"]
bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092"
}
}
filter {
if [type] == "accesslog" {
json {
source => "message"
remove_field => ["message"]
target => "access"
}
}
if [type] == "gamelog" {
mutate {
split => { "message" => " " }
add_field => {
"event_type" => "%{message[3]}"
"current_map" => "%{message[4]}"
"current_x" => "%{message[5]}"
"current_y" => "%{message[6]}"
"user" => "%{message[7]}"
"item" => "%{message[8]}"
"item_id" => "%{message[9]}"
"current_time" => "%{message[12]}"
}
remove_field => ["message"]
}
}
}
output {
if [type] == "accesslog" {
elasticsearch {
index => "accesslog"
codec => "json"
hosts => ["hadoop1:9200","hadoop2:9200","hadoop3:9200"]
}
}
if [type] == "gamelog" {
elasticsearch {
flush_size => 20000 //表示logstash的包数量达到20000个才批量提交到es.默认是500
idle_flush_time => 10 //多长时间发送一次数据,flush_size和idle_flush_time以定时定量的方式发送,按照批次发送,可以减少logstash的网络IO请求
index => "gamelog"
codec => plain {
charset => "UTF-16BE"
}
hosts => ["hadoop1:9200","hadoop2:9200","hadoop3:9200"]
}
}
}
注:UTF-16BE为解决中文乱码,而不是UTF-8
logstash启动
logstash -f /opt/app/logstash-6.2.3/conf/flow-kafka.conf
logstash遇到的问题
在使用logstash采集日志时,如果我们采用file为input类型,采用不能反复对一份文件进行测试!第一次会成功,之后就会失败!
Grok插件
Logstash的Grok 可以使蹩脚的、无结构,杂乱无章的的日志内容结构化
需要注意的地方
grok 模式是正则表达式,因此这个插件的性能受到正则表达式引擎影响,效率并不高。如果通过给定的匹配格式匹配不上,那么Kibana查询的时候会自动打上tag 为 grok failed 的标签。
为什么通过正则表达式效率不高?
个人见解: 如果你的每一条日志都是无结构的,那么Grok 需要对每一条日志去正则匹配,这样子就相当于你用java或者php对每一个日志进行分析然后过滤出自己想要的字段一样,同时也和Linux上的Grep或者Awk 去过滤文本内容是一样,都会消耗内存和CPU资源。
然而假如你的日志是有结构化的比如JSON格式的日志,那么高级语言像Python等直接导入json的库就可以,(Logstash里面直接使用json的编码插件即可)不需要对每一条日志进行过多的分析,那样解析的效率就会很高
语法详解
grok模式的语法如下:
%{SYNTAX:SEMANTIC}
SYNTAX:代表匹配值的类型,例如3.44可以用NUMBER类型所匹配,127.0.0.1可以使用IP类型匹配。
SEMANTIC:代表存储该值的一个变量名称,例如 3.44 可能是一个事件的持续时间,10.0.0.2 可能是请求的client地址。
所以这两个值可以用 %{NUMBER:duration} %{IP:client} 来匹配。
你也可以选择将数据类型转换添加到Grok模式。默认情况下,所有语义都保存为字符串。如果您希望转换语义的数据类型。
例如将字符串更改为整数,则将其后缀为目标数据类型。
例如%{NUMBER:num:int}将num语义从一个字符串转换为一个整数。
匹配的典型格式举例
- %{NUMBER:duration} — 匹配浮点数
- %{IP:client} — 匹配IP
- (?([\S+])),自定义正则
- (?<class_info>([\S+])), 自定义正则匹配多个字符
- \s或者\s+,代表多个空格
- \S+或者\S,代表多个字符
- 大括号里面:xxx,相当于起别名
- %{UUID},匹配类似091ece39-5444-44a1-9f1e-019a17286b48
- %{WORD}, 匹配请求的方式
- %{GREEDYDATA},匹配所有剩余的数据
- %{LOGLEVEL:loglevel} ---- 匹配日志级别
自定义类型
(1) 命名捕获
(?<xx>[0-9A-F]{4}) # 这样可以匹配连续 长度为4的数字,并用xx来存储
(?<xx>[0-9A-F]{4}) # 这样可以匹配连续 长度为5的数字
(2) 创建自定义 patterns 文件进行匹配
-
2.1 创建一个名为patterns其中创建一个文件xx_postfix (名字任意定义),在该文件中,将需要的模式写为
模式名称,空格,然后是该模式的正则表达式
例如:#contents of ./patterns/xx_postfix: xx [0-9A-F]{10,11}
- 2.2 然后使用这个插件中的patterns_dir设置告诉logstash目录是你的自定义模式。
filter { grok { patterns_dir => ["./patterns"] match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:xx}: %{GREEDYDATA:syslog_message}" } } }
Grok过滤器配置选项
break_on_match
● 值类型是布尔值
● 默认是true
● 描述:match可以一次设定多组,预设会依照顺序设定处理,如果日志满足设定条件,则会终止向下处理。但有的时候我们会希望让Logstash跑完所有的设定,这时可以将break_on_match设为false
keep_empty_captures
● 值类型是布尔值
● 默认值是 false
● 描述:如果为true,捕获失败的字段将设置为空值
match
● 值类型是数组
● 默认值是 {}
● 描述:字段⇒值匹配
例如:
filter {
grok { match => { "message" => "Duration: %{NUMBER:duration}" } }
}
//如果你需要针对单个字段匹配多个模式,则该值可以是一组,例如:
filter {
grok { match => { "message" => [ "Duration: %{NUMBER:duration}", "Speed: %{NUMBER:speed}" ] } }
}
实例2
output plugin docs: https://www.elastic.co/guide/en/logstash/current/output-plugins.html
input plugin dosc: https://www.elastic.co/guide/en/logstash/current/input-plugins.html
input {
file {
path => "/usr/local/nginx/logs/access.log"
type => "nginx"
start_position => "beginning"
//每次从头读文件,意味着会重复发送log,慎用
sincedb_path => "/dev/null"
bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092"
//path属性接受的参数是一个数组,其含义是标明需要读取的文件位置
path => [‘pathA’,‘pathB’]
//表示多就去path路径下查看是够有新的文件产生。默认是15秒检查一次。
discover_interval => 15
//#排除不想监听的文件
exclude => [‘fileName1’,‘fileNmae2’]
//添加自定义的字段
add_field => {"test"=>"test"}
//增加标签
tags => "tag1"
//#设置新事件的标志
delimiter => "\n"
//#设置多长时间扫描目录,发现新文件
discover_interval => 15
//#设置多长时间检测文件是否修改
stat_interval => 1
//被监听的文件多久没更新后断开连接不在监听,默认是一个小时。
close_older => 3600
//在每次检查文件列 表的时候, 如果一个文件的最后 修改时间 超过这个值, 就忽略这个文件。 默认一天。
ignore_older => 86400
//logstash 每隔多 久检查一次被监听文件状态( 是否有更新) , 默认是 1 秒。
stat_interval => 1
//#监听文件的起始位置,默认是end
start_position => ‘beginning’
//注意:如果需要每次都从同开始读取文件的话,关设置start_position => beginning是没有用的,可以选择sincedb_path 定义为 /dev/null
}
file {
path => "/var/log/secure"
type => "secure"
start_position => "beginning"
sincedb_path => "/dev/null"
}
beats {
//数据类型
type => "logs"
//接受数据端口
port => 5044
//ssl => true
//ssl_certificate => "/etc/logstash/logstash.crt"
//ssl_key => "/etc/logstash/logstash.key"
}
}
// filter 模块主要是数据预处理,提取一些信息,方便 elasticsearch 好归类存储。
filter {
grok {
match => {"message" => "%{EXIM_DATE:timestamp}\|%{LOGLEVEL:log_level}\|%{INT:pid}\|%{GREEDYDATA}"}
// message 字段是 log 的内容,例如 2018-12-11 23:46:47.051|DEBUG|3491|helper.py:85|helper._save_to_cache|shop_session
//在这里我们提取出了 timestamp log_level pid,grok 有内置定义好的 patterns: EXIM_DATE, EXIM_DATE, INT
// GREEDYDATA 贪婪数据,代表任意字符都可以匹配
}
// 如果在 filebeat 里面添加了这个字段[fields][function] 的话,那就会执行对应的 match 规则去匹配 path
// source 字段就是 log 的来源路径,例如 /var/log/nginx/feiyang233.club.access.log
// match 后我们就可以得到 path=feiyang233.club.access
if [fields][function]=="nginx" {
grok {
match => {"source" => "/var/log/nginx/%{GREEDYDATA:path}.log%{GREEDYDATA}"}
}
}
// 例如 ims 日志来源是 /var/log/ims_logic/debug.log
// match 后我们就可以得到 path=ims_logic
else if [fields][function]=="ims" {
grok {
match => {"source" => "/var/log/%{GREEDYDATA:path}/%{GREEDYDATA}"}
}
}
else {
grok {
match => {"source" => "/var/log/app/%{GREEDYDATA:path}/%{GREEDYDATA}"}
}
}
// filebeat 有定义 [fields][function] 时,我们就添加上这个字段,例如 QA
if [fields][function] {
mutate {
add_field => {
"function" => "%{[fields][function]}"
}
}
}
// 因为线上的机器更多,线上的我默认不在 filebeat 添加 function,所以 else 添加上 live
else {
mutate {
add_field => {
"function" => "live"
}
}
}
// 在之前 filter message 时,我们得到了 timestamp,这里修改一下格式,添加上时区。
date {
match => ["timestamp" , "yyyy-MM-dd HH:mm:ss Z"]
target => "@timestamp"
timezone => "Asia/Singapore"
}
// 将之前获得的 path 替换其中的 / 替换为 - , 因为 elasticsearch index name 有要求
// 例如 feiyang/test feiyang_test
mutate {
gsub => ["path","/","-"]
add_field => {"host_ip" => "%{[fields][host]}"}
remove_field => ["tags","@version","offset","beat","fields","exim_year","exim_month","exim_day","exim_time","timestamp"]
}
// remove_field 去掉一些多余的字段
}
// 单节点 output 就在本机,也不需要 SSL, 但 index 的命名规则还是需要非常的注意
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "sg-%{function}-%{path}-%{+xxxx.ww}"
// sg-nginx-feiyang233.club.access-2019.13 ww 代表周数
}
}
//输出到文件
output{
file{
path => "/tmp/%{+YYYY.MM.dd}-%{host}-file.txt"
}
}
output {
if [type] == "nginx" {
elasticsearch {
hosts => ["http://192.168.0.9:9200"]
index => "nginx-%{+YYYY.MM.dd}"
}
}
else if [type] == "secure" {
elasticsearch {
hosts => ["http://192.168.0.9:9200"]
index => "secure-%{+YYYY.MM.dd}"
}
}
}
-
sincedb_path => "/dev/null"
: 该参数用来指定 sincedb 文件名, Logstash 会记录自己读取文件内容的偏移量到一个隐藏文件里,默认情况下,下次启动,他会从这个偏移量继续往后读,避免重复读取数据。这个隐藏文件,叫做
$HOME/.sincedb_****
,如果设置为/dev/null
这个 Linux 系统上特殊的空洞文件,那么 logstash 每次重启进程的时候,尝试读取 sincedb 内容,都只会读到空白内容,也就会理解成之前没有过运行记录,就会从初始位置开始读取!
参考
https://blog.51cto.com/u_13760226/2433967 ELK--grok正则分析日志与多日志收集
https://blog.csdn.net/napoay/article/details/62885899 ELK日志处理之使用Grok解析日志
https://developer.aliyun.com/article/152043 Logstash详解之——input模块
https://blog.csdn.net/knight_zhou/article/details/104954098 Logstash Grok详解