文章目录
1.异步调用+回调函数
1.2浏览器、爬虫原理
-
浏览器原理
浏览器会将你的请求数据通过网络发送到百度的服务器,服务器接收到请求数据,验证请求数据后,返回给浏览器软件一个HTML网页数据,浏览器接收到这个HTML页面数据通过浏览器内核的渲染机制,渲染成美丽的页面。
-
爬虫原理
-
爬虫模拟一个浏览器向服务器请求数据
-
数据请求成功后,通过数据清洗获取目标的数据
import requests ret = requests.get('http://www.baidu.com') if ret.status_code == 200: print(ret.text)
-
1.2 异步调用如何获取结果
发布三个任务:几种回收方式?
-
所有的任务全部完成后,统一回收、
-
那个任务先完成,回收其任务、
-
方案一
演示一下统一回收所有的结果。
from concurrent.futures import ThreadPoolExecutor import requests def crawling(url: str): ret = requests.get(url) if ret.status_code == 200: return ret.text def parse(data: str): return len(data) if __name__ == '__main__': thread_poor = ThreadPoolExecutor(5) # 方法一 # obj = threadi_poor.submit(crawling, 'http://www.baidu.com') # parse(obj.result) # obj = threadi_poor.submit(crawling, 'http://www.baidu.com') # parse(obj.result) # obj = threadi_poor.submit(crawling, 'http://www.baidu.com') # parse(obj.result) # 方法二 url_list = [ 'http://www.baidu.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.taobao.com', 'https://www.cnblogs.com/jin-xin/articles/7459977.html', 'https://www.luffycity.com/', 'https://www.cnblogs.com/jin-xin/articles/9811379.html', 'https://www.cnblogs.com/jin-xin/articles/11245654.html', 'https://www.sina.com.cn/', ] task_list = [] for url in url_list: obj = thread_poor.submit(crawling,url) task_list.append(obj) thread_poor.shutdown() for obj in task_list: print(parse(obj.result()))
方案一过程描述:
我们开启了5线程的线程池并发的处理了10个IO密集型的任务,但是我们还有10个计算密集型的任务是通过串行处理的,这样不合理,效率低。
-
方案二
from concurrent.futures import ThreadPoolExcutor import requests def crawling(url: str): ret = requests.get(url) if ret.status_code == 200: return parse(ret.text) def parse(data: str): return len(data) if __name__ == '__main__': thread_poor = ThreadPoolExecutor(5) url_list = [ 'http://www.baidu.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.taobao.com', 'https://www.cnblogs.com/jin-xin/articles/7459977.html', 'https://www.luffycity.com/', 'https://www.cnblogs.com/jin-xin/articles/9811379.html', 'https://www.cnblogs.com/jin-xin/articles/11245654.html', 'https://www.sina.com.cn/', ] task_list = [] for url in url_list: obj = thread_poor.submit(crawling,url) task_list.append(obj) thread_poor.shutdown() for obj in task_list: print(obj.result())
方案二过程:
我们开启了5线程的线程池并发处理了10个IO密集型的人任务(每个任务分两个小任务IO+计算),方案二比方案一效率更高一些,但是也有问题:
- 利用多线程(多进程,线程池等)处理的任务最好都是IO密集型,不要掺杂这计算密集型,当我们现在是一个IO+一个计算两种任务耦合到了一起,增加了耦合性。
- 无论方案一还是方案二都没有做到实时获取结果。
1.3 异步调用+回调函数
from concurrent.futures import ThreadPoolExecutor
import requests
def crawling(url: str):
ret = requests.get(url)
if ret.status_code == 200:
return ret.text
def parse(obj):
print(len(obj.result()))
if __name__ == '__main__':
thread_poor = ThreadPoolExecutor(5)
url_list = [
'http://www.baidu.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.taobao.com',
'https://www.cnblogs.com/jin-xin/articles/7459977.html',
'https://www.luffycity.com/',
'https://www.cnblogs.com/jin-xin/articles/9811379.html',
'https://www.cnblogs.com/jin-xin/articles/11245654.html',
'https://www.sina.com.cn/',
]
for url in url_list:
obj = thread_poor.submit(crawling, url)
obj.add_done_callback(parse)
thread_poor.shutdown() # 等待线程池的所有线程将10个IO任务并发的处理完毕
# print(task_list) # 所有的任务对象都是已完成的状态
异步调用+回调函数 机制:
我们开启了5线程的线程池并发的处理了10个IO密集型的任务,并且让每个任务设置了回调函数,回调函数处理的数据解析的功能,10个IO密集型的任务线程池的线程去执行,10个任务的回调函数交由空闲的线程(或者主线程)去执行。
参数解释:
add_done_callback:回调函数,只要有线程或者进程完成了当前网页爬取的任务,剩下的分析结果的任务交由回调函数去执行,线程或者进程继续进行网页爬取的任务。
**整体的执行流程:**线程池设置4个线程,异步发起10个任务,每个任务是通过网页获取源码,并发执行,当一个任务完成之后,将parse这个分析代码的任务交由剩余的空闲的线程去执行,你这个线程继续去处理其他任务。
什么情况下用异步调用+回调函数的机制呢?
你要具备两种类型的任务:第一种IO密集型的任务,第二种计算密集型(耗时很短)的任务,这样IO密集型的任务我们可以利用并发或者并行处理,任何一个处理完毕得到结果之后,直接抛给回调函数,回调函数一般都是主进程或者主线程(空余线程)处理,所以第二种任务一定要耗时短,尽量无IO阻塞。
进程池+回到函数:回调函数有主进程去执行。
线程池+回调函数:回调函数有空闲线程去执行。
2. 线程队列
2.1 先进先出队列
import queue
q = queue.Queue(3)
q.put(111)
q.put(222)
q.put(333)
# q.put(444)
print(q.get())
print(q.get())
print(q.get())
print(q.get())
2.2 后进先出队列
import queue
q = queue.LifoQueue(4) # last in first out
q.put(111)
q.put(222)
q.put(333)
q.put('玮哥')
print(q.get())
print(q.get())
print(q.get())
print(q.get())
2.3 优先级队列
import queue
q = queue.PriorityQueue(5)
# 每次通过元组的形式插入,元组的第一个元素一定是int类型,数字越低,优先级越高。
q.put((0, '新闻'))
q.put((2, '峰哥'))
q.put((-10, '冲哥'))
q.put((5, '日天哥', 'fdsafd'))
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
3. 事件Event
线程与进程都有event存在,先不说什么事件event,我们先聊一聊线程和进程的一个特性:每一个线程(进程)都是独立运行且状态不可预测。如果程序中的其他线程(进程)需要通过判断某个线程的状态来确定自己下一步操作,这时线程(进程)同步问题就会变得非常棘手。其实这是增加了线程(进程)之间的关联性,一个线程(进程)运行到某个节点决定另个一个线程(进程)是否继续运行。听起来很神秘,其实没有时间event照样可以实现。
from threading import Thread, Event
# from multiprocessing import Event
import time
import threading
event = Event()
def connect():
print(f'{threading.currentThread()}检查服务器是否开启...')
time.sleep(3)
print(f'{threading.currentThread()}确定服务器已经正常开启')
event.set()
def ask():
print(f'{threading.currentThread()}尝试连接服务器....')
# event.wait() # 阻塞,直到event对象改变了状态在向下进行
# event.wait(timeout=2) # 可以设置阻塞时长,如果超过设置的世间仍未改变状态,则直接向下运行
# event.wait(timeout=4) #
print(f'{threading.currentThread()}连接成功')
if __name__ == '__main__':
t1 = Thread(target=connect,)
t2 = Thread(target=ask,)
t1.start()
t2.start()