中间加了一层转换,日志从filebeat采集进入logstash,然后将日志中的时间字段覆盖到es的@timestamp字段,kibana会将这个字段当做是日志的上传时间
input { beats { port => 10515 } } filter{ if "xxx" in [tags]{ grok { match => ["message","%{TIMESTAMP_ISO8601:log.date}"] } date { match => ["log.date", "yyyy-MM-dd HH:mm:ss,SSS"] target => "@timestamp" } } if "xxx" in [tags]{ grok { match => ["message","%{TIMESTAMP_ISO8601:timestamp8601}"] } date { match => ["timestamp8601", "yyyy-MM-dd HH:mm:ss.SSS"] target => "@timestamp" } } if "xxx" in [tags]{ grok { match => ["message","%{TIMESTAMP_ISO8601:timestamp8601}"] } date { match => ["timestamp8601", "yyyy-MM-dd HH:mm:ss.SSS"] target => "@timestamp" } } } output { if "xxx" in [tags]{ elasticsearch { hosts => ["http://xxx:9200"] #index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" index => "xxx-%{+YYYY.MM}" user => "xxx" password => "xxx" } } if "xxx" in [tags]{ elasticsearch { hosts => ["http://xxx:9200"] #index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" index => "xxx-%{+YYYY.MM}" user => "xxx" password => "xxx" } } if "xxx" in [tags]{ elasticsearch { hosts => ["http://xxx:9200"] #index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" index => "xxx-%{+YYYY.MM}" user => "xxx" password => "xxx" } } }