Python并发编程06 /阻塞、异步调用/同步调用、异步回调函数、线程queue、事件event、协程
目录
1. 阻塞
进程运行的三个状态:运行,就绪,阻塞
-
阻塞非阻塞是从执行任务的角度来看的:
阻塞:程序运行时,遇到了IO,程序挂起,CPU被切走
非阻塞:程序没有遇到IO,程序遇到IO但是通过某种手段,让CPU尽量的运行我的程序
2. 异步调用、同步调用
1. 概念
-
提交任务的角度:
异步:一次提交多个任务,然后就执行下一行代码
同步:提交一个任务,自任务开始运行直到此任务结束(可能有IO),返回一个返回值之后,再提交下一个任务
2. 异步调用
-
代码示例:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os def task(i):
print(f'{os.getpid()}开始任务')
time.sleep(random.randint(1,3))
print(f'{os.getpid()}任务结束')
return i if __name__ == '__main__':
# 异步调用
pool = ProcessPoolExecutor()
for i in range(10):
pool.submit(task,i)
pool.shutdown(wait=True)
print('===主') # shutdown: 让我的主进程等待进程池中所有的子进程都结束任务之后再执行. 有点类似于join.
# shutdown: 在上一个进程池没有完成所有的任务之前,不允许添加新的任务.
# 一个任务是通过一个函数实现的,任务完成了它的返回值就是函数的返回值.
3. 同步调用
-
代码示例:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os def task(i):
print(f'{os.getpid()}开始任务')
time.sleep(random.randint(1,3))
print(f'{os.getpid()}任务结束')
return i
if __name__ == '__main__': # 同步调用
pool = ProcessPoolExecutor()
for i in range(10):
obj = pool.submit(task,i)
print(f'任务结果:{obj.result()}')
pool.shutdown(wait=True)
print('===主') # obj是一个动态对象,返回的当前的对象的状态,有可能运行中,可能(就绪阻塞),还可能是结束了.
# obj.result() 必须等到这个任务完成后,返回了结果之后,在执行下一个任务.
3. 异步调用+回调函数
-
版本一:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests def task(url):
'''模拟的就是爬取多个源代码 一定有IO操作'''
ret = requests.get(url)
if ret.status_code == 200:
return ret.text def parse(content):
'''模拟对数据进行分析 一般没有IO'''
return len(content) if __name__ == '__main__':
# 开启线程池,并发并行的执行
url_list = [
'http://www.baidu.com',
'http://www.taobao.com',
'https://www.sina.com.cn',
]
pool = ThreadPoolExecutor(4)
obj_list = []
for url in url_list:
obj = pool.submit(task,url)
obj_list.append(obj)
pool.shutdown(wait=True)
for res in obj_list:
print(parse(res.result()))
print('===主') # 1. 异步发出10个任务,并发的执行,但是统一的接收所有的任务的返回值.(效率低,不能实时的获取结果)
# 2. 分析结果流程是串行,影响效率. -
版本二:针对版本一的缺点2,改进,让串行编程并发或者并行.
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import requests def task(url):
'''模拟的就是爬取多个源代码 一定有IO操作'''
ret = requests.get(url)
if ret.status_code == 200:
return parse(ret.text) def parse(content):
'''模拟对数据进行分析 一般没有IO'''
return len(content) if __name__ == '__main__':
# 开启线程池,并发并行的执行
url_list = [
'http://www.baidu.com',
'http://www.taobao.com',
'https://www.sina.com.cn',
]
pool = ThreadPoolExecutor(4)
obj_list = []
for url in url_list:
obj = pool.submit(task, url)
obj_list.append(obj)
pool.shutdown(wait=True)
for res in obj_list: # [obj1, obj2,obj3....obj10]
print(res.result()) # 线程池设置4个线程, 异步发起10个任务,每个任务是通过网页获取源码+数据分析, 并发执行,
# 耦合性增强了,并发执行任务,此任务最好是IO阻塞,才能发挥最大的效果,而parse操作一般是没有IO阻塞的 -
版本三:异步调用 + 回调函数
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import requests def task(url):
'''模拟的就是爬取多个源代码 一定有IO操作'''
ret = requests.get(url)
if ret.status_code == 200:
return ret.text
def parse(obj):
'''模拟对数据进行分析 一般没有IO'''
print(len(obj.result())) if __name__ == '__main__':
# 开启线程池,并发并行的执行
url_list = [
'http://www.baidu.com',
'http://www.taobao.com',
'https://www.sina.com.cn',
]
pool = ThreadPoolExecutor(4)
for url in url_list:
obj = pool.submit(task, url)
obj.add_done_callback(parse) # 基于异步调用回收所有任务的结果要做到实时回收结果,并发执行任务每个任务只是处理IO阻塞的,不能增加新的功能.
# 1.线程池设置4个线程, 异步发起10个任务,每个任务是通过网页获取源码, 并发执行
# 2.当一个任务完成之后,将parse这个分析代码的任务交由剩余的空闲的线程去执行,这个线程继续去处理其他任务.
# 3.如果进程池+回调: 回调函数由主进程去执行.
# 如果线程池+回调: 回调函数由空闲的线程去执行. -
异步回调是一回事儿?
1.异步站在发布任务的角度
2.站在接收结果的角度: 回调函数 按顺序接收每个任务的结果,进行下一步处理 异步 + 回调:异步处理的IO类型;回调处理非IO
4. 线程queue
应用:多线程抢占资源,只能让其串行,选择互斥锁或者队列
-
队列:先进先出
import queue
q = queue.Queue(3)
q.put(1)
q.put(2)
q.put(3) print(q.get())
print(q.get())
print(q.get()) # 输出结果:
1
2
3 # print(q.get(block=False)) # 只要有阻塞就报错
# print(q.get(timeout=2)) # 阻塞2s 还没有值直接报错 -
堆栈:后进先出
q = queue.LifoQueue(4)
q.put(1)
q.put(2)
q.put(3) print(q.get())
print(q.get())
print(q.get()) # 输出结果:
3
2
1 # 栈的应用场景:drf源码中节流的实现,用到了列表实现一个栈 -
优先级队列
q = queue.PriorityQueue(4)
q.put((5, '王五'))
q.put((-2,'张三'))
q.put((0, '李四'))
print(q.get())
print(q.get())
print(q.get()) # 输出结果:
(-2, '张三')
(0, '李四')
(5, '王五') # 数值越小优先级越高,就越先输出
5. 事件event
定义:开启两个线程,一个线程运行到中间的某个阶段,触发另一个线程执行,两个线程增加了耦合性
-
代码示例:
示例一:使用全局变量,实现事件event的效果
from threading import Thread
from threading import current_thread
import time flag = False
def check():
print(f'{current_thread().name} 监测服务器是否开启...')
time.sleep(3)
global flag
flag = True
print('服务器已经开启...') def connect():
while 1:
print(f'{current_thread().name} 等待连接...')
time.sleep(0.5)
if flag:
print(f'{current_thread().name} 连接成功...')
break t1 = Thread(target=check,)
t2 = Thread(target=connect,)
t1.start()
t2.start() # 如果程序中的其它线程需要通过判断某个线程的状态来确定自己的下一步操作示例二:使用事件event
from threading import Thread
from threading import current_thread
from threading import Event
import time event = Event()
def check():
print(f'{current_thread().name} 监测服务器是否开启...')
time.sleep(3)
print(event.is_set()) # 判断是否执行event.set()方法
event.set() # 设置event事件
print(event.is_set())
print('服务器已经开启...') def connect(): print(f'{current_thread().name} 等待连接...')
# event.wait() # 阻塞 直到 event.set() 方法之后
event.wait(1) # 只阻塞1秒,1秒之后如果还没有进行set 直接进行下一步操作.
print(f'{current_thread().name} 连接成功...') t1 = Thread(target=check,)
t2 = Thread(target=connect,)
t1.start()
t2.start()示例三:一个线程监测服务器是否开始,另个一线程判断如果开始了,则显示连接成功,此线程只尝试连接3次,1s 一次,如果超过3次,还没有连接成功,则显示连接失败.
from threading import Thread
from threading import current_thread
from threading import Event
import time
event = Event()
def check():
print(f'{current_thread().name}检测服务器是否开启')
time.sleep(3)
event.set()
if t2.is_alive():
print('服务器已经开启')
def connect():
count = 3
while count:
count -= 1
print(f'{current_thread().name} 等待连接')
event.wait(1)
if event.is_set():
print(f'{current_thread().name} 连接成功')
break
else:
print(f'{current_thread().name} 连接失败')
6. 协程
-
相关概念:
协程:一个线程并发的处理任务
串行:一个线程执行一个任务,执行完毕之后,执行下一个任务
并行:多个CPU执行多个任务,4个CPU执行4个任务
并发:一个CPU执行多个任务,看起来像是同时执行
并发真正的核心/本质:切换并且保持状态
多线程的并发:3个线程处理10个任务,如果线程1处理的这个任务,遇到阻塞,cpu被操作系统切换到另一个线程, -
应用示例:单个CPU:10个任务,让你给我并发的执行这10个任务
方式一:开启多进程并发执行,操作系统+保持状态
方式二:开启多线程并发执行,操作系统+保持状态
方式三:开启协程并发的执行,自己的程序把控着CPU在多个任务之间来回的切换+保持状态
方式三的详细解释:协程切换的速度非常快,蒙蔽操作系统的眼睛,让操作系统认为CPU一直在运行这个线程(协程) -
使用协程的原因:
1. 开销小
2. 运行速度快
3. 协程会长期霸占CPU只执行我程序里边的所有任务 -
协程的特点:
1. 必须在只有一个单线程里实现并发
2. 修改共享数据不需加锁
3. 用户程序里自己保存多个控制流的上下文栈(保持状态)
4. 附加:一个协程遇到IO操作自动切换到其它协程 -
工作中应用协程:
一般在工作中我们都是进程+线程+协程的方式来实现并发,以达到最好的并发效果,如果是4核的cpu,一般起5个进程,每个进程中20个线程(5倍cpu数量),每个线程可以起500个协程,大规模爬取页面的时候,等待网络延迟的时间的时候,我们就可以用协程去实现并发。 并发数量 = 5 * 20 * 500 = 50000个并发,这是一般一个4cpu的机器最大的并发数。nginx在负载均衡的时候最大承载量就是5w个
-
非协程示例代码:
示例一:只有切换+不能保持状态,遇到IO不会主动切换
def func1():
print('in func1') def func2():
print('in func2')
func1()
print('end') func2()示例二:yield 切换+保持状态,遇到IO不会主动切换
def gen():
while 1:
yield 1 def func():
obj = gen()
for i in range(10):
next(obj)
func()示例三:greenlet 切换 +保持状态,遇到IO不会主动切换
from greenlet import greenlet
import time
def eat(name):
print('%s eat 1' %name) # 第2步
g2.switch('李四') # 第3步
time.sleep(3)
print('%s eat 2' %name) # 第6步
g2.switch() # 第7步 def play(name):
print('%s play 1' %name) # 第4步
g1.switch() # 第5步
print('%s play 2' %name) # 第8步 g1=greenlet(eat)
g2=greenlet(play) g1.switch('张三') # 第1步 切换到eat任务 # 输出结果:
张三 eat 1
李四 play 1
张三 eat 2
李四 play 2 -
协程示例代码:
示例一:模拟的阻塞,不是真正的阻塞(遇到time.sleep()还是会阻塞)
import gevent
from threading import current_thread
def eat(name):
print('%s eat 1' %name)
print(current_thread().name)
gevent.sleep(2)
print('%s eat 2' %name)
def play(name):
print('%s play 1' %name)
print(current_thread().name)
gevent.sleep(1)
print('%s play 2' %name) g1 = gevent.spawn(eat,'egon')
g2 = gevent.spawn(play,name='egon')
print(f'主{current_thread().name}')
g1.join()
g2.join()示例二:真正的协程 / 最终版
import gevent
from gevent import monkey
monkey.patch_all() # 打补丁: 将下面的所有的任务的阻塞都打上标记
def eat(name):
print('%s eat 1' %name)
time.sleep(2)
print('%s eat 2' %name)
def play(name):
print('%s play 1' %name)
time.sleep(1)
print('%s play 2' %name)
g1 = gevent.spawn(eat,'张三')
g2 = gevent.spawn(play,name='李四')
# g1.join()
# g2.join()
gevent.joinall([g1,g2])