池concurrent.futrues
- 什么是池?
- 要在程序开始的时候,还没提交任务,先创建几个线程或者进程放在一个池子里,这就是池
- 为什么要用池?
- 如果先开好进程/或线程,那么有任务之后就可以直接使用这个池中的数据了
- 开好的进程或线程会一直存在池中,可以被多个任务反复利用,这样极大的减少了开启,关闭,调度线程、进程的时间开销
- 池中的进程/线程个数控制了操作系统需要调度的任务个数,控制池中的单位,有利于提高操作系统的效率, 减轻操作系统的负担
线程池
-
线程池里面推荐的线程个数(一般根据IO的比例定制):cpu_count
*
5 -
代码:
-
from threading import current_thread from concurrent.futures import ThreadPoolExecutor import time import random def func(): print(current_thread().ident,'start') #从内部获取开启进程的个数 time.sleep(random.randint(1,4)) print(current_thread().ident, 'end') tp=ThreadPoolExecutor(4) #起了4个线程 for i in range(10): tp.submit(func) #submit向池中提交任务 #得: 6876 start 13340 start 11760 start 6080 start 6876 end 6876 start 6080 end 6080 start 11760 end 11760 start 6876 end 6876 start 13340 end 13340 start 11760 end 11760 start 11760 end 6080 end
-
带参数
-
from threading import current_thread from concurrent.futures import ThreadPoolExecutor import time import random def func(a,b): print(current_thread().ident,'start',a,b) #从内部获取开启进程的个数 time.sleep(random.randint(1,4)) print(current_thread().ident, 'end') tp=ThreadPoolExecutor(4) #起了4个线程 for i in range(10): tp.submit(func,i,i+1) #得: 8624 start 0 1 7884 start 1 2 7844 start 2 3 11832 start 3 4 7844 end 7844 start 4 5 8624 end 8624 start 5 6 7884 end 7884 start 6 7 11832 end 11832 start 7 8 7844 end 7844 start 8 9 7884 end 7884 start 9 10 11832 end 8624 end 7884 end 7844 end
-
进程池
-
运用场景:高计算的时候,没有io(没有文件操作,没有数据库操作,没有网络操作,没有input)
-
进程池里面推荐的进程个数:cpu_count
*
1<进程个数<cpu_count*
2 -
代码:
-
import os from concurrent.futures import ProcessPoolExecutor import time import random def func(a, b): print(os.getpid(), 'start', a, b) # 从内部获取开启进程的个数 time.sleep(random.randint(1, 4)) print(os.getpid(), 'end') if __name__ == '__main__': tp = ProcessPoolExecutor(4) # 起了4个线程 for i in range(10): tp.submit(func, i, i + 1) #得 7836 start 0 1 8888 start 1 2 11916 start 2 3 14660 start 3 4 11916 end 11916 start 4 5 7836 end 7836 start 5 6 14660 end 14660 start 6 7 7836 end 8888 end 7836 start 7 8 8888 start 8 9 14660 end 14660 start 9 10 11916 end 7836 end 8888 end 14660 end
-
获取任务结果
from concurrent.futures import ProcessPoolExecutor def func(a, b): return a*b if __name__ == '__main__': tp = ProcessPoolExecutor(4) # 起了4个线程 future_l={} for i in range(10): #异步非阻塞 ret=tp.submit(func, i, i + 1) future_l[i]=ret # print(ret) #Future未来对象 # print(ret.result()) #缺点:慢,效率低 for key in future_l: #同步阻塞 print(key,future_l[key].result())
-
回调函数(add_done_callback)
-
回调函数(异步阻塞),给ret对象绑定一个回调函数。等ret对应的任务有了结果之后,立即调用print_func这个函数,就可以 立即对函数进行处理,而不是按照顺序接受结果,处理结果
-
代码
-
from concurrent.futures import ProcessPoolExecutor import os import time import random def func(a, b): print(os.getpid(), 'start', a, b) # 从内部获取开启进程的个数 time.sleep(random.randint(1, 4)) print(os.getpid(), 'end') return a*b def print_func(ret): print(ret.result()) if __name__ == '__main__': tp = ProcessPoolExecutor(4) # 起了4个线程 for i in range(10): #异步非阻塞 ret=tp.submit(func, i, i + 1) ret.add_done_callback(print_func) #异步阻塞,回调函数,给ret对象绑定一个回调函数。 # 等ret对应的任务有了结果之后,立即调用print_func这个函数 #就可以立即对函数进行处理,而不是按照顺序接受结果,处理结果 #得: 15284 start 0 1 7540 start 1 2 544 start 2 3 6880 start 3 4 7540 end 7540 start 4 5 2 15284 end 544 end 544 start 5 6 6 15284 start 6 7 0 6880 end 6880 start 7 8 12 15284 end 15284 start 8 9 42 7540 end 7540 start 9 10 20 544 end 30 7540 end 90 15284 end 6880 end 72 56
-
map
- map函数只适合传递简单的参数,并且必须是一个可迭代的数据类型作为参数
from concurrent.futures import ProcessPoolExecutor
def func(a):
b=a+1
return a*b
if __name__ == '__main__':
tp = ProcessPoolExecutor(4) # 起了4个线程
tm=tp.map(func,range(20)) #使用map将一个可迭代对象的元素依次传入函数
print(list(tm))
#得:
[0, 2, 6, 12, 20, 30, 42, 56, 72, 90, 110, 132, 156, 182, 210, 240, 272, 306, 342, 380]