序言:近期读Linux 5.15的发布说明,该版本合并了实时锁机制,当开启配置宏CONFIG_PREEMPT_RT的时候,这些锁被基于实时互斥锁的变体替代:mutex、ww_mutex、rw_semaphore、spinlock和rwlock。第一次听说ww_mutex,在百度上查找的时候发现介绍文档很少,于是自己学习,写成笔记。
在某些场合必须同时持有多个锁,并且获取锁的顺序可能不同,为了避免死锁,应该使用伤害/等待互斥锁(Wound/Wait Mutexes)。获取一个锁集合称为一个事务(transaction),每个事务关联一张门票(ticket),门票也称为序列号,根据门票判断哪个事务年轻。有2种处理死锁的方法,如下。
(1) 等待-死亡(Wait-Die)算法:一个事务申请另一个事务已经获取的锁的时候,如果持有锁的事务年轻,那么申请锁的事务等待(wait);如果持有锁的事务年老,那么申请锁的事务退避并且死亡(die)。
(2) 4.19版本开始支持伤害-等待(Wound-Wait)算法:一个事务申请另一个事务已经获取的锁的时候,如果持有锁的事务年轻,那么申请锁的事务伤害(wound)持有锁的事务,请求它去死亡;如果持有锁的事务年老,那么申请锁的事务等待(wait)。
假设进程1和进程2分别在2个处理器上运行,进程1获取锁A,进程2获取锁B,然后进程1申请锁B,进程2申请锁A。假设进程1的门票编号比进程2的门票编号小,也就是进程1年老,进程2年轻。
两种算法都是公平的,因为其中一个事务最终会成功。和等待-死亡算法相比,伤害-等待算法生成的退避少,但是从一次退避恢复的时候要做更多的工作。伤害-等待算法是一种抢占性的算法(因为事务被其它事务伤害),需要一种可靠的方法来选择受伤状态和抢占正在运行的事务。在伤害-等待算法中,一个事务在受伤后死亡(返回“-EDEADLK”),就认为这个事务被抢占。
如果竞争锁的进程少,并且希望减少回滚的次数,那么应该选择伤害-等待算法。
和普通的互斥锁相比,伤害/等待互斥锁增加了下面2个概念。
(1) 获取上下文(acquire context):一个获取上下文表示一个事务,关联一张门票(ticket),门票也称为序列号,门票编号小表示年老,门票编号大表示年轻。获取上下文跟踪调试状态,捕获对伤害/等待互斥锁接口的错误使用。
(2) 伤害/等待类:初始化获取上下文的时候需要指定锁类,锁类会给获取上下文分配门票。锁类也指定算法:等待-死亡(Wait-Die)或伤害-等待(Wound-Wait)。当多个进程竞争同一个锁集合的时候,它们必须使用相同的锁类。
有3种获取伤害/等待互斥锁的函数,如下。
(1) 普通的获取锁函数ww_mutex_lock(),带有获取上下文。
(2) 进程在回滚(即释放所有已经获取的锁)以后,使用慢路径获取锁函数ww_mutex_lock_slow()获取正在竞争的锁。带有“_slow”后缀的函数不是必需的,因为可以调用函数ww_mutex_lock()获取正在竞争的锁。带有“_slow”后缀的函数的优点是接口安全,如下。
- 函数ww_mutex_lock()有一个整数返回值,而函数ww_mutex_lock_slow()没有返回值。
- 当开启调试的时候,函数ww_mutex_lock_slow()检查所有已经获取的锁已经被释放,并且确保进程阻塞在正在竞争的锁上面。
(3) 只获取一个伤害/等待互斥锁,和获取普通的互斥锁完全相同。调用函数ww_mutex_lock(),把获取上下文指定为空指针。
伤害/等待互斥锁的使用方法如下。
(1) 定义一个锁类,锁类在初始化获取上下文的时候需要,锁类也指定算法:等待-死亡(Wait-Die)或伤害-等待(Wound-Wait)。
/* 指定等待-死亡算法 */
static DEFINE_WD_CLASS(my_class);
/* 指定伤害-等待算法 */
static DEFINE_WW_CLASS(my_class);
(2) 初始化一个获取上下文,锁类会给获取上下文分配一张门票。
void ww_acquire_init(struct ww_acquire_ctx *ctx, struct ww_class *ww_class);
(3) 获取锁,返回0表示获取成功,返回“-EDEADLK”表示检测出死锁。
int ww_mutex_lock(struct ww_mutex *lock, struct ww_acquire_ctx *ctx);
(4) 获取需要的所有锁以后,标记获取阶段结束。目前这个函数没有执行任何操作,但是将来可能改变。
void ww_acquire_done(struct ww_acquire_ctx *ctx);
(5) 释放锁。
void ww_mutex_unlock(struct ww_mutex *lock);
(6) 释放所有锁以后,释放获取上下文。
void ww_acquire_fini(struct ww_acquire_ctx *ctx);
下面是一个例子,注意:调用函数ww_mutex_lock()申请锁失败以后,应该先释放已经获取的锁,然后调用慢路径函数ww_mutex_lock_slow()获取正在竞争的锁,最后获取其它锁。重新开始申请锁的时候必须改变申请顺序,因为如果按照原来的顺序申请锁,那么会把刚释放的锁抢回来。
/* 第1步:定义锁类,指定伤害-等待算法。*/
static DEFINE_WW_CLASS(ww_class);
struct obj {
struct ww_mutex lock;
/* obj data */
};
struct obj_entry {
struct list_head head;
struct obj *obj;
};
int lock_objs(struct list_head *list, struct ww_acquire_ctx *ctx)
{
struct obj *res_obj = NULL;
struct obj_entry *contended_entry = NULL;
struct obj_entry *entry;
int ret;
/* 第2步:初始化获取上下文。*/
ww_acquire_init(ctx, &ww_class);
/* 第3步:获取锁。*/
retry:
list_for_each_entry(entry, list, head) {
if (entry->obj == res_obj) {
res_obj = NULL;
continue;
}
ret = ww_mutex_lock(&entry->obj->lock, ctx);
if (ret < 0) {
contended_entry = entry;
goto err;
}
}
/* 第4步:标记获取阶段结束。*/
ww_acquire_done(ctx);
return 0;
err:
/* 回滚,释放已经获取的锁。*/
list_for_each_entry_continue_reverse(entry, list, head) {
ww_mutex_unlock(&entry->obj->lock);
}
if (res_obj) {
ww_mutex_unlock(&res_obj->lock);
}
if (ret == -EDEADLK) {
/* 使用慢路径获取锁函数获取正在竞争的锁。*/
ww_mutex_lock_slow(&contended_entry->obj->lock, ctx);
res_obj = contended_entry->obj;
/* 获取其它锁。*/
goto retry;
}
ww_acquire_fini(ctx);
return ret;
}
void unlock_objs(struct list_head *list, struct ww_acquire_ctx *ctx)
{
struct obj_entry *entry;
/* 第5步:释放锁。*/
list_for_each_entry (entry, list, head) {
ww_mutex_unlock(&entry->obj->lock);
}
/* 第6步:释放获取上下文。*/
ww_acquire_fini(ctx);
}