# ### lock (互斥锁)
"""
# 应用在多进程当中
# 互斥锁lock : 互斥锁是进程间的get_ticket互相排斥
进程之间,谁先抢占到资源,谁就先上锁,等到解锁之后,下一个进程在继续使用
"""
lock.acquire()# 上锁
lock.release()# 解锁
#同一时间允许一个进程上一把锁 就是Lock
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行(同步)的修改,没错,速度是慢了,但牺牲速度却保证了数据安全。
#同一时间允许多个进程上多把锁 就是[信号量Semaphore]
信号量是锁的变形: 实际实现是 计数器 + 锁,同时允许多个进程上锁
# 互斥锁Lock : 互斥锁就是进程的互相排斥,谁先抢到资源,谁就上锁改资源内容,为了保证数据的同步性
# 注意:多个锁一起上,不开锁,会造成死锁.上锁和解锁是一对.
代码如下
lock = Lock()
# 上锁
lock.acquire()
print(1)
# 上锁和解锁是一对,一对一的关系,如果只上锁不解锁,会发生死锁
lock.acquire()
print(2)
lock.release() # 解锁
lock.release()
import json
# 读取票数,更新票数
def wr_info(sign,dic=None):
if sign == "r":
with open("ticket",mode="r",encoding="utf-8") as fp:
dic = json.load(fp)
return dic elif sign == "w":
with open("ticket",mode="w",encoding="utf-8") as fp:
json.dump(dic,fp)
这段代码里函数是根据我所传入的状态来执行并且返回相应的条件,如果我是传入的是一个“r”那么我当前处于读取的模式会把文件里面票数读出来,并且返回给调用处,如果我传入的是“w”模式那么我当前是写的模式把我当前的状态写入票卡文件夹里
进程在创建的时候是异步创建的,在上锁的时候同步进行的
这里介绍下Lock的简写
首先实例化一个lock对象
l=Lock()
with l:
code...
这里相当于传统的写法
l.acquire()
code...
l.release()
信号量 Semaphore
同一时间可以上多把锁即首先实例化一个semaphore对象,然后指定你要上锁的数量由进程调用
# ### 信号量 Semaphore 本质上就是锁,同一时间可以上多把锁
"""
# 语法:
sem = Semaphore(3)
sem.acquire()
sem.release()
""" import time
import random
from multiprocessing import Process,Semaphore
def ktv(person,sem):
sem.acquire()
print("%s进入了ktv,正在唱歌" % (person))
time.sleep(random.randrange(3,6))
print("%s唱完了,离开了ktv" % (person))
sem.release() if __name__ == "__main__":
sem = Semaphore(3)
for i in range(10):
p = Process(target=ktv,args=("person%s" % (i), sem))
p.start() """
lock 多个进程之间,一次只能上一把锁
Semaphore 多个进程之间,可以自定义上锁的数量,不限于一个
"""
用法如上.....
# ### 事件 (Event)
# 阻塞事件 :
e = Event()生成事件对象e
e.wait()动态给程序加阻塞 , 程序当中是否加阻塞完全取决于该对象中的 is_set() [默认返回值是False]
# 如果是True 不加阻塞
# 如果是False 加阻塞
# 控制这个属性的值
# set()方法 将这个属性的值改成True
# clear()方法 将这个属性的值改成False
# is_set()方法 判断当前的属性是否为True (默认上来是False)
事件Event只针对于多进程,在普通的同步进程中是不会出现Event事件的,那么当我使用事件时一个进程中最少需要保持一个子进程去进行修改状态为True另外一个进程去获取状态后在修改False状态,造成的程序也会变成同步,两个进程之间同时保持着紧密的联系
以下通过代码方式实现事件event的过程
def traffic_light(e):
# 默认红灯亮
print("红灯亮")
while True:
if e.is_set():
# 让绿灯亮1秒钟
time.sleep(1)
# 切换成红灯
print("红灯亮")
# 把True 改成False
e.clear()
else:
# 让红灯亮1秒钟
time.sleep(1)
# 切换成绿灯
print("绿灯亮")
# 把默认值从False 改成True
e.set() # e = Event()
# traffic_light(e) def car(e,i):
# 判断如果是红灯亮,就执行下面代码
if not e.is_set():
print("car%s 在等待" % (i))
e.wait()
print("car%s 通行了" % (i)) """
# 方法一
if __name__ == "__main__":
e = Event()
# 创建交通灯对象
p1 = Process(target=traffic_light,args=(e,))
p1.start() # 创建小车
for i in range(20):
time.sleep(random.randrange(0,2))
p2 = Process(target=car,args=(e,i))
p2.start()
""" # 方法二:优化红绿灯代码[当小车执行结束的时候,把红绿灯终止]
if __name__ == "__main__":
lst = []
e = Event()
# 创建交通灯对象
p1 = Process(target=traffic_light,args=(e,))
p1.daemon=True
p1.start() # 创建小车
for i in range(20):
time.sleep(random.randrange(0,2))
p2 = Process(target=car,args=(e,i))
p2.start()
lst.append(p2) # 等所有小车都同行之后,在关闭守护进程
for i in lst:
i.join() print("主程序执行结束... ")
通过程序我们可以看出我们得通过事件的is_set方法来判定当前事件的状态是为False还是为True,为False执行car函数为True执行traffic_light函数
进程之队列
# ### 进程队列 [让进程之间共享资源]
from multiprocessing import Process,Queue
# 线程队列
import queue
"""先进先出"""
(1) 基本语法
q = Queue()
1.用put方法往队列中存值
q.put(111)
2.用get方法从队列中取值
res = q.get()
print(res)
3.当队列中没有值了,在调用get就会发生阻塞
res = q.get()
print(res)
4.get_nowait 在没值的时候,直接报错;存在兼容性问题(不推荐使用,报错报的是线程队列中的空,在取值时存在bug)
res = q.get_nowait()
print(res)
# (了解)
try:
res = q.get_nowait()
print(res)
except queue.Empty:
pass
# (2) 可以适用queue 指定队列长度
q1 = Queue(3)
q1.put(11)
q1.put(22)
q1.put(33)
注意:如果超出了队列的长度,直接阻塞
# q1.put(44)
注意:如果超出了队列的长度,直接报错(不推荐)
# q1.put_nowait(44)
# (3) 多进程之间共享数据
def func(q):
# 2.在子进程中获取数据
res = q.get()
print(res)
# 3.子进程添加数据
q.put("bbb") if __name__ == "__main__":
q2 = Queue()
p1 = Process(target=func,args=(q2,))
p1.start() # 1.在主进程中,添加数据
q2.put("aaa") # 为了能够拿到子进程中添加的队列元素,需要等待子进程执行结束后在获取
p1.join() # 4.主进程获取子进程添加的数据
res = q2.get()
print("主进程执行结束: 值%s" % (res))
生产者与消费者模型
# (1) 基本语法
from multiprocessing import Process,Queue
import time,random
# (1) 优化生产者和消费者模型 [生产者生产多少,对应的就消费多少]
# 消费者模型
def consumer(q,name):
while True:
food = q.get()
if food is None:
break
time.sleep(random.uniform(0.1,1))
print("%s 吃了一个%s" % (name,food)) # 生产者模型
def producer(q,name,food):
for i in range(5):
time.sleep(random.uniform(0.1,1))
print("%s 生产了 %s%s" % (name,food,i))
q.put(food+str(i)) if __name__ == "__main__":
q = Queue()
# 创建生产者
p1 = Process(target=producer,args=(q,"周永玲","便便"))
p1.start() # 创建生产者2号
p2 = Process(target=producer,args=(q,"常远","茶叶"))
p2.start() # 创建消费者
c1 = Process(target=consumer,args=(q,"张龙"))
c1.start() # 创建消费者2号
c2 = Process(target=consumer,args=(q,"林银展"))
c2.start() p1.join()
p2.join() # 在生产完所有数据之后,在队列的最后塞进去一个None,用来表达已经生产完所有数据;
q.put(None) # 便便0 便便1 便便2 便便3 便便4 None
q.put(None) # 便便0 便便1 便便2 便便3 便便4 茶叶1 茶叶2 茶叶3 茶叶4 None None print("主程序执行结束 ... ")
JoinableQueue 阻塞事件
put 存放
get 获取
task_done 队列数据减一
join 阻塞
task_done 与 join 通过一个中间变量统计队列中元素个数
每放入一个值 , 成员中的中间变量值加1
没执行一次task_done,成员中的中间变量值减1
join 会根据中间变量值来确定是阻塞还是放行
如果中间变量是0 意味着放行
如果中间变量不是0 意味着阻塞
1 # (1) 基本语法
"""
jq = JoinableQueue()
jq.put("aabbcc")
print(jq.get())
jq.task_done()
jq.join()
print("finish")
"""
生产者与消费者模型改造
# 消费者模型
def consumer(q,name):
while True:
food = q.get()
time.sleep(random.uniform(0.1,1))
print("%s 吃了一个%s" % (name,food))
q.task_done()
# 生产者模型
def producer(q,name,food):
for i in range(5):
time.sleep(random.uniform(0.1,1))
print("%s 生产了 %s%s" % (name,food,i))
q.put(food+str(i)) if __name__ == "__main__":
# 创建队列
jq = JoinableQueue()
# 消费者进程
c1 = Process(target=consumer,args=(jq,"张晓东"))
c1.daemon = True
c1.start() # 生产者进程
p1 = Process(target=producer,args=(jq,"黄乐锡","大茄子"))
p1.start() # 等待生产者把所有数据放到队列中;
p1.join()
# 直到所有数据被消费完毕之后,放行;
jq.join() print("主进程执行结束 ... ")
# ### Manager list dict 能够实现进程之间的数据共享
如果多个进程同时修改同一份共享数据,这个时候需要加锁,保证数据的准确性
(1) dict list 可以实现进程之间的数据共享
(2) 为了保证数据的准确性,需要加锁
manager实现的原理跟队列的实现原理是一样的,但又有所区别,队列时直接对元素进行操作的,而manager是直接对字典或list进行操作的,本质是一样的,但内容不同
from multiprocessing import Process,Manager,Lock
def work(dic,lock):
# with 语法 可以自动的上锁和解锁
with lock:
dic["count"] -= 1
"""
lock.acquire()
dic["count"] -= 1
lock.release()
""" if __name__ == "__main__":
lst = []
# 创建一把锁
lock = Lock()
m = Manager()
dic = m.dict( {"count":10000} ) for i in range(100):
p = Process(target=work,args=(dic,lock))
p.start()
lst.append(p) for i in lst:
i.join() print(dic,type(dic))