1.背景
应用日志输入到kafka中,logstash对接kafka,然后kibana中展示日志。
Kafka和elk的安装在前文中
2.实现
1)创建配置文件
logstash对接kafka就是一个配置文件的事。
进入logstash的config文件夹
创建文件,并编辑
vim kafkasconf
添加以下内容:
input {
kafka{
bootstrap_servers => ["10.XX.XX.XX:9092,10.XX.XX.XX:9092, 10.XX.XX.XX:9092"]
client_id => "logback-kafka"
auto_offset_reset => "latest"
consumer_threads => 5
decorate_events => true
topics => ["app-logs"]
codec => "json"
}
}
filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp}\s+%{LOGLEVEL:severity}\s+\[%{DATA:service},%{DATA:trace},%{DATA:span},%{DATA:exportable}\]\s+%{DATA:pid}\s+---\s+\[%{DATA:thread}\]\s+%{DATA:class}\s+:\s+%{GREEDYDATA:rest}" }
}
}
output {
elasticsearch {
hosts => ["10.XX.XX.XX:9200"]
}
}
说明:
bootstrap_servers:kafka的IP和端口,有多个时,中间逗号隔开
topics:应用放入kafka日志时创建的
filter:logstash过滤规则
output:logstash对接的elasticsearch信息
保存修改,退出。
2)重启logstash
查询logstash进程号
ps -ef|grep logstash
杀掉进程
Kill -9 XX
进入logstash的bin目录,启动logstash
nohup ./logstash -f ../config/kafkas.conf &
3)验证
登录kibana
先在management中创建index
然后在discover中查看是否有日志输出
哈哈