1.为什么用线程池
1.启动一个新线程的消耗较高且涉及与操作系统的交互,尤其是程序中需要创建大量生存期很短暂的线程,而使用线程池可以很好地提升性能
2.线程池则是创建指定线程数量等待执行事件,当该事件执行结束后该线程并不会死亡,而是回到线程池中变成空闲状态等待执行下一个事件
3.当系统中包含有大量的并发线程时,会导致系统性能急剧下降甚至导致解释器崩溃,而使用线程池可以有效地控制系统中并发线程的数量
2.线程池的使用步骤
1.threadpool 模块实现线程池不推荐继续使用,此处推荐是用 concurrent.futures 模块中的 ThreadPoolExecutor 类实现线程池
2.调用 ThreadPoolExecutor 类的构造器创建一个线程池,定义一个普通函数作为线程任务
3.调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池
3.语法概述
from concurrent.futures import ThreadPoolExecutor thread_pool = ThreadPoolExecutor(max_workers=5) # 创建指定进程数量进程池并返回进程池对象 future = thread_pool.submit(fn, *args, **kwargs) # 将fn函数提交给线程池,并返回一个 future 对象 参数: *args 代表传给 fn 函数的参数 *kwargs 代表以关键字参数的形式为 fn 函数传入参数 # 该函数类似于内建函数 map(func, *iterables) 只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理 thread_pool.map(func, *iterables, timeout=None, chunksize=1) # 提交多个任务给池中,等效for + submit thread_pool.shutdown(wait=True) # 等待池内所有任务执行完毕后关闭线程池 future.cancel() # 取消该future代表的线程任务,如果该任务正在执行不可取消则该方法返回False,否则程序会取消该任务并返回True future.cancelled() # 返回future代表的线程任务是否被成功取消 future.running() # 如果该future代表的线程任务正在执行不可被取消,该方法返回True future.done() # 如果该funture代表的线程任务被成功取消或执行完成,则该方法返回True future.result(timeout=None) # 获取该future代表的线程任务最后返回的结果,如果future代表的线程任务还未完成该方法将会阻塞当前线程 参数: timeout 指定最多阻塞等待多少秒 future.exception(timeout=None) # 获取该future代表的线程任务所引发的异常,如果该任务成功完成没有异常则该方法返回None future.add_done_callback(fn) # 为该future代表的线程任务注册一个回调函数,当该任务成功完成时程序会自动触发该fn函数
4.线程池的基本使用
import time from concurrent.futures import ThreadPoolExecutor def func(num): time.sleep(1) print("num is %s" % num) return num * num def main(): ret_list = list() executor = ThreadPoolExecutor(max_workers=5) # 指定线程池中的线程数量 for i in range(5): future = executor.submit(func, i) # 异步提交任务,返回一个未来对象future ret_list.append(future) executor.shutdown(True) # 等待池内所有任务执行完毕回收完资源后才继续 for ret in ret_list: ret = ret.result() # 获取该future代表的线程任务最后返回的结果 print(ret) if __name__ == "__main__": main() """执行结果 num is 1 num is 0 num is 3 num is 2 num is 4 0 1 4 9 16 """
5.线程池的多任务提交
from concurrent.futures import ThreadPoolExecutor import time def func(num): sum = 0 for i in range(num): sum += i ** 2 print(sum) t = ThreadPoolExecutor(20) start = time.time() t.map(func, range(1000)) # 提交多个任务给池中,等效于 for + submit t.shutdown() print(time.time() - start)
6.线程池的返回值
from concurrent.futures import ThreadPoolExecutor import time def func(num): sum = 0 # time.sleep(5) # print(num) # 异步的效果 for i in range(num): sum += i ** 2 return sum t = ThreadPoolExecutor(20) # 下列代码是用map的方式提交多个任务,对应拿结果的方法是__next__()返回的是一个生成器对象 res = t.map(func, range(1000)) t.shutdown() print(res.__next__()) print(res.__next__()) print(res.__next__()) print(res.__next__()) # 下列代码是用for + submit提交多个任务的方式,对应拿结果的方法是result t = ThreadPoolExecutor(20) res_l = [] for i in range(1000): re = t.submit(func, i) res_l.append(re) t.shutdown() [print(i.result()) for i in res_l] # 在Pool进程池中拿结果,是用get方法,在ThreadPoolExecutor里边拿结果是用result方法
7.线程池中子线程调用回调函数
from threading import current_thread from concurrent.futures import ThreadPoolExecutor # 在线程池中,回调函数是子线程调用的,和父线程没有关系 from concurrent.futures import ProcessPoolExecutor # 不管是ProcessPoolExecutor的进程池 还是Pool的进程池,回调函数都是父进程调用的,和子进程没有关系 import os def func(num): sum = 0 for i in range(num): sum += i ** 2 print('这是在子线程中', current_thread()) # current_thread()查看线程标识,类似于进程中的getpid() return sum def call_back_fun(res): # print(res.result(), os.getpid()) print('这是在回调函数中', res.result(), current_thread()) # print(os.getpid()) if __name__ == '__main__': print(os.getpid()) t = ThreadPoolExecutor(20) # 线程池 # t = ProcessPoolExecutor(20) # 进程池 for i in range(10): t.submit(func, i).add_done_callback(call_back_fun) t.shutdown() print('这是在主线程中', current_thread())
8.进程池和线程池效率对比
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from multiprocessing import Pool # concurrent.futures 这个模块是异步调用的机制 # concurrent.futures 提交任务都是用submit # for + submit 多个任务的提交 # shutdown 是等效于Pool中的close+join,是指不允许再继续向池中增加任务,然后让父进程(线程)等待池中所有进程执行完所有任务 # from multiprocessing import Pool.apply / apply_async import time def func(num): sum = 0 for i in range(num): for j in range(i): for x in range(j): sum += x ** 2 # print(sum) if __name__ == '__main__': # pool的进程池的效率演示 p = Pool(5) start = time.time() for i in range(100): p.apply_async(func, args=(i,)) p.close() p.join() print('Pool进程池的效率时间是%s' % (time.time() - start)) # 0.51 # 多进程的效率演示 tp = ProcessPoolExecutor(5) start = time.time() for i in range(100): tp.submit(func, i) tp.shutdown() # 等效于进程池中的 close + join print('ProcessPoolExecutor进程池的消耗时间为%s' % (time.time() - start)) # 0.49 # 多线程的效率 tp = ThreadPoolExecutor(20) start = time.time() for i in range(100): tp.submit(func, i) tp.shutdown() # 等效于 进程池中的 close + join print('ThreadPoolExecutor线程池的消耗时间为%s' % (time.time() - start)) # 1.40 # 结果: 针对计算密集的程序来说 # 不管是Pool的进程池还是ProcessPoolExecutor()的进程池,执行效率相当 # ThreadPoolExecutor 的效率要差很多 # 所以当计算密集时,使用多进程