通过tqdm,可以很方便地查看一项长耗时任务的执行进度。
为了提升效率,有时可以将任务拆分,提交到多个进程上执行,再将结果汇总。那么,利用tqdm是否可以对多进程中的任务进行进度监控呢?
我对此进行了实验。
1、环境及版本
操作系统:macOS Big Sur
python版本:3.7.6
tqdm版本:4.42.1
2、测试代码
2.1 方法一
已经知道,使用tqdm
最直接的方法是:
from tqdm import tqdm
for i in tqdm(range(int(1e6))):
pass
通过查看tqdm
类的源码可以发现,按照上述方式调用时,range(...)
部分对应的形参实际是iterable
,即tqdm(iterable=range(int(1e6)))
。此外,tqdm
还可以接受一个total
参数,指的是需要计算的总次数。
基于上述原理,我们可以使用multiprocessing.Pool
对象的imap(imap_unordered)
方法来实现对多进程任务的进度追踪。
具体地,代码如下:
from multiprocessing import Pool
import tqdm
import time
def worker(i):
# 执行任务的函数
time.sleep(3)
pass
if __name__ == ‘__main__‘:
p = Pool(4)
start_time = time.time()
list(tqdm.tqdm(iterable=(p.imap(worker, range(10))), total=10))
print(f"共耗时{int(time.time() - start_time)}s")
p.close()
p.join()
由于Pool.imap()
返回的也是一个迭代器,且每个进程的worker
完成后就马上返回结果,因此,上述方式可以监控到整体的进度。
实际上,如果不关注返回值的顺序,使用Pool.imap_unordered()
能够获取更准确的进度,这是因为它的返回值是无序的。而imap
的有序将导致如果前面的任务较为耗时,即使后面的任务已经完成,也必须等到前面的完成后才能告诉tqdm更新进度。关于这一点,可以将上述worker
函数改为如下的形式,然后分别使用imap()
和imap_unordered()
进行验证。
...
def worker(i):
# 执行任务的函数
if i == 0:
sleep(5)
else:
sleep(0.1)
...
2.2 方法二
tqdm其实通过concurrent.futures
包装了对多进程的支持,使用起来更加的简单:
from time import sleep
from tqdm.contrib.concurrent import process_map
def worker(i):
# 执行任务的函数
if i == 0:
sleep(5)
else:
sleep(0.1)
return i
results = process_map(worker, range(10), max_workers=4)
results
会存储每个参数对应的计算结果。
不过这种方法的进度是按照参数传入的顺序进行更新的。也就是说,如果执行上述代码,那么进度会卡在第一个位置上5s,然后瞬间变为100%;如果将range(1)
改为range(9, -1, -1)
那么进度的表现为:在不到1s的时间内(0.9s)完成90%,然后再等待(4.1s)完成剩余的10%。