一、介绍
什么是并发?
并发的本质就是切换+保存状态
cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制):
1.任务发生阻塞
2.计算任务时间过长,需要让出cpu给高优先级的程序
协程,又称微线程,是一种用户态的轻量级线程。协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置,当程序中存在大量不需要CPU的操作时(IO),适用于协程。
协程本质上就是一个线程,以前线程任务的切换是由操作系统控制的,遇到I/O自动切换,现在我们用协程的目的就是较少操作系统切换的开销(开关线程,创建寄存器、堆栈等,在他们之间进行切换等),在我们自己的程序里面来控制任务的切换
进程有三种状态,而线程是进程的执行最小单位,所以也是线程的三种状态
二、协程切换
1.yield是一种在单线程下可以保存任务运行状态的方法
1. yiled 可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级
2. send 可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换
通过yield实现任务切换+保存线程:
import time
def func1():
for i in range(11):
print('func1第%s次打印' % i)
time.sleep(1)
def func2():
g = func1()
for k in range(10):
print('func2第%s次打印' % k)
time.sleep(1)
顺序打印func1,func2
func2()
yield切换
import time
def func1():
for i in range(11):
yield
print('func1第%s次打印' % i)
time.sleep(1)
def func2():
g = func1()
next(g)
for k in range(10):
print('func2第%s次打印' % k)
time.sleep(1)
func2()
只打印func2,yield会保存func1的状态,io阻塞
def consumer():
'''任务1:接收数据,处理数据'''
while True:
x=yield # 只是进行切换,并没有节省I/O时间
print('处理了数据:',x)
def producer():
'''任务2:生产数据'''
g=consumer()
next(g) # 找到yield位置
for i in range(3):
g.send(i) # 给yield传值,然后再循环给下一个yield传值,并且多了切换的程序,比直接串行执行还多了一些步骤,导致执行效率反而更低了
print('发送了数据:',i)
start=time.time()
#基于yield保存状态,实现两个任务直接来回切换,即并发的效果
producer() #我在当前线程中只执行了这个函数,但是通过这个函数里面的send切换了另外一个任务
stop=time.time()
print(stop-start)
result:
处理了数据: 0
发送了数据: 0
处理了数据: 1
发送了数据: 1
处理了数据: 2
发送了数据: 2
没有I/O,单纯切换任务,会降低程序性能
注: yield并不能检测io,实现自动切换
import time
def func1():
while True:
print('func1')
yield
def func2():
g = func1()
for i in range(1000):
#i + 1
next(g)
time.sleep(3)
print('func2')
start = time.time()
func2()
stop = time.time()
print(stop - start)
因为func2方法time.sleep 阻塞,会切换到func1执行
协程就是告诉Cpython解释器,不是搞了个GIL锁吗,那好,我就自己搞成一个线程让你去执行,省去你切换线程的时间,我自己切换比你切换要快很多,避免了很多的开销。
对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另外一个任务去计算,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给我们的线程
以上内容从其他文章粘贴
三、线程、协程对比
1.python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会*交出cpu执行权限,切换其他线程运行)
2.单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
对比操作系统控制线程的切换,用户在单线程内控制协程的切换
优点:
- 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
- 单线程内就可以实现并发的效果,最大限度地利用cpu
缺点:
1.协程属于单线程,无法利用多核优势,可以用多进程+多线程+协程实现
2.协程也是单线程下运行,一旦阻塞,将阻塞整个线程
协程特点:
1.单线程下运行实现并发
2.修改数据不需要加锁(线程需要)
3.用户程序控制上下文切换
4.附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))
四、Greenlet
如果我们在单个线程内有多个任务,要想实现在多个任务之间切换,使用yield生成器的方式过于麻烦(需要先得到初始化一次的生成器,然后再调用send。。。非常麻烦),而使用greenlet模块可以非常简单地实现多个任务直接的切换
pip3 install greenlet
from greenlet import greenlet
def eat(name):
print('%s eat 1' % name)
g2.switch('上海')
print('%s eat 2' % name)
g2.switch()
def play(name):
print('%s play 1' % name)
g1.switch()
print('%s play 2' % name)
g1 = greenlet(eat)
g2 = greenlet(play)
g1.switch('beijing') # 第一次需要传参,以后都不需要
单纯的切换(在没有io的情况下或者没有重复开辟内存空间的操作),反而会降低程序的执行速度
#顺序执行
import time
def f1():
res=1
for i in range(100000000):
res+=i
def f2():
res=1
for i in range(100000000):
res*=i
start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) # 8.795756101608276
#切换
from greenlet import greenlet
import time
def f1():
res=1
for i in range(100000000):
res+=i
g2.switch()
def f2():
res=1
for i in range(100000000):
res*=i
g1.switch()
start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 45.937793016433716
greenlet只是提供了一种比generator(yield)更加便捷的切换方式,当切到一个任务执行时如果遇到IO,那就原地阻塞(不能识别io),仍然是没有解决遇到IO自动切换来提升效率的问题
单线程里的多个任务的代码通常既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。如此,才能提高效率,这就用到了Gevent模块
五、Gevent
Gevent是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet,它是以C扩展模块形式接入Python的轻量级协程。Greenlet全部运行在主程序操作系统进程的内部,但他们被协作式地调度
安装:
pip3 install gevent
用法:
g1=gevent.spawn(func,1,2,3,x=4,y=5)
#创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的,spawn是异步提交任务
g2=gevent.spawn(func2)
g1.join() #等待g1结束
g2.join() #等待g2结束 有人测试的时候会发现,不写第二个join也能执行g2,是的,协程帮你切换执行了,但是你会发现,如果g2里面的任务执行的时间长,但是不写join的话,就不会执行完等到g2剩下的任务了
#或者上述两步合作一步:
gevent.joinall([g1,g2])
g1.value #拿到func1的返回值
import gevent
def eat(name):
print('%s eat 1' % name)
gevent.sleep(2)
print('%s eat 2' % name)
def play(name):
print('%s play 1' % name)
gevent.sleep(1)
print('%s play 2' % name)
g1 = gevent.spawn(eat, 'xxx')
g2 = gevent.spawn(play, name='xxx')
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print('over')
上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞;
而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了
from gevent import monkey;
monkey.patch_all() #必须放到被打补丁者的前面,如time,socket模块之前
from gevent import monkey
monkey.patch_all() # 必须写在最上面,否则可能识别不了io
import gevent
import time
def eat():
# print()
print('eat food 1')
time.sleep(2) # 加上monkey就能够识别到time模块的sleep了
print('eat food 2')
def play():
print('play 1')
time.sleep(1) # 来回切换,直到一个I/O的时间结束,这里都是我们个gevent做得,不再是控制不了的操作系统了。
print('play 2')
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
gevent.joinall([g1,g2])
print('over')
六、同步、异步
from gevent import spawn, joinall,monkey
monkey.patch_all()
import time
def task(pid):
time.sleep(0.5)
print('Task %s done' % pid)
def sync():
for i in range(10):
task(i)
def asyncous():
g_list = [spawn(task,i) for i in range(10)]
joinall(g_list)
if __name__ == '__main__':
print('sync')
sync()
# 对比发现执行速度
print('async')
asyncous()