week6 day3 并发编程之多进程 代码

week6 day3 并发编程之多进程 代码

一、multiprocessing模块介绍

python中的多线程无法利用多核优势,如果想充分地使用多核CPU的资源(用os.cpu_count()查看),在python中大部分情况需要用到多进程。python提供了multiprocessing。multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。multiprocessing模块的功能众多:支持子进程、通信和共享数据,执行不同形式的同步,提供了Process,Queue,Pipe,Lock等组件。需要再次强调的一点是,与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程的名称空间内

二、Process类的介绍

2.1 创建进程的类

创建进程的类(自定义进程的类,自定义进程的类必须要有run内置方法):

from multiprocessing import Process
Process([group [, target [, name [, args [, kwargs]]]]])

强调:

  1. 需要使用关键字的方式来指定参数
  2. args指定地为传给target函数的位置参数,是一个元组,必须有逗号

2.2 参数介绍

group参数未使用,值始终为None;
target表示调用对象,即子进程要执行的任务;
args表示调用对象的位置参数元组args=(1,2,'egon',)
kwargs表示调用对象的关键字参数字典,kwargs={'name':'egon','age':18}
name为子进程的名字。

2.3 方法介绍

内置方法 功能
p.start() 启动进程,并调用该子进程中的p.run()
p.run() 进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要定义该方法
p.terminate() 强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成为了僵尸进程,使用该方法要特别小心这种情况。如果p还保存了一个锁那么也不会被释放,进而导致死锁
p.is_alive() 如果p子进程仍然运行,返回True
p.join([timeout]) 主进程等待p终止(注意:是主进程处于等的状态,而p是处于运行态)。timeout是可选的超时时间。需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程。

2.4 属性介绍

内置方法 功能
p.daemon=True 默认值为False,如果为True,代表p为后台运行的守护进程,当p的父进程代码运行完毕时,p也随之终止
p.name 进程的名称
p.pid 进程的pid
p.exitcode 进程在运行时为None,如果为-N,表示信号被N结束(了解即可)
p.authkey 进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

三、Process类的使用

在windows中Process()必须放到if __name__=='__main__':

详细解释:Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. 
If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). 
This is the reason for hiding calls to Process() inside

if __name__ == "__main__"
since statements inside this if-statement will not get called upon import.
由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。 
如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。 
这是隐藏对Process()内部调用的原,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。

3.1 创建并开启子进程的两种方式

# 开启进程的第一种方式:
from multipleprocessing import Process
import time

def task(name):
	print('%s is running'%name)
	time.sleep(3)
	print('%s is running'%name)
'''
windows操作系统下,创建进程一定要在main内创建
因为windows下创建进程类似于模块的导入方式,会从上往下执行代码。
linux中则是将代码完整的拷贝一份
'''
if __name__='__main__':
	# 创建进程的代码
	# 1.创建一个进程
	p = Process(target=task,args=('jason',)) # 只有需要给定义的函数传参时才需要传参,也可以使用kwargs关键字传参
	# 容器里面哪怕只有一个元素,建议要用逗号隔开
	# 2.启动进程
	p.start() # 告诉操作系统帮你创建一个进程
# 第二种方式:类的继承
from multiprocessing import Porcess

class MyProcess(Process): # 自定义类,继承Process
	def run(self): # run如果定义时候不能放参数,可以使用这种方法,包在一个类里面,把参数定义成类的属性,直接调用
		print('hello world')

if __name__ == '__main__':
	p=MyProcess() # 如果有参数的话,在实例化类的时候传参
	p.start()
	print('主')

3.2 进程之间的内存空间是隔离的

from multiprocessing import Process

n=100

def work():
    global n
    n=1
    print('子进程内:',n)

if __name__ == '__main__':
    p=Process(target=work)
    p.start()
    print('主',n)

3.3 练习:把之前所学的socket通信编程并发的形式

# 服务端
from socket import *
from multiprocessing import Process

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__': #windows下start进程一定要写到这下面
    while True:
        conn,client_addr=server.accept()
        p=Process(target=talk,args=(conn,client_addr))
        p.start()
# 客户端
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

3.4 Process对象的join方法

join:主进程等待子进程结束

# join例子一
from multiprocessing import Process
import time

def task(n):
	time.sleep(n)
	
if __name__=='__main':
	s1=Process(target=task,args=(1,))
	s2=Process(target=task,args=(2,))
	s3=Process(target=task,args=(3,))
	start=time.time()
	s1.start()
	s2.start()
	s3.start()
	s1.join()
	s2.join()
	s3.join()
	print(time.time()-start)-----># 这个时间会是多少呢?

# join例子二
from multiprocessing import Process
import time

class MyRequest(Process):
	def __init__(self,n):
		super().__init__()
		self.n=n
	def run(self):
		time.sleep(self.n)

if __name__=='__main__':
	s1=MyRequest(1)
	s2=MyRequest(2)
	s3=MyRequest(3)
	start=time.time()
	s1.start()
	s1.join()
	s2.start()
	s2.join()
	s3.start()
	s3.join()
	print(time.time()-start)-----># 这个答案又是多少呢?例一和例二有什么区别呢

接下来思考一个问题,有了join,程序不就是串行了吗?

from multiprocessing import Process
import time
import random
def piao(name):
    print('%s is piaoing' %name)
    time.sleep(random.randint(1,3))
    print('%s is piao end' %name)
if __name__ == '__main__':

    p1=Process(target=piao,args=('egon',))
    p2=Process(target=piao,args=('alex',))
    p3=Process(target=piao,args=('yuanhao',))
    p4=Process(target=piao,args=('wupeiqi',))

    p1.start()
    p2.start()
    p3.start()
    p4.start()

#有的同学会有疑问:既然join是等待进程结束,那么我像下面这样写,进程不就又变成串行的了吗?
#当然不是了,必须明确:p.join()是让谁等?
#很明显p.join()是让主线程等待p的结束,卡住的是主线程而绝非进程p,

#详细解析如下:
#进程只要start就会在开始运行了,所以p1-p4.start()时,系统中已经有四个并发的进程了
#而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键
#join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其余p2,p3,p4仍然在运行,等#p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接通过检测,无需等待
# 所以4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间
    p1.join()
    p2.join()
    p3.join()
    p4.join()

    print('主线程')

#上述启动进程与join进程可以简写为
# p_l=[p1,p2,p3,p4]
#
# for p in p_l:
#     p.start()
#
# for p in p_l:
#     p.join()

3.5 Process对象的其他方法或者属性(了解)

terminateis_alive

#进程对象的其他方法一:terminate,is_alive
from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s is piao end' %self.name)


p1=Piao('egon1')
p1.start()

p1.terminate()#关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
print(p1.is_alive()) #结果为True

print('开始')
print(p1.is_alive()) #结果为False

namepid

from multiprocessing import Process
import time
import random
class Piao(Process):
    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法会执行self.name=Piao-1,
        #                    #所以加到这里,会覆盖我们的self.name=name

        #为我们开启的进程设置名字的做法
        super().__init__()
        self.name=name

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)

p=Piao('egon')
p.start()
print('开始')
print(p.pid) #查看pid

总结


创建进程p.start()就是应用程序徐向操作系统发送请求,操作系统接受到请求后,在内存中开辟一块名称空间,同时将父进程名称空间中的内容一模一样的copy给子进程的名称空间进程之间是相互隔离的。一个进程对应内存中的一块独立的内存空间,多个进程对应在内存空间中就是多块独立的名称空间。进程与进程之间数据默认情况下是无法直接交互的,如果想让进程交互,需要借助第三方模块。

3.6 僵尸进程与孤儿进程

3.6.1 僵尸进程

我们知道在unix/linux中,正常情况下,子进程是通过进程创造的,子进程再创造新的进程。子进程的结束和父进程的运行是一个异步过程。子进程的结束和父进程的运行是一个异步过程,即父进程永远无法预测子进程是什么时候结束的。当一个进程完成了它的工作终止后,它的父进程需要调用wait()或者waitpid()系统调用取得子进程的终止状态。linux系统中所有的子进程死后都会进入僵尸进程状态僵尸进程本身并没有害,只有当产生了大量的僵尸进程并且僵尸进程没有被回收的情况下才会出现内存还有空间,但是无法开启新进程的情况。

僵尸进程就是死了但是没有死透。当你启动进程之后,该进程死后会释放重型资源(CPU,内存,打开的文件),但是仍会占据进程号PID等状态信息。因为我要让父进程能够查看到它开设的子进程的基本信息:占用的PID号,运行时间。所有的进程运行完毕后都会步入僵尸进程

避免大量僵尸进程的方法:

  1. 从设计者角度来说,设计定期回收机制

  2. 从使用者角度来说:
    (1)父进程直到僵尸进程的子进程但是没有回收,通知父进程回收子进程的PID,具体命令kill-9父进程的PID
    (2)如果父进程里面没有设置回收僵尸进程的子进程机制。杀死他爹,让僵尸进程变成孤儿进程。孤儿进程被init进程(进程号为1)所接管

3.6.2 孤儿进程

一个父进程的结束,但是它的一个或多个子进程还在进行,那么子进程将称为孤儿进程。孤儿进程被init进程(进程号为1)所收养,并由init进程对它们的完成状态收集工作。

# 思考题
from multiprocessing import Process
import time,os

def task():
	print('%s 正在运行'%os.getpid())
	time.sleep(3)
if __name=='__main__':
	p=Process(target=task)
	p.start()
	p.join() # 等进程p结束后,join函数内部会发送系统调用wait(),去告诉操作系统回收掉进程p的id号
	print(p.pid) # 此时还能否看到子进程p的id号
	print('主')

即便我们不调用join方法,python解释器也会定期在后台自动回收僵尸进程,如果我们开启一个死循环,不停地开子进程,子进程很快结束,父进程不死,会产生一堆僵尸子进程,但这些僵尸进程并不会累积。因为python解释器会自动将它们回收,但如果我们在迅速开启了一堆子进程后,让父进程time.sleep(100)阻塞在了原地,那么父进程就是整体阻在原地了,此时子进程陆续进入僵尸状态,并不会被回收

可以。

p,join()是向操作系统发送请求,告知操作系统p的id号不需要再占用了,回收就可以了,刺史府进程内还可以看到p.pid,但是此时的p.pid是一个无意义的id号,因此操作系统已经将该编号回收。
打个比方:我党相当于操作系统,控制着整个中国的硬件,每个人相当于一个进程,每个人都需要跟我党申请一个身份证号,人持有的id号就相当于进程的pid,人死后应该到我党那里注销身份证号,p.join()就相当于要求我党回收身份证号,但p的家人(相当于主进程)仍然持有p的身份证,但此刻的身份证已经没有任何意义。

四、守护进程

随着主进程开始而开始,当主进程的代码运行完毕即停止运行(注意不是主进程运行完毕,因为主进程可能等待开启的非守护进程的子进程运行完毕)。

from multiprocessing import Process,current_process
import time
def task(name):
    print('%s 总管正在活着'%name)
    time.sleep(3)
    print('%s 总管正在死亡'%name)

if __name__ == '__main__':
    s=Process(target=task,args=('egon',))
    s.daemon = True # 将p设置成守护进程,这一句一定放在start方法上面才有效,否则直接报错
    s.start()
    time.sleep(3) # 如果没有睡的话,刚给操作系统发送请求,主程序就结束了。子进程甚至还没有启动起来。
    print(123) # 这是主进程的代码体
#主进程代码运行完毕,守护进程就会结束
from multiprocessing import Process
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

p1=Process(target=foo)
p2=Process(target=bar)

p1.daemon=True
p1.start()
p2.start()
print("main-------") #打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止

五、进程同步(锁)

进程之间内存是相互隔离的,但是硬盘是共享的,共享一套文件系统,所以访问一个文件,或者一个打印终端,是没有问题的。但是共享带来的就是竞争,竞争带来的就是错乱,如果控制,就是加锁控制。例如下面模拟的火车票抢票的例子。

  • part1:多个进程共享同一打印终端

  • 并发运行,效率高,但竞争同一打印终端,带来了打印错乱。

    from multiprocessing import Process
    import os,time
    
    def work():
    	print('$s is running'%os.getpid())
    	time,sleep(2)
    	print('%s is done'$os.pid())
    	
    if __name__=='__main__':
    	for i in range(3):
    		p=Process(target=work)
    		p.start()
    
  • 加锁:由并发变成了同时只能一个进程进行操作(不是串行!!!串行有顺序),牺牲了运行效率,但避免了竞争。但是锁需要抢,抢到锁的进程可以进行操作。

    from multiprocessing import Process,Lock
    import os,time
    
    def work(lock):
    	# 抢锁
    	lock.acquire()
    	print('%s is running'%os.getpid())
    	time.sleep(2)
    	print('%s is done'%os.pid())
    	# 释放锁
    	lock.release()
    	
    if __name__=='__main__':
    	lock=Lock()
    	for i in range(3):
    		p=Process(target=work,args=(lock,))	
    		p.start()
    
  • part2:多个进程共享同一文件
    文件当数据库,模拟抢票

    from multiprocessing import Process
    import time,json,random
    def check(i): # 查票的过程只有查票操作
        with open('data','rt',encoding='utf-8') as f:
            dic=json.load(f)
            res=dic.get('ticketnum')
            return res,dic
    
    def buy(i): # 买票的过程包含查票和买票操作。先查后买
        res,dic=check(i)
        time.sleep(random.randint(0,3))
        if res>0:
            print('购票成功')
            with open('data','wt',encoding='utf-8') as f:
                dic['ticketnum']=0
                json.dump(dic,f)
        else:
            print('购票失败')
    
    def run(i):
        res,dic=check(i)
        print(res)
        buy(i)
    
    if __name__ == '__main__':
        for i in range(10):
            p=Process(target=run,args=(i,))
            p.start()
    

    会发现由很多人都抢到票了,这种情况在现实生活中是并不被允许的。针对多个进程操作同一份数据的时候,会出现数据错乱的问题。针对上述问题,解决方式就是加锁处理:将并发变成抢锁,抢到锁的人运行,牺牲效率但是保证了数据的安全。

    需要在每个用户买票操作的时候,保证这个过程其他人是不可干预的。所以,给每个过程上锁。而这个锁是需要抢的,抢到了锁的进程才可以进行操作。

    from multiprocessing import Process,Lock
    import time,json,random
    def check(i):
        with open('data','rt',encoding='utf-8') as f:
            dic=json.load(f)
            res=dic.get('ticketnum')
            return res,dic
    
    def buy(i):
        res,dic=check(i)
        time.sleep(random.randint(0,1))
        if res>0:
            print('购票成功')
            with open('data','wt',encoding='utf-8') as f:
                dic['ticketnum']=0
                json.dump(dic,f)
        else:
            print('购票失败')
    
    def run(i,mutex):
        res,dic=check(i)
        print(res)
        # 给买票环节加锁处理
        # 先抢锁
        mutex.acquire()
        buy(i)
        # 释放锁
        mutex.release()
    
    if __name__ == '__main__':
        # 让主进程中生成一把锁,让所有的进程抢,谁先抢到谁先买票
        mutex=Lock()
        for i in range(10):
            p=Process(target=run,args=(i,mutex))
            p.start()
    
    # 上面的例子并没有利用之前学过的高并发知识,现在把高并发知识拿过来
    # 需要开多进程的是服务端,子进程为服务于每个人的进程,并且买票的过程需要加锁
    # 服务端 update version
    from multiprocessing import Process
    import socket,json
    
    IP_ADDR=('127.0.0.1',9000)
    MAX_SIZE=1024
    BAXK_LOG=5
    CODING='utf-8'
    phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    phone.bind(IP_ADDR)
    phone.listen(BAXK_LOG)
    
    def check():
        with open('data', 'r', encoding=CODING) as f:
            dic = json.load(f)
            return dic
    
    def buy():
        dic= check()
        if dic.get('ticket_num') > 0:
            dic['ticket_num'] -= 1
            with open('data', 'w', encoding=CODING) as f:
                json.dump(dic, f)
            return '已购票,还剩{}张票'.format(dic.get('ticket_num'))
        else:
            return '票已售完'
            
    def task(conn,mutex):
        while True:
            try:
                msg=conn.recv(MAX_SIZE).decode(CODING)
                if 'check' in msg:
                    res=check()
                    num=res.get('ticket_num')
                    res='还剩 {} 张票'.format(num)
                elif 'buy' in msg:
                    mutex.acquire()
                    res=buy()
                    mutex.release()
                conn.send(res.encode(CODING))
                print(111)
    
            except Exception:
                break
                
    if __name__ == '__main__':
        while True:
            conn,addr=phone.accept()
            mutex=Lock()
            s=Process(target=task,args=(conn,mutex))
            s.start()
            
            
    # 客户端 update version
    import socket
    IP_ADDR=('127.0.0.1',9000)
    MAX_SIZE=1024
    CODING='utf-8'
    
    phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    phone.connect(IP_ADDR)
    while True:
        msg=input('>>>:').strip()
        if not msg:continue
        phone.send(msg.encode(CODING))
        data=phone.recv(MAX_SIZE)
        print(data.decode(CODING))
    phone.close()
    

互斥锁和信号量的区别:
链接: 互斥锁和信号量的区别.

总结


加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改。牺牲了效率但是保证了数据的安全性。虽然可以用文件共享数据实现进程间的通信,但问题是:

  1. 效率低
  2. 需要自己加锁处理

可不可以找到一种既能高效率(多个进程共享一块内存的数据)又帮我们处理好锁的问题呢?

这就是multiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道

  1. 队列和管道都是将数据存放在内存中的

  2. 队列又是基于【管道+锁】来实现的,可以让我们从复杂的锁问题中解脱出来。我们应该尽量避免共享数据,尽可能使用传递和队列,避免处理复杂的同步和锁的问题,而且在线程数目增多时,往往可以获得更好的扩展性

六、队列(推荐使用)(queue模块Queue)

进程彼此之间相互隔离,要实现进程间通信IPC机制,multiprocessing模块支持两种格式:队列和管道,这两种方式都是使用消息传递的。

6.1 队列模块queue的介绍

  • 创建队列的类(底层就是以管道和锁定的方式实现):
    Queue([maxsize]) # 创建共享的进程队列,Queue是多进程俺去那的队列,可以使用Queue实现多进程之间的数据传递。
    
  • 参数介绍:
    maxsize # 是队列中允许最大项数,省略则无大小限制。
    
  • 方法介绍:
内置方法 用途
q.put() 插入数据到队列中,put方法还有两个可选参数,blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
q.get() 可以从队列中读取并且删除一个元素,同样,get方法有两个参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
q.get_wait() q.get(False)
q.putwait() q.put(False)
q.empty() 调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
q.full() 调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
q.qsize() 返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
  • 其他方法(了解)

    管道subprocesstdoutstderr
    队列=管道+锁

    队列:先进先出
    堆栈:先进后出

from queue import Queue

# 创建一个队列
q=Queue(5) # 括号内可以传数字,表示生成的队列最大可以同时存放的数据量
# 往队列中存数据
q.put(1)
q.put(2)
q.put(3)
# print(q.full())
# print(q.empty())
q.put(4)
q.put(5) # 当队列数据放满了之后,如果还有数据要放程序会阻塞,直到有位置让出来
# print(q.full())
# q.put(6)
'''
千方百计地存
想方设法地取
'''
# 去队列中取数据
v1=q.get()
v2=q.get()
v3=q.get()
v4=q.get()
v5=q.get()
# print(q.empty())
# v6=q.get() # 队列中如果已经没有数据地话,get方法会原地阻塞
# v6=q.get(timeout=3) # 没有数据之后原地等待三秒之后再报错 queue.Empty
try:
    v6=q.get(timeout=3)
    print(v6)
except Exception as e:
    print('一滴都没有了')
# v6=q.get_nowait() # 队列中如果已经没有数据的话,get方法会原地阻塞
# print(v1,v2,v3,v4,v5)

'''
q.full()
q.empty()
q.get_nowait()
在多进程的情况下是不精确的
'''

6.2 子进程/子进程 以及 子进程/父进程 之间通信

再谈IPC机制。

'''
研究思路:
    1.子进程和主进程借助于队列通信
    2.子进程和子进程借助于队列通信
'''
from multiprocessing import Process, Queue

def producer(q):
    q.put(123)
    print(12345)
    
def consumer(q):
    print(q.get())

if __name__ == '__main__':
    q = Queue(5)
    s1 = Process(target=producer, args=(q,))
    s2=Process(target=consumer,args=(q,))
    s1.start()
    s2.start()

6.3 生产者消费者模型

生产者消费者模型:


在并发编程中使用生产者和消费者模式能够解决大多数并发问题。该模式通过平衡生产线程和消费线程之间的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者消费者模型?


在线程的世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者处理数据的能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引出了生产者/消费者模型。

什么是生产者消费者模型?


生产者消费者模型是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者也不找生产者要数据,而是直接从阻塞队列里面取数据,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

# 版本一:
from multiprocessing import Queue, Process
import time, random

def producer(q, name, food):
    for i in range(3):
        res = '{}{}'.format(food, i)
        time.sleep(random.randint(1, 2))
        q.put(res)
        print('{}生产了{}'.format(name, res))
        
def consumer(q, name):
    while True:
        res = q.get()
        if not res: break # 消费者吃光队列中的数据,终止
        time.sleep(random.randint(0, 2))
        print('{}吃了{}'.format(name, res))
        
if __name__ == '__main__':
    q = Queue()

    p1 = Process(target=producer, args=(q, '厨师1', '包子'))
    p2 = Process(target=producer, args=(q, '厨师2', '烧卖'))
    p3 = Process(target=producer, args=(q, '厨师3', '泔水'))

    c1 = Process(target=consumer, args=(q, 'hxx'))
    c2 = Process(target=consumer, args=(q, 'lxx'))
    
    c1.daemon = True
    c2.daemon = True

    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()
    p1.join()
    p2.join()
    p3.join()
    # 等待生产者生产完毕之后,往队列中添加特定的结束符号
    q.put(None) # 肯定在所有生产者生产的数据的末尾
    q.put(None) # 有多少个消费者就需要往队列末尾添加几个结束符None
    print('主')
# 版本二:
from multiprocessing import Process,JoinableQueue
import time, random

def producer(q, name, food):
    for i in range(3):
        res = '{}{}'.format(food, i)
        time.sleep(random.randint(1, 2))
        q.put(res)
        print('{}生产了{}'.format(name, res))
    q.join() # 生产者在原地等消费者把自己放入队列的数据取干净了才结束。
            # 消费者每次拿完东西生产者会收到信号,当收到的信号数量等于生产者生产的数量时,即说明此消费者生产的数据已经全部被取走
def consumer(q, name):
    while True:
        res = q.get()
        time.sleep(random.randint(0, 2))
        print('{}吃了{}'.format(name, res))
        q.task_done()  # 每次拿完数据发送一个信号

if __name__ == '__main__':
    q = JoinableQueue() # 可以等待的Queue。生产者那端等待消费者的信号

    p1 = Process(target=producer, args=(q, '厨师1', '包子'))
    p2 = Process(target=producer, args=(q, '厨师2', '烧卖'))
    p3 = Process(target=producer, args=(q, '厨师3', '泔水'))

    c1 = Process(target=consumer, args=(q, 'hxx'))
    c2 = Process(target=consumer, args=(q, 'lxx'))

    c1.daemon = True # 消费者1是主进程的守护进程,而主进程需要等待生产者进程运行完毕,生产者在等待消费者取完数据发送信号
    c2.daemon = True

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join() # 此时p1.join代表着p1生产的数据全部被拿空了
    p2.join()
    p3.join() # 此时p3.join代表着p3生产的数据全部被拿空了。即队列中没有任何数据了,从而进程结束

    print('主')
    '''
    JoinableQueue 每当你往该队列中存入数据的时候,内部会有一个计数器+1
    每当你调用task_done的时候 计数器-1
    q.join() 当计数器为0的时候,才往后运行
    '''
    # 只要q.join()执行完毕,说明消费者已经处理完数据了,消费者就没必要存在了----->守护进程

视频讲解生产者消费者模型

七、管道(不推荐使用,了解即可)

7.1 介绍

#创建管道的类:
Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
#参数介绍:
dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。
#主要方法:
    conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
    conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
 #其他方法:
conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
conn1.fileno():返回连接使用的整数文件描述符
conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

7.2 基于管道实现进程间的通信(与队列的方式类似,队列就是管道加锁实现的)

from multiprocessing import Process,Pipe

import time,os
def consumer(p,name):
    left,right=p
    left.close()
    while True:
        try:
            baozi=right.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            right.close()
            break
def producer(seq,p):
    left,right=p
    right.close()
    for i in seq:
        left.send(i)
        # time.sleep(1)
    else:
        left.close()
if __name__ == '__main__':
    left,right=Pipe()

    c1=Process(target=consumer,args=((left,right),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(left,right))

    right.close()
    left.close()

    c1.join()
    print('主进程')

注意:生产者和消费者都没有使用管道的某个端点,就应该将其关闭,如在生产者中关闭管道的右端,在消费者中关闭管道的左端。如果忘记执行这些步骤,程序可能再消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生产EOFError异常。因此在生产者中关闭管道不会有任何效果,付费消费者中也关闭了相同的管道端点。

管道可以用于双向通信,利用通常在客户端/服务器中使用的请求/响应模型或远程过程调用,就可以使用管道编写与进程交互的程序

from multiprocessing import Process,Pipe

import time,os
def adder(p,name):
    server,client=p
    client.close()
    while True:
        try:
            x,y=server.recv()
        except EOFError:
            server.close()
            break
        res=x+y
        server.send(res)
    print('server done')
if __name__ == '__main__':
    server,client=Pipe()

    c1=Process(target=adder,args=((server,client),'c1'))
    c1.start()

    server.close()

    client.send((10,20))
    print(client.recv())
    client.close()

    c1.join()
    print('主进程')
#注意:send()和recv()方法使用pickle模块对对象进行序列化。

八、共享数据

展望未来,基于消息传递的并发编程是大势所趋,即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合。通过消息队列交换数据,这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中。进程间通信应该尽量避免使用本节所讲的共享数据的方式。

进程间数据是独立的,可以借助队列或者管道实现通信,二者都是基于消息传递的。
虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止此。
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.

from multiprocessing import Manager,Process,Lock
import os
def work(d,lock):
    # with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
        d['count']-=1

if __name__ == '__main__':
    lock=Lock()
    with Manager() as m:
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)
        #{'count': 94}

进程之间操作共享的数据

九、信号量(了解)

互斥锁同时只允许一个线程更改数据,而semophore是同时允许一定数量的线程更改数据,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去,如果指定信号量为3,那么来一个人获得一把锁,计数加1,当计数等于3时,后面的人均需要等待。一旦释放,就有人可以获得一把锁。

from multiprocessing import Process,Semaphore
import time,random

def go_wc(sem,user):
    sem.acquire()
    print('%s 站到了一个茅坑'%user)
    time.sleep(random.randint(0,2))
    print('%s 离开了茅坑'%user)
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(5)
    p_l=[]
    for i in range(10):
        p=Process(target=go_wc,args=(sem,f'user {i}'))
        p.start()
        p_l.append(p)

    for p in p_l:
        p.join()
    print(11111111111)

十、事件(了解)

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

clear:将“Flag”设置为False
set:将“Flag”设置为True

#_*_coding:utf-8_*_
#!/usr/bin/env python

from multiprocessing import Process,Event
import time,random

def car(e,n):
    while True:
        if not e.is_set(): #Flase
            print('\033[31m红灯亮\033[0m,car%s等着' %n)
            e.wait()
            print('\033[32m车%s 看见绿灯亮了\033[0m' %n)
            time.sleep(random.randint(3,6))
            if not e.is_set():
                continue
            print('走你,car', n)
            break

def police_car(e,n):
    while True:
        if not e.is_set():
            print('\033[31m红灯亮\033[0m,car%s等着' % n)
            e.wait(1)
            print('灯的是%s,警车走了,car %s' %(e.is_set(),n))
            break

def traffic_lights(e,inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            e.clear() #e.is_set() ---->False
        else:
            e.set()

if __name__ == '__main__':
    e=Event()
    # for i in range(10):
    #     p=Process(target=car,args=(e,i,))
    #     p.start()

    for i in range(5):
        p = Process(target=police_car, args=(e, i,))
        p.start()
    t=Process(target=traffic_lights,args=(e,10))
    t.start()

    print('============》')

十一、进程池

在python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一。需要注意的问题是:

  1. 很明显需要并发执行的任务通常要远大于核数

  2. 一个操作系统不可能无限开启进程,通常是有几个核就开几个进程

  3. 进程开启过多,效率反而会下降(开启进程需要占用系统资源的,而且开启多余核的数目的进程也无法做到并行)

例如,当操作对象数目不大时,可以直接利用multiprocessing中的Process动态生成多个进程,十几个还好,但如果是上百个,上千个。手动去限制进程的数量又太繁琐,此时可以发挥进程池的功效。

我们可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数。

ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,
那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

11.1 创建进程池的类

创建进程池的类:如果numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行任务,不会开启其他进程。

Pool([numprocess  [,initializer [, initargs]]]):创建进程池 
  1. 参数介绍
参数名 功能
numprocess 要创建的进程数,如果省略,将默认使用cpu_count()的值
initializer 是每个工作进程启动时要执行的可调用对象,默认为None
initargs 是要传给initializer的参数组
  1. 方法介绍
方法名 功能
p.apply(func [, args [, kwargs]]) 在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并发执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]]) 在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将立即传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
p.close() 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
P.join() 等待所有工作进程退出。此方法只能在close()或teminate()之后调用
  1. 其他方法(了解)
方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
obj.ready():如果调用完成,返回True
obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
obj.wait([timeout]):等待结果变为可用。
obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数

11.2 应用

  1. 同步调用apply

    from multiprocessing import Pool
    import os,time
    def work(n):
        print('%s run'%os.getpid())
        time.sleep(3)
        return n**2
    
    if __name__ == '__main__':
        p=Pool(3) # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
        res_l=[]
        for i in range(10):
            res=p.apply(work,args=(i,)) # #同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞,
                            # 但不管该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程中若是任务发生了阻塞就会被夺走cpu的执行权限
            res_l.append(res)
        print(res_l)
    
  2. 异步调用apply_async

    from multiprocessing import Pool
    import os,time
    def work(n):
        print('%s run'%os.getpid())
        time.sleep(3)
        return n**2
    if __name__ == '__main__':
        p=Pool(3) # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
        res_l=[]
        for i in range(10):
            res=p.apply_async(work,args=(i,)) # 同步运行,阻塞,直到本次任务执行完毕拿到res
            res_l.append(res)
        # 异步apply_async的用法:如果使用异步提交任务,主进程需要使用join,等待进程池内任务都处理完,
        # 然后可以用get来收集结果。否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
        p.close()
        p.join()
        for res in res_l:
            print(res.get()) # 使用get来获取apply_async的结果,如果是apply,则没有get方法,
                            # 因为apply是同步执行,立刻获取结果,也根本无序get
    
  3. 详解apply和apply_async

    #一:使用进程池(异步调用,apply_async)
    #coding: utf-8
    from multiprocessing import Process,Pool
    import time
    
    def func(msg):
    	print( "msg:", msg)
    	time.sleep(1)
    	return msg
    
    if __name__ == "__main__":
        pool = Pool(processes = 3)
    	res_l=[]
        for i in range(10):
    	    msg = "hello %d" %(i)
        	res=pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        	res_l.append(res)
    	print("==============================>") #没有后面的join,或get,则程序整体结束,进程池中的任务还没来得及全部执行完也都跟着主进程一起结束了
    
        pool.close() #关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
        pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    
        print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>对象组成的列表,而非最终的结果,但这一步是在join后执行的,证明结果已经计算完毕,剩下的事情就是调用每个对象下的get方法去获取结果
        for i in res_l:
    	    print(i.get()) #使用get来获取apply_async的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
    
    #二:使用进程池(同步调用,apply)
    #coding: utf-8
    from multiprocessing import Process,Pool
    import time
    
    def func(msg):
        print( "msg:", msg)
        time.sleep(0.1)
        return msg
    
    if __name__ == "__main__":
      	pool = Pool(processes = 3)
      	res_l=[]
      	for i in range(10):
          	msg = "hello %d" %(i)
         	res=pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
            res_l.append(res) #同步执行,即执行完一个拿到结果,再去执行另外一个
        print("==============================>")
      	pool.close()
      	pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到	pool,join函数等待所有子进程结束
    
        print(res_l) #看到的就是最终的结果组成的列表
        for i in res_l: #apply是同步的,所以直接得到结果,没有get()方法
          	print(i)
    

11.3 练习:使用进程池维护固定数目的进程(重写练习一)

# 练习2:使用进程池维护固定数目的进程(重写练习1)
# 服务端
#Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count())
#开启6个客户端,会发现2个客户端处于等待状态
#在每个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程
from socket import *
from multiprocessing import Pool
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    print('进程pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=Pool()
    while True:
        conn,client_addr=server.accept()
        p.apply_async(talk,args=(conn,client_addr))
        # p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问
        
# 客户端
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

11.4 回调函数

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数。

我们可以把耗时间(阻塞)的任务放到进程池内,然后制定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了IO的过程,直接拿到任务的结果。

from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<进程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def pasrse_page(res):
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p=Pool(3)
    res_l=[]
    for url in urls:
        res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
        res_l.append(res)

    p.close()
    p.join()
    print([res.get() for res in res_l]) #拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了

'''
打印结果:
<进程3388> get https://www.baidu.com
<进程3389> get https://www.python.org
<进程3390> get https://www.openstack.org
<进程3388> get https://help.github.com/
<进程3387> parse https://www.baidu.com
<进程3389> get http://www.sina.com.cn/
<进程3387> parse https://www.python.org
<进程3387> parse https://help.github.com/
<进程3387> parse http://www.sina.com.cn/
<进程3387> parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}]
'''
上一篇:机器学习---吴恩达---Week6_1(机器学习改进方法)


下一篇:【Python编程】运行前检测并安装脚本的依赖库