批量弱口令升级版本

多线程版本

import requests
from concurrent.futures import ThreadPoolExecutor
import queue
import threading


def save_file(msg):
    with open('res.txt', mode='a', encoding="utf-8") as f:
        f.write(f"{msg}\n")


def check_pass(lock, q):
    url = q.get()
    full_url = f"{url}/login"
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
    }
    data = {
        "userName": "admin",
        "password": "123456"
    }
    try:
        res = requests.post(url=full_url, headers=headers, data=data)
        data_dict = res.json()
    except Exception as e:
        print("<===请求失败===>")
        return

    if data_dict.get("code") == 200 and data_dict.get("msg") == None:
        msg = f"{url} ===> 存在弱口令"
        print(msg)
        with lock:
            save_file(msg)

    else:
        print("<===密码错误===>")


def main():
    lock = threading.RLock()
    q = queue.Queue()
    with ThreadPoolExecutor() as pool:
        with open('ips.txt', mode='r', encoding='utf-8') as f:
            for i in f:
                line = i.strip()
                if "http" not in line:
                    line = f"http://{line}"
                q.put(line)
                pool.submit(check_pass, lock, q)


if __name__ == '__main__':
    main()

协程版本

import asyncio
import aiofiles
import aiohttp


async def save_file(msg):
    async with aiofiles.open('res1.txt', mode='a', encoding="utf-8") as f:
        await f.write(f"{msg}\n")


async def check_pass(url):
    full_url = f"{url}/login"
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
    }
    data = {
        "userName": "admin",
        "password": "123456"
    }
    try:
        async with aiohttp.ClientSession() as session:
            async with session.post(full_url, headers=headers, data=data, verify_ssl=False) as response:
                data_dict = await response.json()
    except Exception as e:
        print("<===请求失败===>")
        return

    if data_dict.get("code") == 200 and data_dict.get("msg") == None:
        msg = f"{url} ===> 存在弱口令"
        print(msg)
        await save_file(msg)

    else:
        print("<===密码错误===>")


async def main():
    async with aiofiles.open("ips.txt", mode="r", encoding="utf-8") as f:
        async for i in f:
            line = i.strip()
            if "http" not in line:
                line = f"http://{line}"
            await check_pass(line)


if __name__ == '__main__':

    fiberObject = main()
    asyncio.run(fiberObject)

上一篇:Dubbo常用功能01--version版本


下一篇:Spring @Async异步方法中的线程隔离