因此,我编写了一个工具,该工具可以接收项目列表,将其拆分为给定数量的列表(比方说10个),然后获取这10个列表,并生成10个线程,“ EvaluationThreads”(扩展threading.thread),以及每个这些线程中的任何一个都会评估提供给他们评估的内容.当我启动每个线程时,我将它们全部放入列表中,并在生成它们后得到以下代码:
for th in threadList:
th.join()
someTotal = th.resultsAttribute
这就是我如何等待所有线程完成并收集其信息的方式.虽然这是等待所有内容完成然后收集结果的一种有效方法,但我觉得必须有一种更优雅的方法,因为这些线程很可能在不同的时间结束,并且如果第一个开始完成则最后较早完成的线程必须等待该线程完成才能加入.有没有办法获取这些线程的信息并在完成时加入它们,而不是按照它们启动的顺序来加入它们?我本来以为我会在线程中使用某种回调或其他方法,但是我不确定是否有更可接受的解决方案.
谢谢你的帮助.
编辑:为澄清起见,我的评估功能不受CPU限制,并且我不尝试在线程之间分发文档以使其尽快完成,每个线程都有固定的偶数个作业.
解决方法:
对于您的主要问题:
如果您要执行的操作比这还要复杂,或者特别是要反复执行,则可能需要“线程组”类.其中有数十种是预制的,但是如果您不喜欢其中任何一个,那么自己写一个就很简单了.
然后,代替此:
threadList = []
for argchunk in splitIntoChunks(values, 10):
threadList.append(threading.Thread(target=myThreadFunc, args=argchunk))
...
someTotal = 0
for th in threadList:
th.join()
someTotal += th.resultsAttribute
你可以这样做:
threadGroup = ThreadGroup.ThreadGroup()
for argchunk in splitIntoChunks(values, 10):
threadGroup.newThread(myThreadFunc, argchunk)
threadGroup.join()
someTotal = sum(th.resultsAttribute for th in threadGroup)
或者,甚至更好的是,完整的线程池库,因此您可以执行以下操作:
pool = ThreadPool(10)
for argchunk in splitIntoChunks(values, 100):
pool.putRequest(myThreadFunc, argchunk)
pool.wait()
这样做的好处是您可以轻松地在10个线程上适当地调度100个作业,而不是每个线程10个作业,而无需维护队列等所有工作.缺点是您不能仅迭代线程要获得返回值,您必须迭代作业-理想情况下,您不希望将作业保留到最后,以便可以对其进行迭代.
这带给我们第二个问题,即如何从线程(或作业)中获取价值.有很多很多方法可以做到这一点.
您所做的工作.您甚至不需要任何锁定.
如您建议的那样,使用回调也可以.但是请记住,回调将在工作线程而不是主线程上运行,因此,如果它正在访问某些全局对象,则将需要某种同步.
如果仍然要进行同步,则回调可能没有任何好处.例如,如果您要做的就是将一堆值求和,则只需设置total = [0],然后让每个线程在锁内只执行total [0] = myValue. (当然,在这种情况下,仅在主线程中进行求和并避免锁定可能更有意义,但是如果将结果合并在一起的工作更为繁琐,则选择可能不会那么简单.)
您也可以使用某种原子对象,而不是显式锁定.例如,标准的Queue.Queue和collections.deque都是原子的,因此每个线程只能设置q = Queue.Queue(),然后每个线程通过执行q.push(myValue)来推送其结果,然后在加入您之后迭代并汇总队列的值.
实际上,如果每个线程恰好一次推送到队列中,您只需要对队列本身进行10次阻塞,然后便知道group.join()或pool.wait()或其他任何将很快返回的对象.
或者,您甚至可以将回调作为作业推送到队列中.同样,您可以在队列上进行10次阻塞获取,每次执行结果.
如果每个线程可以返回多个对象,则它们可以在完成后将一个哨兵值或回调发送到队列中,并且主线程一直弹出,直到读取10个哨兵.