faust从kafka消费nginx日志流分析告警

faust从kafka消费nginx日志流分析告警

nginx节点日志通过syslog发送至syslog server,syslog server进行格式处理后作为生产者,把日志流send至kafka 对应的topic上。

基于faust框架编写数据流消费程序,从kafka指定的topic上消费数据流,通过stream.filter+lambda表达式,指定错误界别的数据流进行分析,使用域名和ip为key进行计数,当错误超过阈值时发送告警通知相关人员。

import faust
import redis
from feishuRobot import feishuRobot
from datetime import timedelta
from log import logger
from config.config import ReadConfig

try:
    obj = ReadConfig()
    conf = obj.get_config()
    logger.info("load config file successful")
except Exception as r:
    logger.error('Fail to load config file: {reason}', reason=r)

try:
    pool = redis.ConnectionPool(host=conf['redis']['address'], port=conf['redis']['port'], decode_responses=True, password=conf['redis']['password'] )
    obj_r = redis.Redis(connection_pool=pool)

except Exception as r:
    logger.error("Fail to connection redis poll: {reason}", reason=r)

app = faust.App(
    'error_log_alarm',
    store='rocksdb://',
    broker= conf['kafka']['access_broker'],
    stream_wait_empty=False, 
    broker_max_poll_records=conf['kafka']['max_poll'], 
    topic_partitions=1,
    #vaule_type=json,
    #value_serializer='raw',
)

class Transfer(faust.Record):
    from_host_ip: str
    level: str
    #message: str
    reason: str
    logtime: str


#def master_processor(key, events):
    #timestamp = key[1][0]
    #for event in events:


record_error = app.Table(
    'record_error',
    default=int,
    #on_window_close=master_processor,
#).tumbling(timedelta(minutes=1), expires=timedelta(minutes=1)).relative_to_stream()
).tumbling(conf['faust']['window_size'], expires=conf['faust']['expires'], key_index=True).relative_to_stream()


error_topic = app.topic('sec-waf-error-log', value_type=Transfer)

@app.agent(error_topic)
async def greet(stream):
    
    '''
    #async for value in stream.filter(lambda x: x.status == '200' ).group_by(Transfer.from_host_ip):
    async for value in stream:
        master_to_total[value.from_host_ip] += 1
    '''
    #async for value in stream.group_by(Transfer.from_host_ip):
    #upstream timed out (110: Connection timed out) while reading response header from upstream
    #httpApi_action(): httpApi_action[push_count_dict] error: failed to commit the pipelined (push_count_dict) requests: timeout, context: ngx.timer
    #upstream prematurely closed connection while reading response header from upstream
    # connect() failed (111: Connection refused) while connecting to upstream
    #access forbidden by rule,
    #recv() failed (104: Connection reset by peer) while proxying upgraded connection

    try:
        feishu = feishuRobot()
    except Exception as r:
        logger.error("Fail to init feishuRobot object: {reason}", reason=r)

    async for value in stream.filter(lambda x: x.level == "error"):
        #print("attack: ", value)
        record_error['{value.from_host_ip}'] += 1
        

        #域名_ip统计计数
        v = record_error['{value.from_host_ip}']
  
        if v.now() >= 10:
            msg = ""
            err_key = ""

            if "recv() failed (110: Connection timed out) while reading response header from upstream" in value.reason:
                err_key = "error_" + value.from_host_ip + "_Connection_timed_out" 
                msg += "级别: 中\r\n" 

            elif "recv() failed (104: Connection reset by peer) while reading response header from upstream" in value.reason:
                err_key = "error_" + value.from_host_ip + "_Connection_reset_peer" 
                msg += "级别: 中\r\n" 

            elif "upstream prematurely closed connection while reading response header from upstream" in value.reason:
                err_key = "error_" + value.from_host_ip + "_prematurely_closed_connection"
                msg += "级别: 中\r\n" 

            elif "access forbidden by rule" in value.reason:
                err_key = "error_" + value.from_host_ip + "_access_forbidden_rule"
                msg += "级别: 低\r\n" 

            elif "connect() failed (111: Connection refused) while connecting to upstream" in value.reason:
                errr_key = "error_" + value.from_host_ip + "_Connection_refused"
                msg += "级别: 中\r\n" 

            elif "client intended to send too large body" in value.reason:
                err_key = "error_" + value.from_host_ip + "_send_too_large_body"
                msg += "级别: 低\r\n" 

            elif "failed to commit the pipelined (push_count_dict)" in value.reason:
                err_key = "error_" + value.from_host_ip + "_commit_the_pipelined"
                msg += "级别: 低\r\n" 

            elif "could not build optimal server_names_hash" in value.reason: #warning
                err_key = "warn_" + value.from_host_ip + "_optimal_server_names_hash"
                msg += "级别: 低\r\n" 

            elif "no live upstreams while connecting to upstream" in value.reason:
                err_key = "error_" + value.from_host_ip + "_no_live_upstreams"
                msg += "级别: 高\r\n" 

            elif "SSL_do_handshake() failed" in value.reason:
                err_key = "error_" + value.from_host_ip + "_SSL_do_handshake"
                msg += "级别: 高\r\n" 


            if obj_r.get(err_key) is not None:
                #如果存在更新计数统计
                obj_r.set(err_key, str(v.now()))
            else:
                obj_r.set(err_key, str(v.now()))
                obj_r.expire(err_key, conf['redis']['record_expire'])

                msg += "waf节点: " + value.from_host_ip + "\r\n"
                msg += "错误信息" + value.reason + "\r\n"
                msg += "时间: " + value.logtime + "\r\n"
                msg += "错误日志频率: 2分钟" + str(v.now()) + "次\r\n"
                feishu.send_card_text(msg)
               
if __name__ == '__main__':
    app.main()

上一篇:[力扣]24. 两两交换链表中的节点


下一篇:虚拟头节点指针法实现链表删除指定值的节点