我必须在MongoDB上进行大量的插入和更新.
我正在尝试测试多处理来完成这些任务.为此,我创建了这个简单的代码.我的虚拟数据是:
documents = [{"a number": i} for i in range(1000000)]
没有多处理:
time1s = time.time()
client = MongoClient()
db = client.mydb
col = db.mycol
for doc in documents:
col.insert_one(doc)
time1f = time.time()
print(time1f-time1s)
我有150秒.
通过多处理,我根据需要定义了以下工作函数,并在Pymongo’s FAQs中进行了描述.
def insert_doc(document):
client = MongoClient()
db = client.mydb
col = db.mycol
col.insert_one(document)
但是,当我运行我的代码时:
time2s = time.time()
pool = mp.Pool(processes=16)
pool.map(insert_doc, documents)
pool.close()
pool.join()
time2f = time.time()
print(time2f - time2s)
我收到一个错误:
pymongo.errors.ServerSelectionTimeoutError: localhost:27017: [Errno
99] Cannot assign requested address
在提出错误之前,共处理了26447份文件.虽然遇到该错误的人没有使用多处理,但是在here中解释了此错误.解决方案是只打开一个MongoClient,但是当我想进行多处理时,这是不可能的.有没有解决方法?谢谢你的帮助.
解决方法:
您的代码为示例中的每个文档创建一个新的MongoClient(就像您链接的问题一样).这要求您为每个新查询打开一个新套接字.这会破坏PyMongo的连接池,除了速度极慢之外,它还意味着你比TCP堆栈更快地打开和关闭套接字:你在TIME_WAIT状态下留下太多套接字,这样你最终会耗尽端口.
如果您为每个客户端插入大量文档,则可以创建更少的客户端,从而打开更少的套接字:
import multiprocessing as mp
import time
from pymongo import MongoClient
documents = [{"a number": i} for i in range(1000000)]
def insert_doc(chunk):
client = MongoClient()
db = client.mydb
col = db.mycol
col.insert_many(chunk)
chunk_size = 10000
def chunks(sequence):
# Chunks of 1000 documents at a time.
for j in range(0, len(sequence), chunk_size):
yield sequence[j:j + chunk_size]
time2s = time.time()
pool = mp.Pool(processes=16)
pool.map(insert_doc, chunks(documents))
pool.close()
pool.join()
time2f = time.time()
print(time2f - time2s)