multiprocessing --- 基于进程的并行

概述

原文链接:https://docs.python.org/zh-cn/3/library/multiprocessing.html

其实原文就是中文的,这篇文章甚至谈不上翻译,只是照抄一遍罢了。scrapy用多了,回顾一下requests加多进程如何使用。

提问

有一批固定的代理,假设100个。有一批起始url5000个,每个起始url可以获取到若干个详情url。使用多进程获取所有详情页的内容

想法1:最直接的方法就是创建100个进程,每个进程传入一个代理ip,在创建一个url队列,这100个进程一直在消费队列和往队列存放新的url。当某个进程报错(网络之类的错误)达到一定次数则结束该进程

想法2:能不能使用进程池来操作。但是有个问题,如何给进程池内的每个进程分配一个固定的代理呢?固定虽然做不到,但可以把代理也放入一个队列,获取url的时候从队列中取出一个代理,用完再放回队列。

上面两个想法应该都可以实现。但是我的这100个代理可能都是静态代理,质量比较好,我想把它们压榨干净。所以我一直在纠结能不能给进程池的进程固定分配一个代理。

为什么进程池就不能先分配一个资源呢?一堆工人就不能只用一个顺手的工具来做任务,工具坏了才换一个,我感觉这才合理。于是就只能看官网文档有没有这种方法(代码见最后,如果你有更好的想法或者觉得我的代码可以再优化欢迎给出建议)

简单的例子

创建进程

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

创建进程池

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

打印进程id

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

进程间交换数据

队列

multiprocessing.Queue接口基本同queue.Queue一样,只是缺少了task_done()和join()两个方法

当一个对象被放入队列时,会先被后台线程用pickle序列化,再将序列化的数据通过管道传递给队列。将一个对象放入空队列时,需要一个极小的延迟才能使empty方法返回False,推荐使用get_nowait方法

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

警告:如果一个进程在使用队列期间,被terminate或者kill方法终止了,该队列的数据很可能已经损坏,其他进程操作该队列会触发异常。

管道

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

注意:不能在不同进程同时读写管道的同一端(比如多个进程同时操作parent_conn),这会导致管道内的数据异常。如果每个进程只读写某一端的数据,则不会有问题。也就是说返回的parent_conn, child_conn最好分配给两个进程分别使用

同步锁

多进程时应尽量避免锁的存在,可以使用管道或队列来传输数据

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

共享内存

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

Manager

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

Manager()支持的所有类型: list 、 dict 、Namespace 、Lock 、RLock 、Semaphore 、BoundedSemaphore 、 Condition 、Event 、Barrier 、Queue 、Value 和 Array

自定义数据类型

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

建议多使用队列和管道来传输数据,尽可能的避免使用共享内存和Manager,当然,如果你非要用也行。

远程Manager

创建服务端

from multiprocessing.managers import BaseManager
from queue import Queue

queue = Queue()
class QueueManager(BaseManager): 
    pass
QueueManager.register('get_queue', callable=lambda:queue)
m = QueueManager(address=('', 50000), authkey=b'abracadabra')
s = m.get_server()
s.serve_forever()

客户端1
往远程队列中存放’hello’

from multiprocessing.managers import BaseManager

class QueueManager(BaseManager): 
    pass
QueueManager.register('get_queue')
m = QueueManager(address=('127.0.0.1', 50000), authkey=b'abracadabra')
m.connect()
queue = m.get_queue()
queue.put('hello')

客户端2
取出远程队列的数据。这样就实现了两个远程进程间的数据交互

from multiprocessing.managers import BaseManager
class QueueManager(BaseManager): 
    pass
QueueManager.register('get_queue')
m = QueueManager(address=('127.0.0.1', 50000), authkey=b'abracadabra')
m.connect()
queue = m.get_queue()
print(queue.get())
# 'hello'

服务端也做客户端
开始创建一个进程往队列里存放数据,然后开启远程服务器。之后用客户端2就可以获取到队列里进程存放的数据了

from multiprocessing import Process, Queue
from multiprocessing.managers import BaseManager


class Worker(Process):
    def __init__(self, q):
        self.q = q
        super().__init__()
    def run(self):
        self.q.put('local hello')

class QueueManager(BaseManager): 
        pass

if __name__ == '__main__':
    queue = Queue()
    w = Worker(queue)
    w.start()
    QueueManager.register('get_queue', callable=lambda: queue)
    m = QueueManager(address=('', 50000), authkey=b'abracadabra')
    s = m.get_server()
    s.serve_forever()

manager嵌套

manager创建的列表对象和列表基本是一样的,允许相互嵌套使用。当然你可以嵌套其他数据类型,比如字典或者manager.dict()

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

注意:嵌套的可变对象(列表、字典)里的数据不会自动同步到远程。需要在更改后重新赋值给manager

lproxy = manager.list()
lproxy.append({})
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# 重新赋值触发远程同步
lproxy[0] = d

进程池

multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

processes是进程数,默认为 os.cpu_count()的值。如果 initializer 不为 None,则每个工作进程将会在启动时调用 initializer(*initargs)。maxtasksperchild表示寿命(处理多少个任务就自己挂了),默认为None,和pool对象同寿命。context这个没什么软用,不用管。

注意,进程池对象的方法只有创建它的进程能够调用。也就是说无法通过参数传递给其他进程

警告:不要期望垃圾回收机制来回收pool对象,应该手动调用closeterminate。建议使用with

下面是pool对象的一些方法:

  • apply(func[, args[, kwds]]):调用func(args) , 它会返回结果前阻塞。使用apply_async()更好
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]):apply() 方法的一个变种,返回一个 AsyncResult 对象。callback为func返回值的回调函数(参数为func返回值),error_callback则是异常回调函数(参数为抛出的异常对象)
  • map(func, iterable[, chunksize]):它会在返回结果前阻塞。会把iterable分割成chunksize块,再提交给进程池,默认math.ceil(len(iterable)/len(processes))。如果任务很大,建议使用imap或者imap_unordered并指定chunksize
  • map_async(func, iterable[, chunksize[, callback[, error_callback]]]):map()方法的一个变种,返回一个 AsyncResult 对象
  • imap(func, iterable[, chunksize]):map() 的延迟执行版本,一般任务量很大时配合chunksize参数使用
  • imap_unordered(func, iterable[, chunksize]):和imap一样,但返回结果无序(map返回的结果会一一对应iterable)
  • starmap(func, iterable[, chunksize]):和 map() 类似,不过 iterable 中的每一项会被解包再作为函数参数。例如[(1,2), (3, 4)] 会转化为等价于 [func(1,2), func(3,4)]。当函数有多个参数时使用
  • starmap_async(func, iterable[, chunksize[, callback[, error_callback]]]):相当于 starmap() 与 map_async() 的结合
  • close():阻止新任务提交,当已提交任务执行完成后,工作进程会退出
  • terminate():不必等待未完成的任务,立即停止工作进程
  • join():调用前需调用close或者terminate,用于回收资源

multiprocessing.pool.AsyncResult

pool对象中包含_async方法返回的对象,比如pool.apply_async()

  • get([timeout]):获取执行结果
  • wait([timeout]):阻塞,直到返回结果,或者 timeout 秒后超时
  • ready():返回执行状态,是否已经完成
  • successful():判断调用是否已经完成并且未引发异常。 如果还未获得结果则将引发 ValueError

进程池添加任务的多种方法

import time
import os
from multiprocessing import Pool, TimeoutError

def f(x, y=666):
    return x*y

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        # 添加一个任务
        res = pool.apply_async(f, (20,))  
        # 获取执行结果,如果一秒后进程还没有返回结果则抛出TimeoutError异常
        print(res.get(timeout=1)) 

        # 捕获
        res = pool.apply_async(time.sleep, (5,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("获取结果超时,可能是函数没有执行完!")

        # 添加多个任务
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get() for res in multiple_results])

        # map添加最为方便,功能同上
        print("map: ", pool.map(f, range(10)))

        # 和map一样,但会延迟添加任务,类似的函数还有imap。
        # 一般用于大任务量时,配合第三个参数使用更佳(每次添加到池中的块大小,默认1)
        print("imap: ", list(pool.imap(f, range(10))))

        # map和imap都会保证结果的顺序(对应range(4)),而imap_unordered则不会
        print("imap_unordered: ", list(pool.imap_unordered(f, range(10))))

         # 和map一样,传多个参数时建议使用
        print("starmap: ", pool.starmap(f, [(1,1),(2,2),(3,3)]))

        # 前面四个map都是有不阻塞的版本的,以map为例
        pool.map_async(f, range(10), callback=lambda x: print("map_async: ", x))

        # 不再添加任务
        pool.close()
        # 等待所有任务执行完成
        pool.join()

注意:使用进程池时代码必须加上if name == ‘main’:

编程建议

  • 减少不必要的数据共享,尽量使用队列或者管道来进行数据共享
  • 进程参数和共享的数据都保证可以被序列化
  • 使用join避免僵尸进程
  • 继承优于序列化、反序列化:避免使用管道和队列发送共享对象到另外一个进程,而是重新组织代码,对于其他进程创建出来的共享对象,让那些需要访问这些对象的子进程可以直接将这些对象从父进程继承过来 (没看懂,放个原话)
  • 避免杀死进程:Process.terminate 停止一个进程很容易导致这个进程正在使用的共享资源(如锁、信号量、管道和队列)损坏或者变得不可用,无法在其他进程中继续使用
  • join之后不再使用队列,这可能导致死锁。以下是死锁例子,只需要交换最后两行即可避免死锁
from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()
  • 显式传递资源给子进程:比如下面两段代码建议写成第二段
    第一种
from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

第二种

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

最开始的问题

其实主要是创建Pool的时候的两个参数initializer和initargs。进程会在创建的时候执行一次initializer(*initargs)。另外,想要在其他函数内使用该函数的变量,需要指定global关键字

import requests
import time
import random
import os
import multiprocessing

# 代理列表,代理格式:"http://user:passwd@ip:port"
ips2 = []

# 工作进程(进程池的进程)的任务函数
def get(a, q):
    global ip
    # 可以看到进程id和ip是一一对应的
    print(os.getpid(), os.getppid(), ip)
    try:
        return requests.get('http://www.httpbin.org/get', params={'a':a}, proxies={"http":ip,"https":ip},timeout=5).text #
    except Exception as e:
    	# 报错直接换个代理,只是演示,实际肯定需要报错多次才换
        ip = q.get()
        print(e, ip)
        return 
        
# 工作进程初始化执行的函数
def get_ip(q):
    global ip
    ip = q.get()

if __name__ == "__main__":
    manage = multiprocessing.Manager()
    # 进程池需要使用manage.Queue而不是multiprocessing.Queue
    q = manage.Queue(len(ips2))
    for i in ips2:
        q.put(i)
    print(q.qsize())
    pool = multiprocessing.Pool(processes = 4, initializer=get_ip, initargs=(q,))
    result = pool.starmap(get,[(i, q) for i in range(10)])
    for i in result:
        print(i)
    print(q.qsize())

实际上,所有的工作进程都可以直接访问到ips这个列表。但是应该如果分配给这些工作进程,它们既没有顺序也没有名字,唯一的标识就是pid,拿pid映射到列表的某个索引可不可以呢。估计要解决这个问题,只能看进程池实现的代码了。但是官方的源码不是一般人能看的懂的,我肯定是不太行。

上面只演示了如果给进程池的每个进程分配一个固定的ip,并没有更新任务到队列。实际上,对于列表页和详情页的处理会很不一样,一般需要两个不同的函数来处理,更新任务到队列的处理可能会比较麻烦。

完整演示

以崔佬的练习平台为例:https://ssr1.scrape.center/。获取前5页的的所有电影中的导演。

完整代码如下:

import requests
import time
import random
import os
import multiprocessing
from lxml import etree
from queue import Empty


ips2 = []

def get_index(page, q):
    global ip
    if not ip:
        return
    print("进程ID:", os.getpid(), ",开始采集列表页, 页码:", page, ",使用ip: ", ip)
    try:
        resp = requests.get(f'https://ssr1.scrape.center/page/{page}', proxies={"http":ip,"https":ip}, timeout=5)
    except Exception as e:
        pid = os.getpid()
        print(f"进程({pid})出现异常, 使用ip:{ip}")
        print(e)
        try:
            ip = q.get_nowait()
        except Empty:
            pass
        raise Exception(str(page))
    else:
        html = resp.text
        doc = etree.HTML(html)
        detail_urls = doc.xpath('//a[@class="name"]/@href')
        detail_urls = ["https://ssr1.scrape.center" + url for url in detail_urls if not url.startswith('http')]
        time.sleep(1.5)
        return detail_urls

def get_detail(url, q):
    global ip
    if not ip:
        return
    print("进程ID:", os.getpid(), ",开始采集详情页, url:", url, ",使用ip: ", ip)
    try:
        resp = requests.get(url, proxies={"http":ip,"https":ip}, timeout=5)
    except Exception as e:
        pid = os.getpid()
        print(f"进程({pid})出现异常, 使用ip:{ip}")
        print(e)
        try:
            ip = q.get_nowait()
        except Empty:
            pass
        raise Exception(url)
    else:
        html = resp.text
        doc = etree.HTML(html)
        results = doc.xpath('//div[@class="directors el-row"]//p/text()')
        directors = '|'.join([i.strip() for i in results])
        time.sleep(1.5)
        return url + ',' + directors

def save_result(results):
    print("获取的导演结果:", results)

def get_ip(q):
    global ip
    try:
        ip = q.get_nowait()
    except Empty:
        ip = ''
        print('代理列表为空, 请添加代理后再开始采集!')

def index_callback(detail_urls):
    if not detail_urls:
        return
    for url in detail_urls:
        pool.apply_async(get_detail, args=(url, q), callback=save_result, error_callback=detail_error_callback)

def index_error_callback(e):
    pool.apply_async(get_index, (str(e), q), callback=index_callback, error_callback=index_error_callback)

def detail_error_callback(e):
    pool.apply_async(get_detail, (str(e), q), callback=save_result, error_callback=detail_error_callback)

if __name__ == "__main__":
    manage = multiprocessing.Manager()
    q = manage.Queue(len(ips2))
    for i in ips2:
        q.put(i)
    t = time.time()
    pool = multiprocessing.Pool(processes = 20, initializer=get_ip, initargs=(q,))
    for i in range(5):
        pool.apply_async(get_index, args=(i, q), callback=index_callback, error_callback=index_error_callback)
    while(pool._cache):
        time.sleep(0.1)
    pool.close()
    pool.join()
    print('采集完成,耗时:', time.time() - t)
   

这个代码利用回调函数来不断往进程池加入新任务,如果代码出现问题,可能会出现死循环的情况。所以运行之前注意理清逻辑。另外,回调函数是在主进程中执行的,如果包含阻塞的代码,则后面的回调函数会一直在等待状态。所以不要在回调函数内干耗时的操作,可以将这些操作写在进程函数内。

ips2列表中的代理需大于进程数,不然会出现采集数据为None的情况。如果进程没有拿到代理则不给他分配任务,这个想要实现比较麻烦。暂时没有想要一个好的解决方法。

如果代理只是比进程数少了一点点,对于没有拿到代理的进程拿到任务时,可以通过抛出异常让代理池重新分配给进程(见error_callback两个函数的代码)。这可能会导致死循环,因为代理池可能又把任务分配给了没有拿到代理的进程,又会将任务抛出。所以这种方法只针对代理量相对进程来说只是少了一点点的情况。

如何判断进程池的任务是否执行完了,这个官方文档中没有提到相关的接口,但是我通过查看源码发现了几个属性可以进行判断,不知道是不是最佳的方法,有清楚的还请不吝赐教。

  • bool(pool._cache): 判断进程池任务是否全部执行完
  • pool._inqueue.empty():判断进程池是否没有额外的任务(所有任务要么执行完成,要么已分配给进程池内的进程正在执行 )
上一篇:一个无锁消息队列引发的血案(三)——地:q3.h 与 RingBuffer


下一篇:Python进程模块