175.concurrent.futures异步库使用

concurrent.futures异步库使用:

1.简单函数使用

import time
from concurrent import futures
from concurrent.futures._base import Future
from concurrent.futures.thread import ThreadPoolExecutor


def add(x, y):
    time.sleep(3)
    return x + y


def done(self: Future):  # 其中这里的self表示当前任务Future对象
    print(self.result())
    print("add done")


with ThreadPoolExecutor(max_workers=4) as e:
    result: Future = e.submit(add, 4, 5)
    # result.set_result(100)  # 设置一个默认的结果,做测试使用,如果设置了默认则result.result()一直拿到的都是默认值(测试使用)
    print(result.done())  # 如果任务被删除或者已经执行完成,返回True
    print(result.cancelled())  # 任务是否被删除,如果删除返回True
    # print(type(result), result.result(timeout=1))  # timeout时间内未完成任务,则抛出TimeoutError异常
    # print(result.exception(timeout=4))  # 手动设置抛出异常,如果timeout时间内未完成任务,则抛出TimeoutError异常,完成返回None
    result.add_done_callback(done)  # 为当前任务对象添加回掉函数

--------------------------------------测试-------------------------------------
def wait_on_future():
    print("`````````````")
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    return f.result()


# 如果max_workers=1则会导致程序卡死
executor = ThreadPoolExecutor(max_workers=1)
f = executor.submit(wait_on_future)
print(f.result())

2.添加任务:

from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import time
import random


def test_func(s, key):
    print('enter~~{} {}s key={}'.format(threading.current_thread(), s, key))
    threading.Event().wait(s)
    if key == 3:
        raise Exception("{} failed````".format(key))
    return "ok {} ".format(threading.current_thread())


futures = {}


def run(fs):
    print("~~~~~~~~~~~~~~~~~~~~~~~")
    while True:
        time.sleep(1)
        print('-' * 30)
        print(fs)
        # 只要有一个任务没有完成就阻塞,完成一个,执行一次(因为这里会阻塞,如果一个非常耗时的操作在这里进行不是很适合,建议给future添加timeout)
        # 如果内部有异常result()会将这个异常抛出
        # 有异常也算执行完了complete
        # 如果fs中的future任务已经被执行,循环第二次不会重复执行(因为需要删除futures中的任务,重新添加)

        for future in as_completed(fs):  # fs为空也不阻塞
            id = fs[future]
            try:
                print(id, future.result())
            except Exception as e:
                print(e)
                print(id, "failed")


threading.Thread(target=run, args=(futures, )).start()
with ThreadPoolExecutor(max_workers=3) as executor:
    for i in range(7):
        futures[executor.submit(test_func, random.randint(1, 8), i)] = i
上一篇:什么是数据、元数据、主数据?这可能是大多数人没看过的解释


下一篇:迭代器Iterator