谁挂起了我的Python多进程

最近使用Python的多进程multiprocessing模块时遇到了一个进程挂起问题。特在此记录一下。

先给出一个多进程应用的最小代码。

import multiprocessing as mp

def produce(q):
    """生产者"""
    for i in [1, 2, 3, 4, 5, 6, 7, 8, 9]:
        q.put(i)
    print(f"Producer quit.")


def consume(q):
    """消费者"""
    while True:
        r = q.get()
        if r == 9:
            break
    print("Consumer quit.")


if __name__ == "__main__":
    q = mp.Queue()

    consumer = mp.Process(target=consume, args=(q,))
    consumer.start()

    producer = mp.Process(target=produce, args=(q,))
    producer.start()

    producer.join()
    consumer.join()

    print("Main quit.")

这段代码中,主进程构建了一个队列,然后派生两个子进程:一个生产者与一个消费者。生产者进程按顺序将一个包含从1到9的数字列表中的元素逐个填充到队列中。消费者进程逐个将队列中的元素取出,并在读取到最后一个元素的时候退出。这段代码运行的输出如下:

Producer quit.
Consumer quit.
Main quit.

实际情况中,为了安全起见会限制队列的最大容量,以避免内存被占尽。在队列初始化的时候可以通过 maxsize 参数指定允许的最大尺寸。

# 生产者与消费者的函数定义无变化,这里省略。

if __name__ == "__main__":
    q = mp.Queue(maxsize=3)  # 指定队列允许存储的最大元素数量

    consumer = mp.Process(target=consume, args=(q,))
    consumer.start()

    producer = mp.Process(target=produce, args=(q,))
    producer.start()

    producer.join()
    consumer.join()

    print("Main quit.")

此时运行全部代码,输出如下:

Producer quit.
Consumer quit.
Main quit.

在限制队列大小后,应用依旧可以正常退出。

但是,如果你不小心忘记了队列存在大小限制,然后无意间修改了消费者进程的代码:

# 生产者代码无变动,省略。

def consume(q):
    """消费者"""
    while True:
        r = q.get()
        if r == 3:  # 当读取到3时,停止消费队列。
            break
    print("Consumer quit.")
    
# 主进程代码无变动,省略。

该修改会使消费者进程在读取到数字3时就终止退出。此时运行全部代码,输出如下:

Consumer quit.

消费者进程已经正常退出。但是生产者与主进程处于挂起状态。程序卡死了。

为了找到问题的原因,尝试把程序运行的过程打印出来。分别在生产者与消费者进程中打印当前读取到的元素。

def produce(q):
    """生产者"""
    for i in [1, 2, 3, 4, 5, 6, 7, 8, 9]:
        q.put(i)
        print(f"P: {i}")  # 打印存入队列中的元素
    print(f"Producer quit.")


def consume(q):
    """消费者"""
    while True:
        r = q.get()
        print(f"C: {r}")  # 打印从队列中取出的元素
        if r == 3:
            break
    print("Consumer quit.")

程序的输入如下:

P: 1
P: 2
P: 3
C: 1
P: 4
C: 2
C: 3
Consumer quit.
P: 5
P: 6

从输出可以看到,消费者进程在读取到3之后退出。生产者进程继续向队列中填充元素,最后一次填充的是6,然后就没有了下文。由于队列中最后一次被取出的元素是3,而生产者是按顺序从1到9填充,所以此时队列中存在三个元素: 4, 5, 6 。而回想一下,之前将队列的最大尺寸设定为3,很显然,队列已满。此时的 put 方法在耐心地等待队列空间中。由于消费者进程已经退出,所以队列永远不会有空间。进程就卡在这里了。

要解决这个问题,有不同的方案。

第一种:避免消费者进程早于生产者进程退出。只要消费者不退出,那么队列总会迎来空闲之日。

第二种:将队列填充操作设定为非阻塞。将 put 方法的 block 参数设定为 False ,这样如果填充队列失败,程序会报队列满的错误,然后继续执行。

第三种:允许队列填充阻塞,但是规定允许的最长等待时间。队列默认的 put 方法超时为 None ,意味着一直等待直到有空闲为止。为 put 方法设定超时参数,当等待足够长时间队列依旧没有空间,则报队列满的错误。程序继续运行。

这三种方法各有长短,使用时需要根据实际情况选择。理想情况下应当通过代码逻辑将消费者作为最后退出的进程。如果由于特殊原因消费者进程行为无法控制,那么就可以考虑后两种方法,然后通过 try...except 捕捉队列满的错误,进一步结合重试来避免数据丢失。

上一篇:图像旋转-xdoj


下一篇:06 MPLS VPN与EIGRP(SOO)