一文攻克Python的进程以及如何实现进程间通信(队列)

Python的进程以及如何实现进程间通信

一、进程概念

在了解进程之前,先了解多任务。多任务就是指操作系统能够执行多个任务。例如,使用Window或Linux操作系统可以同时看电影、聊天、查看网页信息等,此时操作系统就是在执行多任务,而每一个任务就是一个进程。

进程(process)是计算机已经运行程序的实体。进程与程序不同,程序本身只是指令、数据及其组织形式的描述,而进程才是(指令和数据)的真正运行实例。

二、创建进程的常用方式

1、使用multiprocessing模块创建进程

multiprocessing模块提供了一个Process类来代表一个进程对象
语法格式:

Process(group,target,name,args,kwargs)
  • group:参数未使用,值始终为None
  • target:表示当前进程启动时执行的可调用对象
  • name:为当前进程实例的别名
  • args:表示传递给target函数的参数元组
  • kwargs:表示传递给target函数的参数字典
from multiprocessing import Process

#执行子进程代码
def test():
    print("当前子进程")

#执行主程序
def main():
    print("主进程开始")
    p=Process(target=test) #实例化Process进程类
    p.start()   #启动子进程
    #p.join()    #这个是等子进程全部运行完再运行主进程
    print("主进程结束") 

if __name__=="__main__":
    main()

发现会优先执行主线程,在执行子线程
一文攻克Python的进程以及如何实现进程间通信(队列)
开启p.join()后
一文攻克Python的进程以及如何实现进程间通信(队列)

先实例化Process类,然后使用p.start()方法启动子进程 开始执行test()函数Process的实例p常用的方法除start()外,还有如下常用方法:
run() 如果没有给定target参数,对这个对象调用start()方法时,就将执行对象中的run()方法.
is_alive() 判断进程实例是否还在执行。
join([timeout]) 是否等待进程实例执行结束,或等待多少秒。
start() 启动进程实例(创建子进程)。
terminate() 不管任务是否完成,立即终止。
Precess类还有如下常用属性:
name :当前进程实例别名,默认为Process-N,N为从1开始递增的整数。
pid :当前进程实例的PID值。

示例:创建2个子进程,分别使用os模块和time模块输出父进程和子进程的ID以及子进程的时间,并调用Process类的name和 pid属性,代码如下:

from multiprocessing import Process
import time
import os

# os.getpid()可获取当前进程id,返回值为int
# os.getppid()可获取父进程id,返回值为int

#两个子进程将会调用的俩个方法
def child_1(delay):
    print("子进程(%s)开始执行,父进程为(%s)" % (os.getpid(),os.getppid()))
    t_start = time.time()   #计时开始
    time.sleep(delay)       #程序将会被挂起delay秒
    t_end = time.time()     #即使结束
    print("子进程(%s)执行时间为%0.2f秒"%(os.getpid(),t_end-t_start))

def child_2(delay):
    print("子进程(%s)开始执行,父进程为(%s)" % (os.getpid(),os.getppid()))
    t_start = time.time()   #计时开始
    time.sleep(delay)       #程序将会被挂起delay秒
    t_end = time.time()     #即使结束
    print("子进程(%s)执行时间为%0.2f秒"%(os.getpid(),t_end-t_start))

if __name__ == "__main__":
    print("-------父进程开始执行-------")
    print("父进程PID:%s"%os.getppid)         #输出当前程序的PID
    p1=Process(target=child_1,args=(1,))    #实例化进程p1
    p2=Process(target=child_2,name="two",args=(2,)) #实例化进程p2
    p1.start()  #启动进程p1
    p2.start()  #启动进程p2

    #同时父进程仍在往下执行,如果p1、p2进程还在执行,将会返回True
    print("p1.is_alive=%s"%p1.is_alive())
    print("p2.is_alive=%s"%p2.is_alive())
    
    #输入p1和p2进程的别名和PID
    print("p1.name=%s"%p1.name)
    print("p1.pid=%s"%p1.pid)
    print("p2.name=%s"%p2.name)
    print("p2.pid=%s"%p2.pid)

    print("----------等待子进程--------")
    p1.join()        #等待p1进程结束
    p2.join()        #等待p2进程结束
    print("----------父进程执行结束---------")

一文攻克Python的进程以及如何实现进程间通信(队列)

2、使用Process子类创建进程

对于一些简单的小任务,通常使Process(target=test)方式实现多进程。但是如果要处理复杂任务的进程,通常定义一个类,使其继承Process类,每次实例化这个类的时候,就等同于实例化一个进程对象。

from multiprocessing import Process
import time 
import os

#继承Process类
class SubProcess(Process):
    #由于Process类本身也有__init__初始化方法,这个子类相当于重写了父类的这个方法
    def __init__(self,delay,name=""):
        Process.__init__(self)   #调用Process父类的初始化方法
        #self.delay相当于全局变量
        self.delay = delay       #接收参数delay
        if name:                 #判断传递的参数是否存在
            self.name = name     #如果传递参数name,则为子进程创建的name属性,否自使用默认属性
    def run(self):
        print("子进程(%s)开始执行,父进程为(%s)" % (os.getpid(),os.getppid()))
        t_start = time.time()   #计时开始
        time.sleep(self.delay)       #程序将会被挂起delay秒
        t_end = time.time()     #即使结束
        print("子进程(%s)执行时间为%0.2f秒"%(os.getpid(),t_end-t_start)) 

if __name__ == "__main__":
    print("-------父进程开始执行-------")
    print("父进程PID:%s"%os.getppid)         #输出当前程序的PID
    p1 = SubProcess(delay=1)
    p2 = SubProcess(delay=2,name = "two")

    #对一个不包含target属性的Process类执行start()方法,就会运行这个类中的run()方法
    p1.start()   #启动进程p1
    p2.start()   #启动进程p2

    #输出p1和p2进程的执行状态,如果真正执行,返回True,否则返回False
    print("p1.is_alive=%s"%p1.is_alive())
    print("p2.is_alive=%s"%p2.is_alive())
    
    #输入p1和p2进程的别名和PID
    print("p1.name=%s"%p1.name)
    print("p1.pid=%s"%p1.pid)
    print("p2.name=%s"%p2.name)
    print("p2.pid=%s"%p2.pid)

    print("----------等待子进程--------")
    p1.join()        #等待p1进程结束
    p2.join()        #等待p2进程结束
    print("----------父进程执行结束---------")

上述代码中,定义了一个SubProcess子类,继承multiprocess.Process 父类。SubProcess子类中定义了2个方法:init()初始化方法和 run()方法。在_init()初识化方法中,调用multiprocess.Process父类的_init()初始化方法,否则父类初始化方法会被覆盖,无法开启进程。此外,在 SubProcess子类中并没有定义start()方法,但在主进程中却调用了start()方法,此时就会自动执行SubPorcess类的run()方法。运行结果如图所示。
一文攻克Python的进程以及如何实现进程间通信(队列)

3、 使用进程池Pool创建进程

Pool进程池。为了更好的理解进程池,可以将进程池比作水池。我们需要完成放满10个水盆的水的任务,而在这个水池中,最多可以安放3个水盆接水,也就是同时可以执行3个任务,即开启3个进程。为更快完成任务,现在打开3个水龙头开始放水,当有一个水盆的水接满时,即该进程完成1个任务,我们就将这个水盆的水倒入水桶中,然后继续接水,即执行下一个任务。如果3个水盆每次同时装满水,那么在放满第9盆水后,系统会随机分配1个水盆接水,另外2个水盆空闲。

接下来,先来了解一下Pool类的常用方法。常用方法及说明如下:

  • apply_async(func[, args[ ,kwds]]):使用非阻塞方式调用func()函数,args为传递给func()函数的参数列表,kwds为传递给func()函数的关键字参数列表。
  • apply(func[,args[,kwds]]):使用阻塞方式调用func()函数(开行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程)。
  • close():关闭Pool,使其不再接受新的任务。
  • terminate():不管任务是否完成,立即终止。
  • join():主进程阻塞,等待子进程的退出,必须在close或terminate之后使用。

关于什么是阻塞调用和非阻塞调用个人理解如下:
如果使用阻塞方式,必须等待上一个进程退出才能执行下一个进程而使用非阻塞方式,则可以并行执行3个进程。

下面通过一个示例演示一下如何使用进程池创建多进程。这里模拟水池放水的场景,定义一个进程池,设置最大进程数为3。然后使用非阻塞方式执行10个任务,查看每个进程执行的任务。具体代码:

from multiprocessing import Pool

import os,time

def task(name):
    print("子进程:%s 执行task:%s " % (os.getpid(),name))
    time.sleep(1)

if __name__ == "__main__":
    print("父进程:%s"%os.getpid())
    p = Pool(3)     #定义一个进程池,最大进程为3
    for i in range(10):
        p.apply_async(task,args=(i,)) #使用非阻塞方式调用task()函数
    print("等待所有子进程结束....")
    p.close()    #关闭进程池,关闭后p不在接收新的请求
    p.join()     #等待子进程结束
    print("所有子进程结束")

一文攻克Python的进程以及如何实现进程间通信(队列)
发现pid:3972执行了四次,而pid:15464和8704只执行了三次

阻塞方式:

from multiprocessing import Pool

import os,time

def task(name):
    print("子进程:%s 执行task:%s " % (os.getpid(),name))
    time.sleep(1)

if __name__ == "__main__":
    print("父进程:%s"%os.getpid())
    p = Pool(3)     #定义一个进程池,最大进程为3
    for i in range(10):
        # p.apply_async(task,args=(i,)) #使用非阻塞方式调用task()函数
        p.apply(task,args=(i,))         #使用阻塞方式调用task()函数
    print("等待所有子进程结束....")
    p.close()    #关闭进程池,关闭后p不在接收新的请求
    p.join()     #等待子进程结束
    print("所有子进程结束")

一文攻克Python的进程以及如何实现进程间通信(队列)

三、通过队列实现进程间通信

1、不通过其他条件,进程之间是否能共享信息?

from multiprocessing import Process
def add():
    print("......子进程1开始......")
    global num
    num += 50
    print("num:%d"%num)
    print("......子进程1结束......")

def sub():
    print("......子进程2开始......")
    global num
    num -= 50
    print("num:%d"%num)
    print("......子进程2结束......")    

num = 100 #定义一个全局变量

if __name__=="__main__":
    print(".....主进程开始.....")
    print("num:%d"%num)
    #实例化进程p1,p2
    p1 = Process(target=add)
    p2 = Process(target=sub)
    #开启进程p1,p2
    p1.start()
    p2.start()
    #阻塞主进程,等待进程结束
    p1.join()
    p2.join()
    print(".....主进程结束.....")

一文攻克Python的进程以及如何实现进程间通信(队列)
上述代码中,分别创建了2个子进程,一个子进程中令num 加上50,另一个子进程令num减去50。但是从运行结果可以看出,num在父进程和2个子进程中的初始值都是100。也就是全局变量num在一个进程中的结果,没有传递到下一个进程中,即进程之间没有共享信息。进程间示意图如图所示。
一文攻克Python的进程以及如何实现进程间通信(队列)
要如何才能实现进程间的通信呢?Python的multiprocessing模块包装了底层的机制,提供了Queue(队列)、Pipes(管道)等多种方式来交换数据。以下通过队列(Queue)来实现进程间
的通信。

2、多线程队列的简单使用

进程之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。可以使用multiprocessing模块的Queue实现多进程之间的数据传递。Queue本身是一个消息列队程序,下面介绍一下Queue的使用。
初始化Queue()对象时(例如:q=Queue(num)),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头)。

Queue的常用方法如下;

  • Queue.qsize() :返回当前队列包含的消息数量。

  • Queue.empty():如果队列为空,返回 True;反之返回False 。

  • Queue.full():如果队列满了,返回 True;反之返回False。

  • Queue.get([block[, timeout]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True。
    如果block使用默认值,且没有设置timeout(单位秒),消息列队为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止。如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出"Queue.Empty"异常。
    如果block值为False,消息列队为空,则会立刻抛出“Queue.Empty”异常

  • Queue.get_nowait(:相当Queue.get(False)。

  • Oueue.put(item,[block[, timeout]]):将item消息写入队列,block 默认值为True。
    如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout则会等待timeout秒,若还没空间,则抛出“Queue.Full”异常。
    如果block值为False,消息列队没有空间可写入,则会立刻抛出“Queue.Full”异常

  • Oueue.put_nowait(item):相当Queue.put(item, False)。

下面,通过一个例子学习一下如何使用processing.Queue。代码如下:

from multiprocessing import Queue

if __name__=="__main__":
    q = Queue(3)         #初始化一个queue对象,最多可接收三条put消息
    q.put("消息1")
    q.put("消息2")
    print(q.full())   #q.full()  判断当前队列是否满了
    q.put("消息3")
    print(q.full())

    #因为消息队列已满,下面的try都会抛出异常
    #第一个try会等待2秒后在抛出异常,第二个try会立刻抛出异常
    try:
        q.put("消息4",True,2)
    except:
        print("消息队列已满,现有消息数量:%s"%q.qsize())
    
    try:
        q.put_nowait("消息4")
    except:
        print("消息队列已满,现有消息数量:%s"%q.qsize())

    #读取消息时,先判断消息队列是否为空,在读取
    if not q.empty():
        print("----从队列中获取消息----")
        for i in range(q.qsize()):
            print(q.get_nowait())
    
    #先判断消息队列是否已将满,在写入
    if not q.full():
        q.put_nowait("消息4")

    if not q.empty():
        print("----从队列中获取消息----")
        print(q.get_nowait())

一文攻克Python的进程以及如何实现进程间通信(队列)

3、使用队列在进程中通信

我们知道使用multiprocessing.Process 可以创建多进程,使用multiprocessing.Queue可以实现队列的操作。接下来,通过一个示例结合Process和 Queue实现进程间的通信。创建2个子进程,一个子进程负责向队列中写入数据,另一个子进程负责从队列中读取数据。为保证能够正确从队列中读取数据,设置读取数据的进程等待时间为2秒。如果2秒后仍然无法读取数据,则抛出异常。代码如下:

from multiprocessing import Process,Queue
import time

#向队列写入数据
def write_task(q):
    if not q.full():
        for i in range(5):
            message = "消息" + str(i)
            q.put(message)
            print("写入:%s"%message)

#向队列读取数据
def read_task(q):
    time.sleep(1)
    while not q.empty():
        #等待2秒,如果还没有读取到任何消息,则抛出"Queue.Empty"
        print("读取:%s"%q.get(True,2))  

if __name__=="__main__":
    print("----父进程开始----")
    #父进程创建
    q = Queue()
    pw = Process(target=write_task,args=(q,))  #实例化写入队列的子进程,并且传递队列
    pr = Process(target=read_task,args=(q,))   #实例化读取队列的子进程,并且传递队列
    pw.start()          #启动子进程pw,写入
    pr.start()          #启动子进程pr,读取
    pw.join()           #等待pw结束
    pr.join()           #等待pr结束
    print("----父进程结束----")

一文攻克Python的进程以及如何实现进程间通信(队列)

上一篇:基于规则评分的密码强度检测算法分析及实现(JavaScript)


下一篇:Angular踩坑-ExpressionChangedAfterItHasBeenCheckedError异常