log | logstash
1. 安装
1. 安装java
java -version
检查java版本
没有安装则到oracle官网下载java。JDK
wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" wget http://download.oracle.com/otn-pub/java/jdk/8u181-b13/96a7b8442fe848ef90c96a2fad6ed6d1/jdk-8u181-linux-x64.rpm
rpm -ivh jdk-8u181-linux-x64.rpm
安装
验证java版本
[root@ ~]# java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
设置环境变量vim /etc/profile
JAVA_HOME=/usr/java/jdk1.8.0_181-amd64
CLASSPATH=%JAVA_HOME%/lib:%JAVA_HOME%/jre/lib
PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin
export PATH CLASSPATH JAVA_HOME
最后加入上述内容
source /etc/profile
重新载入配置文件
失败也可使用
rpm -qa|grep jdk //查看版本
rpm -e jdk版本 //卸载
下载java
2. 安装logstash
安装key文件rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
更改yum源vim /etc/yum.repos.d/logstash.repo
新建repo
[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
然后安装yum install logstash
/usr/share/logstash/bin/logstash -e 'input { stdin { } } output { stdout {}
安装完毕后测试
{
"host" => "dc",
"@version" => "1",
"message" => "hello world",
"@timestamp" => 2018-07-27T08:21:57.452Z
}
有类似上面结果输出。
也可使用json
模式输出
bin/logstash -e 'input { stdin { } } output { stdout {codec => json} }'
2. 设置logstash
cd /etc/logstash/conf.d
自行添加配置文件
2.1 配置语法
Logstash 用
{}
来定义区域。区域内可以包括插件区域定义,你可以在一个区域内定义多个插件。插件区域内则可以定义键值对设置。-
logstash支持数据类型:
- bool
- string
- number
- array
- hash
-
字段引用:
- 字段是
Logstash::Event
对象的属性。 - 想在 Logstash 配置中使用字段的值,只需要把字段的名字写在中括号
[]
里就行了,这就叫字段引用。 - 对于 嵌套字段(也就是多维哈希表,或者叫哈希的哈希),每层的字段名都写在
[]
里就可以了。比如,你可以从 geoip 里这样获取 longitude 值(是的,这是个笨办法,实际上有单独的字段专门存这个数据的) - logstash 的数组也支持倒序下标,即
[geoip][location][-1]
可以获取数组最后一个元素的值
- 字段是
条件判断
Logstash从 1.3.0 版开始支持条件判断和表达式。
表达式支持下面这些操作符:
equality, etc: ==, !=, <, >, <=, >=
regexp: =~, !~
inclusion: in, not in
boolean: and, or, nand, xor
unary: !()
- 命令行参数
- -e: 快捷运行
- -f:指定配置文件
- -t:测试语法是否正常
- -l:日志输出位置
- -w:工作线程数量
2.2 设置input
- discover_interval: logstash每隔多久检测一次被监听的path下是否有新文件,默认15s
- exclude: 不想被监听的文件列表
- sincedb_path: 定义sincedb配置的路径
- stat_interval, 没隔多久检测一次被监听文件状态(是否有更新),默认1s
- start_position: 从什么位置读取文件数据,默认结束位置,如果需导数据,则设置成beginning,从开始位置读取; 仅在该文件从未被监听过才有效,因为sincedb文件记录了该文件的inode,会从记录过的pos开始读,需要重复读取一个日志,可以删除sincedb文件或定义sincedb_path为/dev/null
- close_older: 一个已经监听中的文件,超过这个值的时间内没更新内容,就关闭监听它的文件句柄,默认3600s
- ingore_older: 每次检测文件列表的时候,如果一个文件的自后修改时间超过该值,就忽略这文件,默认86400s即1天
2.3 filter
2.3.1 grok正则捕获
可以在 grok 里预定义好命名正则表达式,在稍后(grok参数或者其他正则表达式里)引用它。
Grok 的语法规则是%{语法 : 语义}
。完整语法结构%{PATTERN_NAME:capture_name:data_type}
data_type 目前只支持两个值:int
和 float
“语法”指的就是匹配的模式,例如使用 NUMBER 模式可以匹配出数字,IP 则会匹配出 127.0.0.1 这样的 IP 地址:
如果你把 "message" 里所有的信息都 grok 到不同的字段了,数据实质上就相当于是重复存储了两份。所以你可以用 remove_field
参数来删除掉 message 字段,或者用overwrite
参数来重写默认的 message 字段,只保留最重要的部分。
使用 Grok 的 overwrite
参数也可以覆盖日志中的信息
%{IP:clientip}\s+%{IP:clientip1}...,如果SEMANTIC定义的相同名字,结果为数组形式
"clientip" => [
[0] "12.12.12.12",
[1] "32.32.32.32"
]
自定义grok表达式
语法:(?<field_name>the pattern here)
多行匹配可具体写法是在表达式开始位置加 (?m)
标记
如果日志有多重格式,则可以写多个匹配,然后logstash 会按照这个定义次序依次尝试匹配,到匹配成功为止。效果和用|
效果一致。
2.3.2 数据修改mutate
filters/mutate 插件是 Logstash 另一个重要插件。它提供了丰富的基础类型数据处理能力。包括类型转换,字符串处理和字段处理等。
filter 区段之内,是顺序执行的。所以我们最后看到的输出结果是:
类型转换
关键字convert
。可实现浮点、字符串、整型的相互转换。-
字符串处理
- gsub:
- split:分割字符串,结果为数组的形式
- join:仅对数组类型有效
- merge:合并两个数组或hash字段