user rcu 用户态RCU

  目前又有需求做性能优化,都已经将mutex_lock 修改为cas atomic MPMC、Thread_local 等lock_free/原子/局部变量等相关操作,目前就缺用户态RCU了!so看下 怎么使用以及 性能怎样

 目前已经开源了一种user-rcu:urcu  

liburcu提供了以下几种RCU实现:

  1. rcu-qsbr:读性能最好的RCU实现,可以做到reader 0 zerohead,但是需要改动代码,侵入式。
  2. rcu-signal:性能仅次于qsbr的实现,不需要改动代码,代价是需要牺牲一个signal给urcu实现。
  3. rcu-generic:性能最差的rcu实现(但也比mutex强多了),不需要改动代码,可以作为初始的第一选择。

  qsbr 称为显示静默期 (Quiescent) 声明模式,这种模式下,rcu_read_lock() 和 rcu_read_unlock() 为空操作,对于 reader 来说负担为零 (这一点是另外的实现 不具备的),但代价是,每个 reader 线程也必须周期性的调用 rcu_quiescent_state() 来声明自己进入了静默期,需要注意的是,并非每次读操作完成后都需要做此声明,考虑到读操作的性能和应用读写操作的不平衡性,通常的做法是读操作每进行一定次数 (如 1024) 之后声明一次静默期。还有一点,每个会进入 RCU-read-side critical sections 的线程都需要事先通过 rcu_register_thread() 接口进行注册,相应的在线程推出时调用 rcu_unregister_thread() 接口进行去注册;

  rcu 是用于多核系统中,保持各线程所看到的全局数据一致性的一种机制,所以需要一种手段可以判断当 writer 线程进行数据更新时,reader 线程看到的数据是否也更新了,为此 urcu 维护了一个全局的计数器 global counter,每次 writer 的同步操作 (synchronize),都会使 rcu_gp.ctr 加1,表示数据已经更新了,reader 需要更新与之对应的,每个 reader 线程也持有一个线程内部的计数器 ctx,如果这个 ctx 与 rcu_gp.ctr 一致,就表明本线程的数据已经最新 (ACTIVE_CURRENT),反之则不是最新 (ACTIVE_OLD),

struct rcu_gp {
    unsigned long ctr;
    int32_t futex;
} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
extern struct rcu_gp rcu_gp;

struct rcu_reader {
    /* Data used by both reader and synchronize_rcu() */
    unsigned long ctr;
    struct cds_list_head node 
        __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
    int waiting;
    pthread_t tid;
    unsigned int registered:1;
};
extern DECLARE_URCU_TLS(struct rcu_reader, rcu_reader);

  qsbr rcu 的实现中,reader 线程必须进行显示的注册,它的目的是将自己挂接在全局链表 registry 上,通俗的说就是将自己置于全局管理之下,这样当 writer 在进行同步 (synchronize) 时,才能知道哪些线程需要同步。线程的上线状态分为在线 (online) 和离线 (offline),后面会提到,处于 offline 的线程虽然在 registry 链表上,但在 synchronized 时,writer 会忽略这些线程。线程注册会默认置于 online 状态。

DEFINE_URCU_TLS(struct rcu_reader, rcu_reader);
static CDS_LIST_HEAD(registry);
static pthread_mutex_t rcu_registry_lock = PTHREAD_MUTEX_INITIALIZER;

void rcu_register_thread(void) {
    URCU_TLS(rcu_reader).tid = pthread_self();
    assert(URCU_TLS(rcu_reader).ctr == 0);

    mutex_lock(&rcu_registry_lock);
    assert(!URCU_TLS(rcu_reader).registered);
    URCU_TLS(rcu_reader).registered = 1;
    cds_list_add(&URCU_TLS(rcu_reader).node, &registry);
    mutex_unlock(&rcu_registry_lock);
    _rcu_thread_online();
}

void rcu_unregister_thread(void) {
    _rcu_thread_offline();
    assert(URCU_TLS(rcu_reader).registered);
    URCU_TLS(rcu_reader).registered = 0;
    mutex_lock(&rcu_registry_lock);
    cds_list_del(&URCU_TLS(rcu_reader).node);
    mutex_unlock(&rcu_registry_lock);
}

  每个读线程都有一个rcu_reader;为TLS(线程局部存储)变量;定义一个双向链表registry,用来保存所有读线程的rcu_reader。这会在写线程的synchronize_rcu()用到。还有一个mutex来保护这个链表。

 

读临界区

 读线程在对公共数据做操作的时候,需要调用rcu_read_lockrcu_read_unlock来标记临界区:这两个函数里面实际上什么都没有做,只是assert,说明在O2优化下这就是个空的函数。这也就是为什么urcu-qsbr是zero overhead的原因,因为他的读临界区完全啥事没干!

nline void rcu_read_lock(void) {
    urcu_assert(URCU_TLS(rcu_reader).ctr);
}

inline void rcu_read_unlock(void) {
    urcu_assert(URCU_TLS(rcu_reader).ctr);
}

 

Quiescent State

在qsbr里面对于读线程最核心的函数实际上是rcu_quiescent_state(),用来告诉写线程,该读线程已经结束了一批读临界区:

void rcu_quiescent_state(void) {
    unsigned long gp_ctr;

    urcu_assert(URCU_TLS(rcu_reader).registered);
    if ((gp_ctr = CMM_LOAD_SHARED(rcu_gp.ctr)) == URCU_TLS(rcu_reader).ctr)
        return;
    _rcu_quiescent_state_update_and_wakeup(gp_ctr);
}

 

  这个函数首先看看当前线程的gp号是否已经是最新的,如果是,直接返回;否则调用_rcu_quiescent_state_update_and_wakeup

void _rcu_quiescent_state_update_and_wakeup(unsigned long gp_ctr) {
    cmm_smp_mb();
    _CMM_STORE_SHARED(URCU_TLS(rcu_reader).ctr, gp_ctr);
    cmm_smp_mb();  /* write URCU_TLS(rcu_reader).ctr before read futex */
    wake_up_gp();  /* similar to pthread_cond_broadcast */
    cmm_smp_mb();
}

 

  wakeup函数实际上就是把刚刚读出来的最新的gp号存到当前线程的gp缓存里,接着唤醒可能在等待的写线程。这里的三个cmm_smp_mb调用就是memory barrier,防止这个函数之前和之后的操作可能产生的乱序,以及函数中的两步操作之间可能的乱序。

可以从上面看出,核心函数的操作都不复杂,基本都是一些变量的load和store,overhead非常小

写线程函数——synchronize_rcu

写线程,最核心的函数就是synchronize_rcu,等待Grace Period的结束:

void synchronize_rcu(void)
{
    CDS_LIST_HEAD(qsreaders);

    cmm_smp_mb();//确保synchronize_rcu之前和之后的读写操作都不会乱序

    mutex_lock(&rcu_gp_lock);
    mutex_lock(&rcu_registry_lock);

    if (cds_list_empty(&registry))
        goto out;//如果空则表示没有读线程,可以直接返回。

    CMM_STORE_SHARED(rcu_gp.ctr, rcu_gp.ctr + RCU_GP_CTR);
//如果不为空,则把rcu_gp增一。增一的作用就是表示一个新的Grace Period已经开始了。 接着调用wait_for_readers,等待Grace Period的结束。
    cmm_barrier();
    cmm_smp_mb();

    wait_for_readers(&registry, NULL, &qsreaders);
    cds_list_splice(&qsreaders, &registry);
out:
    mutex_unlock(&rcu_registry_lock);
    mutex_unlock(&rcu_gp_lock);
    cmm_smp_mb();
}
wait_for_readers:函数的目的就是等待所有的读线程都更新自己的gp号到最新的gp号。

  在synchronize_rcu返回之后,我们可以知道没有任何一个读线程可以获取到旧的共享数据,所以我们可以删除旧数据。

  目前synchronize_rcu 为阻塞性,--->数据更新后释放旧资源,是要在 synchronize_rcu() 接触阻塞后才能进行;urcu 提供了 call_rcu() 接口来完成异步回调释放

 

struct rcu_head {
 struct cds_wfcq_node next;
 void (*func)(struct rcu_head *head);
};
void call_rcu(struct rcu_head *head,void (*func)(struct rcu_head *head);

 通常将要延迟释放的数据结构内嵌一个 rcu_head 结构,在需要延迟释放时调用

struct global_foo {
   struct rcu_head rcu_head;
  ......
};
struct global_foo g_foo;
//在 writer 需要释放时,调用 call_rcu(),之后当所有 reader 都更新完成后,设置的回调函数 free_func 被调用
call_rcu(&g_foo.rcu_head, free_func);

call_rcu的实现是通过内部创建一个线程实现的,此线程称之为 call_rcu_thread,这个线程专门用于 writer call_rcu()

上一篇:JAVA五子棋


下一篇:Java之Cleaner