import re
import requests
import json
from threading import Thread,Lock
from concurrent.futures import ThreadPoolExecutor list1 = []
list2 = []
code_list = [200,301,302,401] # 定义正确的状态码 class MyThread(Thread):
'''
用来获取线程的值
'''
def __init__(self,func,args=()):
super(MyThread, self).__init__()
self.func = func
self.args = args
def run(self):
self.result = self.func(*self.args)
def get_result(self):
try:
return self.result
except Exception:
return None def get_url():
'''
打开存放url的文件,并将结果返回出去
:return:
'''
try:
with open('hosts.txt','r',encoding='utf-8') as f:
data = f.readlines()
return data
except Exception: # 文件不存在则返回False
return False def verdictUrl():
'''
从hosts.txt文件中取出url,然后进行合法性检测
:return:
'''
url_list = []
comment_list = []
get_url_res = get_url()
if get_url_res:
for data in get_url_res:
url = data.split(',')[0]
comment = data.split(',')[-1]
try:
res = re.search(r'http\w{0,1}://(\w+\.){2}\w+.*', url).group()
url_list.append(res)
comment_list.append(comment)
except Exception:
print('url:%s 有误'%url)
return (url_list,comment_list)
else:
print('文件不存在......') def getStatusCode(url,comment):
'''
获取网站的状态码,并将它返回出去
:param url:
:param comment:
:return:
'''
global list1,list2
try:
res = requests.head(url)
if res.status_code in code_list:
lock.acquire() # 开始添加互斥锁
list1.append(res.status_code)
lock.release()
except requests.exceptions.ConnectionError:
status = 0 # 自定义状态码
lock.acquire()
list2.append(status)
lock.release()
else:
status = res.status_code # 将状态码赋值给status
finally:
return {'url':url,'StatusCode':status,'comment':comment} def sendDingDing(bc):
'''
用来接收getStatusCode的返回值以及钉钉发送消息
:param bc:
:return:
'''
ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=e0bef403aded94c230953384353bc411a7fba57389ebd59bc0e63cc602ec175f'
HEADERS = {
"Content-Type": "application/json ;charset=utf-8"
}
bc = bc.result()
url = bc['url']
status = bc['StatusCode']
comment = bc['comment']
string_textMsg = {
'msgtype': 'text',
'text': { # 自行添加需要的内容
'content': 'url地址:%s\n'
'url名称:%s\n'
'状态码:%s\n'% (url, comment,status)
}
}
string_textMsg = json.dumps(string_textMsg) # 序列化到内存中
res = requests.post(ding_url, data=string_textMsg, headers=HEADERS)
if __name__ == '__main__':
lock = Lock() # 创建锁对象 pool = ThreadPoolExecutor(4) # 线程池
url,comment = verdictUrl()
res = zip(url,comment)
li = []
for i in res:
for j in range(4): # 开启多线程
t = MyThread(getStatusCode,args = (i[0],i[1]))
li.append(t)
t.start()
for t in li:
t.join() if len(list1)>3 or len(list2)>3: # 如果xxxxx,则交给sendDingDing处理
pool.submit(getStatusCode,i[0],i[1]).add_done_callback(sendDingDing)
需要注意的是:
1.需要在当前目录下创建hosts.txt文件,文件内容格式为:
https://www.baidu.com,百度首页
https://www.trc.com,泰然城首页
https://www.jd.com,京东商城
2.ding_url换成自己的钉钉机器人webhook链接,也可以换成微信报警