简单记录爬虫例子

1、 简单了解爬虫

# from urllib.request import urlopen
#
# url = "https://www.baidu.com/"
# resp = urlopen(url)
# with open("mybaidu.html", mode="w") as f:
#     f.write(resp.read().decode("utf-8"))
#
# print("over!")

############################精简的get请求#############

import requests
# name = input("请输入要查询的名字:")
# url = f'https://www.sogou.com/web?query={name}'
url = "http://www.xinfadi.com.cn/priceDetail.html"
head = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36"
    # "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36"
}


resp = requests.get(url, headers=head)
# print(resp)
print(resp.text)
resp.close()

############################精简的post请求#############

# import requests
# name = input("请输入要查询的名字:")
# url = 'https://www.sogou.com/web'
# head = {
#     "kw": name
# }
#
# resp = requests.get(url, data=head)
# print(resp.json())
# resp.close()

2、 爬图片例子

import requests
# 需要下载 pip install bs4
from bs4 import BeautifulSoup
import time
url = "https://www.umei.cc/bizhitupian/weimeibizhi/"
resp = requests.get(url)
resp.encoding = 'utf-8'
# 把源码交给bs
main_page = BeautifulSoup(resp.text, "html.parser")
alist = main_page.find("div", class_="TypeList").find_all("a")
for a in alist:
    href = a.get('href')  # 通过get拿去属性值
    # 拿到子页面代码
    child_page_resp = requests.get(href)
    child_page_resp.encoding = 'utf-8'
    child_page_text = child_page_resp.text
    # 拿到下载路径
    child_page = BeautifulSoup(child_page_text, "html.parser")
    p = child_page.find("p", align="center")
    img = p.find("img")
    src = img.get("src")
    # 下载图片
    img_resp = requests.get(src)
    # 拿到url中最后一个/后的内容
    img_name = src.split("/")[-1]
    with open("img/"+img_name, mode="wb") as f:
        f.write(img_resp.content)  # 将图片写入文件
        f.close()
    print("over!", img_name)
    time.sleep(1)  # 休息1秒钟

# 使用etree需要下载 pip install lxml
# from lxml import etree
# html = etree.HTML(resp.text)
# divs = html.xpath("html/div/span")

3、线程池例子

import requests

from lxml import etree
import csv
from concurrent.futures import ThreadPoolExecutor

f = open("data.csv", mode="w", encoding="utf-8")
csvwriter = csv.writer(f)

head = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36"
}

def download_one_page(url):
    resp = requests.get(url, headers=head)
    html = etree.HTML(resp.text)
    table = html.xpath("/html/body/div[2]/div/div/div/div[4]/div[1]/div/table")[0]
    # trs = table.xpath("./tr")[1:]
    # print(resp.text)
    # print(table)
    trs = table.xpath("./tr[position()>1]")
    # print(len(trs))
    # 拿到每个tr
    for tr in trs:
        txt = tr.xpath("./td/text()")
        # 对数据做简单的处理:
        txt = (item.replace("\\", "").replace("/", "") for item in txt)
        # 把数据存放在文件中
        csvwriter.writerow(txt)
        print(url, "提取完毕!")
    resp.close()


if __name__ == '__main__':
    # download_one_page("http://www.xinfadi.com.cn/priceDetail.html")
    # 创建线程池
    with ThreadPoolExecutor(50) as t:
        for i in range(1, 200):
            # 把下载任务交给线程池
            t.submit(download_one_page, f"http://www.xinfadi.com.cn/{i}.html")

4、爬一部小说


# 得到所有章节
# https://dushu.baidu.com/api/pc/getCatalog?data={"book_id": "4306063500"}
# 章节内容
# https://dushu.baidu.com/api/pc/getChapterContent?data={"book_id":"4306063500","cid":"4306063500|1569782244","need_bookinfo":1}

import requests
import asyncio
import aiohttp
import aiofiles
import json

"""
步骤:
    1. 同步操作:拿到所有章节
    2. 异步操作: 下载所有文件内容
"""

async def aiodownload(cid, b_id, title):
    data = {
        "book_id": b_id,
        "cid": f"{b_id}|{cid}",
        "need_bookinfo": 1
    }
    data = json.dumps(data)
    url = f"https://dushu.baidu.com/api/pc/getChapterContent?data={data}"
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            # 请求后得到内容
            dic = await resp.json()
            # 写入小说内容
            async with aiofiles.open(f'novel/{title}', mode="w", encoding="utf-8") as f:
                await f.write(dic['data']['novel']['content'])


async def getCatalog(url):
    resp = requests.get(url)
    dic = resp.json()
    tasks = []
    for item in dic['data']['novel']['items']:
        # 找出每一个章节的cid
        title = item['title']
        cid = item['cid']
        # 准备异步操作
        task = asyncio.create_task(aiodownload(cid, b_id, title))
        tasks.append(task)
    await asyncio.wait(tasks)

if __name__ == '__main__':
    b_id = "4306063500"
    url = 'https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"' + b_id + '"}'
    asyncio.run(getCatalog(url))

5、简单了解爬视频

"""
流程:
    1. 拿到54812-1-1.html的页面源代码
    2. 从源代码中提取m3u8的url
    3. 下载m3u8
    4. 读取m3u8文件,下载视频
    5. 合并视频
创建文件夹 标记为 excluded
"""

import requests
import re

# headers = {
#     "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36"
# }
# # 用来提取地址
# obj = re.compile(r"url: '(?P<url>.*?)',", re.S)
#
# url = "http://91kanju2.com/vod-play/54812-1-1.html"
# resp = requests.get(url, headers=headers)
# # 拿到地址
# m3u8_url = obj.search(resp.text).group("url")
#
# # print(m3u8_url)
# resp.close()
#
# # 下载m3u8文件
# resp2 = requests.get(m3u8_url, headers=headers)
#
# with open("哲仁王后.m3u8", mode="wb") as f:
#     f.write(resp2.content)
#
# resp2.close()
# print("下载完毕")

# 解析m3u8文件
n = 1
with open("哲仁王后.m3u8", mode="r", encoding="utf-8") as f:
    for line in f:
        line = line.strip() # 先去掉空格
        if line.startswith("#"):
            continue
        # 下载片段
        resp3 = requests.get(line)
        f = open(f"video/{n}.ts", mode="wb")
        f.write(resp3.content)
        f.close()
        resp3.close()
        n += 1

6、多线程版爬视频例子

"""
思路:
    1. 拿到页面的源代码
    2. 从iframe的页面源代码中拿到m3u8文件
    3. 下载第一层m3u8文件 -> 下载第二层m3u8文件(视频的存放路径)
    4. 下载视频
    5. 下载密钥,进行解密
    6. 合并所有ts文件作为mp4文件
"""
import requests
# 直接找到iframe模块,用bs4
from bs4 import BeautifulSoup
# 从script中那代码,用re
import re
# 进行异步操作
import asyncio
import aiohttp
import aiofiles
from Crypto.Cipher import AES
import os

def get_iframe_src(url):
    resp = requests.get(url)
    main_page = BeautifulSoup(resp.text, "html.parser")
    # 只有一个iframe标签
    src = main_page.find("iframe").get("src")
    resp.close()
    return src

def download_m3u8_file(url, name):
    resp = requests.get(url)
    with open(name, mode="wb") as f:
        f.write(resp.text)

def get_first_m3u8_url(url):
    resp = requests.get(url)
    obj = re.compile(r'var main = "(?P<m3u8_url>.*?)"', re.S)
    m3u8_url = obj.search(resp.text).group("m3u8_url")
    resp.close()
    return m3u8_url

async def download_ts(url, name, session):
    async with session.get(url) as resp:
        async with aiofiles.open(f'video/{name}', mode="wb") as f:
            await f.write(await resp.content.read()) # 把下载的内容写入到文件中
    print(f'{name}下载完毕')

async def aio_download(up_url):
    tasks = []
    async with aiohttp.ClientSession() as session:  # 提前准备session
        async with aiofiles.open("越狱第一季_second.txt", mode="r", encoding='utf-8') as f:
            async for line in f:
                if line.startswith("#"):
                    continue
                line = line.strip()
                # 拼接ts地址
                ts_url = up_url+line
                task = asyncio.create_task(download_ts(ts_url, line, session))  # 创建任务
                tasks.append(task)

            await asyncio.wait(tasks)  # 等待任务下载

def get_key(url):
    resp = requests.get(url)
    # print(resp.text)
    return resp.text

async def dec_ts(name, key):
    aes = AES.new(key=key, IV=b"00000000000", mode=AES.MODE_CBC)
    async with aiofiles.open(f'video/{name}', mode="rb") as f1,\
        aiofiles.open(f'video/temp_{name}', mode="wb") as f2:
        bs = await f1.read() # 从源文件中读取内容
        await f2.write(aes.decrypt(bs))  # 把解密好的文件内容写入文件
    print(f"{name}处理完毕")

async def aio_dec(key):
    # 解密
    tasks = []
    async with aiofiles.open("越狱第一季_second.txt", mode="r", encoding='utf-8') as f:
        async for line in f:
            if line.startswith("#"):
                continue
            line = line.strip()
            # 开始创建异步任务
            task = asyncio.create_task(dec_ts(line, key))  # 创建任务
            tasks.append(task)
        await asyncio.wait(tasks)  # 等待任务下载

def merge_ts():
    # mac: cat 1.ts 2.ts 3.ts > xxx.mp4
    # windows: copy /b 1.ts+2.ts+3.ts xxx.mp4
    lst = []
    with open("越狱第一季_second.txt", mode="r", encoding="utf-8") as f:
        for line in f:
            if line.startswith("#"):
                continue
            line = line.strip()
            lst.append(f"video/temp_{line}")
    s = " ".join(lst) # 放入1.ts 2.ts 3.ts
    os.system(f"cat {s} > movie.mp4")
    print("完毕!")


def main(url):
    # 拿到主页面的源代码,找到iframe中的地址
    iframe_src = get_iframe_src(url)
    # 拿到第一层m3u8文件
    first_m3u8_url = get_first_m3u8_url(iframe_src)
    # 拿到iframe的域名
    iframe_domain = iframe_src.split("/share")[0]
    # 拼接正确路径
    first_m3u8_url = iframe_domain+first_m3u8_url
    # 下载第一层m3u8文件
    download_m3u8_file(first_m3u8_url, "越狱第一季.txt")
    # 下载第二层m3u8文件
    with open("越狱第一季.txt", mode="r", encoding="utf-8") as f:
        for line in f:
            if line.startswith("#"):
                continue
            else:
                line = line.strip() # 去掉空白或换行符
                # 拼接第二层地址
                second_m3u8_url = first_m3u8_url.split("index.m3u8")[0]+line
                download_m3u8_file(second_m3u8_url, "越狱第一季_second.txt")

    # 下载视频
    second_m3u8_url_up = second_m3u8_url.replace("index.m3u8", "")
    # 异步协程
    # asyncio.run(aio_download(second_m3u8_url_up))  # 测试完后就可以注释了
    # 拿到密钥
    key_url = second_m3u8_url_up+"key.key"
    key = get_key(key_url)
    # 解密
    asyncio.run(aio_dec(key))
    # 合并文件为MP4文件
    merge_ts()

if __name__ == '__main__':
    url = "http://91kanju2.com/vod-play/541-2-1.html"
    main(url)
上一篇:使用expect进行linux服务器批量首次登录


下一篇:acwing kuangbin专题打卡第一题棋盘问题