一、IPC(进程间通信)机制
进程之间通信必须找到一种介质,该介质必须满足
1、是所有进程共享的
2、必须是内存空间
附加:帮我们自动处理好锁的问题
进程之间通信必须找到一种介质,该介质必须满足
1、是所有进程共享的
2、必须是内存空间
附加:帮我们自动处理好锁的问题
a、from multiprocessing import Manager(共享内存,但要自己解决锁的问题)
b、IPC中的队列(Queue) 共享,内存,自动处理锁的问题(最常用)
c、IPC中的管道(Pipe),共享,内存,需自己解决锁的问题
#d. 文件,共享,硬盘,需要自己解决锁的问题
a、用Manager
from multiprocessing import Process,Manager,Lock
import time
b、IPC中的队列(Queue) 共享,内存,自动处理锁的问题(最常用)
c、IPC中的管道(Pipe),共享,内存,需自己解决锁的问题
#d. 文件,共享,硬盘,需要自己解决锁的问题
a、用Manager
from multiprocessing import Process,Manager,Lock
import time
mutex=Lock()
def task(dic,lock):
lock.acquire()
temp=dic['num']
time.sleep(0.1)
dic['num']=temp-1
lock.release()
def task(dic,lock):
lock.acquire()
temp=dic['num']
time.sleep(0.1)
dic['num']=temp-1
lock.release()
if __name__ == '__main__':
m=Manager()
dic=m.dict({'num':10})
m=Manager()
dic=m.dict({'num':10})
l=[]
for i in range(10):
p=Process(target=task,args=(dic,mutex))
l.append(p)
p.start()
for i in range(10):
p=Process(target=task,args=(dic,mutex))
l.append(p)
p.start()
for p in l:
p.join()
print(dic)
p.join()
print(dic)
b、用队列Queue
1)共享的空间
2)是内存空间
3)自动帮我们处理好锁定问题
1)共享的空间
2)是内存空间
3)自动帮我们处理好锁定问题
from multiprocessing import Queue
q=Queue(3) #设置队列中maxsize个数为三
q.put('first')
q.put({'second':None})
q.put('三')
# q.put(4) #阻塞。不报错,程序卡在原地等待队列中清出一个值。默认blok=True
print(q.get())
print(q.get())
print(q.get())
q=Queue(3) #设置队列中maxsize个数为三
q.put('first')
q.put({'second':None})
q.put('三')
# q.put(4) #阻塞。不报错,程序卡在原地等待队列中清出一个值。默认blok=True
print(q.get())
print(q.get())
print(q.get())
强调:
1、队列用来存成进程之间沟通的消息,数据量不应该过大
2、maxsize的值超过的内存限制就变得毫无意义
了解:
q=Queue(3)
q.put('first',block=False)
q.put('second',block=False)
q.put('third',block=False)
q.put('fourth',block=False) #报错 queue.Full
1、队列用来存成进程之间沟通的消息,数据量不应该过大
2、maxsize的值超过的内存限制就变得毫无意义
了解:
q=Queue(3)
q.put('first',block=False)
q.put('second',block=False)
q.put('third',block=False)
q.put('fourth',block=False) #报错 queue.Full
q.put('first',block=True)
q.put('second',block=True)
q.put('third',block=True)
q.put('fourth',block=True,timeout=3) #等待3秒后若还进不去报错。注意timeout不能和block=False连用
q.put('second',block=True)
q.put('third',block=True)
q.put('fourth',block=True,timeout=3) #等待3秒后若还进不去报错。注意timeout不能和block=False连用
q.get(block=False)
q.get(block=False)
q.get(block=False)
q.get(block=False) #报错 queue.Empty
q.get(block=False)
q.get(block=False)
q.get(block=False) #报错 queue.Empty
q.get(block=True)
q.get(block=True)
q.get(block=True)
q.get(block=True,timeout=2) #等待2秒后还取不出东西则报错。注意timeout不能和block=False连用
q.get(block=True)
q.get(block=True)
q.get(block=True,timeout=2) #等待2秒后还取不出东西则报错。注意timeout不能和block=False连用
二、生产者消费者模型
该模型中包含两类重要的角色:
1、生产者:将负责造数据的任务比喻为生产者
2、消费者:接收生产者造出的数据来做进一步的处理,该类任务被比喻成消费者
该模型中包含两类重要的角色:
1、生产者:将负责造数据的任务比喻为生产者
2、消费者:接收生产者造出的数据来做进一步的处理,该类任务被比喻成消费者
实现生产者消费者模型三要素
1、生产者
2、消费者
3、队列
1、生产者
2、消费者
3、队列
什么时候用该模型:
程序中出现明显的两类任何,一类任务是负责生产,另外一类任务是负责处理生产的数据的
程序中出现明显的两类任何,一类任务是负责生产,另外一类任务是负责处理生产的数据的
该模型的好处:
1、实现了生产者与消费者解耦和
2、平衡了生产者的生产力与消费者的处理数据的能力
1、实现了生产者与消费者解耦和
2、平衡了生产者的生产力与消费者的处理数据的能力
import time
import random
from multiprocessing import Process,Queue
import random
from multiprocessing import Process,Queue
def consumer(name,q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('\033[46m消费者===》%s 吃了 %s\033[0m' %(name,res))
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('\033[46m消费者===》%s 吃了 %s\033[0m' %(name,res))
def producer(name,q,food):
for i in range(5):
time.sleep(random.randint(1,2))
res='%s%s' %(food,i)
q.put(res)
print('\033[45m生产者者===》%s 生产了 %s\033[0m' %(name,res))
for i in range(5):
time.sleep(random.randint(1,2))
res='%s%s' %(food,i)
q.put(res)
print('\033[45m生产者者===》%s 生产了 %s\033[0m' %(name,res))
if __name__ == '__main__':
#1、共享的盆
q=Queue()#
#1、共享的盆
q=Queue()#
#2、生产者们
p1=Process(target=producer,args=('egon',q,'包子'))
p2=Process(target=producer,args=('刘清政',q,'泔水'))
p3=Process(target=producer,args=('杨军',q,'米饭'))
p1=Process(target=producer,args=('egon',q,'包子'))
p2=Process(target=producer,args=('刘清政',q,'泔水'))
p3=Process(target=producer,args=('杨军',q,'米饭'))
#3、消费者们
c1=Process(target=consumer,args=('alex',q))
c2=Process(target=consumer,args=('梁书东',q))
c1=Process(target=consumer,args=('alex',q))
c2=Process(target=consumer,args=('梁书东',q))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p2.start()
p3.start()
c1.start()
c2.start()
生产者消费者模型是解决问题的思路不是技术。可以用进程和队列来实现,也可以用其他的来实现。
三、守护进程与应用
#如果父进程将子进程设置为守护进程,那么在主进程 代码运行完毕 后守护进程就立即被回收
注意:代码运行完 和 结束(进程死了)的区别
import time
import random
from multiprocessing import Process,Queue
#如果父进程将子进程设置为守护进程,那么在主进程 代码运行完毕 后守护进程就立即被回收
注意:代码运行完 和 结束(进程死了)的区别
import time
import random
from multiprocessing import Process,Queue
方法一:
def consumer(name,q):
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print('\033[46m消费者===》%s 吃了 %s\033[0m' %(name,res))
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print('\033[46m消费者===》%s 吃了 %s\033[0m' %(name,res))
def producer(name,q,food):
for i in range(5):
time.sleep(random.randint(1,2))
res='%s%s' %(food,i)
q.put(res)
print('\033[45m生产者者===》%s 生产了 %s\033[0m' %(name,res))
for i in range(5):
time.sleep(random.randint(1,2))
res='%s%s' %(food,i)
q.put(res)
print('\033[45m生产者者===》%s 生产了 %s\033[0m' %(name,res))
if __name__ == '__main__':
#1、共享的盆
q=Queue()
#1、共享的盆
q=Queue()
#2、生产者们
p1=Process(target=producer,args=('egon',q,'包子'))
p2=Process(target=producer,args=('刘清政',q,'泔水'))
p3=Process(target=producer,args=('杨军',q,'米饭'))
p1=Process(target=producer,args=('egon',q,'包子'))
p2=Process(target=producer,args=('刘清政',q,'泔水'))
p3=Process(target=producer,args=('杨军',q,'米饭'))
#3、消费者们
c1=Process(target=consumer,args=('alex',q))
c2=Process(target=consumer,args=('梁书东',q))
c1=Process(target=consumer,args=('alex',q))
c2=Process(target=consumer,args=('梁书东',q))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p2.start()
p3.start()
c1.start()
c2.start()
# 在生产者生产完毕后,往队列的末尾添加一个结束信号None
p1.join()
p2.join()
p3.join()
# 有几个消费者就应该放几个结束信号
q.put(None)
q.put(None)
p1.join()
p2.join()
p3.join()
# 有几个消费者就应该放几个结束信号
q.put(None)
q.put(None)
方法二:
import time
import random
from multiprocessing import Process,JoinableQueue
import time
import random
from multiprocessing import Process,JoinableQueue
def consumer(name,q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('\033[46m消费者===》%s 吃了 %s\033[0m' %(name,res))
q.task_done() #拿一个减一个,与q.join()有联系
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('\033[46m消费者===》%s 吃了 %s\033[0m' %(name,res))
q.task_done() #拿一个减一个,与q.join()有联系
def producer(name,q,food):
for i in range(5):
time.sleep(random.randint(1,2))
res='%s%s' %(food,i)
q.put(res)
print('\033[45m生产者者===》%s 生产了 %s\033[0m' %(name,res))
for i in range(5):
time.sleep(random.randint(1,2))
res='%s%s' %(food,i)
q.put(res)
print('\033[45m生产者者===》%s 生产了 %s\033[0m' %(name,res))
if __name__ == '__main__':
#1、共享的盆
q=JoinableQueue()
#1、共享的盆
q=JoinableQueue()
#2、生产者们
p1=Process(target=producer,args=('egon',q,'包子'))
p2=Process(target=producer,args=('刘清政',q,'泔水'))
p3=Process(target=producer,args=('杨军',q,'米饭'))
p1=Process(target=producer,args=('egon',q,'包子'))
p2=Process(target=producer,args=('刘清政',q,'泔水'))
p3=Process(target=producer,args=('杨军',q,'米饭'))
#3、消费者们
c1=Process(target=consumer,args=('alex',q))
c2=Process(target=consumer,args=('梁书东',q))
c1.daemon=True # c1.daemon=True 必须在c1.start() 前
c2.daemon=True
c1=Process(target=consumer,args=('alex',q))
c2=Process(target=consumer,args=('梁书东',q))
c1.daemon=True # c1.daemon=True 必须在c1.start() 前
c2.daemon=True
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p2.start()
p3.start()
c1.start()
c2.start()
# 确定生产者确确实实已经生产完毕
p1.join()
p2.join()
p3.join()
# 在生产者生产完毕后,拿到队列中元素的总个数,然后直到元素总数变为0,q.join()这一行代码才算运行完毕
q.join()
#一旦结束意味着队列确实被取空,消费者已经确确实实把数据都取干净了
print('主进程结束')
p1.join()
p2.join()
p3.join()
# 在生产者生产完毕后,拿到队列中元素的总个数,然后直到元素总数变为0,q.join()这一行代码才算运行完毕
q.join()
#一旦结束意味着队列确实被取空,消费者已经确确实实把数据都取干净了
print('主进程结束')