C++11 CAS无锁函数compare_exchange_weak的使用

 

#include <iostream>       // std::cout
#include <atomic>         // std::atomic
#include <thread>         // std::thread
#include <vector>         // std::vector

// a simple global linked list:
struct Node { int value; Node* next; };
std::atomic<Node*> list_head (nullptr);

void append (int val) {     // append an element to the list
  Node* oldHead = list_head;
  Node* newNode = new Node {val,oldHead};

  // what follows is equivalent to: list_head = newNode, but in a thread-safe way:
  while (!list_head.compare_exchange_weak(oldHead,newNode)) {
    newNode->next = oldHead;
  }
}

int main ()
{
  // spawn 10 threads to fill the linked list:
  std::vector<std::thread> threads;
  for (int i=0; i<30; ++i) threads.push_back(std::thread(append,i));
  for (auto& th : threads) th.join();

  // print contents:
  for (Node* it = list_head; it!=nullptr; it=it->next)
    std::cout << ' ' << it->value;
  std::cout << '\n';

  // cleanup:
  Node* it; while (it=list_head) {list_head=it->next; delete it;}

  return 0;
}
#include <iostream>       // std::cout
#include <atomic>         // std::atomic
#include <thread>         // std::thread
#include <vector>         // std::vector

// a simple global linked list:
struct Node { int value; Node* next; };
std::atomic<Node*> list_head (nullptr);

void append (int val) {     // append an element to the list
  Node* oldHead = list_head;
  Node* newNode = new Node {val,oldHead};

  // what follows is equivalent to: list_head = newNode, but in a thread-safe way:
  while (!list_head.compare_exchange_weak(oldHead,newNode)) {
    newNode->next = oldHead;
  }
}

int main ()
{
  // spawn 10 threads to fill the linked list:
  std::vector<std::thread> threads;
  for (int i=0; i<30; ++i) threads.push_back(std::thread(append,i));
  for (auto& th : threads) th.join();

  // print contents:
  for (Node* it = list_head; it!=nullptr; it=it->next)
    std::cout << ' ' << it->value;
  std::cout << '\n';

  // cleanup:
  Node* it; while (it=list_head) {list_head=it->next; delete it;}

  return 0;
}

 

 

root@ubuntu:~/c++# g++ -std=c++11  weak.cpp -o weak -pthread
root@ubuntu:~/c++# ./weak
 29 28 27 23 26 25 24 22 21 20 19 18 17 16 15 14 13 12 11 10 9 8 7 6 5 4 3 2 1 0
root@ubuntu:~/c++# 

 

 

在看c++11的CAS用法的时候,主要是产生了两个问题:

  1. compare_swap_strong 与 compare_swap_weak 有啥区别?
  2. c++11 CAS原语系列后面还有两个memory_order参数,有什么作用?
/*
* @brief:compare & swap(CAS)。如果等于expect则swap,否则就返回--是否交换成功, 注意expect如果不相等,会把当前值写入到expected里面。
* 相比于strong,weak可能会出现[spurious wakeup](<http://en.wikipedia.org/wiki/Spurious_wakeup>).
* @param          若x等于expect,则设置为desired 返回true,
*                 否则最新值写入expect,返回false
*/
class atomic {
bool compare_exchange_strong(T& expect /*用来比较的值*/, T desired/*用来设置的值*/)
bool compare_exchange_weak(T& expect, T desired)
}

compare_swap_strong 与 compare_swap_weak

看一下compare_swap_strong()的实现是如何的?

首先介绍一下几个定义好的与操作,实际上就是几个enum

// 几个与操作是经过了重载的。
/// Enumeration for memory_order
typedef enum memory_order {
    memory_order_relaxed,
    memory_order_consume,
    memory_order_acquire,
    memory_order_release,
    memory_order_acq_rel,
    memory_order_seq_cst
} memory_order;

enum __memory_order_modifier {
    __memory_order_mask          = 0x0ffff,
    __memory_order_modifier_mask = 0xffff0000,
    __memory_order_hle_acquire   = 0x10000,
    __memory_order_hle_release   = 0x20000
};

// & operator
constexpr memory_order
operator&(memory_order __m, __memory_order_modifier __mod) {
  return memory_order(__m & int(__mod));
}

可以简单地理解为几个常量在操作。接下来看真正的实现部分。

/c++/atomic 头文件
compare_exchange_weak(__pointer_type& __p1,
                     __pointer_type __p2,
                     memory_order __m1,
                     memory_order __m2) noexcept {
    return _M_b.compare_exchange_strong(__p1, __p2, __m1, __m2);
}

// atomic_base
_GLIBCXX_ALWAYS_INLINE bool
compare_exchange_strong(__pointer_type& __p1, __pointer_type __p2,
                        memory_order __m1,
                        memory_order __m2) noexcept
{
  memory_order __b2 = __m2 & __memory_order_mask;
  memory_order __b1 = __m1 & __memory_order_mask;
  __glibcxx_assert(__b2 != memory_order_release);
  __glibcxx_assert(__b2 != memory_order_acq_rel);
  __glibcxx_assert(__b2 <= __b1);

  return __atomic_compare_exchange_n(&_M_p, &__p1, __p2, 0, __m1, __m2);
}

最后发现__atomic_compare_exchange_n是由GCC内置的函数。

gcc 内置

Built-in Function:

bool __atomic_compare_exchange_n(
    type *ptr,              // 需要进行比较的ptr
    type *expected,         // 旧的值,会返回ptr里面的值
    type desired,           // 想要设置的新的值
    bool weak,              // 强一致,还是弱一致
    int success_memorder,   // 成功时的内存序
    int failure_memorder    // 失败时的内存序
)

weak与strong

参考文档gcc内置CAS说明

详细的说明如下,如果有耐心那么可以直接看完。如果没有耐心,可以直接看一下我自己的结论。

  • weak = true的时候,对应c++11的compare_exchange_weak函数。
  • weak = false的时候,对应的是compare_exchange_strong函数。

区别这两个函数的区别在于,weak在有的平台上(注意,是有的平台,这里不包括x86)会存在失败的可能性。即,当*ptr == *expected依然有可能什么都不做而返回false

所以,在x86平台来说,这两者可以说是没什么区别。只是如果想要代码可移值性好,那么采用compare_exchange_weak并且使用循环来判断,那么是一种比较好的办法。

结论就是:

  • 想要性能,使用compare_exchange_weak+循环来处理。
  • 想要简单,使用compare_exchange_strong
  • 如果是x86平台,两者没区别
  • 如果想在移值的时候,拿到高性能,用compare_exchange_weak

详细的说明

需要注意的是,weak = true表示弱CAS,在这种情况下,就是交换成功,也有可能返回失败。

在某些平台上,即使 atomic的值和expected一樣,weak 版仍有可能会失败。但是绝大多数情況不会失敗,且 weak 版的运行比strong版快。所以若本来就需要在循环里执行 compare-and-swap,用 weak 版较有效率;反之,只执行一次的話,用 strong 版可以省去不必要的循环。

所以weak和strong的区别在于,weak仍然在*ptr == expected的时候,执行依然会有小概率失败。也就是说, 即使*ptr == expected,此时也不会发生值的设置,也会返回false。(不会是设置成功了,但是返回的是false)。

Many targets only offer the strong variation and ignore the parameter. When in doubt, use the strong variation.

一般而言,当你拿不准,就使用strong的版本。

If desired is written into *ptr then true is returned and memory is affected according to the memory order specified by success_memorder. There are no restrictions on what memory order can be used here.

Otherwise, false is returned and memory is affected according to failure_memorder. This memory order cannot be ATOMIC_RELEASE nor ATOMIC_ACQ_REL. It also cannot be a stronger order than that specified by success_memorder.

如果需要写入* ptr则返回true,并根据success_memorder指定的内存顺序影响内存。 这里可以使用什么内存顺序没有限制。

否则,返回false并根据failure_memorder影响内存。 此内存顺序不能是ATOMIC_RELEASE和ATOMIC_ACQ_REL 。 它也不可能比success_memorder指定的顺序更强大。

两个内存序

这个函数系列有趣的是,后面还有两个有趣的memory_order序。那么这两个memory_order起什么作用呢?

结论就是,针对x86平台来说,取什么值都是一样的。可以拿一段简单的代码进行尝试,注意查看生成的汇编代码。

#include <pthread.h>
#include <stdbool.h>

extern int a_lock;

int a_lock = 0;

static bool lock(int *l)
{
    int tmp = 0;
    return __atomic_compare_exchange_n(l, &tmp, 1,
            true , __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
}
static void unlock(int *l)
{
    __atomic_store_n(l, 0, __ATOMIC_SEQ_CST);
}

int main(int argc, char *argv[])
{
    lock(&a_lock);
    unlock(&a_lock);
}

 

 

root@ubuntu:~/c++# g++ -S -O0  -std=c++11  weak.cpp -o weak -pthread
root@ubuntu:~/c++# cat weak
        .arch armv8-a
        .file   "weak.cpp"
        .global a_lock
        .bss
        .align  2
        .type   a_lock, %object
        .size   a_lock, 4
a_lock:
        .zero   4
        .text
        .align  2
        .type   _ZL4lockPi, %function
_ZL4lockPi:
.LFB14:
        .cfi_startproc
        stp     x29, x30, [sp, -48]!
        .cfi_def_cfa_offset 48
        .cfi_offset 29, -48
        .cfi_offset 30, -40
        add     x29, sp, 0
        .cfi_def_cfa_register 29
        str     x0, [x29, 24]
        adrp    x0, :got:__stack_chk_guard
        ldr     x0, [x0, #:got_lo12:__stack_chk_guard]
        ldr     x1, [x0]
        str     x1, [x29, 40]
        mov     x1,0
        str     wzr, [x29, 36]
        ldr     x1, [x29, 24]
        add     x0, x29, 36
        ldr     w3, [x0]
        mov     w4, 1
        ldaxr   w2, [x1]
        cmp     w2, w3
        bne     .L5
        stlxr   w5, w4, [x1]
        cmp     w5, 0
.L5:
        cset    w1, eq
        cmp     w1, 0
        bne     .L2
        str     w2, [x0]
.L2:
        mov     w0, w1
        adrp    x1, :got:__stack_chk_guard
        ldr     x1, [x1, #:got_lo12:__stack_chk_guard]
        ldr     x2, [x29, 40]
        ldr     x1, [x1]
        eor     x1, x2, x1
        cmp     x1, 0
        beq     .L4
        bl      __stack_chk_fail
.L4:
        ldp     x29, x30, [sp], 48
        .cfi_restore 30
        .cfi_restore 29
        .cfi_def_cfa 31, 0
        ret
        .cfi_endproc
.LFE14:
        .size   _ZL4lockPi, .-_ZL4lockPi
        .align  2
        .type   _ZL6unlockPi, %function
_ZL6unlockPi:
.LFB15:
        .cfi_startproc
        sub     sp, sp, #16
        .cfi_def_cfa_offset 16
        str     x0, [sp, 8]
        ldr     x0, [sp, 8]
        stlr    wzr, [x0]
        nop
        add     sp, sp, 16
        .cfi_def_cfa_offset 0
        ret
        .cfi_endproc
.LFE15:
        .size   _ZL6unlockPi, .-_ZL6unlockPi
        .align  2
        .global main
        .type   main, %function
main:
.LFB16:
        .cfi_startproc
        stp     x29, x30, [sp, -32]!
        .cfi_def_cfa_offset 32
        .cfi_offset 29, -32
        .cfi_offset 30, -24
        add     x29, sp, 0
        .cfi_def_cfa_register 29
        str     w0, [x29, 28]
        str     x1, [x29, 16]
        adrp    x0, a_lock
        add     x0, x0, :lo12:a_lock
        bl      _ZL4lockPi
        adrp    x0, a_lock
        add     x0, x0, :lo12:a_lock
        bl      _ZL6unlockPi
        mov     w0, 0
        ldp     x29, x30, [sp], 32
        .cfi_restore 30
        .cfi_restore 29
        .cfi_def_cfa 31, 0
        ret
        .cfi_endproc
.LFE16:
        .size   main, .-main
        .ident  "GCC: (Ubuntu/Linaro 5.5.0-12ubuntu1) 5.5.0 20171010"
        .section        .note.GNU-stack,"",@progbits
root@ubuntu:~/c++# 

 

 

root@ubuntu:~/c++# cat weak 
        .arch armv8-a
        .file   "weak.cpp"
        .section        .text.startup,"ax",@progbits
        .align  2
        .p2align 3,,7
        .global main
        .type   main, %function
main:
.LFB17:
        .cfi_startproc
        adrp    x0, :got:__stack_chk_guard
        stp     x29, x30, [sp, -32]!
        .cfi_def_cfa_offset 32
        .cfi_offset 29, -32
        .cfi_offset 30, -24
        adrp    x1, .LANCHOR0
        add     x29, sp, 0
        .cfi_def_cfa_register 29
        ldr     x4, [x0, #:got_lo12:__stack_chk_guard]
        add     x2, x1, :lo12:.LANCHOR0
        ldr     x5, [x4]
        str     x5, [x29, 24]
        mov     x5,0
        str     wzr, [x29, 20]
        ldaxr   w4, [x2]
        cmp     w4, 0
        bne     .L4
        mov     w3, 1
        stlxr   w5, w3, [x2]
        cmp     w5, 0
.L4:
        beq     .L2
        str     w4, [x29, 20]
.L2:
        add     x1, x1, :lo12:.LANCHOR0
        stlr    wzr, [x1]
        ldr     x1, [x0, #:got_lo12:__stack_chk_guard]
        mov     w0, 0
        ldr     x2, [x29, 24]
        ldr     x1, [x1]
        eor     x1, x2, x1
        cbnz    x1, .L7
        ldp     x29, x30, [sp], 32
        .cfi_remember_state
        .cfi_restore 30
        .cfi_restore 29
        .cfi_def_cfa 31, 0
        ret
.L7:
        .cfi_restore_state
        bl      __stack_chk_fail
        .cfi_endproc
.LFE17:
        .size   main, .-main
        .global a_lock
        .bss
        .align  2
.LANCHOR0 = . + 0
        .type   a_lock, %object
        .size   a_lock, 4
a_lock:
        .zero   4
        .ident  "GCC: (Ubuntu/Linaro 5.5.0-12ubuntu1) 5.5.0 20171010"
        .section        .note.GNU-stack,"",@progbits
root@ubuntu:~/c++# 

可以尝试去修改一下__atomic_compare_exchange_n后面两个内存序。可以发现最后生成的汇编代码没有什么区别。

小结

  • 为了跨平台性以及高性能,那么使用compare_exchange_weak好的办法。
  • 后面的两个内存序,使用默认的std::memory_order_seq_cst即可。这是因为,在x86平台上,微调这两个参数生成的代码都是一样的,没有什么收益。其他弱内存序的cpu上面,可能每种cpu得到的最合适的这两个参数都会有所不同。为了避免去适配各个cpu,那么一种偷懒的办法就是直接使用std::memory_order_seq_cst

一句话就是,直接使用var.compare_exchange_weak(a,b);这种形式最省事。比如:

void link_nodes_atomic(node * new_top_node, node * end_node)
    {
        tagged_node_handle old_tos = tos.load(detail::memory_order_relaxed);
        for (;;) {
            tagged_node_handle new_tos (pool.get_handle(new_top_node), old_tos.get_tag());
            end_node->next = pool.get_handle(old_tos);

            if (tos.compare_exchange_weak(old_tos, new_tos))
                break;
        }
    }

 

 

#include <atomic>

template<class T>
struct node {
   T data;
   node* next;
   node(const T& data) : data(data), next(nullptr) {}
};

template<class T>
class stack {
   std::atomic<node<T>*> head;
   public:
      void push(const T& data) {
         node<T>* new_node = new node<T>(data);
         new_node->next = head.load(std::memory_order_relaxed);
         while(!std::atomic_compare_exchange_weak_explicit(&head, &new_node->next,
            new_node, std::memory_order_release, std::memory_order_relaxed))
            ;
      }
};

int main() {
   stack<int> s;
   s.push(1);
   s.push(2);
   s.push(3);
}

 

上一篇:通过fork函数创建进程


下一篇:一、编译过程