concurrent.futures异步库使用:
1.简单函数使用
import time
from concurrent import futures
from concurrent.futures._base import Future
from concurrent.futures.thread import ThreadPoolExecutor
def add(x, y):
time.sleep(3)
return x + y
def done(self: Future): # 其中这里的self表示当前任务Future对象
print(self.result())
print("add done")
with ThreadPoolExecutor(max_workers=4) as e:
result: Future = e.submit(add, 4, 5)
# result.set_result(100) # 设置一个默认的结果,做测试使用,如果设置了默认则result.result()一直拿到的都是默认值(测试使用)
print(result.done()) # 如果任务被删除或者已经执行完成,返回True
print(result.cancelled()) # 任务是否被删除,如果删除返回True
# print(type(result), result.result(timeout=1)) # timeout时间内未完成任务,则抛出TimeoutError异常
# print(result.exception(timeout=4)) # 手动设置抛出异常,如果timeout时间内未完成任务,则抛出TimeoutError异常,完成返回None
result.add_done_callback(done) # 为当前任务对象添加回掉函数
--------------------------------------测试-------------------------------------
def wait_on_future():
print("`````````````")
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
return f.result()
# 如果max_workers=1则会导致程序卡死
executor = ThreadPoolExecutor(max_workers=1)
f = executor.submit(wait_on_future)
print(f.result())
2.添加任务:
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import time
import random
def test_func(s, key):
print('enter~~{} {}s key={}'.format(threading.current_thread(), s, key))
threading.Event().wait(s)
if key == 3:
raise Exception("{} failed````".format(key))
return "ok {} ".format(threading.current_thread())
futures = {}
def run(fs):
print("~~~~~~~~~~~~~~~~~~~~~~~")
while True:
time.sleep(1)
print('-' * 30)
print(fs)
# 只要有一个任务没有完成就阻塞,完成一个,执行一次(因为这里会阻塞,如果一个非常耗时的操作在这里进行不是很适合,建议给future添加timeout)
# 如果内部有异常result()会将这个异常抛出
# 有异常也算执行完了complete
# 如果fs中的future任务已经被执行,循环第二次不会重复执行(因为需要删除futures中的任务,重新添加)
for future in as_completed(fs): # fs为空也不阻塞
id = fs[future]
try:
print(id, future.result())
except Exception as e:
print(e)
print(id, "failed")
threading.Thread(target=run, args=(futures, )).start()
with ThreadPoolExecutor(max_workers=3) as executor:
for i in range(7):
futures[executor.submit(test_func, random.randint(1, 8), i)] = i