指定端口监控流量的输入及输入
1. 安装iftop
yum -y install iftop
2. 编写脚本
#!/bin/bash
while true
do
iftop -nNP -t -s 1 |egrep '[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}:9092|[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}:[0-9]{1,5}' >>/root/test_iftop_log/test-iftop.log
# 配置文件
Iftop_file=/root/test_iftop_log/test-iftop.log
Change_file=/root/test_iftop_log/change_iftop.log
INPUT_File=/root/test_iftop_log/input_iftop.log
OUTPUT_File=/root/test_iftop_log/output_iftop.log
Iftop_Log=/root/test_iftop_log/iftop.log
number_file=/root/test_iftop_log/line_number
Change_numeber_file=/root/test_iftop_log/change_line_number
IFTOP_OUT=/root/test_iftop_log/iftop.out
TIME_LOG=/root/test_iftop_log/iftop_TIME
TIME_DATA=`date +%F-%H-%M`
BAK_Iftop_Log=/root/test_iftop_log/backup/${TIME_DATA}_iftop.log
# Md5sum文件
Md5Sum_Path=/root/test_iftop_log/md5_file/iftop.md5sum
Md5Sum_Change=/root/test_iftop_log/md5_file/change.md5sum
LS=`ls -lh /root/test_iftop_log/iftop.log |awk '{print $5}'`
#
line_number=`cat ${number_file}`
Change_line_numeber=`cat ${Change_numeber_file}`
#Process=`$(ps -ef) |grep -v grep|grep iftop|wc -l`
Md5Sum_judge=`md5sum -c ${Md5Sum_Path} |awk '{print $2}'`
Md5Sum_judge_Change=`md5sum -c ${Md5Sum_Change} |awk '{print $2}'`
if [ ! -f $Md5Sum_Path ];then
md5sum $Iftop_file > $Md5Sum_Path
elif [ ! -f $Change_file ];then
cat ${Iftop_file}|awk 'NF==7{$1=a;print;next}1' $Iftop_file |column -t >${Change_file}
elif [ ! -f $Md5Sum_Change ];then
md5sum $Change_file > $Md5Sum_Change
fi
if [[ ${Md5Sum_judge} = "FAILED" ]];then
rm -rf $Md5Sum_Path
md5sum $Iftop_file > $Md5Sum_Path
awk "NR>$line_number" $Iftop_file|awk 'NF==7{$1=a;print;next}1' |column -t >>${Change_file}
sed -e "s/:/ /g" ${Change_file} -i
# sed -i 's/[<=>]//g' ${Change_file}
cat ${Iftop_file}|wc -l >${number_file}
fi
sed -i "s/B//g" ${Change_file}
sed -i "s/b//g" ${Change_file}
sed -i 's/KB//g' ${Change_file}
sed -i 's/Kb//g' ${Change_file}
sed -i "s/K//g" ${Change_file}
for i in `egrep -o '[0-9]{1,4}\.[0-9]{1,4}MB|[0-9]{1,4}MB' ${Change_file}`
do
num=`awk "BEGIN{print ${i%MB}*1024}"`
sed -i "s/$i/$num/g" ${Change_file}
done
for i in `egrep -o '[0-9]{1,4}\.[0-9]{1,4}MB|[0-9]{1,4}GB' ${Change_file}`
do
num=`awk "BEGIN{print ${i%MB}*1024*1024}"`
sed -i "s/$i/$num/g" ${Change_file}
done
if [[ ${Md5Sum_judge_Change} = "FAILED" ]];then
rm -rf ${Md5Sum_Change}
md5sum ${Change_file} > ${Md5Sum_Change}
sed -n '1~2p' ${Change_file} >${INPUT_File}
sed -n '2~2p' ${Change_file} >${OUTPUT_File}
paste -d '\t' ${INPUT_File} ${OUTPUT_File} > ${IFTOP_OUT}
cat ${IFTOP_OUT}| awk '{print $0" " strftime("%Y-%m-%d-%H:%M:%S",systime())}' >${TIME_LOG}
grep '9092' ${TIME_LOG} >> ${Iftop_Log}
cat ${Change_file}|wc -l >${Change_numeber_file}
fi
sleep 1
done
3. 配置logstash
input {
file {
path => "/root/test_iftop_log/iftop.log"
#start_position => beginning
}
}
filter {
mutate {
split => {"message" => " "}
add_field => { "input_IP" => "%{[message][0]}" }
add_field => { "input_port" => "%{[message][1]}" }
add_field => { "input_symbol" => "%{[message][2]}" }
add_field => { "input_last2s" => "%{[message][3]}" }
add_field => { "input_last10s" => "%{[message][4]}" }
add_field => { "input_last40s" => "%{[message][5]}" }
add_field => { "input_cumulative" => "%{[message][6]}" }
add_field => { "output_IP" => "%{[message][7]}" }
add_field => { "output_port" => "%{[message][8]}" }
add_field => { "output_symbol" => "%{[message][9]}" }
add_field => { "output_last2s" => "%{[message][10]}" }
add_field => { "output_last10s" => "%{[message][11]}" }
add_field => { "output_last40s" => "%{[message][12]}" }
add_field => { "output_cumulative" => "%{[message][13]}" }
add_field => { "logdate" => "%{[message][14]}" }
}
#date {
# match => [ "logdate", "yyyy/MM/dd HH:mm:ss"]
#}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "iotplatform-iftop-out-%{+yyyy-MM-dd}"
}
}
3. 执行脚本启动logstash收集数据
# 后台执行脚本
nohup sh iftop.sh &
# 启动logstash
/opt/logstash/bin/logstash -f /opt/logstash/yum/iftop.conf
4. 效果图展示