<p>将F5的waf日志导入到elasticsearch以及mongodb中,方便后期查看以及制作监控报表。
logstash上的配置
input {
syslog {
port => 515
}
}
filter {
grok {
match => {
message => ".*dvchost=(?<dvchost>[^\s]+)\sdvc=(?<dvc>[^\s]+)\scs1=(?<policy_name>[^\s]+)\scs1Label=policy_name\scs2=(?<http_class_name>[^\s]+)\scs2Label=http_class_name\sdeviceCustomDate1=(?<policy_apply_date>\w+\s\d{2}\s\d{4}\s\d{2}:\d{2}:\d{2})\sdeviceCustomDate1Label=policy_apply_date\sexternalId=(?<externalId>\d{1,})\sact=(?<act>\w+)\scn1=(?<response_code>\d{1,})\scn1Label=response_code\***c=(?<clientip>[^s]+)\sspt=(?<clientPort>\d{1,})\sdst=(?<targetip>[^\s]+)\sdpt=(?<targetport>\d{1,})\srequestMethod=(?<requestMethod>\w+)\sapp=(?<app>\w+)\scs5=(?<x_forwarded_for_header_value>([^\s]+|\w+))\scs5Label=x_forwarded_for_header_value\srt=(?<time>\w+\s\d{2}\s\d{4}\s\d{2}:\d{2}:\d{2})\sdeviceExternalId=(?<deviceExternalId>\d{1,})\scs4=(?<attack_type>.*)\scs4Label=attack_type\scs6=(?<geo_location>.*)\scs6Label=geo_location\sc6a1=(?<device_address>.*)\sc6a1Label=device_address\sc6a2=(?<source_address>.*)\sc6a2Label=source_address\sc6a3=(?<destination_address>.*)\sc6a3Label=destination_address\sc6a4=(?<ip_address>.*)\sc6a4Label=ip_address_intelligence\smsg=(?<msg>.*)\ssuid=(?<suid>[^\s]+)\ssuser=(?<suer>.*)\scn2=(?<violation_rating>.*)\scn2Label=violation_rating\scn3=(?<device_id>.*)\scn3Label=device_id\srequest=(?<request>.*)\scs3Label=full_request\scs3=\w+\s(?<full_request>.*)"
}
}
date {
match => ["time","MMM dd yyyy HH:mm:ss"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
ruby {
code => "event.set(‘timestamp‘,event.get(‘@timestamp‘))" //将@timestamp的值赋值给timestamp字段
}
ruby {
code => "event.set(‘timetemp‘, event.get(‘@timestamp‘).time.localtime + 8*60*60)" //设置timetemp的值为@timstamp的值+8小时
}
ruby {
code => "event.set(‘@timestamp‘,event.get(‘timetemp‘))" //将timetemp的值赋给@timestamp
}
mutate {
remove_field => ["timetemp"]
}
}
output {
stdout { codec => rubydebug }
file {
path => "/opt/logs/waf/wafLog-%{+YYYY.MM.dd}"
codec => line { format => "custom format: %{message}"}
}
mongodb {
isodate => true //默认为false,如果设置为true,@timestamp将作为isodate的时间格式插入到数据库中,否则将会已字符串的形式插入
collection => "waflogs"
database => "logs"
uri => "mongodb://127.0.0.1:27017"
}
elasticsearch {
hosts => ["192.168.1.2:9200","192.168.1.3:9200","192.168.1.4:9200"]
index => "waflog-%{+YYYY.MM.dd}"
}
}
注意事项及说明
- 在F5设备上设置日志通过syslog的方式输出到syslog服务器。在rsyslog服务器上启动一个logstash的进程,使用syslog模块,设置端口为515,这样logstash就可以收到waf上的日志。如果输出到默认的rsyslog服务上,需要设置rsyslog服务器才能看到日志(还没用研究出来如何设置)。logstash上正则表达式的配置则根据日志情况进行配置。
- @timestamp的时间默认是比当前时间早8小时,为了能在mongodb中正确显示时间,所以在filter模块中将@timestamp的时间设置成当前时间。但是在kibana中如果用@timestamp作为时间轴,会自动将@timestamp的时间+8小时,会导致kibana上显示异常。所以用了timestamp时间作为kibana上的显示时间。这样即解决了mongodb插入的时间问题,也解决了kibana上的显示问题。