文章目录
使用阿里云的SLB与日志服务的Python sdk实现自动封禁恶意访问用户IP
-
场景说明:
- 使用阿里云的SLB作为所有服务的网关流量入口,且该SLB 开启访问控制黑名单功能与日志收集到阿里云日志服务的功能。
-
解决问题
- 出现很大一批不太正常访问用户IP,在日志服务中心看到基本都是同一个IP访问,状态码基本都是4xx,因此需要开发一个可以在自动封禁IP与指定时间外自动解封的功能。
-
python脚本功能开发
-
该脚本实现了统计5分钟,某个用户IP 出现4xx状态次数 大于1000,则封禁该IP 2小时,2个小时后会自动解封。若是某个IP 需要特殊处理不需要被封禁可以把该IP 添加到whileIPList 列表即可。
-
安装所用到的阿里云的依赖包
pip install -U aliyun-log-python-sdk pip install aliyun-python-sdk-slb pip install aliyunsdkcore
-
- 如何使用:结合定时任务每5分钟运行一次。
注意:该脚本会因为你的日志字段格式调整你的查询sql语句,脚本会需要部分细节的调整。
该详细脚本内容如下,仅供参考:
from aliyun.log import GetProjectLogsRequest,LogClient
import datetime
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkslb.request.v20140515.AddAccessControlListEntryRequest import AddAccessControlListEntryRequest
from aliyunsdkslb.request.v20140515.RemoveAccessControlListEntryRequest import RemoveAccessControlListEntryRequest
from aliyunsdkslb.request.v20140515.DescribeAccessControlListAttributeRequest import DescribeAccessControlListAttributeRequest
import json
endpoint = 'https://cn-hangzhou.log.aliyuncs.com' # 选择与上面步骤创建Project所属区域匹配的Endpoint
access_key_id = '**************' # 使用您的阿里云访问密钥AccessKeyId
access_key = '*****************' # 使用您的阿里云访问密钥AccessKeySecret
project = "*********" #日志中心project ID
AccessControlListID="***********" #slb访问控制列表ID
client = AcsClient(access_key_id, access_key, 'cn-hangzhou')
limit = 1000
whileIPList = ["124.71.133.196","122.9.71.14","116.63.49.58"]
def getBlackIP():
startTime=(datetime.datetime.now() - datetime.timedelta(minutes=5)).strftime("%Y-%m-%d %H:%M:%S")
endTime=(datetime.datetime.now()).strftime("%Y-%m-%d %H:%M:%S")
sql = "select remote_addr,count(*) as c from uki-prod-kong where __date__ > '{0}' and __date__ < '{1}' and status > 308 and status < 500 group by remote_addr order by c DESC LIMIT 5".format(startTime,endTime)
print(sql)
client = LogClient(endpoint, access_key_id, access_key)
req = GetProjectLogsRequest(project,sql )
res = client.get_project_logs(req)
dataList = res.body
print(dataList)
blackIPList = []
for item in dataList:
if int(item['c']) > limit and item["remote_addr"] not in whileIPList:
blackIPList.append(item["remote_addr"])
return blackIPList
def AddAccessControlListEntry(BlackIPList):
addEntryList = []
for item in BlackIPList:
addEntryList.append({"entry": "{}/32".format(item),"comment":"{}".format((datetime.datetime.now()).strftime("%Y-%m-%d %H:%M:%S"))})
request = AddAccessControlListEntryRequest()
request.set_accept_format('json')
request.set_AclId(AccessControlListID)
request.set_AclEntrys(addEntryList)
client.do_action_with_exception(request)
print("已添加黑名单IP:",BlackIPList)
def RemoveAccessControlListEntry(removeEntryList):
delEntryList=[]
for item in removeEntryList:
delEntryList.append({'entry': "{}".format(item)})
#print("删除2小时之前的IP:",delEntryList)
request = RemoveAccessControlListEntryRequest()
request.set_accept_format('json')
request.set_AclId(AccessControlListID)
request.set_AclEntrys(delEntryList)
client.do_action_with_exception(request)
print("已清理过期的黑名单IP:",removeEntryList)
def getRemoveEntryList():
request = DescribeAccessControlListAttributeRequest()
request.set_accept_format('json')
request.set_AclId(AccessControlListID)
response = client.do_action_with_exception(request)
data = response.decode("utf-8")
data = json.loads(data)
delTime = (datetime.datetime.now() - datetime.timedelta(hours=2)).strftime("%Y-%m-%d %H:%M:%S")
removeEntryList=[]
try:
for item in data["AclEntrys"]["AclEntry"]:
if item["AclEntryComment"] != "" and item["AclEntryComment"] < delTime:
removeEntryList.append(item["AclEntryIP"])
return removeEntryList
except KeyError:
print("IP黑名单为null")
return []
def main():
blackIPList = getBlackIP()
RemoveEntryList = getRemoveEntryList()
if len(blackIPList) == 0:
print("暂时没有IP需要被添加到黑名单")
else:
AddAccessControlListEntry(blackIPList)
if len(RemoveEntryList) == 0:
print("暂时没有过期的IP 需要被清理")
else:
RemoveAccessControlListEntry(RemoveEntryList)
if __name__ == '__main__':
main()