线程队列 concurrent 协程 greenlet gevent

死锁问题

所谓死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

经典问题:哲学家就餐问题

英语:Dining philosophers problem 是在计算机科学中的一个经典问题,用来演示在并发计算中多线程同步(Synchronization)时产生的问题。

import time
from threading import Thread, Lock


def philosopher1(chopsticks, noodles, name):
    chopsticks.acquire()
    print('%s 拿到了筷子' % name)
    time.sleep(1)

    noodles.acquire()
    print('%s 端起了面条' % name)
    time.sleep(1)
    print('%s开始吃面' % name)
    time.sleep(2)
    noodles.release()
    print('%s 放下了面条' % name)
    chopsticks.release()
    print('%s 放下了筷子' % name)


def philosopher2(chopsticks, noodles, name):
    noodles.acquire()
    print('%s 端起了面条' % name)
    time.sleep(1)

    chopsticks.acquire()
    print('%s 拿到了筷子' % name)
    time.sleep(1)

    print('%s开始吃面' % name)
    time.sleep(2)
    chopsticks.release()
    print('%s 放下了筷子' % name)
    noodles.release()
    print('%s 放下了面条' % name)


if __name__ == '__main__':
    chopsticks = Lock()
    noodles = Lock()
    for i1 in ['亚里士多德', '柏拉图', '卡迪尔']:
        t1 = Thread(target=philosopher1, args=(chopsticks, noodles, i1))
        t1.start()
    for i2 in ['尼采', '苏格拉底', '马克思']:
        t2 = Thread(target=philosopher2, args=(chopsticks, noodles, i2))
        t2.start()

递归锁解决死锁问题

解决方法:递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁。

import time
from threading import Thread, RLock


def philosopher1(chopsticks, noodles, name):
    chopsticks.acquire()
    print('%s 拿到了筷子' % name)
    time.sleep(1)

    noodles.acquire()
    print('%s 端起了面条' % name)
    time.sleep(1)
    print('%s开始吃面' % name)
    time.sleep(2)
    noodles.release()
    print('%s 放下了面条' % name)
    chopsticks.release()
    print('%s 放下了筷子' % name)


def philosopher2(chopsticks, noodles, name):
    noodles.acquire()
    print('%s 端起了面条' % name)
    time.sleep(1)

    chopsticks.acquire()
    print('%s 拿到了筷子' % name)
    time.sleep(1)

    print('%s开始吃面' % name)
    time.sleep(2)
    chopsticks.release()
    print('%s 放下了筷子' % name)
    noodles.release()
    print('%s 放下了面条' % name)


if __name__ == '__main__':
    chopsticks = RLock()
    noodles = chopsticks
    for i1 in ['亚里士多德', '柏拉图', '卡迪尔']:
        t1 = Thread(target=philosopher1, args=(chopsticks, noodles, i1))
        t1.start()
    for i2 in ['尼采', '苏格拉底', '马克思']:
        t2 = Thread(target=philosopher2, args=(chopsticks, noodles, i2))
        t2.start()

线程队列

同一个进程下多个线程数据是共享的
为什么先同一个进程下还会去使用队列呢?
因为队列是 管道 + 锁
所以用队列还是为了保证数据的安全

线程队列 queque,用法:from queue import Queue。内置方法与进程中的Queue基本一样

先进先出

from queue import Queue

q = Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
first
second
third
'''

后进先出

from queue import LifoQueue

q = LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())

'''
输出结果
third
second
first
'''

优先级队列

from queue import PriorityQueue

q = PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((3, 'a'))
q.put((2, 'b'))
q.put((1, 'c'))

print(q.get())
print(q.get())
print(q.get())

Python标准模块——concurrent.futures

concurrent.futures模块提供了高度封装的异步调用接口

ThreadPoolExecutor:线程池,提供异步调用

ProcessPoolExecutor:进程池,提供异步调用

基本方法

submit(fn, *args, **kwargs):异步提交任务

map(func, *iterables, timeout=None, chunksize=1):取代for循环submit的操作

shutdown(wait=True):相当于进程池的pool.close()+pool.join()操作

  • wait=True,等待池内所有任务执行完毕回收完资源后才继续
  • wait=False,立即返回,并不会等待池内的任务执行完毕
  • 但不管wait参数为何值,整个程序都会等到所有任务执行完毕
  • submit和map必须在shutdown之前

result(timeout=None):取得结果

add_done_callback(fn):回调函数

done():判断某一个线程是否完成

cancle():取消某个任务

ProcessPoolExecutor 进程池

from concurrent.futures import ProcessPoolExecutor
import time
import os

# 括号内可以传数字不传的化默认开设当前计算机cpu个数进程
p_pool = ProcessPoolExecutor()
'''
池子造出来之后 里面会固定存几个个进程
这5个进程不会出现重复创建和销毁的过程
'''


def task(n):
    print(n, os.getpid())
    time.sleep(1)
    return n ** n


def call_back(res):
    print(res.result())


if __name__ == '__main__':
    for i in range(1, 25):
        p_pool.submit(task, i).add_done_callback(call_back)
       # 拿到回调结果

    p_pool.shutdown()  # 关闭进程池等进程池中所有任务运行完毕 再往下执行

    print('主')

ThreadPoolExecutor 线程池

#介绍
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

#用法
与ProcessPoolExecutor相同

总结

from concurrent.futures import ProcessPoolExecutor
p_pool = PProcessPoolExecutor()
p_pool.submit(task, i).add_done_callback(call_back)

协程

协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是协程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

需要强调的是:

  1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会*交出cpu执行权限,切换其他线程运行)
  2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)

对比操作系统控制线程的切换,用户在单线程内控制协程的切换。

优点如下:

  1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
  2. 单线程内就可以实现并发的效果,最大限度地利用cpu

缺点如下:

  1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
  2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

总结协程特点:

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里自己保存多个控制流的上下文栈
  4. 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))
'''
进程:资源单位
线程:执行单位
协程:单线程下实现并发
			自己在代码层面上检测我们所有的IO操作
			一旦遇到了IO了 我们在代码级别完成切换
			这样给cpu的感觉是你这个程序一直在运行,没有IO
			从而提升程序运行效率
多道技术:		
		切换+保存状态
		cpu两种切换
		1、程序遇到IO
		2、程序长时间占用
代码是如何做到的
		切换+保存状态
		
切换
		切换不一定是提升效率,也有可能是降低效率
		比如计算密集型切换起来可能还会降低效率
	
		
保存状态
		保存上一次执行的状态 暂时挂起 下一次来接着上一次的操作继续往后面执行


'''

gevent模块

from gevent import monkey

monkey.patch_all()
import time
from gevent import spawn

'''
gevent模块本身无法监测常见的一些IO操作
在使用的时候需要额外导入一个
from gevent import monkey
monkey.patch_all()
'''


def eat():
    print('吃了一口')
    time.sleep(2)
    print('又吃了一口')


def play():
    print('玩了一下')
    time.sleep(3)
    print('又玩了一下')


start = time.time()
g1 = spawn(eat)
g2 = spawn(play)
g1.join()
g2.join()
print(time.time()-start)

greenlet模块

from greenlet import greenlet

def eat(name):
    print('%s eat 1' %name)
    g2.switch('nick')
    print('%s eat 2' %name)
    g2.switch()
def play(name):
    print('%s play 1' %name)
    g1.switch()
    print('%s play 2' %name)

g1=greenlet(eat)
g2=greenlet(play)

g1.switch('nick')#可以在第一次switch时传入参数,以后都不需要

# 单纯的切换(在没有io的情况下或者没有重复开辟内存空间的操作),反而会降低程序的执行速度。手动切换
上一篇:平衡树学习笔记之 fhq Treap


下一篇:2021-09-09