ThreadPoolExecutor内存溢出
情景一:
在数据处理中,使用ThreadPoolExecutor(线程池)处理大量数据情况下,导致内存溢出
机器卡死挂掉;
场景模拟:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from memory_profiler import profile
import queue
from line_profiler import LineProfiler
from functools import wraps
class BoundThreadPoolExecutor(ThreadPoolExecutor):
"""
对ThreadPoolExecutor 进行重写,给队列设置边界
"""
def __init__(self, qsize: int = None, *args, **kwargs):
super(BoundThreadPoolExecutor, self).__init__(*args, **kwargs)
self._work_queue = queue.Queue(qsize)
def timer(func):
@wraps(func)
def decorator(*args, **kwargs):
func_return = func(*args, **kwargs)
lp = LineProfiler()
lp_wrap = lp(func)
lp_wrap(*args, **kwargs)
lp.print_stats()
return func_return
return decorator
def func(num):
print(f"the {num} run...")
time.sleep(0.5)
return num*num
# @timer
@profile
def main():
# with ThreadPoolExecutor(max_workers=2) as t:
# res = [t.submit(func, i) for i in range(100)]
# pool = BoundThreadPoolExecutor(qsize=2, max_workers=2)
pool = ThreadPoolExecutor(max_workers=2)
for i in range(100):
# func(i)
pool.submit(func, i)
print(pool._work_queue.qsize())
pool.shutdown()
if __name__ == '__main__':
main()
未对线程队列限制时,进程将所有对象添加到self._work_queue
中
重写ThreadPoolExecutor
, 限制self._work_queue = queue.Queue(qsize)
队列大小
结果对比
总结
存在内存溢出的情况,原因是ThreadPoolExecutor 线程池使用的是无边界队列,进程在队列中
添加对象时没有对空闲线程进行判断,导致内存消耗过多