将waf日志导入到elasticsearch及mongodb中

目的

<p>将F5的waf日志导入到elasticsearch以及mongodb中,方便后期查看以及制作监控报表。

logstash上的配置

input {
        syslog {
        port => 515
  }
}
filter {
        grok {
                match => {
                        message => ".*dvchost=(?<dvchost>[^\s]+)\sdvc=(?<dvc>[^\s]+)\scs1=(?<policy_name>[^\s]+)\scs1Label=policy_name\scs2=(?<http_class_name>[^\s]+)\scs2Label=http_class_name\sdeviceCustomDate1=(?<policy_apply_date>\w+\s\d{2}\s\d{4}\s\d{2}:\d{2}:\d{2})\sdeviceCustomDate1Label=policy_apply_date\sexternalId=(?<externalId>\d{1,})\sact=(?<act>\w+)\scn1=(?<response_code>\d{1,})\scn1Label=response_code\***c=(?<clientip>[^s]+)\sspt=(?<clientPort>\d{1,})\sdst=(?<targetip>[^\s]+)\sdpt=(?<targetport>\d{1,})\srequestMethod=(?<requestMethod>\w+)\sapp=(?<app>\w+)\scs5=(?<x_forwarded_for_header_value>([^\s]+|\w+))\scs5Label=x_forwarded_for_header_value\srt=(?<time>\w+\s\d{2}\s\d{4}\s\d{2}:\d{2}:\d{2})\sdeviceExternalId=(?<deviceExternalId>\d{1,})\scs4=(?<attack_type>.*)\scs4Label=attack_type\scs6=(?<geo_location>.*)\scs6Label=geo_location\sc6a1=(?<device_address>.*)\sc6a1Label=device_address\sc6a2=(?<source_address>.*)\sc6a2Label=source_address\sc6a3=(?<destination_address>.*)\sc6a3Label=destination_address\sc6a4=(?<ip_address>.*)\sc6a4Label=ip_address_intelligence\smsg=(?<msg>.*)\ssuid=(?<suid>[^\s]+)\ssuser=(?<suer>.*)\scn2=(?<violation_rating>.*)\scn2Label=violation_rating\scn3=(?<device_id>.*)\scn3Label=device_id\srequest=(?<request>.*)\scs3Label=full_request\scs3=\w+\s(?<full_request>.*)"
                }
        }
        date {
                match => ["time","MMM dd yyyy HH:mm:ss"]
                target => "@timestamp"
                timezone => "Asia/Shanghai"
        }
        ruby {
                code => "event.set(‘timestamp‘,event.get(‘@timestamp‘))"    //将@timestamp的值赋值给timestamp字段
        }
        ruby {
                code => "event.set(‘timetemp‘, event.get(‘@timestamp‘).time.localtime + 8*60*60)" //设置timetemp的值为@timstamp的值+8小时
        }
        ruby {
                code => "event.set(‘@timestamp‘,event.get(‘timetemp‘))" //将timetemp的值赋给@timestamp
        }
        mutate {
                 remove_field => ["timetemp"]
        }

}

output {
        stdout { codec => rubydebug }
        file {
                path => "/opt/logs/waf/wafLog-%{+YYYY.MM.dd}"
                codec => line { format => "custom format: %{message}"}

        }
       mongodb {
               isodate => true   //默认为false,如果设置为true,@timestamp将作为isodate的时间格式插入到数据库中,否则将会已字符串的形式插入
               collection => "waflogs"
               database => "logs"
               uri => "mongodb://127.0.0.1:27017"
       }
        elasticsearch {
                hosts => ["192.168.1.2:9200","192.168.1.3:9200","192.168.1.4:9200"]
                index => "waflog-%{+YYYY.MM.dd}"
        }
}

注意事项及说明

  1. 在F5设备上设置日志通过syslog的方式输出到syslog服务器。在rsyslog服务器上启动一个logstash的进程,使用syslog模块,设置端口为515,这样logstash就可以收到waf上的日志。如果输出到默认的rsyslog服务上,需要设置rsyslog服务器才能看到日志(还没用研究出来如何设置)。logstash上正则表达式的配置则根据日志情况进行配置。
  2. @timestamp的时间默认是比当前时间早8小时,为了能在mongodb中正确显示时间,所以在filter模块中将@timestamp的时间设置成当前时间。但是在kibana中如果用@timestamp作为时间轴,会自动将@timestamp的时间+8小时,会导致kibana上显示异常。所以用了timestamp时间作为kibana上的显示时间。这样即解决了mongodb插入的时间问题,也解决了kibana上的显示问题。

将waf日志导入到elasticsearch及mongodb中

上一篇:一键安装数据库


下一篇:PL/SQL system 作为sysdba登录报错