使用多处理锁的麻烦.Pool:酸洗错误

我正在构建一个python模块来从大型文本语料库中提取标签,虽然它的结果质量很高但执行速度非常慢.我试图通过使用多处理来加快进程,这也是有效的,直到我尝试引入一个锁,以便一次只有一个进程连接到我们的数据库.我无法弄清楚我的生活如何使这项工作 – 尽管经常搜索和调整我仍然得到一个PicklingError:不能pickle< type'strread.lock'>:属性查找thread.lock失败.这是有问题的代码 – 它工作正常,直到我试图传递一个锁对象作为f的参数.

def make_network(initial_tag, max_tags = 2, max_iter = 3):
    manager = Manager()
    lock = manager.Lock()
    pool = manager.Pool(8)

    # this is a very expensive function that I would like to parallelize 
    # over a list of tags. It involves a (relatively cheap) call to an external
    # database, which needs a lock to avoid simultaneous queries. It takes a list
    # of strings (tags) as its sole argument, and returns a list of sets with entries
    # corresponding to the input list.
    f = partial(get_more_tags, max_tags = max_tags, lock = lock) 

    def _recursively_find_more_tags(tags, level):
        if level >= max_iter:
            raise StopIteration
        new_tags = pool.map(f, tags)
        to_search = []
        for i, s in zip(tags, new_tags):
            for t in s:
                joined = ' '.join(t)
                print i + "|" + joined
                to_search.append(joined)
        try:
            return _recursively_find_more_tags(to_search, level+1)
        except StopIteration:
            return None

    _recursively_find_more_tags([initial_tag], 0)

解决方法:

您的问题是锁定对象不可选.在这种情况下,我可以看到两种可能的解决方案.

>为避免这种情况,您可以将锁变量设为全局变量.然后,您将能够在池过程函数中直接将其作为全局变量引用,并且不必将其作为参数传递给池过程函数.这是有效的,因为Python在创建池进程时使用OS分叉机制,因此会复制创建池进程的进程的全部内容.这是将锁传递给使用多处理包创建的Python进程的唯一方法.顺便说一句,没有必要仅使用Manager类来进行此锁定.通过此更改,您的代码将如下所示:

import multiprocessing
from functools import partial

lock = None  # Global definition of lock
pool = None  # Global definition of pool


def make_network(initial_tag, max_tags=2, max_iter=3):
    global lock
    global pool
    lock = multiprocessing.Lock()
    pool = multiprocessing.Pool(8)


def get_more_tags():
    global lock
    pass


# this is a very expensive function that I would like to parallelize
# over a list of tags. It involves a (relatively cheap) call to an external
# database, which needs a lock to avoid simultaneous queries. It takes a
# list of strings (tags) as its sole argument, and returns a list of sets
# with entries corresponding to the input list.
f = partial(get_more_tags, max_tags=max_tags) 

def _recursively_find_more_tags(tags, level):
    global pool
    if level >= max_iter:
        raise StopIteration
    new_tags = pool.map(f, tags)
    to_search = []
    for i, s in zip(tags, new_tags):
        for t in s:
            joined = ' '.join(t)
            print(i + "|" + joined)
            to_search.append(joined)
    try:
        return _recursively_find_more_tags(to_search, level + 1)
    except StopIteration:
        return None

_recursively_find_more_tags([initial_tag], 0)

在您的实际代码中,锁和池变量可能是类实例变量.

>第二个避免完全使用锁但可能具有稍高开销的解决方案是使用multiprocessing.Process创建另一个进程,并通过multiprocessing.Queue将其连接到每个池进程.此过程将负责运行数据库查询.您将使用该队列允许池进程将参数发送到管理数据库查询的进程.由于所有池进程都使用相同的队列,因此将自动序列化对数据库的访问.额外的开销将来自数据库查询参数和查询响应的pickle / unpickling.请注意,您可以将multiprocessing.Queue对象作为参数传递给池进程.另请注意,基于multiprocessing.Lock的解决方案不适用于不使用fork语义创建进程的Windows.

上一篇:linux – 如何在使用锁文件时避免竞争条件以避免脚本的两个实例同时运行?


下一篇:java – ReentrantReadWriteLock:ReadLock和WriteLock之间有什么区别?