1、 简单了解爬虫
# from urllib.request import urlopen
#
# url = "https://www.baidu.com/"
# resp = urlopen(url)
# with open("mybaidu.html", mode="w") as f:
# f.write(resp.read().decode("utf-8"))
#
# print("over!")
############################精简的get请求#############
import requests
# name = input("请输入要查询的名字:")
# url = f'https://www.sogou.com/web?query={name}'
url = "http://www.xinfadi.com.cn/priceDetail.html"
head = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36"
# "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36"
}
resp = requests.get(url, headers=head)
# print(resp)
print(resp.text)
resp.close()
############################精简的post请求#############
# import requests
# name = input("请输入要查询的名字:")
# url = 'https://www.sogou.com/web'
# head = {
# "kw": name
# }
#
# resp = requests.get(url, data=head)
# print(resp.json())
# resp.close()
2、 爬图片例子
import requests
# 需要下载 pip install bs4
from bs4 import BeautifulSoup
import time
url = "https://www.umei.cc/bizhitupian/weimeibizhi/"
resp = requests.get(url)
resp.encoding = 'utf-8'
# 把源码交给bs
main_page = BeautifulSoup(resp.text, "html.parser")
alist = main_page.find("div", class_="TypeList").find_all("a")
for a in alist:
href = a.get('href') # 通过get拿去属性值
# 拿到子页面代码
child_page_resp = requests.get(href)
child_page_resp.encoding = 'utf-8'
child_page_text = child_page_resp.text
# 拿到下载路径
child_page = BeautifulSoup(child_page_text, "html.parser")
p = child_page.find("p", align="center")
img = p.find("img")
src = img.get("src")
# 下载图片
img_resp = requests.get(src)
# 拿到url中最后一个/后的内容
img_name = src.split("/")[-1]
with open("img/"+img_name, mode="wb") as f:
f.write(img_resp.content) # 将图片写入文件
f.close()
print("over!", img_name)
time.sleep(1) # 休息1秒钟
# 使用etree需要下载 pip install lxml
# from lxml import etree
# html = etree.HTML(resp.text)
# divs = html.xpath("html/div/span")
3、线程池例子
import requests
from lxml import etree
import csv
from concurrent.futures import ThreadPoolExecutor
f = open("data.csv", mode="w", encoding="utf-8")
csvwriter = csv.writer(f)
head = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36"
}
def download_one_page(url):
resp = requests.get(url, headers=head)
html = etree.HTML(resp.text)
table = html.xpath("/html/body/div[2]/div/div/div/div[4]/div[1]/div/table")[0]
# trs = table.xpath("./tr")[1:]
# print(resp.text)
# print(table)
trs = table.xpath("./tr[position()>1]")
# print(len(trs))
# 拿到每个tr
for tr in trs:
txt = tr.xpath("./td/text()")
# 对数据做简单的处理:
txt = (item.replace("\\", "").replace("/", "") for item in txt)
# 把数据存放在文件中
csvwriter.writerow(txt)
print(url, "提取完毕!")
resp.close()
if __name__ == '__main__':
# download_one_page("http://www.xinfadi.com.cn/priceDetail.html")
# 创建线程池
with ThreadPoolExecutor(50) as t:
for i in range(1, 200):
# 把下载任务交给线程池
t.submit(download_one_page, f"http://www.xinfadi.com.cn/{i}.html")
4、爬一部小说
# 得到所有章节
# https://dushu.baidu.com/api/pc/getCatalog?data={"book_id": "4306063500"}
# 章节内容
# https://dushu.baidu.com/api/pc/getChapterContent?data={"book_id":"4306063500","cid":"4306063500|1569782244","need_bookinfo":1}
import requests
import asyncio
import aiohttp
import aiofiles
import json
"""
步骤:
1. 同步操作:拿到所有章节
2. 异步操作: 下载所有文件内容
"""
async def aiodownload(cid, b_id, title):
data = {
"book_id": b_id,
"cid": f"{b_id}|{cid}",
"need_bookinfo": 1
}
data = json.dumps(data)
url = f"https://dushu.baidu.com/api/pc/getChapterContent?data={data}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
# 请求后得到内容
dic = await resp.json()
# 写入小说内容
async with aiofiles.open(f'novel/{title}', mode="w", encoding="utf-8") as f:
await f.write(dic['data']['novel']['content'])
async def getCatalog(url):
resp = requests.get(url)
dic = resp.json()
tasks = []
for item in dic['data']['novel']['items']:
# 找出每一个章节的cid
title = item['title']
cid = item['cid']
# 准备异步操作
task = asyncio.create_task(aiodownload(cid, b_id, title))
tasks.append(task)
await asyncio.wait(tasks)
if __name__ == '__main__':
b_id = "4306063500"
url = 'https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"' + b_id + '"}'
asyncio.run(getCatalog(url))
5、简单了解爬视频
"""
流程:
1. 拿到54812-1-1.html的页面源代码
2. 从源代码中提取m3u8的url
3. 下载m3u8
4. 读取m3u8文件,下载视频
5. 合并视频
创建文件夹 标记为 excluded
"""
import requests
import re
# headers = {
# "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36"
# }
# # 用来提取地址
# obj = re.compile(r"url: '(?P<url>.*?)',", re.S)
#
# url = "http://91kanju2.com/vod-play/54812-1-1.html"
# resp = requests.get(url, headers=headers)
# # 拿到地址
# m3u8_url = obj.search(resp.text).group("url")
#
# # print(m3u8_url)
# resp.close()
#
# # 下载m3u8文件
# resp2 = requests.get(m3u8_url, headers=headers)
#
# with open("哲仁王后.m3u8", mode="wb") as f:
# f.write(resp2.content)
#
# resp2.close()
# print("下载完毕")
# 解析m3u8文件
n = 1
with open("哲仁王后.m3u8", mode="r", encoding="utf-8") as f:
for line in f:
line = line.strip() # 先去掉空格
if line.startswith("#"):
continue
# 下载片段
resp3 = requests.get(line)
f = open(f"video/{n}.ts", mode="wb")
f.write(resp3.content)
f.close()
resp3.close()
n += 1
6、多线程版爬视频例子
"""
思路:
1. 拿到页面的源代码
2. 从iframe的页面源代码中拿到m3u8文件
3. 下载第一层m3u8文件 -> 下载第二层m3u8文件(视频的存放路径)
4. 下载视频
5. 下载密钥,进行解密
6. 合并所有ts文件作为mp4文件
"""
import requests
# 直接找到iframe模块,用bs4
from bs4 import BeautifulSoup
# 从script中那代码,用re
import re
# 进行异步操作
import asyncio
import aiohttp
import aiofiles
from Crypto.Cipher import AES
import os
def get_iframe_src(url):
resp = requests.get(url)
main_page = BeautifulSoup(resp.text, "html.parser")
# 只有一个iframe标签
src = main_page.find("iframe").get("src")
resp.close()
return src
def download_m3u8_file(url, name):
resp = requests.get(url)
with open(name, mode="wb") as f:
f.write(resp.text)
def get_first_m3u8_url(url):
resp = requests.get(url)
obj = re.compile(r'var main = "(?P<m3u8_url>.*?)"', re.S)
m3u8_url = obj.search(resp.text).group("m3u8_url")
resp.close()
return m3u8_url
async def download_ts(url, name, session):
async with session.get(url) as resp:
async with aiofiles.open(f'video/{name}', mode="wb") as f:
await f.write(await resp.content.read()) # 把下载的内容写入到文件中
print(f'{name}下载完毕')
async def aio_download(up_url):
tasks = []
async with aiohttp.ClientSession() as session: # 提前准备session
async with aiofiles.open("越狱第一季_second.txt", mode="r", encoding='utf-8') as f:
async for line in f:
if line.startswith("#"):
continue
line = line.strip()
# 拼接ts地址
ts_url = up_url+line
task = asyncio.create_task(download_ts(ts_url, line, session)) # 创建任务
tasks.append(task)
await asyncio.wait(tasks) # 等待任务下载
def get_key(url):
resp = requests.get(url)
# print(resp.text)
return resp.text
async def dec_ts(name, key):
aes = AES.new(key=key, IV=b"00000000000", mode=AES.MODE_CBC)
async with aiofiles.open(f'video/{name}', mode="rb") as f1,\
aiofiles.open(f'video/temp_{name}', mode="wb") as f2:
bs = await f1.read() # 从源文件中读取内容
await f2.write(aes.decrypt(bs)) # 把解密好的文件内容写入文件
print(f"{name}处理完毕")
async def aio_dec(key):
# 解密
tasks = []
async with aiofiles.open("越狱第一季_second.txt", mode="r", encoding='utf-8') as f:
async for line in f:
if line.startswith("#"):
continue
line = line.strip()
# 开始创建异步任务
task = asyncio.create_task(dec_ts(line, key)) # 创建任务
tasks.append(task)
await asyncio.wait(tasks) # 等待任务下载
def merge_ts():
# mac: cat 1.ts 2.ts 3.ts > xxx.mp4
# windows: copy /b 1.ts+2.ts+3.ts xxx.mp4
lst = []
with open("越狱第一季_second.txt", mode="r", encoding="utf-8") as f:
for line in f:
if line.startswith("#"):
continue
line = line.strip()
lst.append(f"video/temp_{line}")
s = " ".join(lst) # 放入1.ts 2.ts 3.ts
os.system(f"cat {s} > movie.mp4")
print("完毕!")
def main(url):
# 拿到主页面的源代码,找到iframe中的地址
iframe_src = get_iframe_src(url)
# 拿到第一层m3u8文件
first_m3u8_url = get_first_m3u8_url(iframe_src)
# 拿到iframe的域名
iframe_domain = iframe_src.split("/share")[0]
# 拼接正确路径
first_m3u8_url = iframe_domain+first_m3u8_url
# 下载第一层m3u8文件
download_m3u8_file(first_m3u8_url, "越狱第一季.txt")
# 下载第二层m3u8文件
with open("越狱第一季.txt", mode="r", encoding="utf-8") as f:
for line in f:
if line.startswith("#"):
continue
else:
line = line.strip() # 去掉空白或换行符
# 拼接第二层地址
second_m3u8_url = first_m3u8_url.split("index.m3u8")[0]+line
download_m3u8_file(second_m3u8_url, "越狱第一季_second.txt")
# 下载视频
second_m3u8_url_up = second_m3u8_url.replace("index.m3u8", "")
# 异步协程
# asyncio.run(aio_download(second_m3u8_url_up)) # 测试完后就可以注释了
# 拿到密钥
key_url = second_m3u8_url_up+"key.key"
key = get_key(key_url)
# 解密
asyncio.run(aio_dec(key))
# 合并文件为MP4文件
merge_ts()
if __name__ == '__main__':
url = "http://91kanju2.com/vod-play/541-2-1.html"
main(url)