python – Pymongo多处理

我必须在MongoDB上进行大量的插入和更新.

我正在尝试测试多处理来完成这些任务.为此,我创建了这个简单的代码.我的虚拟数据是:

documents = [{"a number": i} for i in range(1000000)]

没有多处理:

time1s = time.time()
client = MongoClient()
db = client.mydb
col = db.mycol
for doc in documents:
    col.insert_one(doc)
time1f = time.time()
print(time1f-time1s)

我有150秒.

通过多处理,我根据需要定义了以下工作函数,并在Pymongo’s FAQs中进行了描述.

def insert_doc(document):
    client = MongoClient()
    db = client.mydb
    col = db.mycol
    col.insert_one(document)

但是,当我运行我的代码时:

time2s = time.time()
pool = mp.Pool(processes=16)
pool.map(insert_doc, documents)
pool.close()
pool.join()
time2f = time.time()
print(time2f - time2s)

我收到一个错误:

pymongo.errors.ServerSelectionTimeoutError: localhost:27017: [Errno
99] Cannot assign requested address

在提出错误之前,共处理了26447份文件.虽然遇到该错误的人没有使用多处理,但是在here中解释了此错误.解决方案是只打开一个MongoClient,但是当我想进行多处理时,这是不可能的.有没有解决方法?谢谢你的帮助.

解决方法:

您的代码为示例中的每个文档创建一个新的MongoClient(就像您链接的问题一样).这要求您为每个新查询打开一个新套接字.这会破坏PyMongo的连接池,除了速度极慢之外,它还意味着你比TCP堆栈更快地打开和关闭套接字:你在TIME_WAIT状态下留下太多套接字,这样你最终会耗尽端口.

如果您为每个客户端插入大量文档,则可以创建更少的客户端,从而打开更少的套接字:

import multiprocessing as mp
import time
from pymongo import MongoClient

documents = [{"a number": i} for i in range(1000000)]

def insert_doc(chunk):
    client = MongoClient()
    db = client.mydb
    col = db.mycol
    col.insert_many(chunk)

chunk_size = 10000

def chunks(sequence):
    # Chunks of 1000 documents at a time.
    for j in range(0, len(sequence), chunk_size):
        yield sequence[j:j + chunk_size]

time2s = time.time()
pool = mp.Pool(processes=16)
pool.map(insert_doc, chunks(documents))
pool.close()
pool.join()
time2f = time.time()
print(time2f - time2s)
上一篇:python – MongoClient在fork之前打开.创建MongoClient


下一篇:python – 是否有可能从pymongo ping mongodb