@(python)
目录
引言
Executor和Future
使用submit来操作线程池/进程池
add_done_callback实现回调函数
引言
Python标准库为我们提供了threading
和multiprocessing
模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我们就要编写自己的线程池/进程池,以空间换时间。但从Python3.2开始,标准库为我们提供了concurrent.futures
模块,它提供了ThreadPoolExecutor
和ProcessPoolExecutor
两个类,实现了对threading
和multiprocessing
的进一步抽象,对编写线程池/进程池提供了直接的支持。
Executor和Future
concurrent.futures
模块的基础是Exectuor
,Executor
是一个抽象类,它不能被直接使用。但是它提供的两个子类ThreadPoolExecutor
和ProcessPoolExecutor
却是非常有用,顾名思义两者分别被用来创建线程池和进程池的代码。我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue
来操心死锁的问题,线程池/进程池会自动帮我们调度。
Future
这个概念相信有java和nodejs下编程经验的朋友肯定不陌生了,你可以把它理解为一个在未来完成的操作,这是异步编程的基础,传统编程模式下比如我们操作queue.get
的时候,在等待返回结果之前会产生阻塞,cpu不能让出来做其他事情,而Future
的引入帮助我们在等待的这段时间可以完成其他的操作。
p.s:如果你依然在坚守Python2.x,请先安装futures模块。
pip install futures
使用submit来操作线程池/进程池
我们先通过下面这段代码来了解一下线程池的概念:
# example1.py
from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
time.sleep(2)
return message
pool = ThreadPoolExecutor(max_workers=2) # 创建一个最大可容纳2个task的线程池
future1 = pool.submit(return_future_result, ("hello")) # 往线程池里面加入一个task
future2 = pool.submit(return_future_result, ("world")) # 往线程池里面加入一个task
print(future1.done()) # 判断task1是否结束
time.sleep(3)
print(future2.done()) # 判断task2是否结束
print(future1.result()) # 查看task1返回的结果
print(future2.result()) # 查看task2返回的结果
执行结果:
False
True
hello
world
解析:我们根据运行结果来分析一下。我们使用
submit
方法来往线程池中加入一个task,submit返回一个Future
对象,对于Future
对象可以简单地理解为一个在未来完成的操作。在第一个print语句中很明显因为time.sleep(2)
的原因我们的future1没有完成,因为我们使用time.sleep(3)
暂停了主线程,所以到第二个print语句的时候我们线程池里的任务都已经全部结束。
上面的代码我们也可以改写为进程池形式,api和线程池如出一辙,我就不罗嗦了。
# example2.py
from concurrent.futures import ProcessPoolExecutor
import time
def return_future_result(message):
time.sleep(2)
return message
pool = ProcessPoolExecutor(max_workers=2)
future1 = pool.submit(return_future_result, ("hello"))
future2 = pool.submit(return_future_result, ("world"))
print(future1.done())
time.sleep(3)
print(future2.done())
print(future1.result())
print(future2.result())
add_done_callback
实现回调函数
from concurrent.futures import ThreadPoolExecutor
import requests
def task(url):
response = requests.get(url)
if response.status_code == 200:
return response
def download(futures):
response = futures.result() #会得到一个返回值,这个返回值就是task函数的返回值
content = response.text
tmp_list = response.url.split("/")
filename = tmp_list[len(tmp_list)-1]
print("正在下载:%s" %response.url)
with open(filename,"w",encoding="utf-8") as f:
f.write("%s\n%s" %(response.url,content))
print("下载完成")
url_list = [
"http://www.cnblogs.com/wupeiqi/articles/5713330.html",
"http://blog.csdn.net/anzhsoft/article/details/19563091",
"http://blog.csdn.net/anzhsoft/article/details/19570187"
]
thread_pool = ThreadPoolExecutor(max_workers=2) #生一个线程池对象,最大线程数为2个
for url in url_list:
futures = thread_pool.submit(task,url) #会得到一个Future对象
#回调函数,会将futures本身当作参数传给download函数
futures.add_done_callback(download)