非抢占式RCU实现(一)

关于RCU的实现,考虑如下情形:

1、非抢占式RCU

2、限于嵌入式系统4核、每核单线程

3、RCU_FANOUT = 32

此时,RCU_TREE退化为单节点,如下,针对rcu_sched_state的使用做相关分析。

本想从解析各个数据结构之间的关系入手,但是没有成功,因此首先读下内核代码:

以下记录中以数组形式表示rcu_data,但是注意这是per-CPU变量,实际并非数组。

系统完成初始化后(rcu_init)的情形如下:

struct rcu_state rcu_sched_state =
{
.node[] =
{
.gpnum = ,
.qsmaks = ,
.qsmaskinig = ( << ) | ( << ) | ( << ) | ( << ),
.grplo = ,
.grphi = ,
.completed = ,
.grpnum = ,
.grpmask = ,
.parent = NULL,
.level = ,
// INIT_LIST_HEAD(&(rsp->node[0].blocked_tasks[0..3]);
},
.level = {&rcu_sched_state.node[]},
.levelcnt = { , , , , },
.levelspread[] = NR_CPUS,
    .rda[0..3] = &(rcu_data[0..3]),
.signaled = RCU_GP_IDLE,
.gpnum = ^ - ,
.completed = ^ - ,
.onofflock = __RAW_SPIN_LOCK_UNLOCKDE(&rcu_sched_state.onofflock),
.orphan_cbs_list = NULL,
.orphan_cbs_tail = &rcu_sched_state.orphan_cbs_list,
.orphan_qlen = ,
.fqslock = __RAW_SPIN_LOCK_UNLOCKDE(&rcu_sched_state.fqslock),
.n_force_qs = ,
.n_force_qs_ngp = ,
}; rcu_data[i]:
    rcu_data[i].mynode = &(rsp->node[0]);
    rcu_data[i].grpmask = (1 << (i - 0));
    rcu_data[i].nxtlist = NULL;
    rcu_data[i].nxttail[0..3] = &rcu_data[i].nxtlist;
    rcu_data[i].qlen = 0;
    rcu_data[i].cpu = i;     rcu_data[i].passed_quies = 0;
    rcu_data[i].qs_pending = 1;
    rcu_data[i].beenonline = 1;
    rcu_data[i].preemptable = 0;
    rcu_data[i].qlen_last_fqs_check = 0;
    rcu_data[i].n_force_qs_snap = 0;
    rcu_data[i].blimit = 10;
    rcu_data[i].gpnum = rsp.node[0].completed;  // 0
    rcu_data[i].completed = rsp.node[0].completed;  //0
    rcu_data[i].passed_quiesc_completed = rsp.node[0].completed - 1; // 2^32 - 1

假设:

A、B、C、D四核,其中A核需要修改数据,而B核需要读取相关数据,C、D两核与相关数据无关。
为了确保对其它情形同样适用,grace period必须等到A、B、C、D四核都通过进程切换后才能唤醒A核上对数据进行修改的进程。

首先,我们要明白,如果A、B、C、D依次进行进程切换且在一个宽限期之内不会再次发生申请宽限期的情形:

发生进程调度时对RCU的处理

schedule()
|---->rcu_sched_qs(cpu)
|----struct rcu_data *rdp;
| rdp = &per_cpu(rcu_sched_data, cpu);
|----rdp->passed_quiesc_completed = rdp->gpnum - ;
| 非常重要, 最近一次处理的grace period的序号
|----rdp->passed_quiesc = ;

系统时钟中断中对RCU的处理

timer_tick()---->update_process_times(user)
|---->rcu_check_callbacks(cpu, user_tick);
|---->raise_softirq(RCU_SOFTIRQ);
| 在rcu_init中:
| open_softirq(RCU_SOFTIRQ,
| rcu_process_callbacks);

系统初始后,A核首先开始注册新节点的过程(感觉自己太弱了,还好,简单的情形已分析出来,以后的记录中再分析复杂的)

synchronize_sched()
|----struct rcu_synchronize rcu;
| init_completion(&rcu.completion);
|
|----call_rcu_sche(&rcu.head, wakeme_after_rcu);
|---->__call_rcu(head, func, &rcu_sched_state);
|----wait_for_completion(&rcu.completion); __call_rcu(head, func, &rcu_sched_state)
staic void __call_rcu(struct rcu_head *head, 
void (*func)(struct rcu_head *rcu),
struct rcu_state *rsp)
|----head->func = func, head->next = NULL;
|
|----rdp = rsp->rda[smp_processor_id()];
|---->rcu_process_gp_end(rsp, rdp);
|---->check_for_new_grace_period(rsp, rdp);
|
|----*rdp->nxttail[RCU_NEXT_TAIL] = head;
|----rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
|
|
|----if(!rcu_gp_in_progress(rsp))
|----{ rcu_start_gp (rsp, variable) } rcu_gp_in_progress(rsp)
rcu_gp_in_progress(rsp)
|---- return rsp->completed != rsp->gpnum;

rcu_start_gp(struct rcu_state *rsp, unsigned int flags)

rcu_start_gp(struct rcu_state *rsp, unsigned int flags)
|---->rsp->gpnum++; //grace period序号加1
|----rsp->signaled = RCU_GP_INIT;
|----rsp->jiffies_force_qs = jiffies + RCU_JIFFIES_TILL_FORCE_QS;
| 因为有些核可能睡眠,所以在过了一定时间后可以force quiescent state
|
|考虑单节点
|----rnp->qsmask = rnp->qsmaskinit;
| 标记需要检测哪些CPU经过了quiescent state
|
|----rnp->gpnum = rsp->gpnum; //对于一个节点,更新rnp的gpnum
|
|----rnp->completed = rsp->completed; //更新rnp的completed
|
|----rsp->signaled = RCU_SIGNAL_INIT;
|----rcu_start_gp_per_cpu(rsp, rnp, rdp);
rcu_start_gp_per_cpu(struct rcu_state *rsp, struct rcu_node *rnp, struct rcu_data *rdp)
|---->__rcu_process_gp_end(rsp, rnp, rdp)
|
|----if(rdp->completed != rnp->completed)
|----{
|---- rdp->nxttail[RCU_NODE_TAIL] =
|----     rpd->nxttail[RCU_WAIT_TAIL];
|----
|---- rdp->nxttail[RCU_WAIT_TAIL] =
|----      rdp->nxttail[RCU_NEXT_READY_TAIL];
|----
|---- rdp->nxttail[RCU_NEXT_READY_TAIL] =
|----      rdp->nxttail[RCU_NEXT_TAIL];
|---- rdp->completed = rnp->completed; //更新rdp的completed
|----}
|
|
|---->dp->nxttail[RCU_NEXT_READY_TAIL] =
|----        rdp->nxttail[RCU_NEXT_TAIL];
|----
|---->rdp->nxttail[RCU_WAIT_TAIL] =
|----        rdp->nxttail[RCU_NEXT_TAIL];
|
|---->__note_new_gpnum(rsp, rnp, rdp);
|
|----if(rdp->gpnum != rnp->gpnum)
|----{
|---- rdp->qs_pending = ;
|---- rdp->passed_quiesc = ;
|---- rdp->gpnum = rnp->gpnum; //更新rdp的gpnum
|----}

对于A核,很简单,因为会立即发生线程切换。对于其它的核会咋update_process_times或者shchedule中均会调用rcu_shce_qs(int cpu)以标记各自经过quiescent state。

唤醒软中断RCU_SOFTIRQ的处理过程

static void rcu_process_callbacks(struct softirq_action *unused)
|---->__rcu_process_callbacks(&rcu_sched_state,
|---- &__get_cpu_var(rcu_sched_data));
|----......

考虑A核上处理过程

__rcu_process_callbacks(struct rcu_state *rsp, struct rcu_data *rdp)
|
|---->rcu_process_gp_end(rsp, rdp); //A核上不会发生任何事情
|
|---->rcu_check_quiescent_state(rsp, rdp); //A核上不会发生任何事情
|
|----if(!rdp->pending) return;
| 这个CPU是否已经上报过它经历了quiescent state, 若已上报,则返回
|
|----if(!rdp->passed_quiesc)
| 这个CPU是否已经经历了 quiescent state,如果没有,则返回
|
|----rcu_report_qs_rdp(rdp->cpu, rsp, rdp,
| rdp->passed_quiesc_completed)
| 执行到此处,说明该CPU经历过了quiescent state,而且还没有上报
|---->rcu_do_batch(rsp, rdp);
    |实际上,除非恰好该核终结了grace period,否则如下的if条件将成立,从而返回
|---->if(!cpu_has_callbacks_ready_to_invoke(rdp))return;
|
| 此处不会返回,而接着运行的原因可在
| rcu_report_qs_rsp(rsp, flags)的rcu_start_gp中查找
|
|---->list = rdp->nxtlist;
|
|---->rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL];
|
|---->*rdp->nxttail[RCU_DONE_TAIL] = NULL;
|
|---->tail = rdp->nxttail[RCU_DONE_TAIL];
|
          |如果所有的callback都在DONE_TAIL前面,那就还原其它的指针
|----for (count = RCU_NEXT_SIZE - ; count >= ; count--)
| if (rdp->nxttail[count] ==
| rdp->nxttail[RCU_DONE_TAIL])
| rdp->nxttail[count] = &rdp->nxtlist;
|
|----count = ;
| while (list) {
| next = list->next;
| list->func(list);
| list = next;
| if (++count >= rdp->blimit)
| break;
| }
|
|----if(cpu_has_callbacks_ready_to_invoke(rdp))
| raise_softirq(RCU_SOFTIRQ); rcu_report_qs_rdp(int cpu, struct rcu_state *rsp,
struct rcu_data *rdp, long lastcomp)
rcu_report_qs_rdp(int cpu, struct rcu_state *rsp, struct rcu_data *rdp, long lastcomp)
|----if(lastcomp != rnp->completed)
|----{ rdp->passed_quiesc = ; return }
|
|对于A核,lastcomp != rnp->completed不成立
|
|----mask = rdp->grpmask;
| 该CPU在rcu_node中的淹码
|----rdp->qs_pending = ;
| rcu_node已经响应该上报过程
|----rdp->nxttail[RCU_NEXT_READY_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
|----rcu_report_qs_rnp(mask, rsp, rnp, flags);
|---->if(!(rnp->qsmask & mask)) return;
| rcu_node已经知道了在该次grace period中,这个CPU已经经历过了
|quiescent state,因此返回;
|
|---->rnp->qsmask &= ~mask;
| 清除需要检测的CPU,因为已经检测到了
|
|---->if(rnp->qmask != ) return;
| 因为rnp->qmask不为零,说明对于该次grace period
| 仍有一些核没有被检测到通过了quiescent state,因此不能再上报给
| rcu_state,因此返回
|
|---->rcu_report_qs_rsp(rsp, flags);
| 对于该次grace period,rcu_node检测到所有的
| 核都至少通过quiescent state一次,因此
| 上报给rcu_state
|
|---->rsp->completed = rsp->gpnum;
| 该次grace period结束,可以开启新的grace period
|---->rsp->signaled = RCU_GP_IDLE;
|
|---->rcu_start_gp(rsp, flags);
| 看下上文对rcu_start_gp的分析,知道器会迁移链表,尤其注意
| rdp->nxttail[RCU_NODE_TAIL] =
| rpd->nxttail[RCU_WAIT_TAIL];
| 之所以要注意这一点,是因为其和rcu_do_batch能够真正的处理链表相关

B、C、D其它三个核的处理过程

__rcu_process_callbacks(struct rcu_state *rsp, struct rcu_data *rdp)
|---->rcu_process_gp_end(rsp, rdp);
| |---->if(rdp->completed = rnp->completed) return;
| |
| | 因为该次grace period并非本核发起,因此需要处理自己的链表,
| | 故需要调用__rcu_process_gp_end函数完成链表处理;
| | 实际上开启本次grace period的核在开启该次grace_period时已经
| | 调用了__rcu_process_gp_end,可以参照rcu_start_gp_per_cpu
| |---->__rcu_process_gp_end(rsp, rnp, rdp);
| | 该过程不仅完成链表的迁移,而且设置:
| |   rdp->completed = rnp->completed
| |
|---->rcu_check_quiescent_state(rsp, rdp);
| |---->if(check_for_new_grace_period(rsp, rdp)) return;
| | 对于check_for_new_grace_period(rsp, rdp)
| |----int ret = ;
| |---->if(rdp->gpnum != rsp->gpnum)
| | { note_new_gpnum(rsp, rdp), ret = ;}
| |
| | 因为该次grace period并非本核发起,因此需要在本CPU上标记
| | 一个新的grace period;
| | 实际上开启本次grace period的核在开启该次grace period时
| | 已经调用了__note_new_gpnum,可以参照rcu_start_gp_per_cpu
| | |---->__note_new_gpnum(rsp, rnp, rdp);
| | |
| | |----if(rdp->gpnum != rnp->gpnum)
| | |----{
| | |---- rdp->qs_pending = ;
| | |---- rdp->passed_quiesc = ;
| | |---- rdp->gpnum = rnp->gpnum;
| | | 更新rdp的gpnum
| | |----}
| |
| |----return ret;
| |
|---->rcu_do_batch(rsp, rdp)
|---->if(!cpu_has_callbacks_ready_to_invoke(rdp))return;
|对于cpu_has_callbacks_read_to_invoke(rdp)
| |---->return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL];
| | 对于B、C、D核将返回0;导致rcu_do_batch不会正的开始处理链表
|
| 从这里可以看出,对于A核开启的grace period,B、C、D在首次处理
| RCU_SOFTIRQ相应的软中断时,将会各自根据rcu_node上的
| gpnum与completed
| 调用()__note_new_gpnum(rsp, rnp, rdp);
| 设置本核gpnum, qs_pending, passed_quiesc
| ()__rcu_process_gp_end(rsp, rnp, rdp);
| 设置本核completed,迁移链表
|
| 所以对于B、C、D核而言在第二次处理软中断时,将会与A核情形一样,
| 可以上报给
| rcu_node,由于所有的核都已经上报给了rcu_node,因此rcu_node上报给
| rcu_state,来指明该次grace period已经结束

此处对B、C、D的考虑确实存在问题,因为B、C、D在执行rcu_do_batch的过程中也会如A中一样,此处为了方便理解做了假设。

上一篇:BestCoder Round #61 1001 Numbers


下一篇:C# Redis使用之StackExchange