faust从kafka消费nginx日志流分析告警
nginx节点日志通过syslog发送至syslog server,syslog server进行格式处理后作为生产者,把日志流send至kafka 对应的topic上。
基于faust框架编写数据流消费程序,从kafka指定的topic上消费数据流,通过stream.filter+lambda表达式,指定错误界别的数据流进行分析,使用域名和ip为key进行计数,当错误超过阈值时发送告警通知相关人员。
import faust
import redis
from feishuRobot import feishuRobot
from datetime import timedelta
from log import logger
from config.config import ReadConfig
try:
obj = ReadConfig()
conf = obj.get_config()
logger.info("load config file successful")
except Exception as r:
logger.error('Fail to load config file: {reason}', reason=r)
try:
pool = redis.ConnectionPool(host=conf['redis']['address'], port=conf['redis']['port'], decode_responses=True, password=conf['redis']['password'] )
obj_r = redis.Redis(connection_pool=pool)
except Exception as r:
logger.error("Fail to connection redis poll: {reason}", reason=r)
app = faust.App(
'error_log_alarm',
store='rocksdb://',
broker= conf['kafka']['access_broker'],
stream_wait_empty=False,
broker_max_poll_records=conf['kafka']['max_poll'],
topic_partitions=1,
#vaule_type=json,
#value_serializer='raw',
)
class Transfer(faust.Record):
from_host_ip: str
level: str
#message: str
reason: str
logtime: str
#def master_processor(key, events):
#timestamp = key[1][0]
#for event in events:
record_error = app.Table(
'record_error',
default=int,
#on_window_close=master_processor,
#).tumbling(timedelta(minutes=1), expires=timedelta(minutes=1)).relative_to_stream()
).tumbling(conf['faust']['window_size'], expires=conf['faust']['expires'], key_index=True).relative_to_stream()
error_topic = app.topic('sec-waf-error-log', value_type=Transfer)
@app.agent(error_topic)
async def greet(stream):
'''
#async for value in stream.filter(lambda x: x.status == '200' ).group_by(Transfer.from_host_ip):
async for value in stream:
master_to_total[value.from_host_ip] += 1
'''
#async for value in stream.group_by(Transfer.from_host_ip):
#upstream timed out (110: Connection timed out) while reading response header from upstream
#httpApi_action(): httpApi_action[push_count_dict] error: failed to commit the pipelined (push_count_dict) requests: timeout, context: ngx.timer
#upstream prematurely closed connection while reading response header from upstream
# connect() failed (111: Connection refused) while connecting to upstream
#access forbidden by rule,
#recv() failed (104: Connection reset by peer) while proxying upgraded connection
try:
feishu = feishuRobot()
except Exception as r:
logger.error("Fail to init feishuRobot object: {reason}", reason=r)
async for value in stream.filter(lambda x: x.level == "error"):
#print("attack: ", value)
record_error['{value.from_host_ip}'] += 1
#域名_ip统计计数
v = record_error['{value.from_host_ip}']
if v.now() >= 10:
msg = ""
err_key = ""
if "recv() failed (110: Connection timed out) while reading response header from upstream" in value.reason:
err_key = "error_" + value.from_host_ip + "_Connection_timed_out"
msg += "级别: 中\r\n"
elif "recv() failed (104: Connection reset by peer) while reading response header from upstream" in value.reason:
err_key = "error_" + value.from_host_ip + "_Connection_reset_peer"
msg += "级别: 中\r\n"
elif "upstream prematurely closed connection while reading response header from upstream" in value.reason:
err_key = "error_" + value.from_host_ip + "_prematurely_closed_connection"
msg += "级别: 中\r\n"
elif "access forbidden by rule" in value.reason:
err_key = "error_" + value.from_host_ip + "_access_forbidden_rule"
msg += "级别: 低\r\n"
elif "connect() failed (111: Connection refused) while connecting to upstream" in value.reason:
errr_key = "error_" + value.from_host_ip + "_Connection_refused"
msg += "级别: 中\r\n"
elif "client intended to send too large body" in value.reason:
err_key = "error_" + value.from_host_ip + "_send_too_large_body"
msg += "级别: 低\r\n"
elif "failed to commit the pipelined (push_count_dict)" in value.reason:
err_key = "error_" + value.from_host_ip + "_commit_the_pipelined"
msg += "级别: 低\r\n"
elif "could not build optimal server_names_hash" in value.reason: #warning
err_key = "warn_" + value.from_host_ip + "_optimal_server_names_hash"
msg += "级别: 低\r\n"
elif "no live upstreams while connecting to upstream" in value.reason:
err_key = "error_" + value.from_host_ip + "_no_live_upstreams"
msg += "级别: 高\r\n"
elif "SSL_do_handshake() failed" in value.reason:
err_key = "error_" + value.from_host_ip + "_SSL_do_handshake"
msg += "级别: 高\r\n"
if obj_r.get(err_key) is not None:
#如果存在更新计数统计
obj_r.set(err_key, str(v.now()))
else:
obj_r.set(err_key, str(v.now()))
obj_r.expire(err_key, conf['redis']['record_expire'])
msg += "waf节点: " + value.from_host_ip + "\r\n"
msg += "错误信息" + value.reason + "\r\n"
msg += "时间: " + value.logtime + "\r\n"
msg += "错误日志频率: 2分钟" + str(v.now()) + "次\r\n"
feishu.send_card_text(msg)
if __name__ == '__main__':
app.main()