LiteOS:SpinLock自旋锁及LockDep死锁检测

摘要:除了多核的自旋锁机制,本文会介绍下LiteOS 5.0引入的LockDep死锁检测特性。

2020年12月发布的LiteOS 5.0推出了全新的内核,支持SMP多核调度功能。想学习SMP多核调度功能,需要了解下SpinLock自旋锁。除了多核的自旋锁机制,本文还会介绍下LiteOS 5.0引入的LockDep死锁检测特性。

本文中所涉及的LiteOS源码,均可以在LiteOS开源站点https://gitee.com/LiteOS/LiteOS 获取。

自旋锁SpinLock源代码、开发文档,LockDep死锁检测特性代码文档列表如下:

我们首先来看看自旋锁。

1、SpinLock 自旋锁

在多核环境中,由于使用相同的内存空间,存在对同一资源进行访问的情况,所以需要互斥访问机制来保证同一时刻只有一个核进行操作。自旋锁就是这样的一种机制。

自旋锁是指当一个线程在获取锁时,如果锁已经被其它线程获取,那么该线程将循环等待,并不断判断是否能够成功获取锁,直到获取到锁才会退出循环。因此建议保护耗时较短的操作,防止对系统整体性能有明显的影响。

自旋锁与互斥锁比较类似,它们都是为了解决对共享资源的互斥使用问题。无论是互斥锁,还是自旋锁,在任何时刻,最多只能有一个持有者。但是两者在调度机制上略有不同,对于互斥锁,如果锁已经被占用,锁申请者会被阻塞;但是自旋锁不会引起调用者阻塞,会一直循环检测自旋锁是否已经被释放。自旋锁用于多核不同CPU核对资源的互斥访问,互斥锁用于同一CPU核内不同任务对资源的互斥访问。

自旋锁SpinLock核心的代码都在kernel\include\los_spinlock.h头文件中,包含struct Spinlock结构体定义、一些inline内联函数LOS_SpinXXX,还有一些LockDep死锁检测相关的宏定义LOCKDEP_XXXX。

1.1 Spinlock 自旋锁结构体

自旋锁结构体Spinlock定义如下,主要的成员变量为size_t rawLock,这是自旋锁是否占用持有的成功的标记:为0时,锁没有被持有,为1时表示被成功持有。当开启LockDep死锁检测调测特性时,会使能另外3个成员变量,记录持有自旋锁的CPU核信息、任务信息。

struct Spinlock {
    size_t      rawLock;            /**< 原始自旋锁 */
#ifdef LOSCFG_KERNEL_SMP_LOCKDEP
    UINT32      cpuid;              /**< 死锁检测特性开启时,持有自旋锁的CPU核 */
    VOID        *owner;             /**< 死锁检测特性开启时,持有自旋锁的任务的TCB指针 */
    const CHAR  *name;              /**< 死锁检测特性开启时,持有自旋锁的任务的名称 */
#endif
};

1.2 Spinlock 自旋锁常用函数接口

LiteOS自旋锁模块为用户提供下面几种功能,包含自旋锁初始化,申请/释放,查询自旋锁状态等。自旋锁相关的函数、宏定义只支持SMP - Symmetric MultiProcessor模式,当单核UP - UniProcessor时,函数不生效。接口详细信息可以查看API参考。

1.2.1 自旋锁初始化

自旋锁初始化的内联函数如下,其中参数SPIN_LOCK_S *lock,即自旋锁结构体指针,其中SPIN_LOCK_S是Spinlock的typedef别名,在kernel\include\los_lockdep.h文件中定义的。

自旋锁初始时,会把自旋锁标记为0:lock->rawLock = 0,当开启死锁检测特性时,也会做相应的初始化。

LITE_OS_SEC_ALW_INLINE STATIC INLINE VOID LOS_SpinInit(SPIN_LOCK_S *lock)
{
    lock->rawLock     = 0;
#ifdef LOSCFG_KERNEL_SMP_LOCKDEP
    lock->cpuid       = (UINT32)-1;
    lock->owner       = SPINLOCK_OWNER_INIT;
    lock->name        = "spinlock";
#endif
}

LOS_SpinInit()是动态初始化的自旋锁,LiteOS还提供了静态初始化自旋锁的方法SPIN_LOCK_INIT(lock):

#define SPIN_LOCK_INIT(lock)  SPIN_LOCK_S lock = SPIN_LOCK_INITIALIZER(lock)

1.2.2 申请/释放自旋锁

初始化自旋锁后,可以以SPIN_LOCK_S *lock为参数申请、释放自旋锁。自旋锁的这些函数中,调用的LOCKDEP_开头函数是死锁检测的函数,后文会详细讲述。核心的3个函数由汇编语言编写,这些汇编函数存,根据不同的CPU架构,可以在文件arch\arm\cortex_a_r\src\spinlock.S或arch\arm64\src\spinlock.S中查看,此文不再详细讲述其汇编代码。

ArchSpinLock(&lock->rawLock);  // 汇编语言编写的 申请自旋锁的函数
ArchSpinUnlock(&lock->rawLock); // 汇编语言编写的 释放自旋锁的函数
ArchSpinTrylock(&lock->rawLock); // 汇编语言编写的 尝试申请自旋锁的函数
  • STATIC INLINE VOID LOS_SpinLock(SPIN_LOCK_S *lock) 申请自旋锁

该函数尝试申请自旋锁,如果自旋锁锁被其他核占用,则循环等待,直至其他核释放自旋锁。

我们看下代码首先执行⑴处代码,暂停任务调度,然后执行汇编函数ArchSpinLock(&lock->rawLock)申请自旋锁。

LITE_OS_SEC_ALW_INLINE STATIC INLINE VOID LOS_SpinLock(SPIN_LOCK_S *lock)
{
⑴  LOS_TaskLock();
    LOCKDEP_CHECK_IN(lock);
⑵  ArchSpinLock(&lock->rawLock);
    LOCKDEP_RECORD(lock);
}
  • STATIC INLINE VOID LOS_SpinUnlock(SPIN_LOCK_S *lock) 释放自旋锁

释放自旋锁LOS_SpinUnlock(SPIN_LOCK_S *lock)需要和申请自旋锁的函数LOS_SpinLock(SPIN_LOCK_S *lock)成对使用。执行⑴处汇编函数ArchSpinUnlock(&lock->rawLock)释放自旋锁,然后执行⑵恢复任务调度功能。

LITE_OS_SEC_ALW_INLINE STATIC INLINE VOID LOS_SpinUnlock(SPIN_LOCK_S *lock)
{
    LOCKDEP_CHECK_OUT(lock);
⑴  ArchSpinUnlock(&lock->rawLock);
⑵  LOS_TaskUnlock();
}
  • STATIC INLINE INT32 LOS_SpinTrylock(SPIN_LOCK_S *lock) 尝试申请自旋锁

尝试申请指定的自旋锁,如果无法获取锁,直接返回失败,而不会一直循环等待。用户根据返回值,判断是否成功申请到自旋锁,然后再做后续业务处理。和函数LOS_SpinLock(SPIN_LOCK_S *lock)执行的汇编函数不同,该函数调用的汇编函数为ArchSpinTrylock(&lock->rawLock),并有返回值。

LITE_OS_SEC_ALW_INLINE STATIC INLINE INT32 LOS_SpinTrylock(SPIN_LOCK_S *lock)
{
    LOS_TaskLock();
    LOCKDEP_CHECK_IN(lock);
⑴  INT32 ret = ArchSpinTrylock(&lock->rawLock);
    if (ret == LOS_OK) {
        LOCKDEP_RECORD(lock);
    }
    return ret;
}

1.2.3 申请/释放自旋锁(同时进行关中断保护)

LiteOS 还提供一对支持关中断保护的申请/释放指定自旋锁的函数,除了参数SPIN_LOCK_S *lock,还需要参数UINT32 *intSave用于关中断、恢复中断。LOS_SpinLockSave()和LOS_SpinUnlockRestore()必须成对使用。

  • STATIC INLINE VOID LOS_SpinLockSave(SPIN_LOCK_S *lock, UINT32 *intSave) 关中断后,再申请指定的自旋锁值

从代码中,可以看出首先执行LOS_IntLock()关中断,然后再调用LOS_SpinLock(lock)申请自旋锁。

LITE_OS_SEC_ALW_INLINE STATIC INLINE VOID LOS_SpinLockSave(SPIN_LOCK_S *lock, UINT32 *intSave)
{
    *intSave = LOS_IntLock();
    LOS_SpinLock(lock);
}
  • STATIC INLINE VOID LOS_SpinUnlockRestore(SPIN_LOCK_S *lock, UINT32 *intSave) 关中断后,再申请指定的自旋锁值。

从代码中,可以看出首先调用LOS_SpinUnlock(lock)释放自旋锁,然后再调用LOS_IntRestore(intSave)恢复中断。

LITE_OS_SEC_ALW_INLINE STATIC INLINE VOID LOS_SpinUnlockRestore(SPIN_LOCK_S *lock, UINT32 intSave)

{
    LOS_SpinUnlock(lock);
    LOS_IntRestore(intSave);
}

1.2.4 获取自旋锁持有状态

可以使用函数BOOL LOS_SpinHeld(const SPIN_LOCK_S *lock)查询自旋锁的持有状态,返回TRUE,自旋锁锁被持有,返回FALSE时表示没有被持有:

LITE_OS_SEC_ALW_INLINE STATIC INLINE BOOL LOS_SpinHeld(const SPIN_LOCK_S *lock)
{
    return (lock->rawLock != 0);
}

2、LockDep 死锁检测调测特性

LockDep是Lock Dependency Check的缩写,是内核的一种死锁检测机制。这个调测特性默认是关闭的,如果需要该调测特性,需要使能宏定义LOSCFG_KERNEL_SMP_LOCKDEP。当检测到死锁错误时,会打印发生死锁的自旋锁的相关信息,打印backtrace回溯栈信息。

2.1 LockDep 自旋锁的错误类型及结构体定义

在文件kernel\include\los_lockdep.h中定义了死锁的枚举类型LockDepErrType及HeldLocks结构体。

自旋锁的错误类型有double lock重复申请锁、dead lock死锁、unlock without lock释放未持有的锁、lockdep overflow死锁检测溢出,超出定义的MAX_LOCK_DEPTH。

结构体LockDep是任务LosTaskCB结构体的开启LOSCFG_KERNEL_SMP_LOCKDEP时的一个成员变量,记录该任务持有的自旋锁、需要申请的自旋锁的信息。结构体HeldLocks记录持有的自旋锁的详细信息,各个成员变量见如下注释:

typedef struct Spinlock SPIN_LOCK_S;

#define MAX_LOCK_DEPTH  16U

enum LockDepErrType {
    LOCKDEP_SUCEESS = 0,
    LOCKDEP_ERR_DOUBLE_LOCK, // double lock 重复申请锁
    LOCKDEP_ERR_DEAD_LOCK,  // dead lock 死锁
    LOCKDEP_ERR_UNLOCK_WITHOUT_LOCK, // unlock without lock 释放未持有的锁
    LOCKDEP_ERR_OVERFLOW, // lockdep overflow 死锁检测溢出
};

typedef struct {
    VOID *lockPtr; // Spinlock 自旋锁的内存地址
    VOID *lockAddr; // 请求锁的函数的返回地址
    UINT64 waitTime; // 抢占申请自旋锁的等待时间
    UINT64 holdTime;  // 持有自旋锁的时间
} HeldLocks;

typedef struct {
    VOID *waitLock; // 任务申请占用的自旋锁Spinlock
    INT32 lockDepth; // 自旋锁的深度
    HeldLocks heldLocks[MAX_LOCK_DEPTH]; // 持有的自旋锁详细信息数组
} LockDep;

2.2 LockDep 死锁检测的常用函数接口

LockDep 死锁检测特性提供了3个函数接口,在申请自旋锁前、成功申请到自旋锁后、释放自旋锁后打点调用。另外,提供了一些其他常用函数接口。

我们先看下,死锁检测函数如何记录等待时间waitTime、持有时间holdTime的。在申请自旋锁前调用OsLockDepCheckIn(),记录waitTime的起点;成功申请到自旋锁后,调用OsLockDepRecord()记录waitTime的结束点,同时记录记录holdTime的起点;释放自旋锁后调用OsLockDepCheckOut()记录holdTime的结束点。如图所示:

LiteOS:SpinLock自旋锁及LockDep死锁检测

2.2.1 OsLockDepCheckIn(const SPIN_LOCK_S *lock) 记录申请自旋锁

我们一起分析下代码,看看申请自旋锁前死锁检测特性做了哪些操作。⑴处代码获取请求自旋锁的函数返回地址。⑵获取当前任务的TCB,然后获取它的死锁检测成员LockDep *lockDep。⑶、⑽处两个函数配对使用,前者先关中断,然后等待、占用死锁检测特性、设置STATIC Atomic g_lockdepAvailable为0,后者释放锁检测特性,设置STATIC Atomic g_lockdepAvailable为1,然后恢复中断。

⑷处代码判断当前任务持有的自旋锁是否超过死锁检测特性设置的自旋锁数量的最大值MAX_LOCK_DEPTH,如果超过,则报溢出错误,跳转到OUT继续执行。⑸处代码,如果申请的自旋锁没有被任何CPU核持有,可以直接占有,无需等待,跳转到OUT继续执行。⑹处代码,如果申请的自旋锁被当前任务持有,则报重复申请自旋锁错误,跳转到OUT继续执行。⑺处判断是否发生死锁,稍后再分析函数OsLockDepCheckDependancy()。

⑻处代码,如果检测结果通过,可以持有自旋锁,则记录相关信息,包含要申请的自旋锁、申请锁的函数返回地址、申请自旋锁的开始时间。否则执行⑼处代码,输出死锁错误信息。

VOID OsLockDepCheckIn(const SPIN_LOCK_S *lock)
{
    UINT32 intSave;
    enum LockDepErrType checkResult = LOCKDEP_SUCEESS;
⑴  VOID *requestAddr = (VOID *)__builtin_return_address(0);
⑵  LosTaskCB *current = OsCurrTaskGet();
    LockDep *lockDep = &current->lockDep;
    LosTaskCB *lockOwner = NULL;
    if (lock == NULL) {
        return;
    }
⑶   OsLockDepRequire(&intSave);
⑷  if (lockDep->lockDepth >= (INT32)MAX_LOCK_DEPTH) {
        checkResult = LOCKDEP_ERR_OVERFLOW;
        goto OUT;
    }
    lockOwner = lock->owner;
⑸  if (lockOwner == SPINLOCK_OWNER_INIT) {
        goto OUT;
    }
⑹  if (current == lockOwner) {
        checkResult = LOCKDEP_ERR_DOUBLE_LOCK;
        goto OUT;
    }
⑺  if (OsLockDepCheckDependancy(current, lockOwner) != TRUE) {
        checkResult = LOCKDEP_ERR_DEAD_LOCK;
        goto OUT;
    }
OUT:
⑻  if (checkResult == LOCKDEP_SUCEESS) {
        lockDep->waitLock = (SPIN_LOCK_S *)lock;
        lockDep->heldLocks[lockDep->lockDepth].lockAddr = requestAddr;
        lockDep->heldLocks[lockDep->lockDepth].waitTime = OsLockDepGetCycles(); /* start time */
    } else {
⑼      OsLockDepDumpLock(current, lock, requestAddr, checkResult);
    }
⑽  OsLockDepRelease(intSave);
}

我们再分析下死锁检测的函数OsLockDepCheckDependancy(),循环判断嵌套申请的自旋锁是否会发生死锁,包含2个参数,第一个参数是申请自旋锁的任务LosTaskCB *current,第二个参数为持有自旋锁的任务LosTaskCB *lockOwner:

⑴处代码,如果申请自旋锁的任务和持有锁的任务同一个,则发生死锁。⑵处代码,如果持有自旋锁的任务,还在申请其他自旋锁,则把lockOwner指向其他自旋锁的任务TCB,否则退出循环。⑶如果自旋锁被占用则一直循环。

STATIC BOOL OsLockDepCheckDependancy(const LosTaskCB *current, const LosTaskCB *lockOwner)
{
    BOOL checkResult = TRUE;
    const SPIN_LOCK_S *lockTemp = NULL;
    do {
⑴      if (current == lockOwner) {
            checkResult = FALSE;
            return checkResult;
        }
⑵      if (lockOwner->lockDep.waitLock != NULL) {
            lockTemp = lockOwner->lockDep.waitLock;
            lockOwner = lockTemp->owner;
        } else {
            break;
        }
⑶  } while (lockOwner != SPINLOCK_OWNER_INIT);
    return checkResult;
}

死锁检测TCB、LockDep、Spinlock关系示意图:

LiteOS:SpinLock自旋锁及LockDep死锁检测

2.2.2 OsLockDepRecord(const SPIN_LOCK_S *lock) 记录申请到的自旋锁

我们继续分析,当申请自旋锁后,死锁检测特性做了哪些操作。⑴处代码获取系统运行以来的cycle数目,然后计算waitTime,即从开始申请自旋锁到申请到自旋锁之前的cycle数目,同时记录持有自旋锁的holdTime的开始时间。⑵处代码更新自旋锁的信息,锁被当前任务持有,CPU核设置为当前核。⑶处更新死锁检测lockDep的信息,持有锁的数目加1,等待锁置空。

VOID OsLockDepRecord(SPIN_LOCK_S *lock)
{
    UINT32 intSave;
    UINT64 cycles;
    LosTaskCB *current = OsCurrTaskGet();
    LockDep *lockDep = &current->lockDep;
    HeldLocks *heldlock = &lockDep->heldLocks[lockDep->lockDepth];
    if (lock == NULL) {
        return;
    }
    OsLockDepRequire(&intSave);
⑴  cycles = OsLockDepGetCycles();
    heldlock->waitTime = cycles - heldlock->waitTime;
    heldlock->holdTime = cycles;
⑵  lock->owner = current;
    lock->cpuid = ArchCurrCpuid();
⑶  heldlock->lockPtr = lock;
    lockDep->lockDepth++;
    lockDep->waitLock = NULL;
    OsLockDepRelease(intSave);
}

2.2.3 OsLockDepCheckOut(const SPIN_LOCK_S *lock) 记录释放自旋锁

我们再分析下,当释放自旋锁后,死锁检测特性做了哪些操作。⑴处代码表示,当释放一个没有占用的自旋锁,会调用函数OsLockDepDumpLock()打印死锁检测错误信息。⑵处代码先获取持有锁的任务TCB的死锁检测变量lockDep,然后获取其持有锁数组的起始地址,即指针变量heldlocks。⑶获取持有锁的数目,然后执行⑷,对持有的锁进行循环遍历,定位到自旋锁*lock的数组索引,再执行⑸处代码更新持有锁的总时间。

⑹处代码,判断如果释放的锁,不是任务持有锁数组的最后一个,则移动数组后面的元素,数组元素也需要减少1。最后,执行⑺更新自旋锁的没有被任何CPU核、任何任务占用。

VOID OsLockDepCheckOut(SPIN_LOCK_S *lock)
{
    UINT32 intSave;
    INT32 depth;
    VOID *requestAddr = (VOID *)__builtin_return_address(0);  
    LosTaskCB *current = OsCurrTaskGet();
    LosTaskCB *owner = NULL;
    LockDep *lockDep = NULL;
    HeldLocks *heldlocks = NULL;
    if (lock == NULL) {
        return;
    }
    OsLockDepRequire(&intSave);
    owner = lock->owner;
⑴  if (owner == SPINLOCK_OWNER_INIT) {
        OsLockDepDumpLock(current, lock, requestAddr, LOCKDEP_ERR_UNLOCK_WITHOUT_LOCK);
        goto OUT;
    }
    lockDep = &owner->lockDep;
⑵  heldlocks = &lockDep->heldLocks[0];
⑶  depth = lockDep->lockDepth;
    while (depth-- >= 0) {
⑷      if (heldlocks[depth].lockPtr == lock) {
            break;
        }
    }
    LOS_ASSERT(depth >= 0);
⑸  heldlocks[depth].holdTime = OsLockDepGetCycles() - heldlocks[depth].holdTime;
⑹  while (depth < lockDep->lockDepth - 1) {
        lockDep->heldLocks[depth] = lockDep->heldLocks[depth + 1];
        depth++;
    }
    lockDep->lockDepth--;
⑺  lock->cpuid = (UINT32)(-1);
    lock->owner = SPINLOCK_OWNER_INIT;

OUT:
    OsLockDepRelease(intSave);
}

2.2.4 OsLockdepClearSpinlocks(VOID) 释放持有的自旋锁

该函数OsLockdepClearSpinlocks()会全部释放当前任务持有的自旋锁。在arch\arm\cortex_a_r\src\fault.c文件中,异常处理函数OsExcHandleEntry()通过调用LOCKDEP_CLEAR_LOCKS()实现对该函数的调用。

⑴处代码获取当前任务死锁检测变量lockDep,然后⑵处循环变量持有的自旋锁,获取自旋锁并调用LOS_SpinUnlock()进行释放。

VOID OsLockdepClearSpinlocks(VOID)
{
    LosTaskCB *task = OsCurrTaskGet();
⑴  LockDep *lockDep = &task->lockDep;
    SPIN_LOCK_S *lock = NULL;
    while (lockDep->lockDepth) {
⑵      lock = lockDep->heldLocks[lockDep->lockDepth - 1].lockPtr;
        LOS_SpinUnlock(lock);
    }
}

小结

本文带领大家一起剖析了SpinLock自旋锁,LockDep死锁检测特性的源代码,结合讲解,参考官方示例程序代码,自己写写程序,实际编译运行一下,加深理解。

感谢阅读,如有任何问题、建议,都可以留言给我们: https://gitee.com/LiteOS/LiteOS/issues 。为了更容易找到LiteOS代码仓,建议访问 https://gitee.com/LiteOS/LiteOS ,关注Watch、点赞Star、并Fork到自己账户下,如下图,谢谢。

LiteOS:SpinLock自旋锁及LockDep死锁检测

本文分享自华为云社区《LiteOS内核源码分析系列二 SpinLock自旋锁及LockDep死锁检测》,原文作者:zhushy。

 

点击关注,第一时间了解华为云新鲜技术~

上一篇:鸿蒙内核源码分析(任务管理篇) | 谁在让CPU忙忙碌碌? | 百篇博客分析HarmonyOS源码 | v5.05


下一篇:聊聊LiteOS事件模块的结构体、初始化及常用操作