Golang1.17源码分析之mutex-007
Golang1.17 学习笔记007
源代码:sync/mutex.go
数据结构:
const (
// 锁标识位(state的最后一位)
// Mutex.state & mutexLocked==1表示已经上锁;Mutex.state & mutexLocked==0表示已经未锁
mutexLocked = 1 << iota // mutex is locked
// 是否存在运行中的协程(state的倒数第二位)
// Mutex.state & mutexWoken==1表示存在运行中的协程;Mutex.state & mutexWoken==0表示不存在运行中的协程
mutexWoken
// 饥饿状态位(state的倒数第三位)
// Mutex.state & mutexStarving==1表示饥饿;Mutex.state & mutexStarving==0表示不饥饿
mutexStarving
// 等待者数量偏移量(值为3)
// 根据 mutex.state >> mutexWaiterShift 得到当前阻塞的goroutine数目,最多可以阻塞2^29个goroutine
mutexWaiterShift = iota
//值为1e6纳秒,也就是1毫秒,当等待队列中队首goroutine等待时间超过starvationThresholdNs也就是1毫秒,mutex进入饥饿模式
starvationThresholdNs = 1e6
)
type Mutex struct {
//锁有两种模式:正常模式和饥饿模式
// 如果一个goroutine获取到了锁之后,它会判断以下两种情况:
// 1. 它是队列中最后一个goroutine;
// 2. 它拿到锁所花的时间小于1ms;
// 以上只要有一个成立,它就会把锁转变回正常模式。
state int32 //表示互斥锁的状态,比如是否被锁定等。
sema uint32 //表示信号量,协程阻塞等待该信号量,解锁的协程释放信号量从而唤醒等待信号量的协程
}
加锁:
func (m *Mutex) Lock() {
// 如果 m.state == 0 那么 m.state = 1 && return true
// 如果当前没有被锁,那么锁上并返回true
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 已经被其他协程持有,进入自旋阶段
m.lockSlow()
}
func (m *Mutex) lockSlow() {
// 用来存当前goroutine等待的时间
var waitStartTime int64
// 用来存当前goroutine是否饥饿
starving := false
// 用来存当前goroutine是否已唤醒
awoke := false
// 用来存当前goroutine的循环次数(想一想一个goroutine如果循环了2147483648次咋办……)
iter := 0
// 复制一下当前锁的状态
old := m.state
// 自旋
for {
// 如果是饥饿情况之下,就不要自旋了,因为锁会直接交给队列头部的goroutine
// 如果锁是被获取状态,并且满足自旋条件(canSpin见后文分析),那么就自旋等锁
// 伪代码:if isLocked() and isNotStarving() and canSpin()
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 自旋的过程中如果发现state还没有设置woken标识,则设置它的woken标识,并标记自己为被唤醒
// 这样当Unlock的时候就不会去唤醒其它被阻塞的goroutine了
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 进行自旋(分析见后文)
runtime_doSpin()
iter++
// 更新锁的状态(有可能在自旋的这段时间之内锁的状态已经被其它goroutine改变)
old = m.state
continue
}
// 当走到这一步的时候,可能会有以下的情况:
// 1. 锁被获取+饥饿
// 2. 锁被获取+正常
// 3. 锁空闲+饥饿
// 4. 锁空闲+正常
// goroutine的状态可能是唤醒以及非唤醒
// 复制一份当前的状态,目的是根据当前状态设置出期望的状态,存在new里面,
// 并且通过CAS来比较以及更新锁的状态
// old用来存锁的当前状态
new := old
// 如果说锁不是饥饿状态,就把期望状态设置为被获取(获取锁)
// 也就是说,如果是饥饿状态,就不要把期望状态设置为被获取
// 新到的goroutine乖乖排队去
// 伪代码:if isNotStarving()
if old&mutexStarving == 0 {
// 伪代码:newState = locked
new |= mutexLocked
}
// 如果锁是被获取状态,或者饥饿状态
// 就把期望状态中的等待队列的等待者数量+1(实际上是new + 8)
// (会不会可能有三亿个goroutine等待拿锁……)
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果当前goroutine已经处于饥饿状态, 并且old state的已被加锁,
// 将new state的状态标记为饥饿状态, 将锁转变为饥饿状态.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 如果说当前goroutine是被唤醒状态,我们需要reset这个状态
// 因为goroutine要么是拿到锁了,要么是进入sleep了
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 这句就是把new设置为非唤醒状态
// &^的意思是and not
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果说old状态不是饥饿状态也不是被获取状态
// 那么代表当前goroutine已经通过CAS成功获取了锁
// (能进入这个代码块表示状态已改变,也就是说状态是从空闲到被获取)
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
// 如果说之前没有等待过,就初始化设置现在的等待时间
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 既然获取锁失败了,就使用sleep原语来阻塞当前goroutine
// 通过信号量来排队获取锁
// 如果是新来的goroutine,就放到队列尾部
// 如果是被唤醒的等待锁的goroutine,就放到队列头部
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 这里sleep完了,被唤醒
// 如果当前goroutine已经是饥饿状态了
// 或者当前goroutine已经等待了1ms(在上面定义常量)以上
// 就把当前goroutine的状态设置为饥饿
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果说锁现在是饥饿状态,就代表现在锁是被释放的状态,当前goroutine是被信号量所唤醒的
// 也就是说,锁被直接交给了当前goroutine
if old&mutexStarving != 0 {
// 如果说当前锁的状态是被唤醒状态或者被获取状态,或者说等待的队列为空
// 那么是不可能的,肯定是出问题了,因为当前状态肯定应该有等待的队列,锁也一定是被释放状态且未唤醒
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 当前goroutine获取锁,waiter数量-1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果当前goroutine非饥饿状态,或者说当前goroutine是队列中最后一个goroutine
// 那么就退出饥饿模式,把状态设置为正常
if !starving || old>>mutexWaiterShift == 1 {
// 在这里这么做至关重要,还要考虑等待时间。
// 饥饿模式是非常低效率的,一旦两个goroutine将互斥锁切换为饥饿模式,它们便可以无限锁
delta -= mutexStarving
}
// 原子性地加上改动的状态
atomic.AddInt32(&m.state, delta)
break
}
// 如果锁不是饥饿模式,就把当前的goroutine设为被唤醒
// 并且重置iter(重置spin)
awoke = true
iter = 0
} else {
// 如果CAS不成功,也就是说没能成功获得锁,锁被别的goroutine获得了或者锁一直没被释放
// 那么就更新状态,重新开始循环尝试拿锁
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
判断是否可以自旋:runtime/proc.go: sync_runtime_canSpin
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
// 这里的active_spin是个常量,值为4
// 简单来说,sync.Mutex是有可能被多个goroutine竞争的,所以不应该大量自旋(消耗CPU)
// 自旋的条件如下:
// 1. 自旋次数小于active_spin(这里是4)次;
// 2. 在多核机器上;
// 3. GOMAXPROCS > 1并且至少有一个其它的处于运行状态的P;
// 4. 当前P没有其它等待运行的G;
// 满足以上四个条件才可以进行自旋。
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
解锁:
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 这里获取到锁的状态,然后将状态减去被获取的状态(也就是解锁),称为new(期望)状态
// 注意以上两个操作是原子的,所以不用担心多个goroutine并发的问题
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
// 如果说,期望状态加上被获取的状态,不是被获取的话
// 那么就panic
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 如果说new状态(也就是锁的状态)不是饥饿状态
if new&mutexStarving == 0 {
old := new
for {
// 如果说锁没有等待拿锁的goroutine
// 或者锁被获取了(在循环的过程中被其它goroutine获取了)
// 或者锁是被唤醒状态(表示有goroutine被唤醒,不需要再去尝试唤醒其它goroutine)
// 或者锁是饥饿模式(会直接转交给队列头的goroutine)
// 那么就直接返回,啥都不用做了
// 也就是没有等待的goroutine, 或者锁不处于空闲的状态,直接返回.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
/
// 走到这一步的时候,说明锁目前还是空闲状态,并且没有goroutine被唤醒且队列中有goroutine等待拿锁
// 将等待的goroutine数减一,并设置woken标识
new = (old - 1<<mutexWaiterShift) | mutexWoken
// 设置新的state, 这里通过信号量会唤醒一个阻塞的goroutine去获取锁.
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 饥饿模式下, 直接将锁的拥有权传给等待队列中的第一个.
// 注意此时state的mutexLocked还没有加锁,唤醒的goroutine会设置它。
// 在此期间,如果有新的goroutine来请求锁, 因为mutex处于饥饿状态, mutex还是被认为处于锁状态,
// 新来的goroutine不会把锁抢过去.
runtime_Semrelease(&m.sema, true, 1)
}
}
mutex 模式
-
normal:协程如果加锁不成功不会立即转入阻塞排队,而是判断是否满足自旋的条件,如果满足则会启动自旋过程,尝试抢锁
-
starvation:自旋过程中能抢到锁,一定意味着同一时刻有协程释放了锁,我们知道释放锁时如果发现有阻塞等待的协程,
还会释放一个信号量来唤醒一个等待协程,被唤醒的协程得到CPU后开始运行,此时发现锁已被抢占了,自己只好再次阻塞,
不过阻塞前会判断自上次阻塞到本次阻塞经过了多长时间,如果超过1ms的话,会将Mutex标记为”饥饿”模式,然后再阻塞。
处于饥饿模式下,不会启动自旋过程,也即一旦有协程释放了锁,那么一定会唤醒协程,被唤醒的协程将会成功获取锁,
同时也会把等待计数减1
Woken状态
Woken状态用于加锁和解锁过程的通信,举个例子,同一时刻,两个协程一个在加锁,一个在解锁,在加锁的协程可能在自旋过程中,
此时把Woken标记为1,用于通知解锁协程不必释放信号量了,好比在说:你只管解锁好了,不必释放信号量,我马上就拿到锁了
参考文献:
《Go专家编程》之mutex
https://www.purewhite.io/2019/03/28/golang-mutex-source/
https://www.cnblogs.com/ricklz/p/14535653.html