协程

day31 协程

一、死锁与递归锁

​ 所谓死锁:是指两个或者两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,也就是死锁

1.1死锁

from threading import Thread,Lock
import time
lock1 = Lock()
lock2 = lock1

def eat1(name):
    lock1.acquire()
    print('%s抢到了面条'% name)
    time.sleep(1)
    lock2.acquire()
    print('%s抢到了筷子'% name)
    time.sleep(1)
    lock2.release()
    print('%s放下了筷子'% name)
    time.sleep(1)
    print('%s放下了面条'% name)
    lock1.release()

def eat2(name):
    lock2.acquire()
    print('%s抢到了筷子'% name)
    time.sleep(1)
    lock1.acquire()
    print('%s抢到了面条'% name)
    time.sleep(1)
    lock1.release()
    print('%s放下了面条'% name)
    time.sleep(1)
    print('%s放下了筷子'% name)
    lock2.release()

if __name__ == '__main__':
    for name in ['zhang','meng','song']:
        t = Thread(target=eat1,args=(name, ))
        t.start()

    for name in ['bai','liu','hu']:
        t1 = Thread(target=eat2,args=(name, ))
        t1.start()
输出结果:eat1中zhang抢到了面条,但是eat2中bai抢到了筷子,谁都等待对方先给,这就造成了死锁
# 解决方法:递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁:RLock。
	这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁。

1.2、递归锁(重入锁)

from threading import Thread,Lock,RLock
import time
# lock1 = Lock()
# lock2 = lock1
lock1 = RLock()  # 递归锁
lock2 = lock1

def eat1(name):
    lock1.acquire()
    print('%s抢到了面条'% name)
    time.sleep(1)
    lock2.acquire()
    print('%s抢到了筷子'% name)
    time.sleep(1)
    lock2.release()
    print('%s放下了筷子'% name)
    time.sleep(1)
    print('%s放下了面条'% name)
    lock1.release()

def eat2(name):
    lock2.acquire()
    print('%s抢到了筷子'% name)
    time.sleep(1)
    lock1.acquire()
    print('%s抢到了面条'% name)
    time.sleep(1)
    lock1.release()
    print('%s放下了面条'% name)
    time.sleep(1)
    print('%s放下了筷子'% name)
    lock2.release()

if __name__ == '__main__':
    for name in ['zhang','meng','song']:
        t = Thread(target=eat1,args=(name, ))
        t.start()

    for name in ['bai','liu','hu']:
        t1 = Thread(target=eat2,args=(name, ))
        t1.start()
结果:顺利的执行代码

二、线程队列

queue队列:使用import queue(小写q),用法与进程Queue一样
'''
    同一个进程下多个线程数据是共享的
    为什么先同一个进程下还会去使用队列呢
    因为队列是:管道+锁
    所以用队列还是为了保证数据的安全
'''

2.1、先进先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
    first
    second
    third
'''

2.2、后进先出

import queue

q=queue.LifoQueue() # LifoQueue函数是为后进先出
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(后进先出):
third
second
first
'''

2.3、优先级队列

import queue

q=queue.PriorityQueue() # 作用时候为优先级
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''

三、进程池与线程池

3.1、基本用法

一、介绍:
    1.concurrent.futures模块提供了高度封装的异步调用接口
    2.ThreadPoolExecutor:线程池,提供异步调用
    3.ProcessPoolExecutor:进程池,提供异步调用
二、基本方法:
    1.submit(fn,*args,**kwargs):异步提交任务
    2.map():在submit中和for循环一样的作用
    3.shutdown():相当于p.join
    4.result(timeout=None):取得结果
    5.add_done_callback(fu):回调函数
    6.done():判断一个线程是否完成
    7.cancle():取消某个任务
代码:
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

    def task(n,m):
        print(n)
        print(m)
        
    if __name__ == '__main__':
        p_fool = ProcessPoolExecutor(3) # 同时只有3个进程执行,把进程池换成线程池,操作一样
        p_fool.submit(task,n=1,m=2) # 提交函数
        p_fool.shutdown() # 相当于join
        print('=====>')
        输出结果:1,2,===>

3.2、回调函数:add_done_callback()

 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

    def task(n,m):
        return n + m

    # 回调函数
    def foo(res):
        print(res.result())

    if __name__ == '__main__':
        p_fool = ProcessPoolExecutor(3)
        p_fool.submit(task,n=1,m=2).add_done_callback(foo) # 回调函数
        返回结果:3

3.3、map的用法

# map的用法
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import os,time,random
    def task(n):
        print('my is %s'% os.getpid())
        time.sleep(random.randint(1,3))
        return n ** 2

    if __name__ == '__main__':
        t_fool = ThreadPoolExecutor(3)
        t_fool.map(task,range(1,12)) # 和下方for循环一样的功能
        # for i in range(1, 12):
        #     t_fool.submit(task, i)

四、协程

1、协程介绍:
    协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是协程:协程是一种用户态的轻量级线程,既协程是由用户程序自己控制调度的。
需要强调的是:
    1.python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会*交出cpu执行权限,切换其他线程执行)
    2.单线程内开启协程,一旦遇到io,就会从高应用程序级别(而非操作系统)控制切换,从此来提升效率。
对比操作系统控制线程的切换,用户在单线程内控制协程的切换。
# 优点:
    1.协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
    2.单线程内就可以实现并发的效果,更大限度地利用cpu

# 缺点:
    1.协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
    2.协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程	
# 协程特点:
    1.必须在只有一个单线程里实现并发
    2.修改共享数据不需加锁
    3.用户程序里自己保存多个控制流的上下文栈
    4.一个协程遇到io操作自动切换到其他协程

五、协程之greenlet模块

greenlet作用:当切到一个任务时如果遇到io,拿就原地堵塞,仍然是没有解决遇到io自动切换来提升效率问题。# 我们完全可以遇到阻塞,就利用阻塞的时间去执行任务。
from greenlet import greenlet

def eat(name):
    print('%s吃一口' % name)
    g2.switch(name)
    print('%s再吃一口'% name)
    g2.switch()

def play(name):
    print('%s玩下手机'%name)
    g1.switch()
    print('%s再玩下手机'% name)

g1 = greenlet(eat)
g2 = greenlet(play)
g1.switch('meng') # 可以在第一次switch时传入参数,以后都不需要

输出结果:
'''
    meng吃一口
    meng玩下手机
    meng再吃一口
    meng再玩下手机
'''

六、协程之Gevent模块

    Gevent介绍:Gevent是一个第三方库,可以轻松通过gevent实现并发同步或者异步编程,在gevent中用到的主要模式是Greenlet,它是以c扩展模块形式接入python的轻量级协程。Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

代码:
    import gevent	
    import time

    from gevent import monkey  # 加入猴子补丁之后也可以用time.sleep()来切换
    monkey.patch_all()

    def eat(name):
        print('my is eat1 %s'% name)
        gevent.sleep(2) # 可以切换打印吹来
        # time.sleep(2)  # 不能切换打印出来
        print('my is eat2 %s'% name)

    def play(name):
        print('my is play1 or %s'% name)
        gevent.sleep(2)
        # time.sleep(2)
        print('my is play2 or %s'% name)

    g1 = gevent.spawn(eat,'lqz')
    g2 = gevent.spawn(play,'lqz')

    g1.join()
    g2.join()
    gevent.sleep()输出结果:
    '''
        my is eat1 lqz
        my is play1 or lqz
        my is eat2 lqz
        my is play2 or lqz
    '''
    time.sleep()输出结果:
        '''
            my is eat1 lqz
            my is play1 or lqz
            my is eat2 lqz
            my is play2 or lqz
        '''
上一篇:Docker安装Skywalking APM分布式追踪系统


下一篇:.NetCore从零开始使用Skywalking分布式链路追踪系统