C++11异步编程

线程同步是线程间有序访问共享数据。

同步

同步:在调用后没有得到结果,该调用就不返回。一旦调用返回,就意味得到了返回值。由调用者等待调用的结果。

异步

异步:调用后立即返回了,不等待返回结果。由被调用者在得到结果后,通知调用者(回调函数,等)处理这个调用。

异步编程

在< thread >线程库中,没有获得线程执行结果的方法。

在并发编程中,虽然多线程互斥临界区条件变量,可以获取异步任务执行结果,但操作多容易引入bug。还会使用各种回调方法来处理异步返回的结果,让代码分散且难以维护。

< future >库函数

C++ 11新增了< future >库函数为异步编程提供了很大的便利。

< future >库允许不同的线程访问共享数据。
<future> 头文件中包含了以下几个类和函数:

Providers 类:std::promise, std::package_task
Futures 类:std::future, shared_future.
Providers 函数:std::async()
其他类型:std::future_error, std::future_errc, std::future_status, std::launch.

std::promise 类

promise 提供了一种线程同步的手段。promise 对象可以保存类型 T 的值,该值可被在其他线程中的 future 对象读取 。 promise 对象可以和一个std::future(一种共享状态)对象相关联,并可以在相关联的共享状态(std::future)上保存一个类型T 的值。

通过promise的成员函数 get_future 来与 future 对象关联,调用该函数之后,两个对象共享相同的共享状态(shared state)

promise 对象是异步 Provider,它可以在另一个线程通过set_value成员函数设置共享状态的值,使得共享状态标志变为 ready。
future 对象可以接收异步返回的共享状态的值,可以在必要的情况下,用wait函数阻塞调用者,等待共享状态标志变为 ready,后才能获取共享状态的值。


promise与future传递结果

Prmomise和Future提供一种访问异步操作结果的机制,可以在线程之间传递数据和异常信息。

std::promise< T >与std::packaged_task< Func >提供了较丰富的异步编程工具,使用时要创建提供共享状态的对象(promise与packaged_task),还要创建访问共享状态的对象(future与shared_future)。

promise (const promise&) = delete;   拷贝构造函数,被禁用。
promise (promise&& x) noexcept;      有移动构造函数。


std::promise 的 operator= 没有拷贝语义,即 std::promise 普通的赋值操作被禁用,operator= 只有 move 语义,所以 std::promise 对象是禁止拷贝的

 


std::promise<int> pr;
std::thread t([](std::promise<int>& p){ p.set_value_at_thread_exit(9); },std::ref(pr));//这里promise拿到了9
std::future<int> f = pr.get_future();//设置关联的future
auto r = f.get();
 

#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>
 
void accumulate(std::vector<int>::iterator first,
                std::vector<int>::iterator last,
                std::promise<int> accumulate_promise)
{
    int sum = std::accumulate(first, last, 0);
    Sleep(10);
    accumulate_promise.set_value(sum);  //promise对象设置结果,内部会让共享状态变为就绪,以提醒future
}
 
int use_promise()
{
    //用std::promise<int> 在线程间传递结果。
    std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
    std::promise<int> accumulate_promise;
    std::future<int> accumulate_future = accumulate_promise.get_future();  //将future 对象和promise 对象关联上
    
    std::thread work_thread(accumulate, numbers.begin(), numbers.end(), std::move(accumulate_promise));  //把promise对象送入耗时work_thread线程内
    accumulate_future.wait();//future对象在wait方法上阻塞,用以等待关联的promise对象设置结果
    std::cout << "result=" << accumulate_future.get() << '\n';
    work_thread.join();//阻塞等待work_thread线程执行完成
   
    return 0;
}
 
std::promise<int> promise1;

void print_global_promise() {
    std::future<int> fut = promise1.get_future();
    int x = fut.get();
    std::cout << "value: " << x << '\n';
}


int use_promise2()
{
    std::thread th1(print_global_promise);
    std::this_thread::sleep_for(std::chrono::seconds(5));

    promise1.set_value(10);
    th1.join();

    promise1 = std::promise<int>();    // promise1 被move赋值为一个新的 promise 对象.

    std::thread th2(print_global_promise);
    std::this_thread::sleep_for(std::chrono::seconds(5));

    promise1.set_value(20);
    th2.join();

    return 0;
}

promise::get_future 函数

返回与 promise 共享状态相关联的 future对象 。future 对象可以访问该 promise 对象设置在共享状态上的值或者异常对象。调用get_future函数后,promise 对象通常会在某个时间点准备好(设置一个值或一个异常对象),如果不设置值或者异常,promise 对象在析构时会自动地设置一个 future_error 异常,来设置其自身的准备状态。

promise::set_value函数

用于设置共享状态的值,通过成员函数set_value可以设置std::promise中保存的值,调用后 promise 的共享状态标志变为 ready。该值最终会被与之关联的std::future::get函数读取到。需要注意的是:set_value只能被调用一次,多次调用会抛出std::future_error异常

理解

promise会做出承诺:会设置T类型的值,future未来从这里获得承诺的T类型的值。


线程A可以对promise执行set_value(),传入对应产出的值,而另一个线程B则可以使用future的get()方法来获取他们共享的值,但这个线程B会阻塞在那,直到获得future与promise共享的值。这里有一个值得注意的地方:Future的一个重要属性在于它只能被赋值一次。

void runner(std::promise<int>* pPromise) {
    std::cout << "do some io task" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(5));
    pPromise->set_value(3);
}

int use_promise3() {
    std::promise<int> _promise;
    std::thread t(runner, &_promise);

    int ret = _promise.get_future().get(); // Wait p.set_value()
    std::cout << ret << std::endl;

    t.join();

    return 0;
}

std::future

std::future提供了一种访问异步操作结果的机制。因为异步操作是不能马上获得结果的,只能在未来某个时候获取,我们以同步等待的方式来获取结果。future_status有三种状态:

deferred:异步操作还没开始
ready:异步操作已经完成
timeout:异步操作超时


获取future结果有三种方式:

get:等待异步操作结束并返回结果
wait:只是等待异步操作完成,没有返回值
wait_for:超时等待返回结果。


//获取状态
status = future.wait_for(std::chrono::seconds(1))
if (status == std::future_status::deferred) {
  std::cout << "deferred\n";
} else if (status == std::future_status::timeout) {
  std::cout << "timeout\n";
} else if (status == std::future_status::ready) {
  std::cout << "ready!\n";
}
 

从异步调用的角度来说,future更像是执行函数的返回值,如果一个事件需要等待特定的一次性事件,那么这线程可以获取一个future对象来代表这个事件。

异步调用往往不知道何时返回,如果需要使用异步调用的结果。这个时候就要用到future。

线程可以周期性的在这个future上等待一小段时间,检查future是否已经ready,如果没有,该线程可以先去做另一个任务,一旦future就绪,该future就无法复位(无法再次使用这个future等待这个事件),所以future代表的是一次性事件。


future的类型

<future>库中声明了两种future,唯一future(std::future)和共享future(std::shared_future)前者的实例是仅有的一个指向其关联事件的实例,后者可以有多个实例指向同一个关联事件,当事件就绪时,所有指向同一事件的std::shared_future实例会变成就绪。



std::future是一个模板,例如std::future<int>,模板参数就是期待返回的类型,future被用于线程间通信。

future使用的时机是当你不需要立刻得到一个结果的时候,你可以开启一个线程帮你去做一项任务,并期待这个任务的返回,但是std::thread并没有提供这样的机制,这就需要用到std::async和std::future(都在<future>头文件中声明)

std::async返回一个std::future对象,而不是给一个确定的值。当你需要使用这个值的时候,对future使用get(),线程就会阻塞直到future就绪,然后返回该值。


int accumulate2(std::vector<int>::iterator first,
    std::vector<int>::iterator last)
{
    int sum = std::accumulate(first, last, 0);
    std::this_thread::sleep_for(std::chrono::seconds(10));
    return sum;
}
 

int use_packaged_task()
{
    //用packaged_task 在线程间传递结果。
    std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
    std::packaged_task<int(std::vector<int>::iterator, std::vector<int>::iterator)> accumulate_task(accumulate2);
    std::future<int> accumulate_future = accumulate_task.get_future();

    std::thread work_thread(std::move(accumulate_task), numbers.begin(), numbers.end());
    accumulate_future.wait();  //等待结果

    std::cout << "result=" << accumulate_future.get() << '\n';
    work_thread.join();  //阻塞等待线程执行完成

  
    return 0;
}

shared_future

字面意思是:可共享的未来

shared_future对象的行为与future对象类似,但是shared_future对象可以被复制的,多个shared_future对象可以共享所有权。在共享状态的值在准备就绪状态ready后,可以多次获取值。

shared_future对象可以从future对象隐式转换(构造函数),也可以调用future::share显式获得。转换获取后,原future对象无效。

共享状态的生存期持续到与之关联的最后一个对象被销毁为止。

future的get()函数为转移语义数据,只能get一次;shared_future的get()函数为复制数据,可以get多次。

需要用std::shared_future< T > 

std::future< T >提供了一个将future转换为shared_future的方法f.share(),但转换后原future状态失效。使用时需要留心。

使用全局变量传递数据,会使得不同函数间的耦合度较高,不利于模块化编程。后面两种方法分别通过函数参数与返回值来传递数据,可以降低函数间的耦合度,使编程和维护更简单快捷。


  //修改后,使用shared_future的result_s可以get多次
   std::shared_future<int> result_s(std::move(result)); //执行后result为空。

语句执行效果等同于下句:
    //std::shared_future<int> result_s(mypt.get_future()); 
    auto mythreadresult = result_s.get();    //mythreadresult=5
    mythreadresult = result_s.get();        //mythreadresult=5
 

std::packaged_task

std::packaged_task是模板,是任务的封装。好处是不需要修改函数,直接利用return。
std::packaged_task的模板参数是函数签名。
packaged_task包装callable元素,将其作为任务,将callable对象的return结果传输到future对象,并允许异步的获取future对象。

调用packaged_task的get_future()方法,获得它绑定的函数的返回值类型的future。
例如 int add(int a, intb)的函数签名是int(int, int);
std::packaged_task<int(int, int)> task(add); 

packaged_task对象包含两个元素:
        一个是任务,它是可调用对象(如函数指针、指向成员或函数对象的指针)。
        一个是共享状态,能够存储调用已存储任务return的结果,并可通过futrue未来对象在另个线程异步的访问。

通过调用packaged_task的成员函数get_future,将共享状态与future对象关联。使得两个对象共享相同的共享状态:packaged_task对象是异步提供程序,通过调用存储的任务,可以在某个时刻将共享状态设置为ready就绪。

future是异步返回对象,用于检索共享状态的值,在必要时它等待,直到变成准备就绪ready状态。共享状态的生存期持续到与之关联的最后一个对象被销毁为止。

int add(int a, int b)
{
    std::this_thread::sleep_for(std::chrono::seconds(5));
    return a + b;
}

int main()
{
    std::packaged_task<int(int, int)> task(add);
    std::cout << "Hello World 11" << std::endl;
    std::future<int> result = task.get_future();
    //task(1, 1);      //必须要让任务执行,否则在get()获取future的值时会一直阻塞
    std::thread th(std::move(task), 10, 5);
    std::cout << "Hello World 66" << std::endl;
    std::cout << "Hello World 77" << std::endl;

    std::cout << result.get() << std::endl;

    std::cout << "Hello World 88" << std::endl;
    return 0;
}

async

future库封装了更高级别的函数std::async   ,std::async是更简单的异步工具,使异步执行任务更方便。

std::future std::async(std::launch policy, Func, Args…)

std::async函数的返回值是std::futrue对象。可以很方便的获取线程函数的执行结果。
        std::async会自动创建一个线程去调用线程函数,它返回一个std::future,这个future中存储了线程函数返回的结果,当我们需要线程函数的结果时,直接从future中获取。



std::launch::async | std::launch::deferred参数 是默认行为(可省略)。有了这个启动策略,它可以异步运行或不运行,这取决于系统的负载。
继续使用上面的程序示例,改为使用std::async传递结果,修改后的代码如下:

std::async()第一个参数std::launch policy是启动策略,控制std::async的异步行为,可以用三种不同的启动策略来创建std::async:

(1)std::launch::async保证异步执行,即传递函数将在单独的线程中执行;
std::launch::async 不需要get(),新线程也会创建并执行

std::future<int> result = std::async(std::launch::async,&A::mythread2, &a, tempar);  


(2)std::launch::deferred表示线程入口函数,被延迟到std::future的wait()或者get()函数调用时,才执行;当其他线程调用std::future的get()/wait()来访问共享状态时,将执行非异步行为;

std::future<int> result = std::async(std::launch::deferred,&A::mythread2, &a, tempar);   

如果在此之后不调用wait()或者get(),那么上述线程不会被执行。
如果在之后调用get()函数,并没有创建新线程,是在主线程中调用的线程入口函数。

(3)当不指定参数时,默认参数为:std::launch::deferred | std::launch::async,系统会自行选择一种模式。
 

Func是要调用的可调用对象(function, member function, function object, lambda),Args是传递给Func的参数。


使用std::async能在很大程度上简少编程工作量,我们可以使用std::async替代线程的创建,让它成为我们做异步操作的首选。
 

class CA
{
public:
    int funcHaoShi(int param)
    {
        std::cout << "threadid=" << std::this_thread::get_id() << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(5000));
        return param * 3;
    }
};
 

int use_async()
{
    // 演示用 async 在线程间传递结果。
    std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
    std::cout << "use_async  ... " << std::endl;

    auto accumulate_future = std::async(std::launch::async, accumulate2, numbers.begin(), numbers.end()); 
    std::cout << "result=" << accumulate_future.get() << '\n';

    CA a;
    auto  _future = std::async(&CA::funcHaoShi, &a,  12);  //类的成员函数  //当不指定参数时,默认参数为:std::launch::deferred | std::launch::async,系统会自行选择一种模式
    std::cout << "result=" << _future.get() << '\n';


    return 0;
}
int funUseTime(int i) {
    std::cout << " funUseTime  running" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(5));
    return i;
}

int use_async2() {
    std::future<int> _future = std::async(std::launch::async, funUseTime, 1);
    std::future_status status;
    do {
        status = _future.wait_for(std::chrono::seconds(1));
        if (status == std::future_status::deferred) {
            std::cout << "deferred\n";
        }
        else if (status == std::future_status::timeout) {
            std::cout << "timeout\n";
        }
        else if (status == std::future_status::ready) {
            std::cout << "ready!\n";
        }
    } while (status != std::future_status::ready);
    std::cout << "result is " << _future.get() << '\n';
    return 0;
}

上一篇:Promise(resolve,reject)的基本使用


下一篇:至真科技校招前端面筋