一. 基本定义
互斥锁(英语:英语:Mutual exclusion,缩写 Mutex)是一种用于多线程编程中,防止两条线程同时对同一公共资源(比如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区域(critical section)达成。临界区域指的是一块对公共资源进行访问的代码,并非一种机制或是算法。一个程序、进程、线程可以拥有多个临界区域,但是并不一定会应用互斥锁。
二. 互斥锁的作用
加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,没错,速度是慢了,牺牲了速度而保证了数据安全。
三. 互斥锁例子
互斥锁常见的应用常见就是我们抢票的时候,10个同时查看到还有2张剩余票数.然后开始提交购票请求,但是能买到票的只有2个人.也就是说查看票数是并行操作,但是购票却是串行操作.
如果都是并行操作,那么数据就会造成10个人同时都在对剩余票数进行操作,将会导致2张票买给了10个人.所以,我们必须保证,串行操作,且下一个人操作的数据,是在上一个人操作后,已经更新的数据.
from multiprocessing import Process,Lock
import time
import json def search(name):
time.sleep(1)
dic = json.load(open("db.txt","r",encoding="utf-8"))
print("<%s> 剩余票数:%s"%(name,dic["count"])) def get(name,mutex):
mutex.acquire() # 互斥锁加在这里,保证同时只能有一个人在购票.
time.sleep(1)
dic = json.load(open("db.txt","r",encoding="utf-8"))
if dic["count"]>0:
print("开始购票")
time.sleep(2)
dic["count"]-=1
json.dump(dic,open("db.txt","w",encoding="utf-8"))
print("<%s>购票成功."%name) else:
print("<%s>购票失败."%name)
mutex.release() # 释放互斥锁 def task(name,mutex):
search(name)
get(name,mutex)
互斥锁_模拟抢票
四. 互斥锁与join
既然互斥锁的将原本并行执行才程序,变成了串行执行,那么我们之前学习的jion方法,不就可以完成这个操作吗?为什么还要用互斥锁呢?
我们把刚刚的例子改成用jion的方法,看看会出现什么问题.
from multiprocessing import Process,Lock
import time
import json def search(name):
time.sleep(1)
dic = json.load(open("db.txt","r",encoding="utf-8"))
print("<%s> 剩余票数:%s"%(name,dic["count"])) def get(name):
# mutex.acquire() # 互斥锁加在这里,保证同时只能有一个人在购票.
time.sleep(1)
dic = json.load(open("db.txt","r",encoding="utf-8"))
if dic["count"]>0:
print("开始购票")
time.sleep(2)
dic["count"]-=1
json.dump(dic,open("db.txt","w",encoding="utf-8"))
print("<%s>购票成功."%name) else:
print("<%s>购票失败."%name)
# mutex.release() # 释放互斥锁 def task(name):
search(name)
get(name) if __name__ == '__main__':
# mutex = Lock() #生成锁
for i in range(10):
p = Process(target=task,args=("路人%s"%i,))
p.start()
p.join()
join方法_模拟购票
<路人0> 剩余票数:2
开始购票
<路人0>购票成功.
<路人1> 剩余票数:1
开始购票
<路人1>购票成功.
<路人2> 剩余票数:0
<路人2>购票失败.
<路人3> 剩余票数:0
<路人3>购票失败.
<路人4> 剩余票数:0
<路人4>购票失败.
<路人5> 剩余票数:0
<路人5>购票失败.
<路人6> 剩余票数:0
<路人6>购票失败.
<路人7> 剩余票数:0
<路人7>购票失败.
<路人8> 剩余票数:0
<路人8>购票失败.
<路人9> 剩余票数:0
<路人9>购票失败.
发现使用join将并发改成穿行,确实能保证数据安全,但问题是连查票操作也变成只能一个一个人去查了,很明显大家查票时应该是并发地去查询而无需考虑数据准确与否,此时join与互斥锁的区别就显而易见了,join是将一个任务整体串行,而互斥锁的好处则是可以将一个任务中的某一段代码串行,比如只让task函数中的get任务串行.
五. 总结
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1、效率低(共享数据基于文件,而文件是硬盘上的数据)
2、需要自己加锁处理
因此我们最好找寻一种解决方案能够兼顾:
1、效率高(多个进程共享一块内存的数据)
2、帮我们处理好锁问题。
这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,因而队列才是进程间通信的最佳选择。
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
六. 补充知识---Process对象的其他属性
p.daemon :守护进程(必须在开启之前设置守护进程):如果父进程死,子进程p也死了
p.join:父进程等p执行完了才运行主进程,是父进程阻塞在原地,而p仍然在后台运行。
terminate:强制关闭。(确保p里面没有其他子进程的时候关闭,如果里面有子进程,你去用这个方法强制关闭了就会产生僵尸进程(打个比方:如果你老子挂了,你还没挂,那么就没人给你收尸了,啊哈哈))
is_alive:关闭进程的时候,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
p.join():父进程在等p的结束,是父进程阻塞在原地,而p仍然在后台运行
p.name:查看名字
p.pid :查看id
七. 队列Queue
队列介绍:
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
创建队列的类(底层就是以管道和锁定的方式实现):
Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
参数介绍: maxsize是队列中允许最大项数,省略则无大小限制。
但需要明确:
1、队列内存放的是消息而非大数据
2、队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小
主要方法介绍: q.put方法用以插入数据到队列中。
q.full() 判断队列是否已经满了
q.get方法可以从队列读取并且删除一个元素。
q.empty() 判断队列是否已经没有数据了
from multiprocessing import Queue q=Queue(3) q.put(1)
q.put(2)
q.put(3)
print(q.full()) #满了
# q.put(4) #再放就阻塞住了 print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
# print(q.get()) #再取就阻塞住了
队列例子