python ctrl-c 无法终止 multiprocessing pool

python ctrl-c 无法终止 multiprocessing pool

github pages: perfectnewer.gitub.io
gitee pages: perfectnewer.gitee.io


文章目录


前言

最近对一个线上脚本进行优化,因为时间紧逻辑比较多,后面还有一堆事情,所以决定先改成多进程的方式。此处记录一下遇到的问题。

问题1:在multiprocessing中ctrl-c 无法终止运行

一开始我是质疑这些答案的,毕竟我的代码可以正常接收ctrl-c,但是最后发现本质原因是一样的,方案也是通用。那么对根因的探索祥见另外一篇python2 multiprocess pool源码简单解读

先说解决方案:

参考链接

此处贴上stack overflow的代码

注意:

  1. 除了resul .get会block以外,pool.join也会block信号
  2. 我的代码和他的不一样,我的脚本是长期运行的。我是能收到信号的。但是整个进程也没有退出,原因是worker异常退出导致。虽然原因不同,但是这个方案是可行的。因为worker不会因为ctrl-c异常退出了
#!/bin/env python
from __future__ import print_function

import multiprocessing
import os
import signal
import time

def run_worker(delay):
    print("In a worker process", os.getpid())
    time.sleep(delay)

def main():
    print("Initializng 2 workers")
    original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    pool = multiprocessing.Pool(2)
    signal.signal(signal.SIGINT, original_sigint_handler)
    try:
        print("Starting 2 jobs of 5 seconds each")
        res = pool.map_async(run_worker, [5, 5])
        print("Waiting for results")
        res.get(60) # Without the timeout this blocking call ignores all signals.
    except KeyboardInterrupt:
        print("Caught KeyboardInterrupt, terminating workers")
        pool.terminate()
    else:
        print("Normal termination")
        pool.close()
    pool.join()

if __name__ == "__main__":
    main()

此处放上我的代码逻辑

def mgr():
    import multiprocessing
    multiprocessing.log_to_stderr()

    logger.info("start")
    origin_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    signal.signal(signal.SIGTERM, signal.SIG_IGN)
    pool = Pool(3)
    stopper = Stopper()
    signal.signal(signal.SIGINT, origin_handler)
    signal.signal(signal.SIGTERM, functools.partial(stop, stopper))
    idx = 1
    working_process = []
    while not stopper.is_stop():
        try:
            v = r.blpop("test", 120)
            ...
            logger.info(v)
            rst = pool.apply_async(worker, args=(idx, ))
            working_process.append(rst)
        except KeyboardInterrupt:
            logger.info("bk error ======== parent {}".format(os.getpid()))
            break
        except Exception:
            pass

        while len(working_process) >= 3:
            logger.info("wait exit")
            readys = []
            for idx, rst in enumerate(working_process):
                rdy = rst.ready()
                print('1: ', idx, ' ', rdy)
                if rdy:
                    readys.append(idx)

            if not readys:
                time.sleep(10)
            else:
                for idx in readys:
                    working_process[idx] = None
            working_process = filter(lambda x: x is not None, working_process)
        idx += 1
    logger.info("exit")
    pool.close()
    pool.join()

根本原因:Condition.wait

根本原因其实取决于你的具体代码逻辑。目前我遇到的有两个

  • threading.Condition执行condition.wait()的时候,会阻止信号的处理。这个是python2.7的bug,并且不会改变这个行为。在python3中已经修复。详见:threading.Condition.wait() is not interruptible in Python 2.7
  • pool worker异常退出(使用Exception无法捕捉的异常),导致result没有被设置。pool一直等待worker的任务完成

问题2:pool中放入了过多的待处理任务

这个问题的原因是对进程池的理解不到位。multiprocessing.Pool只是限制了执行任务的进程的个数,并没有限制用户放入的任务数量。虽然它保证只有规定个数的进程运行。但是你可以一直往pool里添加task。

解决方案就是自己需要对处理的任务数量统计计数。code

问题3:内存泄漏

这个一开始就能预料到了。这个主要是因为我们业务代码有进程缓存,只要定期清理就可以了。那么方案也有两个

  1. 自己定期清理
  2. multiprocessing有个maxtasksperchild参数,表示一个进程多少次任务后就销毁,重新创建新的进程
上一篇:CentOS 系统升级系统内核版本


下一篇:spark解剖--master、worker、driver、executor