使用阿里云的SLB与日志服务的Python sdk实现自动封禁恶意访问用户IP

文章目录

使用阿里云的SLB与日志服务的Python sdk实现自动封禁恶意访问用户IP

  • 场景说明
    • 使用阿里云的SLB作为所有服务的网关流量入口,且该SLB 开启访问控制黑名单功能与日志收集到阿里云日志服务的功能。
  • 解决问题
    • 出现很大一批不太正常访问用户IP,在日志服务中心看到基本都是同一个IP访问,状态码基本都是4xx,因此需要开发一个可以在自动封禁IP与指定时间外自动解封的功能。
  • python脚本功能开发
    • 该脚本实现了统计5分钟,某个用户IP 出现4xx状态次数 大于1000,则封禁该IP 2小时,2个小时后会自动解封。若是某个IP 需要特殊处理不需要被封禁可以把该IP 添加到whileIPList 列表即可。

    • 安装所用到的阿里云的依赖包

    pip  install -U aliyun-log-python-sdk
    pip  install  aliyun-python-sdk-slb
    pip  install aliyunsdkcore
    
  • 如何使用:结合定时任务每5分钟运行一次。
    注意:该脚本会因为你的日志字段格式调整你的查询sql语句,脚本会需要部分细节的调整。
    该详细脚本内容如下,仅供参考:
from aliyun.log import    GetProjectLogsRequest,LogClient
import datetime
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkslb.request.v20140515.AddAccessControlListEntryRequest import AddAccessControlListEntryRequest
from aliyunsdkslb.request.v20140515.RemoveAccessControlListEntryRequest import RemoveAccessControlListEntryRequest
from aliyunsdkslb.request.v20140515.DescribeAccessControlListAttributeRequest import DescribeAccessControlListAttributeRequest
import json


endpoint = 'https://cn-hangzhou.log.aliyuncs.com'  # 选择与上面步骤创建Project所属区域匹配的Endpoint
access_key_id = '**************'  # 使用您的阿里云访问密钥AccessKeyId
access_key = '*****************'  # 使用您的阿里云访问密钥AccessKeySecret
project = "*********"  #日志中心project ID
AccessControlListID="***********"  #slb访问控制列表ID
client = AcsClient(access_key_id, access_key, 'cn-hangzhou')

limit = 1000
whileIPList = ["124.71.133.196","122.9.71.14","116.63.49.58"]

def getBlackIP():
    startTime=(datetime.datetime.now() - datetime.timedelta(minutes=5)).strftime("%Y-%m-%d %H:%M:%S")
    endTime=(datetime.datetime.now()).strftime("%Y-%m-%d %H:%M:%S")
    sql = "select  remote_addr,count(*) as c from uki-prod-kong where __date__ > '{0}' and __date__ < '{1}' and status > 308 and status < 500  group by remote_addr order by  c DESC LIMIT 5".format(startTime,endTime)
    print(sql)
    client = LogClient(endpoint, access_key_id, access_key)
    req = GetProjectLogsRequest(project,sql )
    res = client.get_project_logs(req)
    dataList =  res.body
    print(dataList)
    blackIPList = []
    for item in dataList:
       if  int(item['c']) > limit and item["remote_addr"] not in  whileIPList:
           blackIPList.append(item["remote_addr"])
    return blackIPList

def AddAccessControlListEntry(BlackIPList):
    addEntryList = []
    for item  in BlackIPList:
        addEntryList.append({"entry": "{}/32".format(item),"comment":"{}".format((datetime.datetime.now()).strftime("%Y-%m-%d %H:%M:%S"))})
    request = AddAccessControlListEntryRequest()
    request.set_accept_format('json')
    request.set_AclId(AccessControlListID)
    request.set_AclEntrys(addEntryList)
    client.do_action_with_exception(request)
    print("已添加黑名单IP:",BlackIPList)

def RemoveAccessControlListEntry(removeEntryList):
    delEntryList=[]
    for item in removeEntryList:
        delEntryList.append({'entry': "{}".format(item)})
    #print("删除2小时之前的IP:",delEntryList)
    request = RemoveAccessControlListEntryRequest()
    request.set_accept_format('json')
    request.set_AclId(AccessControlListID)
    request.set_AclEntrys(delEntryList)
    client.do_action_with_exception(request)
    print("已清理过期的黑名单IP:",removeEntryList)

def  getRemoveEntryList():
    request = DescribeAccessControlListAttributeRequest()
    request.set_accept_format('json')
    request.set_AclId(AccessControlListID)
    response = client.do_action_with_exception(request)
    data = response.decode("utf-8")
    data = json.loads(data)
    delTime = (datetime.datetime.now() - datetime.timedelta(hours=2)).strftime("%Y-%m-%d %H:%M:%S")
    removeEntryList=[]
    try:
        for item in data["AclEntrys"]["AclEntry"]:
           if item["AclEntryComment"] != "" and item["AclEntryComment"] < delTime:
               removeEntryList.append(item["AclEntryIP"])
        return  removeEntryList
    except KeyError:
        print("IP黑名单为null")
        return []

def  main():
    blackIPList = getBlackIP()
    RemoveEntryList = getRemoveEntryList()

    if  len(blackIPList) == 0:
        print("暂时没有IP需要被添加到黑名单")
    else:
        AddAccessControlListEntry(blackIPList)
    if len(RemoveEntryList) == 0:
        print("暂时没有过期的IP 需要被清理")
    else:
        RemoveAccessControlListEntry(RemoveEntryList)

if __name__ == '__main__':
    main()

上一篇:SAE 场景下,应用流量的负载均衡及路由策略配置实践


下一篇:Oracle Cloud Applications:NetSuite构建了第一套统一的云应用程序套件