《C++ Concurrency in Action》笔记

《C++ Concurrency in Action》笔记

英文书籍《C++ Concurrency in Action》国内有不同人翻译,有一版是4个译者翻译的,豆瓣知乎风评一般。陈晓伟翻译的不错,下面是我对该译者书籍的读书笔记。

1 你好,C++的并发世界

1.1 何谓并发

最简单和最基本的并发,是指两个或更多独立的活动同时发生。
并发在生活中随处可见,我们可以一边走路一边说话,也可以两只手同时作不同的动作,还有我们每个人都过着相互独立的生活——当我在游泳的时候,你可以看球赛,等等。

1.1.1 计算机系统中的并发

计算机领域的并发指的是在单个系统里同时执行多个独立的任务,而非顺序的进行一些活动。
系统通过任务调度,在时间片微粒度下,做任务切换,让用户感觉不到间隙,这种也被称为并发。
多处理器计算机可实现硬件并发。
《C++ Concurrency in Action》笔记
图1.2显示了四个任务在双核处理器上的任务切换,仍然是将任务整齐地划分为同等大小块的理想情况。实际上,许多因素会使得分割不均和调度不规则。
《C++ Concurrency in Action》笔记

1.1.2 并发的途径

途径一:一个进程内含一个线程,交互是进程间通信;
途径二:一个进程内含多个线程,交互是线程间通信。

多进程并发

如图1.3所示,独立的进程可以通过进程间常规的通信渠道传递讯息(信号、套接字、文件、管道等等)。不过,这种进程之间的通信通常不是设置复杂,就是速度慢,这是因为操作系统会在进程间提供了一定的保护措施,以避免一个进程去修改另一个进程的数据。还有一个缺点是,运行多个进程所需的固定开销:需要时间启动进程,操作系统需要内部资源来管理进程,等等。
当然,以上的机制也不是一无是处:操作系统在进程间提供附加的保护操作和更高级别的通信机制,意味着可以更容易编写安全的并发代码。实际上,在类似于Erlang的编程环境中,将进程作为并发的基本构造块。
使用多进程实现并发还有一个额外的优势———可以使用远程连接(可能需要联网)的方式,在不同的机器上运行独立的进程。虽然,这增加了通信成本,但在设计精良的系统上,这可能是一个提高并行可用行和性能的低成本方式。
《C++ Concurrency in Action》笔记

多线程并发

并发的另一个途径,在单个进程中运行多个线程。线程很像轻量级的进程:每个线程相互独立运行,且线程可以在不同的指令序列中运行。但是,进程中的所有线程都共享地址空间,并且所有线程访问到大部分数据———全局变量仍然是全局的,指针、对象的引用或数据可以在线程之间传递。虽然,进程之间通常共享内存,但是这种共享通常是难以建立和管理的。因为,同一数据的内存地址在不同的进程中是不相同。图1.4展示了一个进程中的两个线程通过共享内存进行通信。
《C++ Concurrency in Action》笔记
地址空间共享,以及缺少线程间数据的保护,使得操作系统的记录工作量减小,所以使用多线程相关的开销远远小于使用多个进程。不过,共享内存的灵活性是有代价的:如果数据要被多个线程访问,那么程序员必须确保每个线程所访问到的数据是一致的(在本书第3、4、5和8章中会涉及,线程间数据共享可能会遇到的问题,以及如何使用工具来避免这些问题)。问题并非无解,只要在编写代码时适当地注意即可,这同样也意味着需要对线程通信做大量的工作。

1.2 为什么使用并发?

主要原因有两个:关注点分离(SOC)和性能

1.2.1 为了分离关注点

编写软件时,分离关注点是个好主意;通过将相关的代码与无关的代码分离,可以使程序更容易理解和测试,从而减少出错的可能性。即使一些功能区域中的操作需要在同一时刻发生的情况下,依旧可以使用并发分离不同的功能区域;若不显式地使用并发,就得编写一个任务切换框架,或者在操作中主动地调用一段不相关的代码。

1.2.2 为了性能

多处理器系统,现在越来越普遍。如果想要利用日益增长的计算能力,那就必须设计多任务并发式软件。
两种方式利用并发提高性能。一是业务逻辑并行,即任务并行(task parallelism),二是数据并行(data parallelism),如图像处理。

1.2.3 什么时候不使用并发

基本上,不使用并发的唯一原因就是,收益比不上成本。并发代码增加开发成本,更高复杂度增加出错风险。除非潜在的性能增益足够大或关注点分离地足够清晰,否则,别用并发。
此外,线程是有限的资源。如果让太多的线程同时运行,则会消耗很多操作系统资源,从而使得操作系统整体上运行得更加缓慢。不仅如此,因为每个线程都需要一个独立的堆栈空间,所以运行太多的线程也会耗尽进程的可用内存或地址空间。对于一个可用地址空间为4GB(32bit)的平坦架构的进程来说,这的确是个问题:如果每个线程都有一个1MB的堆栈(很多系统都会这样分配),那么4096个线程将会用尽所有地址空间,不会给代码、静态数据或者堆数据留有任何空间。即便64位(或者更大)的系统不存在这种直接的地址空间限制,但其他资源有限:如果你运行了太多的线程,最终也是出会问题的。尽管线程池(参见第9章)可以用来限制线程的数量,但这也并不是什么灵丹妙药,它也有自己的问题。
客户端/服务器(C/S)应用在服务器端为每一个链接启动一个独立的线程,对于少量的链接是可以正常工作的,但当同样的技术用于需要处理大量链接的高需求服务器时,也会因为线程太多而耗尽系统资源。在这种场景下,使用线程池可以对性能产生优化(参见第9章)。

1.3 C++中的并发和多线程

通过多线程为C++并发提供标准化支持是件新鲜事。只有在C++11标准下,才能编写不依赖平台扩展的多线程代码。了解C++线程库中的众多规则前,先来了解一下其发展的历史。

1.3.1 C++多线程历史

C++98(1998)标准不承认线程的存在,并且各种语言要素的操作效果都以顺序抽象机的形式编写。不仅如此,内存模型也没有正式定义,所以在C++98标准下,没办法在缺少编译器相关扩展的情况下编写多线程应用程序。
当然,编译器供应商可以*地向语言添加扩展,添加C语言中流行的多线程API———POSIX标准中的C标准和Microsoft Windows API中的那些———这就使得很多C++编译器供应商通过各种平台相关扩展来支持多线程。所以C++本身不支持并行语法时,程序员们已经编写了大量的C++多线程程序了。

1.3.2 新标准支持并发

C++11新标准中不仅有了一个全新的线程感知内存模型,C++标准库也扩展了:包含了用于管理线程(参见第2章)、保护共享数据(参见第3章)、线程间同步操作(参见第4章),以及低级原子操作(参见第5章)的各种类。
新C++线程库很大程度上,是基于上文提到的C++类库的经验积累。特别是,Boost线程库作为新类库的主要模型,很多类与Boost库中的相关类有着相同名称和结构。随着C++标准的进步,Boost线程库也配合着C++标准在许多方面做出改变,因此之前使用Boost的用户将会发现自己非常熟悉C++11的线程库。
新的C++标准直接支持原子操作,允许程序员通过定义语义的方式编写高效的代码,从而无需了解与平台相关的汇编指令。

1.3.3 C++线程库的效率

为了效率,C++类整合了一些底层工具。这样就需要了解相关使用高级工具和使用低级工具的开销差,这个开销差就是抽象代价(abstraction penalty)。该类库在大部分主流平台上都能实现高效(带有非常低的抽象代价)。

1.3.4 平台相关的工具

在C++线程库中提供一个native_handle()成员函数,允许通过使用平台相关API直接操作底层实现。就其本质而言,任何使用native_handle()执行的操作都是完全依赖于平台的。

1.4 开始入门

#include <iostream>
#include <thread>  //①
void hello()  //②
{
  std::cout << "Hello Concurrent World\n";
}
int main()
{
  std::thread t(hello);  //③
  t.join();  //④
}

2 线程管理

2.1 线程管理的基础

2.1.1 启动线程

void do_some_work();
std::thread my_thread(do_some_work);
class background_task
{
public:
  void operator()() const
  {
    do_something();
    do_something_else();
  }
};

background_task f;
std::thread my_thread(f);

有件事需要注意,当把函数对象传入到线程构造函数中时,需要避免“最令人头痛的语法解析”(C++’s most vexing parse, 中文简介)。如果你传递了一个临时变量,而不是一个命名的变量;C++编译器会将其解析为函数声明,而不是类型对象的定义。

例如:

std::thread my_thread(background_task());

这里相当与声明了一个名为my_thread的函数,这个函数带有一个参数(函数指针指向没有参数并返回background_task对象的函数),返回一个std::thread对象的函数,而非启动了一个线程。
使用在前面命名函数对象的方式,或使用多组括号①,或使用新统一的初始化语法②,可以避免这个问题。

如下所示:

std::thread my_thread((background_task()));  // 1
std::thread my_thread{background_task()};    // 2

使用lambda表达式也能避免这个问题。lambda表达式是C++11的一个新特性,它允许使用一个可以捕获局部变量的局部函数(可以避免传递参数,参见2.2节)。想要具体的了解lambda表达式,可以阅读附录A的A.5节。之前的例子可以改写为lambda表达式的类型:

std::thread my_thread([]{
  do_something();
  do_something_else();
});

启动了线程,你需要明确是要等待线程结束(加入式——参见2.1.2节),还是让其自主运行(分离式——参见2.1.3节)。如果std::thread对象销毁之前还没有做出决定,程序就会终止(std::thread的析构函数会调用std::terminate())。因此,即便是有异常存在,也需要确保线程能够正确的_加入_(joined)或_分离_(detached)。

清单2.1 函数已经结束,线程依旧访问局部变量

struct func
{
  int& i;
  func(int& i_) : i(i_) {}
  void operator() ()
  {
    for (unsigned j=0 ; j<1000000 ; ++j)
    {
      do_something(i);           // 1. 潜在访问隐患:悬空引用
    }
  }
};

void oops()
{
  int some_local_state=0;
  func my_func(some_local_state);
  std::thread my_thread(my_func);
  my_thread.detach();          // 2. 不等待线程结束
}                              // 3. 新线程可能还在运行

这个例子中,已经决定不等待线程结束(使用了detach()②),所以当oops()函数执行完成时③,新线程中的函数可能还在运行。如果线程还在运行,它就会去调用do_something(i)函数①,这时就会访问已经销毁的变量。如同一个单线程程序——允许在函数完成后继续持有局部变量的指针或引用;当然,这从来就不是一个好主意——这种情况发生时,错误并不明显,会使多线程更容易出错。

处理这种情况的常规方法:使线程函数的功能齐全,将数据复制到线程中,而非复制到共享数据中。如果使用一个可调用的对象作为线程函数,这个对象就会复制到线程中,而后原始对象就会立即销毁。但对于对象中包含的指针和引用还需谨慎,例如清单2.1所示。使用一个能访问局部变量的函数去创建线程是一个糟糕的主意(除非十分确定线程会在函数完成前结束)。此外,可以通过join()函数来确保线程在函数完成前结束。

2.1.2 等待线程完成

如果需要等待线程,相关的std::thread实例需要使用join()。在这种情况下,因为原始线程在其生命周期中并没有做什么事,使得用一个独立的线程去执行函数变得收益甚微,但在实际编程中,原始线程要么有自己的工作要做;要么会启动多个子线程来做一些有用的工作,并等待这些线程结束。
这意味着,只能对一个线程使用一次join();一旦已经使用过join(),std::thread对象就不能再次加入了,当对其使用joinable()时,将返回false。

2.1.3 特殊情况下的等待

如果想要分离一个线程,可以在线程启动后,直接使用detach()进行分离。如果打算等待对应线程,通常,当倾向于在无异常的情况下使用join()时,需要在异常处理过程中调用join(),从而避免应用被抛出的异常所终止。
清单 2.2 等待线程完成

struct func; // 定义在清单2.1中
void f()
{
  int some_local_state=0;
  func my_func(some_local_state);
  std::thread t(my_func);
  try
  {
    do_something_in_current_thread();
  }
  catch(...)
  {
    t.join();  // 1
    throw;
  }
  t.join();  // 2
}

代码使用了try/catch块确保访问本地状态的线程退出后,函数才结束。

一种方式是使用“资源获取即初始化方式”(RAII,Resource Acquisition Is Initialization),并且提供一个类,在析构函数中使用join(),如同下面清单中的代码。看它如何简化f()函数。

清单 2.3 使用RAII等待线程完成

class thread_guard
{
  std::thread& t;
public:
  explicit thread_guard(std::thread& t_):
    t(t_)
  {}
  ~thread_guard()
  {
    if(t.joinable()) // 1
    {
      t.join();      // 2
    }
  }
  thread_guard(thread_guard const&)=delete;   // 3
  thread_guard& operator=(thread_guard const&)=delete;
};

struct func; // 定义在清单2.1中

void f()
{
  int some_local_state=0;
  func my_func(some_local_state);
  std::thread t(my_func);
  thread_guard g(t);
  do_something_in_current_thread();
}    // 4

2.1.4 后台运行线程

使用detach()会让线程在后台运行,这就意味着主线程不能与之产生直接交互。也就是说,不会等待这个线程结束;如果线程分离,那么就不可能有std::thread对象能引用它,分离线程的确在后台运行,所以分离线程不能被加入。不过C++运行库保证,当线程退出时,相关资源的能够正确回收,后台线程的归属和控制C++运行库都会处理。
通常称分离线程为_守护线程_(daemon threads),UNIX中守护线程是指,没有任何显式的用户接口,并在后台运行的线程。这种线程的特点就是长时间运行;线程的生命周期可能会从某一个应用起始到结束,可能会在后台监视文件系统,还有可能对缓存进行清理,亦或对数据结构进行优化。

std::thread t(do_background_work);
t.detach();
assert(!t.joinable());

清单2.4 使用分离线程去处理其他文档

void edit_document(std::string const& filename)
{
  open_document_and_display_gui(filename);
  while(!done_editing())
  {
    user_command cmd=get_user_input();
    if(cmd.type==open_new_document)
    {
      std::string const new_name=get_filename_from_user();
      std::thread t(edit_document,new_name);  // 1
      t.detach();  // 2
    }
    else
    {
       process_user_input(cmd);
    }
  }
}

2.2 向线程函数传递参数

std::thread构造函数中的可调用对象,或函数传递一个参数很简单。需要注意的是,默认参数要拷贝到线程独立内存中,即使参数是引用的形式,也可以在新线程中进行访问。

void f(int i, std::string const& s);
std::thread t(f, 3, "hello");

代码创建了一个调用f(3, “hello”)的线程。注意,函数f需要一个std::string对象作为第二个参数,但这里使用的是字符串的字面值,也就是char const *类型。之后,在线程的上下文中完成字面值向std::string对象的转化。需要特别要注意,当指向动态变量的指针作为参数传递给线程的情况,代码如下:

void f(int i,std::string const& s);
void oops(int some_param)
{
  char buffer[1024]; // 1
  sprintf(buffer, "%i",some_param);
  std::thread t(f,3,buffer); // 2
  t.detach();
}

这种情况下,buffer②是一个指针变量,指向本地变量,然后本地变量通过buffer传递到新线程中②。并且,函数有很有可能会在字面值转化成std::string对象之前崩溃(oops),从而导致一些未定义的行为。并且想要依赖隐式转换将字面值转换为函数期待的std::string对象,但因std::thread的构造函数会复制提供的变量,就只复制了没有转换成期望类型的字符串字面值。

解决方案就是在传递到std::thread构造函数之前就将字面值转化为std::string对象:

void f(int i,std::string const& s);
void not_oops(int some_param)
{
  char buffer[1024];
  sprintf(buffer,"%i",some_param);
  std::thread t(f,3,std::string(buffer));  // 使用std::string,避免悬垂指针
  t.detach();
}

还可能遇到相反的情况:期望传递一个引用,但整个对象被复制了。当线程更新一个引用传递的数据结构时,这种情况就可能发生,比如:

void update_data_for_widget(widget_id w,widget_data& data); // 1
void oops_again(widget_id w)
{
  widget_data data;
  std::thread t(update_data_for_widget,w,data); // 2
  display_status();
  t.join();
  process_widget_data(data); // 3
}

虽然update_data_for_widget①的第二个参数期待传入一个引用,但是std::thread的构造函数②并不知晓;构造函数无视函数期待的参数类型,并盲目的拷贝已提供的变量。当线程调用update_data_for_widget函数时,传递给函数的参数是data变量内部拷贝的引用,而非数据本身的引用。因此,当线程结束时,内部拷贝数据将会在数据更新阶段被销毁,且process_widget_data将会接收到没有修改的data变量③。对于熟悉std::bind的开发者来说,问题的解决办法是显而易见的:可以使用std::ref将参数转换成引用的形式,从而可将线程的调用改为以下形式:

std::thread t(update_data_for_widget,w,std::ref(data));

在这之后,update_data_for_widget就会接收到一个data变量的引用,而非一个data变量拷贝的引用。

如果你熟悉std::bind,就应该不会对以上述传参的形式感到奇怪,因为std::thread构造函数和std::bind的操作都在标准库中定义好了,可以传递一个成员函数指针作为线程函数,并提供一个合适的对象指针作为第一个参数:

class X
{
public:
  void do_lengthy_work();
};
X my_x;
std::thread t(&X::do_lengthy_work,&my_x); // 1

这段代码中,新线程将my_x.do_lengthy_work()作为线程函数;my_x的地址①作为指针对象提供给函数。也可以为成员函数提供参数:std::thread构造函数的第三个参数就是成员函数的第一个参数,以此类推(代码如下,译者自加)。

class X
{
public:
  void do_lengthy_work(int);
};
X my_x;
int num(0);
std::thread t(&X::do_lengthy_work, &my_x, num);

有趣的是,提供的参数可以移动,但不能拷贝。"移动"是指:原始对象中的数据转移给另一对象,而转移的这些数据就不再在原始对象中保存了(译者:比较像在文本编辑的"剪切"操作)。std::unique_ptr就是这样一种类型(译者:C++11中的智能指针),这种类型为动态分配的对象提供内存自动管理机制(译者:类似垃圾回收)。同一时间内,只允许一个std::unique_ptr实现指向一个给定对象,并且当这个实现销毁时,指向的对象也将被删除。移动构造函数(move constructor)和移动赋值操作符(move assignment operator)允许一个对象在多个std::unique_ptr实现中传递(有关"移动"的更多内容,请参考附录A的A.1.1节)。使用"移动"转移原对象后,就会留下一个空指针(NULL)。移动操作可以将对象转换成可接受的类型,例如:函数参数或函数返回的类型。当原对象是一个临时变量时,自动进行移动操作,但当原对象是一个命名变量,那么转移的时候就需要使用std::move()进行显示移动。下面的代码展示了std::move的用法,展示了std::move是如何转移一个动态对象到一个线程中去的:

void process_big_object(std::unique_ptr<big_object>);

std::unique_ptr<big_object> p(new big_object);
p->prepare_data(42);
std::thread t(process_big_object,std::move(p));

std::thread的构造函数中指定std::move(p),big_object对象的所有权就被首先转移到新创建线程的的内部存储中,之后传递给process_big_object函数。

标准线程库中和std::unique_ptr在所属权上有相似语义类型的类有好几种,std::thread为其中之一。虽然,std::thread实例不像std::unique_ptr那样能占有一个动态对象的所有权,但是它能占有其他资源:每个实例都负责管理一个执行线程。执行线程的所有权可以在多个std::thread实例中互相转移,这是依赖于std::thread实例的可移动不可复制性。不可复制保性证了在同一时间点,一个std::thread实例只能关联一个执行线程;可移动性使得程序员可以自己决定,哪个实例拥有实际执行线程的所有权。

2.3 转移线程所有权

假设要写一个在后台启动线程的函数,想通过新线程返回的所有权去调用这个函数,而不是等待线程结束再去调用;或完全与之相反的想法:创建一个线程,并在函数中转移所有权,都必须要等待线程结束。总之,新线程的所有权都需要转移。
这就是移动引入std::thread的原因,C++标准库中有很多_资源占有_(resource-owning)类型,比如std::ifstream,std::unique_ptr还有std::thread都是可移动,但不可拷贝。这就说明执行线程的所有权可以在std::thread实例中移动,下面将展示一个例子。例子中,创建了两个执行线程,并且在std::thread实例之间(t1,t2和t3)转移所有权:

void some_function();
void some_other_function();
std::thread t1(some_function);            // 1
std::thread t2=std::move(t1);            // 2
t1=std::thread(some_other_function);    // 3
std::thread t3;                            // 4
t3=std::move(t2);                        // 5
t1=std::move(t3);                        // 6 赋值操作将使程序崩溃

首先,新线程开始与t1相关联。当显式使用std::move()创建t2后②,t1的所有权就转移给了t2。之后,t1和执行线程已经没有关联了;执行some_function的函数现在与t2关联。

然后,与一个临时std::thread对象相关的线程启动了③。为什么不显式调用std::move()转移所有权呢?因为,所有者是一个临时对象——移动操作将会隐式的调用。

t3使用默认构造方式创建④,与任何执行线程都没有关联。调用std::move()将与t2关联线程的所有权转移到t3中⑤。因为t2是一个命名对象,需要显式的调用std::move()。移动操作⑤完成后,t1与执行some_other_function的线程相关联,t2与任何线程都无关联,t3与执行some_function的线程相关联。

最后一个移动操作,将some_function线程的所有权转移⑥给t1。不过,t1已经有了一个关联的线程(执行some_other_function的线程),所以这里系统直接调用std::terminate()终止程序继续运行。

std::thread支持移动,就意味着线程的所有权可以在函数外进行转移,就如下面程序一样。

清单2.5 函数返回std::thread对象

std::thread f()
{
  void some_function();
  return std::thread(some_function);
}

std::thread g()
{
  void some_other_function(int);
  std::thread t(some_other_function,42);
  return t;
}

当所有权可以在函数内部传递,就允许std::thread实例可作为参数进行传递,代码如下:

void f(std::thread t);
void g()
{
  void some_function();
  f(std::thread(some_function));
  std::thread t(some_function);
  f(std::move(t));
}

std::thread对象的容器,如果这个容器是移动敏感的(比如,标准中的std::vector<>),那么移动操作同样适用于这些容器。了解这些后,就可以写出类似清单2.7中的代码,代码量产了一些线程,并且等待它们结束。

清单2.7 量产线程,等待它们结束

void do_work(unsigned id);

void f()
{
  std::vector<std::thread> threads;
  for(unsigned i=0; i < 20; ++i)
  {
    threads.push_back(std::thread(do_work,i)); // 产生线程
  } 
  std::for_each(threads.begin(),threads.end(),
  std::mem_fn(&std::thread::join)); // 对每个线程调用join()
}

2.4 运行时决定线程数量

std::thread::hardware_concurrency()在新版C++标准库中是一个很有用的函数。这个函数将返回能同时并发在一个程序中的线程数量。例如,多核系统中,返回值可以是CPU核芯的数量。返回值也仅仅是一个提示,当系统信息无法获取时,函数也会返回0。

2.5 标识线程

线程标识类型是std::thread::id,可以通过两种方式进行检索。第一种,可以通过调用std::thread对象的成员函数get_id()来直接获取。如果std::thread对象没有与任何执行线程相关联,get_id()将返回std::thread::type默认构造值,这个值表示“无线程”。第二种,当前线程中调用std::this_thread::get_id()(这个函数定义在<thread>头文件中)也可以获得线程标识。

std::thread::id对象可以*的拷贝和对比,因为标识符就可以复用。如果两个对象的std::thread::id相等,那它们就是同一个线程,或者都“无线程”。如果不等,那么就代表了两个不同线程,或者一个有线程,另一没有线程。

线程库不会限制你去检查线程标识是否一样,std::thread::id类型对象提供相当丰富的对比操作;比如,提供为不同的值进行排序。这意味着允许程序员将其当做为容器的键值,做排序,或做其他方式的比较。按默认顺序比较不同值的std::thread::id,所以这个行为可预见的:当a<bb<c时,得a<c,等等。标准库也提供std::hash<std::thread::id>容器,所以std::thread::id也可以作为无序容器的键值。

std::thread::id master_thread;
void some_core_part_of_algorithm()
{
  if(std::this_thread::get_id()==master_thread)
  {
    do_master_thread_work();
  }
  do_common_work();
}

3 线程间共享数据

3.1 共享数据带来的问题

当涉及到共享数据时,问题很可能是因为共享数据修改所导致。如果共享数据是只读的,那么只读操作不会影响到数据,更不会涉及对数据的修改,所以所有线程都会获得同样的数据。但是,当一个或多个线程要修改共享数据时,就会产生很多麻烦。这种情况下,就必须小心谨慎,才能确保一切所有线程都工作正常。
不变量(invariants)的概念对程序员们编写的程序会有一定的帮助——对于特殊结构体的描述;比如,“变量包含列表中的项数”。不变量通常会在一次更新中被破坏,特别是比较复杂的数据结构,或者一次更新就要改动很大的数据结构。
双链表中每个节点都有一个指针指向列表中下一个节点,还有一个指针指向前一个节点。其中不变量就是节点A中指向“下一个”节点B的指针,还有前向指针。为了从列表中删除一个节点,其两边节点的指针都需要更新。当其中一边更新完成时,不变量就被破坏了,直到另一边也完成更新;在两边都完成更新后,不变量就又稳定了。

从一个列表中删除一个节点的步骤如下图:

1. 找到要删除的节点N
2. 更新前一个节点指向N的指针,让这个指针指向N的下一个节点
3. 更新后一个节点指向N的指针,让这个指正指向N的前一个节点
4. 删除节点N

《C++ Concurrency in Action》笔记
图中b和c在相同的方向上指向和原来已经不一致了,这就破坏了不变量。线程间潜在问题就是修改共享数据,致使不变量遭到破坏。这就是并行代码常见错误:条件竞争。

3.1.1 条件竞争

C++标准中也定义了数据竞争这个术语,一种特殊的条件竞争:并发的去修改一个独立对象,数据竞争是(可怕的)未定义行为的起因。
恶性条件竞争通常发生于完成对多于一个的数据块的修改时,例如,对两个连接指针的修改。因为操作要访问两个独立的数据块,独立的指令将会对数据块将进行修改,并且其中一个线程可能正在进行时,另一个线程就对数据块进行了访问。因为出现的概率太低,条件竞争很难查找,也很难复现。

3.1.2 避免恶性条件竞争

这里提供一些方法来解决恶性条件竞争,最简单的办法就是对数据结构采用某种保护机制,确保只有进行修改的线程才能看到不变量被破坏时的中间状态。从其他访问线程的角度来看,修改不是已经完成了,就是还没开始。C++标准库提供很多类似的机制。
另一个选择是对数据结构和不变量的设计进行修改,修改完的结构必须能完成一系列不可分割的变化,也就是保证每个不变量保持稳定的状态,这就是所谓的无锁编程。不过,这种方式很难得到正确的结果。如果到这个级别,无论是内存模型上的细微差异,还是线程访问数据的能力,都会让工作变的复杂。
另一种处理条件竞争的方式是,使用事务的方式去处理数据结构的更新(这里的"处理"就如同对数据库进行更新一样)。所需的一些数据和读取都存储在事务日志中,然后将之前的操作合为一步,再进行提交。当数据结构被另一个线程修改后,或处理已经重启的情况下,提交就会无法进行,这称作为“软件事务内存”。理论研究中,这是一个很热门的研究领域。这个概念将不会在本书中再进行介绍,因为在C++中没有对STM进行直接支持。但是,基本思想会在后面提及。

保护共享数据结构的最基本的方式,是使用C++标准库提供的互斥量。

3.2 使用互斥量保护共享数据

互斥量是C++中一种最通用的数据保护机制,但它不是“银弹”;精心组织代码来保护正确的数据,并在接口内部避免竞争条件是非常重要的。但互斥量自身也有问题,也会造成死锁,或是对数据保护的太多(或太少)。

3.2.1 C++中使用互斥量

C++中通过实例化std::mutex创建互斥量,通过调用成员函数lock()进行上锁,unlock()进行解锁。不过,不推荐实践中直接去调用成员函数,因为调用成员函数就意味着,必须记住在每个函数出口都要去调用unlock(),也包括异常的情况。C++标准库为互斥量提供了一个RAII语法的模板类std::lock_guard,其会在构造的时候提供已锁的互斥量,并在析构的时候进行解锁,从而保证了一个已锁的互斥量总是会被正确的解锁。下面的程序清单中,展示了如何在多线程程序中,使用std::mutex构造的std::lock_guard实例,对一个列表进行访问保护。std::mutexstd::lock_guard都在<mutex>头文件中声明。
清单3.1 使用互斥量保护列表

#include <list>
#include <mutex>
#include <algorithm>

std::list<int> some_list;    // 1
std::mutex some_mutex;    // 2

void add_to_list(int new_value)
{
  std::lock_guard<std::mutex> guard(some_mutex);    // 3
  some_list.push_back(new_value);
}

bool list_contains(int value_to_find)
{
  std::lock_guard<std::mutex> guard(some_mutex);    // 4
  return std::find(some_list.begin(),some_list.end(),value_to_find) != some_list.end();
}

当所有成员函数都会在调用时对数据上锁,结束时对数据解锁,那么就保证了数据访问时不变量不被破坏。
当然,也不是总是那么理想,聪明的你一定注意到了:当其中一个成员函数返回的是保护数据的指针或引用时,会破坏对数据的保护。具有访问能力的指针或引用可以访问(并可能修改)被保护的数据,而不会被互斥锁限制。互斥量保护的数据需要对接口的设计相当谨慎,要确保互斥量能锁住任何对保护数据的访问,并且不留后门。

3.2.2 精心组织代码来保护共享数据

使用互斥量来保护数据,并不是仅仅在每一个成员函数中都加入一个std::lock_guard对象那么简单;一个迷失的指针或引用,将会让这种保护形同虚设。
清单3.2 无意中传递了保护数据的引用

class some_data
{
  int a;
  std::string b;
public:
  void do_something();
};

class data_wrapper
{
private:
  some_data data;
  std::mutex m;
public:
  template<typename Function>
  void process_data(Function func)
  {
    std::lock_guard<std::mutex> l(m);
    func(data);    // 1 传递“保护”数据给用户函数
  }
};

some_data* unprotected;

void malicious_function(some_data& protected_data)
{
  unprotected=&protected_data;
}

data_wrapper x;
void foo()
{
  x.process_data(malicious_function);    // 2 传递一个恶意函数
  unprotected->do_something();    // 3 在无保护的情况下访问保护数据
}

例子中process_data看起来没有任何问题,std::lock_guard对数据做了很好的保护,但调用用户提供的函数func①,就意味着foo能够绕过保护机制将函数malicious_function传递进去②,在没有锁定互斥量的情况下调用do_something()

3.2.3 发现接口内在的条件竞争

清单3.3 std::stack容器的实现

template<typename T,typename Container=std::deque<T> >
class stack
{
public:
  explicit stack(const Container&);
  explicit stack(Container&& = Container());
  template <class Alloc> explicit stack(const Alloc&);
  template <class Alloc> stack(const Container&, const Alloc&);
  template <class Alloc> stack(Container&&, const Alloc&);
  template <class Alloc> stack(stack&&, const Alloc&);
  
  bool empty() const;
  size_t size() const;
  T& top();
  T const& top() const;
  void push(T const&);
  void push(T&&);
  void pop();
  void swap(stack&&);
};

虽然empty()和size()可能在被调用并返回时是正确的,但其的结果是不可靠的;当它们返回后,其他线程就可以*地访问栈,并且可能push()多个新元素到栈中,也可能pop()一些已在栈中的元素。这样的话,之前从empty()和size()得到的结果就有问题了。

特别地,当栈实例是非共享的,如果栈非空,使用empty()检查再调用top()访问栈顶部的元素是安全的。如下代码所示:

stack<int> s;
if (! s.empty()){    // 1
  int const value = s.top();    // 2
  s.pop();    // 3
  do_something(value);
}

以上是单线程安全代码:对一个空栈使用top()是未定义行为。对于共享的栈对象,这样的调用顺序就不再安全了,因为在调用empty()①和调用top()②之间,可能有来自另一个线程的pop()调用并删除了最后一个元素。这是一个经典的条件竞争,使用互斥量对栈内部数据进行保护,但依旧不能阻止条件竞争的发生,这就是接口固有的问题。
怎么解决呢?问题发生在接口设计上,所以解决的方法也就是改变接口设计。
表3.1 一种可能执行顺序

Thread A Thread B
if (!s.empty);
if(!s.empty);
int const value = s.top();
int const value = s.top();
s.pop();
do_something(value); s.pop();
do_something(value);

当线程运行时,调用两次top(),栈没被修改,所以每个线程能得到同样的值。值已被处理了两次。这种条件竞争,因为看起来没有任何错误,就会让这个Bug很难定位。
这就需要接口设计上有较大的改动,提议之一就是使用同一互斥量来保护top()和pop()。
选项1: 传入一个引用
第一个选项是将变量的引用作为参数,传入pop()函数中获取想要的“弹出值”:

std::vector<int> result;
some_stack.pop(result);

大多数情况下,这种方式还不错,但有明显的缺点:需要构造出一个栈中类型的实例,用于接收目标值。对于一些类型,这样做是不现实的,因为临时构造一个实例,从时间和资源的角度上来看,都是不划算。对于其他的类型,这样也不总能行得通,因为构造函数需要的一些参数,在代码的这个阶段不一定可用。最后,需要可赋值的存储类型,这是一个重大限制:即使支持移动构造,甚至是拷贝构造(从而允许返回一个值),很多用户自定义类型可能都不支持赋值操作。
选项2:无异常抛出的拷贝构造函数或移动构造函数
对于有返回值的pop()函数来说,只有“异常安全”方面的担忧(当返回值时可以抛出一个异常)。很多类型都有拷贝构造函数,它们不会抛出异常,并且随着新标准中对“右值引用”的支持(详见附录A,A.1节),很多类型都将会有一个移动构造函数,即使他们和拷贝构造函数做着相同的事情,它也不会抛出异常。一个有用的选项可以限制对线程安全的栈的使用,并且能让栈安全的返回所需的值,而不会抛出异常。
选项3:返回指向弹出值的指针
第三个选择是返回一个指向弹出元素的指针,而不是直接返回值。指针的优势是*拷贝,并且不会产生异常。缺点就是返回一个指针需要对对象的内存分配进行管理,对于简单数据类型(比如:int),内存管理的开销要远大于直接返回值。对于选择这个方案的接口,使用std::shared_ptr是个不错的选择;不仅能避免内存泄露(因为当对象中指针销毁时,对象也会被销毁),而且标准库能够完全控制内存分配方案,也就不需要new和delete操作。这种优化是很重要的:因为堆栈中的每个对象,都需要用new进行独立的内存分配,相较于非线程安全版本,这个方案的开销相当大。
选项4:“选项1 + 选项2”或 “选项1 + 选项3”
对于通用的代码来说,灵活性不应忽视。当你已经选择了选项2或3时,再去选择1也是很容易的。这些选项提供给用户,让用户自己选择对于他们自己来说最合适,最经济的方案。
例:定义线程安全的堆栈
清单3.4 线程安全的堆栈类定义(概述)

#include <exception>
#include <memory>  // For std::shared_ptr<>

struct empty_stack: std::exception
{
  const char* what() const throw();
};

template<typename T>
class threadsafe_stack
{
public:
  threadsafe_stack();
  threadsafe_stack(const threadsafe_stack&);
  threadsafe_stack& operator=(const threadsafe_stack&) = delete; // 1 赋值操作被删除

  void push(T new_value);
  std::shared_ptr<T> pop();
  void pop(T& value);
  bool empty() const;
};

削减接口可以获得最大程度的安全,甚至限制对栈的一些操作。使用std::shared_ptr可以避免内存分配管理的问题,并避免多次使用new和delete操作。
清单3.5 扩充(线程安全)堆栈

#include <exception>
#include <memory>
#include <mutex>
#include <stack>

struct empty_stack: std::exception
{
  const char* what() const throw() {
	return "empty stack!";
  };
};

template<typename T>
class threadsafe_stack
{
private:
  std::stack<T> data;
  mutable std::mutex m;
  
public:
  threadsafe_stack()
	: data(std::stack<T>()){}
  
  threadsafe_stack(const threadsafe_stack& other)
  {
    std::lock_guard<std::mutex> lock(other.m);
    data = other.data; // 1 在构造函数体中的执行拷贝
  }

  threadsafe_stack& operator=(const threadsafe_stack&) = delete;

  void push(T new_value)
  {
    std::lock_guard<std::mutex> lock(m);
    data.push(new_value);
  }
  
  std::shared_ptr<T> pop()
  {
    std::lock_guard<std::mutex> lock(m);
    if(data.empty()) throw empty_stack(); // 在调用pop前,检查栈是否为空
	
    std::shared_ptr<T> const res(std::make_shared<T>(data.top())); // 在修改堆栈前,分配出返回值
    data.pop();
    return res;
  }
  
  void pop(T& value)
  {
    std::lock_guard<std::mutex> lock(m);
    if(data.empty()) throw empty_stack();
	
    value=data.top();
    data.pop();
  }
  
  bool empty() const
  {
    std::lock_guard<std::mutex> lock(m);
    return data.empty();
  }
};

之前对top()和pop()函数的讨论中,恶性条件竞争已经出现,因为锁的粒度太小,需要保护的操作并未全覆盖到。不过,锁住的颗粒过大同样会有问题。还有一个问题,一个全局互斥量要去保护全部共享数据,在一个系统中存在有大量的共享数据时,因为线程可以强制运行,甚至可以访问不同位置的数据,抵消了并发带来的性能提升。在第一版为多处理器系统设计Linux内核中,就使用了一个全局内核锁。虽然这个锁能正常工作,但在双核处理系统的上的性能要比两个单核系统的性能差很多,四核系统就更不能提了。太多请求去竞争占用内核,使得依赖于处理器运行的线程没有办法很好的工作。随后修正的Linux内核加入了一个细粒度锁方案,因为少了很多内核竞争,这时四核处理系统的性能就和单核处理的四倍差不多了。
一个给定操作需要两个或两个以上的互斥量时,另一个潜在的问题将出现:死锁。与条件竞争完全相反——不同的两个线程会互相等待,从而什么都没做。

3.2.4 死锁:问题描述及解决方案

试想有一个玩具,这个玩具由两部分组成,必须拿到这两个部分,才能够玩。例如,一个玩具鼓,需要一个鼓锤和一个鼓才能玩。现在有两个小孩,他们都很喜欢玩这个玩具。当其中一个孩子拿到了鼓和鼓锤时,那就可以尽情的玩耍了。当另一孩子想要玩,他就得等待另一孩子玩完才行。再试想,鼓和鼓锤被放在不同的玩具箱里,并且两个孩子在同一时间里都想要去敲鼓。之后,他们就去玩具箱里面找这个鼓。其中一个找到了鼓,并且另外一个找到了鼓锤。现在问题就来了,除非其中一个孩子决定让另一个先玩,他可以把自己的那部分给另外一个孩子;但当他们都紧握着自己所有的部分而不给予,那么这个鼓谁都没法玩。
现在没有孩子去争抢玩具,但线程有对锁的竞争:一对线程需要对他们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个解锁。这样没有线程能工作,因为他们都在等待对方释放互斥量。这种情况就是死锁,它的最大问题就是由两个或两个以上的互斥量来锁定一个操作。
避免死锁的一般建议,就是让两个互斥量总以相同的顺序上锁:总在互斥量B之前锁住互斥量A,就永远不会死锁。
std::lock——可以一次性锁住多个(两个以上)的互斥量,并且没有副作用(死锁风险)。下面的程序清单中,就来看一下怎么在一个简单的交换操作中使用std::lock
清单3.6 交换操作中使用std::lock()std::lock_guard

// 这里的std::lock()需要包含<mutex>头文件
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
  some_big_object some_detail;
  std::mutex m;
public:
  X(some_big_object const& sd):some_detail(sd){}

  friend void swap(X& lhs, X& rhs)
  {
    if(&lhs==&rhs)
      return;
    std::lock(lhs.m,rhs.m); // 1
    std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock); // 2
    std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock); // 3
    swap(lhs.some_detail,rhs.some_detail);
  }
};

使用std::lock去锁lhs.m或rhs.m时,可能会抛出异常;这种情况下,异常会传播到std::lock之外。当std::lock成功的获取一个互斥量上的锁,并且当其尝试从另一个互斥量上再获取锁时,就会有异常抛出,第一个锁也会随着异常的产生而自动释放,所以std::lock要么将两个锁都锁住,要不一个都不锁。
虽然std::lock可以在这情况下(获取两个以上的锁)避免死锁,但它没办法帮助你获取其中一个锁。

3.2.5 避免死锁的进阶指导

虽然锁是产生死锁的一般原因,但也不排除死锁出现在其他地方。无锁的情况下,仅需要每个std::thread对象调用join(),两个线程就能产生死锁。这种情况很常见,一个线程会等待另一个线程,其他线程同时也会等待第一个线程结束,所以三个或更多线程的互相等待也会发生死锁。为了避免死锁,这里的指导意见为:当机会来临时,不要拱手让人。以下提供一些个人的指导建议,如何识别死锁,并消除其他线程的等待。
避免嵌套锁
第一个建议往往是最简单的:一个线程已获得一个锁时,再别去获取第二个。如果能坚持这个建议,因为每个线程只持有一个锁,锁上就不会产生死锁。即使互斥锁造成死锁的最常见原因,也可能会在其他方面受到死锁的困扰(比如:线程间的互相等待)。当你需要获取多个锁,使用一个std::lock来做这件事(对获取锁的操作上锁),避免产生死锁。
避免在持有锁时调用用户提供的代码
:因为代码是用户提供的,你没有办法确定用户要做什么;用户程序可能做任何事情,包括获取锁。你在持有锁的情况下,调用用户提供的代码;如果用户代码要获取一个锁,就会违反第一个指导意见,并造成死锁(有时,这是无法避免的)。
使用固定顺序获取锁
当硬性条件要求你获取两个以上(包括两个)的锁,并且不能使用std::lock单独操作来获取它们;那么最好在每个线程上,用固定的顺序获取它们获取它们(锁)。
为了遍历链表,线程必须保证在获取当前节点的互斥锁前提下,获得下一个节点的锁,要保证指向下一个节点的指针不会同时被修改。一旦下一个节点上的锁被获取,那么第一个节点的锁就可以释放了,因为没有持有它的必要性了。
这种“手递手”锁的模式允许多个线程访问列表,为每一个访问的线程提供不同的节点。但是,为了避免死锁,节点必须以同样的顺序上锁:如果两个线程试图用互为反向的顺序,使用“手递手”锁遍历列表,他们将执行到列表中间部分时,发生死锁。当节点A和B在列表中相邻,当前线程可能会同时尝试获取A和B上的锁。另一个线程可能已经获取了节点B上的锁,并且试图获取节点A上的锁——经典的死锁场景。
当A、C节点中的B节点正在被删除时,如果有线程在已获取A和C上的锁后,还要获取B节点上的锁时,当一个线程遍历列表的时候,这样的情况就可能发生死锁。这样的线程可能会试图首先锁住A节点或C节点(根据遍历的方向),但是后面就会发现,它无法获得B上的锁,因为线程在执行删除任务的时候,已经获取了B上的锁,并且同时也获取了A和C上的锁。

这里提供一种避免死锁的方式,定义遍历的顺序,所以一个线程必须先锁住A才能获取B的锁,在锁住B之后才能获取C的锁。这将消除死锁发生的可能性,在不允许反向遍历的列表上。类似的约定常被用来建立其他的数据结构。
使用锁的层次结构
清单3.7 使用层次锁来避免死锁

hierarchical_mutex high_level_mutex(10000); // 1
hierarchical_mutex low_level_mutex(5000);  // 2

int do_low_level_stuff();

int low_level_func()
{
  std::lock_guard<hierarchical_mutex> lk(low_level_mutex); // 3
  return do_low_level_stuff();
}

void high_level_stuff(int some_param);

void high_level_func()
{
  std::lock_guard<hierarchical_mutex> lk(high_level_mutex); // 4
  high_level_stuff(low_level_func()); // 5
}

void thread_a()  // 6
{
  high_level_func();
}

hierarchical_mutex other_mutex(100); // 7
void do_other_stuff();

void other_stuff()
{
  high_level_func();  // 8
  do_other_stuff();
}

void thread_b() // 9
{
  std::lock_guard<hierarchical_mutex> lk(other_mutex); // 10
  other_stuff();
}

例子也展示了另一点,std::lock_guard<>模板与用户定义的互斥量类型一起使用。虽然hierarchical_mutex不是C++标准的一部分,但是它写起来很容易;一个简单的实现在列表3.8中展示出来。尽管它是一个用户定义类型,它可以用于std::lock_guard<>模板中,因为它的实现有三个成员函数为了满足互斥量操作:lock(), unlock() 和 try_lock()。虽然你还没见过try_lock()怎么使用,但是其使用起来很简单:当互斥量上的锁被一个线程持有,它将返回false,而不是等待调用的线程,直到能够获取互斥量上的锁为止。在std::lock()的内部实现中,try_lock()会作为避免死锁算法的一部分。
列表3.8 简单的层级互斥量实现

class hierarchical_mutex
{
  std::mutex internal_mutex;
  
  unsigned long const hierarchy_value;
  unsigned long previous_hierarchy_value;
  
  static thread_local unsigned long this_thread_hierarchy_value;  // 1
  
  void check_for_hierarchy_violation()
  {
    if(this_thread_hierarchy_value <= hierarchy_value)  // 2
    {
      throw std::logic_error(“mutex hierarchy violated”);
    }
  }
  
  void update_hierarchy_value()
  {
    previous_hierarchy_value=this_thread_hierarchy_value;  // 3
    this_thread_hierarchy_value=hierarchy_value;
  }
  
public:
  explicit hierarchical_mutex(unsigned long value):
      hierarchy_value(value),
      previous_hierarchy_value(0)
  {}
  
  void lock()
  {
    check_for_hierarchy_violation();
    internal_mutex.lock();  // 4
    update_hierarchy_value();  // 5
  }
  
  void unlock()
  {
    this_thread_hierarchy_value=previous_hierarchy_value;  // 6
    internal_mutex.unlock();
  }
  
  bool try_lock()
  {
    check_for_hierarchy_violation();
    if(!internal_mutex.try_lock())  // 7
      return false;
    update_hierarchy_value();
    return true;
  }
};
thread_local unsigned long
     hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);  // 8

超越锁的延伸扩展
当代码已经能规避死锁,std::lock()std::lock_guard能组成简单的锁覆盖大多数情况,但是有时需要更多的灵活性。在这些情况,可以使用标准库提供的std::unique_lock模板。如std::lock_guard,这是一个参数化的互斥量模板类,并且它提供很多RAII类型锁用来管理std::lock_guard类型,可以让代码更加灵活。

3.2.6 std::unique_lock——灵活的锁

std::unqiue_lock使用更为*的不变量,这样std::unique_lock实例不会总与互斥量的数据类型相关,使用起来要比std:lock_guard更加灵活。首先,可将std::adopt_lock作为第二个参数传入构造函数,对互斥量进行管理;也可以将std::defer_lock作为第二个参数传递进去,表明互斥量应保持解锁状态。这样,就可以被std::unique_lock对象(不是互斥量)的lock()函数的所获取,或传递std::unique_lock对象到std::lock()中。清单3.6可以轻易的转换为清单3.9,使用std::unique_lockstd::defer_lock①,而非std::lock_guardstd::adopt_lock。代码长度相同,几乎等价,唯一不同的就是:std::unique_lock会占用比较多的空间,并且比std::lock_guard稍慢一些。保证灵活性要付出代价,这个代价就是允许std::unique_lock实例不带互斥量:信息已被存储,且已被更新。
清单3.9 交换操作中std::lock()std::unique_lock的使用

class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
  some_big_object some_detail;
  std::mutex m;
public:
  X(some_big_object const& sd):some_detail(sd){}
  friend void swap(X& lhs, X& rhs)
  {
    if(&lhs==&rhs)
      return;
    std::unique_lock<std::mutex> lock_a(lhs.m,std::defer_lock); // 1 
    std::unique_lock<std::mutex> lock_b(rhs.m,std::defer_lock); // 1 std::def_lock 留下未上锁的互斥量
    std::lock(lock_a,lock_b); // 2 互斥量在这里上锁
    swap(lhs.some_detail,rhs.some_detail);
  }
};

列表3.9中,因为std::unique_lock支持lock(), try_lock()和unlock()成员函数,所以能将std::unique_lock对象传递到std::lock()②。这些同名的成员函数在低层做着实际的工作,并且仅更新std::unique_lock实例中的标志,来确定该实例是否拥有特定的互斥量,这个标志是为了确保unlock()在析构函数中被正确调用。如果实例拥有互斥量,那么析构函数必须调用unlock();但当实例中没有互斥量时,析构函数就不能去调用unlock()。这个标志可以通过owns_lock()成员变量进行查询。

可能如你期望的那样,这个标志被存储在某个地方。因此,std::unique_lock对象的体积通常要比std::lock_guard对象大,当使用std::unique_lock替代std::lock_guard,因为会对标志进行适当的更新或检查,就会做些轻微的性能惩罚。当std::lock_guard已经能够满足你的需求,那么还是建议你继续使用它。当需要更加灵活的锁时,最好选择std::unique_lock,因为它更适合于你的任务。你已经看到一个递延锁的例子,另外一种情况是锁的所有权需要从一个域转到另一个域。

3.2.7 不同域中互斥量所有权的传递

std::unique_lock是可移动,但不可赋值的类型。

std::unique_lock<std::mutex> get_lock()
{
  extern std::mutex some_mutex;
  std::unique_lock<std::mutex> lk(some_mutex);
  prepare_data();
  return lk;  // 1
}
void process_data()
{
  std::unique_lock<std::mutex> lk(get_lock());  // 2
  do_something();
}

std::unique_lock的灵活性同样也允许实例在销毁之前放弃其拥有的锁。可以使用unlock()来做这件事,如同一个互斥量:std::unique_lock的成员函数提供类似于锁定和解锁互斥量的功能。std::unique_lock实例在销毁前释放锁的能力,当锁没有必要在持有的时候,可以在特定的代码分支对其进行选择性的释放。这对于应用性能来说很重要,因为持有锁的时间增加会导致性能下降,其他线程会等待这个锁的释放,避免超越操作。

3.2.8 锁的粒度

选择粒度对于锁来说很重要,为了保护对应的数据,保证锁有能力保护这些数据也很重要。
如果很多线程正在等待同一个资源,当有线程持有锁的时间过长,这就会增加等待的时间。在可能的情况下,锁住互斥量的同时只能对共享数据进行访问;试图对锁外数据进行处理。特别是做一些费时的动作,比如:对文件的输入/输出操作进行上锁。文件输入/输出通常要比从内存中读或写同样长度的数据慢成百上千倍,所以除非锁已经打算去保护对文件的访问,要么执行输入/输出操作将会将延迟其他线程执行的时间,这很没有必要(因为文件锁阻塞住了很多操作),这样多线程带来的性能效益会被抵消。

这能表示只有一个互斥量保护整个数据结构时的情况,不仅可能会有更多对锁的竞争,也会增加锁持锁的时间。较多的操作步骤需要获取同一个互斥量上的锁,所以持有锁的时间会更长。成本上的双重打击也算是为向细粒度锁转移提供了双重激励和可能。
列表3.10 比较操作符中一次锁住一个互斥量

class Y
{
private:
  int some_detail;
  mutable std::mutex m;
  int get_detail() const
  {
    std::lock_guard<std::mutex> lock_a(m);  // 1
    return some_detail;
  }
public:
  Y(int sd):some_detail(sd){}

  friend bool operator==(Y const& lhs, Y const& rhs)
  {
    if(&lhs==&rhs)
      return true;
    int const lhs_value=lhs.get_detail();  // 2
    int const rhs_value=rhs.get_detail();  // 3
    return lhs_value==rhs_value;  // 4
  }
};

3.3 保护共享数据的替代设施

互斥量是最通用的机制,但其并非保护共享数据的唯一方式。这里有很多替代方式可以在特定情况下,提供更加合适的保护。
一个特别极端(但十分常见)的情况就是,共享数据在并发访问和初始化时(都需要保护),但是之后需要进行隐式同步。这可能是因为数据作为只读方式创建,所以没有同步问题;或者因为必要的保护作为对数据操作的一部分,所以隐式的执行。任何情况下,数据初始化后锁住一个互斥量,纯粹是为了保护其初始化过程(这是没有必要的),并且这会给性能带来不必要的冲击。出于以上的原因,C++标准提供了一种纯粹保护共享数据初始化过程的机制。

3.3.1 保护共享数据的初始化过程

假设你与一个共享源,构建代价很昂贵,可能它会打开一个数据库连接或分配出很多的内存。
延迟初始化(Lazy initialization)在单线程代码很常见——每一个操作都需要先对源进行检查,为了了解数据是否被初始化,然后在其使用前决定,数据是否需要初始化:

std::shared_ptr<some_resource> resource_ptr;
void foo()
{
  if(!resource_ptr)
  {
    resource_ptr.reset(new some_resource);  // 1
  }
  resource_ptr->do_something();
}

清单 3.11 使用一个互斥量的延迟初始化(线程安全)过程

std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;

void foo()
{
  std::unique_lock<std::mutex> lk(resource_mutex);  // 所有线程在此序列化 
  if(!resource_ptr)
  {
    resource_ptr.reset(new some_resource);  // 只有初始化过程需要保护 
  }
  lk.unlock();
  resource_ptr->do_something();
}

这段代码相当常见了,也足够表现出没必要的线程化问题,很多人能想出更好的一些的办法来做这件事,包括声名狼藉的双重检查锁模式:

void undefined_behaviour_with_double_checked_locking()
{
  if(!resource_ptr)  // 1
  {
    std::lock_guard<std::mutex> lk(resource_mutex);
    if(!resource_ptr)  // 2
    {
      resource_ptr.reset(new some_resource);  // 3
    }
  }
  resource_ptr->do_something();  // 4
}

指针第一次读取数据不需要获取锁①,并且只有在指针为NULL时才需要获取锁。然后,当获取锁之后,指针会被再次检查一遍② (这就是双重检查的部分),避免另一的线程在第一次检查后再做初始化,并且让当前线程获取锁。
这个模式为什么声名狼藉呢?因为这里有潜在的条件竞争,未被锁保护的读取操作①没有与其他线程里被锁保护的写入操作③进行同步。因此就会产生条件竞争,这个条件竞争不仅覆盖指针本身,还会影响到其指向的对象;即使一个线程知道另一个线程完成对指针进行写入,它可能没有看到新创建的some_resource实例,然后调用do_something()④后,得到不正确的结果。这个例子是在一种典型的条件竞争——数据竞争,C++标准中这就会被指定为“未定义行为”。这种竞争肯定是可以避免的。可以阅读第5章,那里有更多对内存模型的讨论,包括数据竞争的构成。
C++标准委员会也认为条件竞争的处理很重要,所以C++标准库提供了std::once_flagstd::call_once来处理这种情况。比起锁住互斥量,并显式的检查指针,每个线程只需要使用std::call_once,在std::call_once的结束时,就能安全的知道指针已经被其他的线程初始化了。使用std::call_once比显式使用互斥量消耗的资源更少,特别是当初始化完成后。

std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag;  // 1

void init_resource()
{
  resource_ptr.reset(new some_resource);
}

void foo()
{
  std::call_once(resource_flag,init_resource);  // 可以完整的进行一次初始化
  resource_ptr->do_something();
}

清单3.12 使用std::call_once作为类成员的延迟初始化(线程安全)

class X
{
private:
  connection_info connection_details;
  connection_handle connection;
  std::once_flag connection_init_flag;

  void open_connection()
  {
    connection=connection_manager.open(connection_details);
  }
public:
  X(connection_info const& connection_details_):
      connection_details(connection_details_)
  {}
  void send_data(data_packet const& data)  // 1
  {
    std::call_once(connection_init_flag,&X::open_connection,this);  // 2
    connection.send_data(data);
  }
  data_packet receive_data()  // 3
  {
    std::call_once(connection_init_flag,&X::open_connection,this);  // 2
    return connection.receive_data();
  }
};

值得注意的是,std::mutexstd::one_flag的实例就不能拷贝和移动,所以当你使用它们作为类成员函数,如果你需要用到他们,你就得显示定义这些特殊的成员函数。

3.3.2 保护很少更新的数据结构

试想,为了将域名解析为其相关IP地址,我们在缓存中的存放了一张DNS入口表。通常,给定DNS数目在很长的一段时间内保持不变。虽然,在用户访问不同网站时,新的入口可能会被添加到表中,但是这些数据可能在其生命周期内保持不变。所以定期检查缓存中入口的有效性,就变的十分重要了;但是,这也需要一次更新,也许这次更新只是对一些细节做了改动。
使用std::mutex来保护数据结构,显的有些反应过度,比起使用std::mutex实例进行同步,不如使用boost::shared_mutex来做同步。
清单3.13 使用boost::shared_mutex对数据结构进行保护

#include <map>
#include <string>
#include <mutex>
#include <boost/thread/shared_mutex.hpp>

class dns_entry;

class dns_cache
{
  std::map<std::string,dns_entry> entries;
  mutable boost::shared_mutex entry_mutex;
public:
  dns_entry find_entry(std::string const& domain) const
  {
    boost::shared_lock<boost::shared_mutex> lk(entry_mutex);  // 1
    std::map<std::string,dns_entry>::const_iterator const it=
       entries.find(domain);
    return (it==entries.end())?dns_entry():it->second;
  }
  void update_or_add_entry(std::string const& domain,
                           dns_entry const& dns_details)
  {
    std::lock_guard<boost::shared_mutex> lk(entry_mutex);  // 2
    entries[domain]=dns_details;
  }
};

3.3.3 嵌套锁

当一个线程已经获取一个std::mutex时(已经上锁),并对其再次上锁,这个操作就是错误的,并且继续尝试这样做的话,就会产生未定义行为。然而,在某些情况下,一个线程尝试获取同一个互斥量多次,而没有对其进行一次释放是可以的。之所以可以,是因为C++标准库提供了std::recursive_mutex类。互斥量锁住其他线程前,你必须释放你拥有的所有锁,所以当你调用lock()三次时,你也必须调用unlock()三次。正确使用std::lock_guard<std::recursive_mutex>std::unique_lock<std::recursive_mutex>可以帮你处理这些问题。

4 同步并发操作

4.1 等待一个事件或其他条件

晚间在列车上等待凌晨到站出站,要么一直不睡等着,要么等待被列车员唤醒。
第二个选择是在等待线程在检查间隙,使用std::this_thread::sleep_for()进行周期性的间歇(详见4.3节):

bool flag;
std::mutex m;

void wait_for_flag()
{
  std::unique_lock<std::mutex> lk(m);
  while(!flag)
  {
    lk.unlock();  // 1 解锁互斥量
    std::this_thread::sleep_for(std::chrono::milliseconds(100));  // 2 休眠100ms
    lk.lock();   // 3 再锁互斥量
  }
}

这里博主理解为是一个轮询。

4.1.1 等待条件达成

清单4.1 使用std::condition_variable处理数据等待

std::mutex mut;
std::queue<data_chunk> data_queue;  // 1
std::condition_variable data_cond;

void data_preparation_thread()
{
  while(more_data_to_prepare())
  {
    data_chunk const data=prepare_data();
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(data);  // 2
    data_cond.notify_one();  // 3
  }
}

void data_processing_thread()
{
  while(true)
  {
    std::unique_lock<std::mutex> lk(mut);  // 4
    data_cond.wait(
         lk,[]{return !data_queue.empty();});  // 5
    data_chunk data=data_queue.front();
    data_queue.pop();
    lk.unlock();  // 6
    process(data);
    if(is_last_chunk(data))
      break;
  }
}

4.1.2 使用条件变量构建线程安全队列

清单4.2 std::queue接口

template <class T, class Container = std::deque<T> >
class queue {
public:
  explicit queue(const Container&);
  explicit queue(Container&& = Container());
  template <class Alloc> explicit queue(const Alloc&);
  template <class Alloc> queue(const Container&, const Alloc&);
  template <class Alloc> queue(Container&&, const Alloc&);
  template <class Alloc> queue(queue&&, const Alloc&);

  void swap(queue& q);

  bool empty() const;
  size_type size() const;

  T& front();
  const T& front() const;
  T& back();
  const T& back() const;

  void push(const T& x);
  void push(T&& x);
  void pop();
  template <class... Args> void emplace(Args&&... args);
};

当你忽略构造、赋值以及交换操作时,你就剩下了三组操作:1. 对整个队列的状态进行查询(empty()和size());2.查询在队列中的各个元素(front()和back());3.修改队列的操作(push(), pop()和emplace())。这就和3.2.3中的栈一样了,因此你也会遇到在固有接口上的条件竞争。因此,你需要将front()和pop()合并成一个函数调用,就像之前在栈实现时合并top()和pop()一样。与清单4.1中的代码不同的是:当使用队列在多个线程中传递数据时,接收线程通常需要等待数据的压入。这里我们提供pop()函数的两个变种:try_pop()和wait_and_pop()。try_pop() ,尝试从队列中弹出数据,总会直接返回(当有失败时),即使没有值可检索;wait_and_pop(),将会等待有值可检索的时候才返回。当你使用之前栈的方式来实现你的队列,你实现的队列接口就可能会是下面这样:
清单4.3 线程安全队列的接口

#include <memory> // 为了使用std::shared_ptr

template<typename T>
class threadsafe_queue
{
public:
  threadsafe_queue();
  threadsafe_queue(const threadsafe_queue&);
  threadsafe_queue& operator=(
      const threadsafe_queue&) = delete;  // 不允许简单的赋值

  void push(T new_value);

  bool try_pop(T& value);  // 1
  std::shared_ptr<T> try_pop();  // 2

  void wait_and_pop(T& value);
  std::shared_ptr<T> wait_and_pop();

  bool empty() const;
};

清单4.4 从清单4.1中提取push()和wait_and_pop()

#include <queue>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadsafe_queue
{
private:
  std::mutex mut;
  std::queue<T> data_queue;
  std::condition_variable data_cond;
public:
  void push(T new_value)
  {
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(new_value);
    data_cond.notify_one();
  }

  void wait_and_pop(T& value)
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});
    value=data_queue.front();
    data_queue.pop();
  }
};
threadsafe_queue<data_chunk> data_queue;  // 1

void data_preparation_thread()
{
  while(more_data_to_prepare())
  {
    data_chunk const data=prepare_data();
    data_queue.push(data);  // 2
  }
}

void data_processing_thread()
{
  while(true)
  {
    data_chunk data;
    data_queue.wait_and_pop(data);  // 3
    process(data);
    if(is_last_chunk(data))
      break;
  }
}

清单4.5 使用条件变量的线程安全队列(完整版)

#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadsafe_queue
{
private:
  mutable std::mutex mut;  // 1 互斥量必须是可变的 
  std::queue<T> data_queue;
  std::condition_variable data_cond;
public:
  threadsafe_queue()
  {}
  threadsafe_queue(threadsafe_queue const& other)
  {
    std::lock_guard<std::mutex> lk(other.mut);
    data_queue=other.data_queue;
  }

  void push(T new_value)
  {
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(new_value);
    data_cond.notify_one();
  }

  void wait_and_pop(T& value)
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});
    value=data_queue.front();
    data_queue.pop();
  }

  std::shared_ptr<T> wait_and_pop()
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }

  bool try_pop(T& value)
  {
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty())
      return false;
    value=data_queue.front();
    data_queue.pop();
    return true;
  }

  std::shared_ptr<T> try_pop()
  {
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty())
      return std::shared_ptr<T>();
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }

  bool empty() const
  {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
  }
};

4.2 使用期望等待一次性事件

5 C++内存模型和原子类型操作

6 基于锁的并发数据结构设计

7 无锁并发数据结构设计

8 并发代码设计

9 高级线程管理

10 多线程程序的测试和调试

重点

线程std::thread对象必须调用join或detach

Destroys the thread object. If *this has an associated thread (joinable() == true), std::terminate() is called.
创建的std::thread对象会在构造函数结束时销毁,因为它是一个局部变量。如果调用了std::thread的析构函数,而该线程是joinable,则调用std::terminate。即程序会调用Abort(),不进行任何清理工作,然后abnormal program termination(程序异常终止)。

shared_mutex也是基于操作系统底层的读写锁pthread_rwlock_t的封装

参考

1、《C++ Concurrency in Action》[陈晓伟 译]
2、github 翻译地址
3、gitbook 在线阅读
4、书中源码
5、学习C++11/14
6、CPP-Concurrency-In-Action-2ed
7、thread析构
8、C++11多线程 - Part 2: 连接和分离线程
9、Starting thread causing abort()
10、Boost Mutex详细解说

上一篇:selenium的等待方法


下一篇:解决selenium控制webdriver总是被网站检测的问题