服务器开启的进程数或线程数都会随着并发的客户端数目地增多而增多,
这会对服务端主机带来巨大的压力,甚至于不堪重负而瘫痪,
于是我们必须对服务端开启的进程数或线程数加以控制,让机器在一个自己可以承受的范围内运行,
这就是进程池或线程池的用途,例如进程池,就是用来存放进程的池子,
本质还是基于多进程,只不过是对开启进程的数目加上了限制
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
基本方法:
submit(func,*args,*kwargs) :异步提交任务
map(func,*iterable,timeout = None,chunksize=1) : 取代for循环submit操作
shutdown(wait=True) : 相当于进程池的pool.close() + pool.join()操作
wait = True 等待池内所有任务执行完毕回收资源后才继续
wait = False 立即返回,并不会等待池内的任务执行完毕
但是,不管wait值为多少,整个程序都会等到所有任务执行完毕
result(timeout=None) : 取得结果
add_done_callback(func) : 回调函数
1、进程池
from concurrent.futures import ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(11): future=executor.submit(task,i) futures.append(future) # 将对象添加到列表 executor.shutdown(True) print('+++>') for future in futures: print(future.result()) # 打印每个对象返回的结果
2、线程池
from concurrent.futures import ThreadPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) futures=[] for i in range(11): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print(future.result())
3、map的用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) li = [] # for i in range(11): # future=executor.submit(task,i) ret = executor.map(task,range(1,12)) #map取代了for+submit,返回的是一个迭代器 executor.shutdown(True) # 等待线程池里的任务都运行完再进行下一步 for i in ret: # 这里的结果与上面的结果是一样的 print(i)
4、回调函数
什么是回调函数:可以为进程池或线程池里的每一个进程或现场绑定一个函数,该函数在进程或线程任务执行完毕后自动触发,并接受任务的返回值当做参数,该函数成为回调函数
可以理解为:进程或线程执行完返回结果后再在外面进行调用函数进行处理
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import requests import os def get_page(url): print('<进程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): res=res.result() print('<进程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=ProcessPoolExecutor(3) for url in urls: ret = p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果