Python开发之路 - 锁、信号量 、队列

1.并发&并行

并发:是指系统具有处理多个任务(动作)的能力---CPU切换速度快 但任务不在同一时间节点上跑

并行:是指系统具有同时处理多个任务(动作)的能力 --- 多核时 任务在同一时间节点上跑

并行是并发的子集

2.同步&异步

同步:当程序执行到一个IO(等待外部数据)的时候,你------等:同步  打电话

异步:当程序执行到一个IO(等待外部数据)的时候,你------不等:一直等到数据接收成功,再回来处理  发短信

3.GIL(Global Interprete Lock)全局解释器锁

'''
定义:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)
'''
结论:在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势

首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL

任务分两种:

1.IO密集型 (accept,recv) CPU会有空闲的时候 time.sleep()也等同于IO操作

2.计算密集型

对于IO密集型的任务,python的多线程是有意义的

可以采用多进程+协程利用多核

对于IO密集型的任务,python的多线程不推荐,python就不适用了

4.同步锁(也叫互斥锁)

三个需要注意的点:
#1.线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来

#2.join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高

#3. 一定要看本小节最后的GIL与互斥锁的经典分析

Python开发之路 - 锁、信号量 、队列

import threading,time
sum = 100

def sub():
    global sum
    temp = sum
    time.sleep(0.1) #此时会发生时间轮循,有些线程来不及进行下一步就被解释器要求释放GIL锁
    sum = temp - 1


l = []
for i in range(100):
    t = threading.Thread(target=sub)
    t.start()
    l.append(t)

for a in l:
    a.join()
print(sum)

 

此时输出结果是99

那么问题来了,怎么让sub函数在执行过程中不被CPU切换呢?------答案 同步锁

用锁把需要一次性由CPU执行的部分用acquire和release包起来

Python开发之路 - 锁、信号量 、队列
#同步锁
import threading,time
sum = 100

def sub():
    global sum
    lock.acquire() #同步锁
    temp = sum
    time.sleep(0.1) #此时会发生时间轮循
    sum = temp - 1
    lock.release()


l = []
lock = threading.Lock()
for i in range(100):
    t = threading.Thread(target=sub)
    t.start()
    l.append(t)

for a in l:
    a.join()
print(sum)
同步锁解决

 

因此以后在多个线程共享一个数据时,一定要酌情添加同步锁

5.死锁现象与递归锁

进程也有死锁与递归锁,在进程那里忘记说了,放到这里一切说了额

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

Python开发之路 - 锁、信号量 、队列
from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A锁\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B锁\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B锁\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A锁\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

'''
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁
然后就卡住,死锁了
'''
死锁

 解决方法:递归锁 RLock 内部有计数器 每创建一把锁就加一,如下:

Python开发之路 - 锁、信号量 、队列
import threading
import time
class Mythread(threading.Thread):
    def actionA(self):
        r_lock.acquire() # 递归锁 内部有计数器 count = 1
        print(self.name,'got LockA ',time.ctime())  #--> self.name 为线程的名字
        time.sleep(2)

        r_lock.acquire() # count = 2
        print(self.name, 'got LockB ', time.ctime())
        time.sleep(1)

        r_lock.release() #count = 1
        r_lock.release() #count = 0
    def actionB(self):
        r_lock.acquire()
        print(self.name, 'got LockB ', time.ctime())
        time.sleep(2)

        r_lock.acquire()
        print(self.name, 'got LockA ', time.ctime())
        time.sleep(1)

        r_lock.release()
        r_lock.release()

    def run(self):
        self.actionA()
        self.actionB()


if __name__ == '__main__':
    # A = threading.Lock()
    # B = threading.Lock()
    r_lock = threading.RLock()

    for i in range(5):
        t = Mythread()
        t.start()
        l = []
        l.append(t)

    for i in l:
        i.join()

    print('ending')
递归锁

 6.同步条件 同进程中 event对象在多个线程里状态是一致的 因为大家用的是一个共同的event对象

flag标志位

同进程的一样

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False。

 

import threading,time

class Boss(threading.Thread):
    def run(self):
        print('Boss:今晚大家要加班到22:00!')
        print(event.isSet()) #False
        event.set()
        time.sleep(5)
        print('Boss:<22:00>了 可以下班了')
        print(event.isSet())
        event.set()

class Worker(threading.Thread):
    def run(self):
        event.wait()  #工人在这里阻塞着,什么时候条件满足了再往下走 一旦event被设定,此处等同于pass
        print('Worker:哎。。。命苦啊!')
        time.sleep(1)
        event.clear()
        event.wait()
        print('Worker:Yeah!')

if __name__ == '__main__':
    event = threading.Event()

    l = []
    for i in range(5):
        l.append(Worker())
    l.append(Boss())

    for i in l:
        i.start()
    for t in l:
        t.join()

    print('ending')

 

7.信号量Semaphore

同进程的一样,类似于停车场

Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):

import threading,time

class Mythread(threading.Thread):
    def run(self):
        if semaphore.acquire():
            print(self.name)
            time.sleep(5)
            semaphore.release()

if __name__ == '__main__':
    semaphore = threading.Semaphore(5)
    l = []
    for i in range(100):
        l.append(Mythread())
    for t in l:
        t.start()

 

此时就是5个线程5个线程的进出

8.多线程利器 - 队列(queue) !!!重点  用于保护线程数据安全 

先进先出:first in first out

import queue #线程队列
q = queue.Queue() # 三种顺序 1.FIFO(默认) 先进先出 2.先进后出
q.put(12)
q.put('hello')
q.put({'name':'chris'})
q.put(42,False) # 此时队列满了不会进入同步状态 而是报错--
while 1:
    data = q.get()
    print(data)
    print('----')

 

先进后出:Lifo later in first out

Python开发之路 - 锁、信号量 、队列
import queue #线程队列
q = queue.LifoQueue() # 后进先出
q.put(12)
q.put('hello')
q.put({'name':'chris'})

while 1:
    data = q.get()
    print(data)
    print('----')
先进后出

 

优先级模式:

Python开发之路 - 锁、信号量 、队列
import queue #线程队列
q = queue.PriorityQueue()
q.put([7,12])
q.put([3,'hello'])
q.put([6,{'name':'chris'}])

while 1:
    data = q.get()
    print(data[1])
    print('----')
优先级模式

 

更多方法:

Python开发之路 - 锁、信号量 、队列
Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty
Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception queue.Full
Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

Queue.qsize()
Queue.empty() #return True if empty  
Queue.full() # return True if full 
Queue.put(item, block=True, timeout=None)
Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue.put_nowait(item)
Equivalent to put(item, False).

Queue.get(block=True, timeout=None)
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue.get_nowait()
Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue.task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue.join() block直到queue被消费完毕

更多方法说明
其他队列的方法

9.生产者消费者模型

 为什么要使用生产者消费者模型:

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等消费者处理完,才能继续生产数据,反之也是,为了解决这个问题,于是引入了生产者消费者模型。

什么是生产者消费者模式:

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者之间不直接通信,而通过阻塞队列来通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列(阻塞队列:生产者产出时容器已满,此时阻塞;或者消费要取出数据时,容器为空,此时也阻塞)相当于一个缓冲区,平衡了生产者和消费者的处理能力。

通过队列来承担容器进行解耦:

Python开发之路 - 锁、信号量 、队列
import threading,random
import time,queue

q = queue.Queue() #队列对象

def producer(name):
    count = 0
    while q.qsize()< 10:
        print('making......')
        time.sleep(random.randrange(3))
        q.put(count)
        print('Producer %s has making %s 个包子' %(name,count))
        count += 1
        q.task_done() #向队列发送信息
        print('OK')

def consumer(name):
    count = 0

    while count < 10:
        time.sleep(random.randrange(2))
        if not q.empty():
            data = q.get()
            # print(data)
            q.join() #如果生产者那边没有发送task_done 信号 就不往下面执行
            print('\033[32:1m Consumer %s has eating %s 个包子...\033[0m' %(name,data))

        else:
            print('no more baozi....')
p1 = threading.Thread(target=producer,args=('liuzong',))
c1 = threading.Thread(target=consumer,args=('chris',))

p1.start()
c1.start()
生产者消费者模型

q.task_done() 与 q.join() 作为一对信号通信 一端未发送q.task_done() 另一端的join() 就会卡住  直到信号传输过来

上一篇:【Python学习笔记八】用threading.Thread实现多线程执行


下一篇:Java核心技术读书笔记6-5 Java代理