方法一: #coding=utf-8
import urllib2
import threading
from time import ctime,sleep
print "Start-Time : %s" %ctime()
f = open("ip.txt","r")
lines = f.readlines()
for line in lines:
url = 'http://'+line.strip('\n')+':10000'
try:
r = urllib2.urlopen(url,timeout=1)
if r.getcode() == 200:
print url
infile = open("infiles.txt","a")
infile.write(url+'\n')
infile.close()
except Exception as e :
print e
f.close()
print "Stop--Time : %s" %ctime() 方法二:
#coding=utf-8
#port scan
import os
import subprocess
ip = open('ip.txt')
data = ip.readlines()
for line in data:
line = line.strip()
cmd = '/root/Desktop/tools/masscan/bin/masscan -p10000 %s' %line
for i in os.popen(cmd):
f = open("abc.txt","a")
f.write(i)
f.close()
ip.close() 批量探测域名列表中指定端口的开放情况(域名列表,只包含域名,例如www.baidu.com 而非http://www.baidu.com/pic)扫描完毕后在当前目录生产指定端口的文本文件。#coding=utf8
import os
import sys
import Queue
import socket
import getopt
import logging
import requests
import threading
logging.basicConfig(
level=logging.WARNING,
format="[%(asctime)s] %(message)s"
)
def test_port(host, port):
logging.warning("trying %s" % host)
try:
ip = socket.gethostbyname(host)
except:
pass
else:
if ip:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
indicator = sock.connect_ex((ip, port))
except:
if sock:
sock.close()
else:
if indicator == 0:
sock.close()
with open(("%d.txt" % port), "a") as f:
f.write("%s:%d (%s) open!!\n" % (ip, port, host))
class BatchThreads(threading.Thread):
def __init__(self, queue, _port):
super(BatchThreads, self).__init__()
self.queue = queue
self.port = _port
def run(self):
while True:
if self.queue.empty():
break
else:
try:
url = self.queue.get()
#struts2_all(url)
test_port(url, self.port)
except:
break
def batch_queue(_port, _file, _queue, _thread_number):
with open(_file) as f:
urls = [line.strip() for line in f.readlines()]
urls = set(filter(lambda url: url and not url.startswith("#"), urls))
if urls:
# with open("urls.txt", "w") as ft:
# ft.write("\n".join(urls))
for url in urls:
queue.put(url)
logging.warning("total %d" % queue.qsize())
if _thread_number > (queue.qsize() / 2):
_thread_number = queue.qsize() / 2
for _ in xrange(_thread_number):
threads.append(BatchThreads(_queue, _port))
for t in threads:
t.start()
for t in threads:
t.join()
def usage():
print '''Usage: python %s [option]
All Struts2 Vulnerable Test
-h scan a single host
-f scan from a file
-p port
''' % os.path.basename(sys.argv[0])
if __name__ == '__main__':
global threads
threads = []
queue = Queue.Queue()
thread_number = 100
port = 8080
filename = ''
target = ''
if not len(sys.argv[1:]):
exit(usage())
try:
opts, args = getopt.getopt(sys.argv[1:], 'u:f:p:')
except getopt.GetoptError as err:
exit(usage())
else:
for name, value in opts:
if name == '-u':
target = value
#struts2_all(value)
if name == '-p':
port = int(value)
if name == '-f':
filename = value
if target:
test_port(target,port)
if filename : batch_queue(port,filename, queue, thread_number)