007-Golang1.17源码分析之mutex

Golang1.17源码分析之mutex-007

Golang1.17 学习笔记007

源代码:sync/mutex.go

数据结构:

const (

    // 锁标识位(state的最后一位) 
    // Mutex.state & mutexLocked==1表示已经上锁;Mutex.state & mutexLocked==0表示已经未锁
	mutexLocked = 1 << iota // mutex is locked
	
	// 是否存在运行中的协程(state的倒数第二位)
    // Mutex.state & mutexWoken==1表示存在运行中的协程;Mutex.state & mutexWoken==0表示不存在运行中的协程
	mutexWoken
	
	// 饥饿状态位(state的倒数第三位)
    // Mutex.state & mutexStarving==1表示饥饿;Mutex.state & mutexStarving==0表示不饥饿
	mutexStarving
	
	// 等待者数量偏移量(值为3)
    // 根据 mutex.state >> mutexWaiterShift 得到当前阻塞的goroutine数目,最多可以阻塞2^29个goroutine
	mutexWaiterShift = iota
	
	//值为1e6纳秒,也就是1毫秒,当等待队列中队首goroutine等待时间超过starvationThresholdNs也就是1毫秒,mutex进入饥饿模式
	starvationThresholdNs = 1e6
)
type Mutex struct {
    //锁有两种模式:正常模式和饥饿模式
    // 如果一个goroutine获取到了锁之后,它会判断以下两种情况:
	// 1. 它是队列中最后一个goroutine;
	// 2. 它拿到锁所花的时间小于1ms;
	// 以上只要有一个成立,它就会把锁转变回正常模式。
	state int32     //表示互斥锁的状态,比如是否被锁定等。
	sema  uint32    //表示信号量,协程阻塞等待该信号量,解锁的协程释放信号量从而唤醒等待信号量的协程
}

加锁:

func (m *Mutex) Lock() {
	// 如果 m.state == 0 那么 m.state = 1 && return true
	// 如果当前没有被锁,那么锁上并返回true
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	
	// 已经被其他协程持有,进入自旋阶段
	m.lockSlow()
}

func (m *Mutex) lockSlow() {
	// 用来存当前goroutine等待的时间
	var waitStartTime int64
	// 用来存当前goroutine是否饥饿
	starving := false
	// 用来存当前goroutine是否已唤醒
	awoke := false
	// 用来存当前goroutine的循环次数(想一想一个goroutine如果循环了2147483648次咋办……)
	iter := 0
	// 复制一下当前锁的状态
	old := m.state
	
	// 自旋
	for {
		// 如果是饥饿情况之下,就不要自旋了,因为锁会直接交给队列头部的goroutine
		// 如果锁是被获取状态,并且满足自旋条件(canSpin见后文分析),那么就自旋等锁
		// 伪代码:if isLocked() and isNotStarving() and canSpin()
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// 自旋的过程中如果发现state还没有设置woken标识,则设置它的woken标识,并标记自己为被唤醒
			// 这样当Unlock的时候就不会去唤醒其它被阻塞的goroutine了
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			// 进行自旋(分析见后文)
			runtime_doSpin()
			iter++
			// 更新锁的状态(有可能在自旋的这段时间之内锁的状态已经被其它goroutine改变)
			old = m.state
			continue
		}
		
		// 当走到这一步的时候,可能会有以下的情况:
		// 1. 锁被获取+饥饿
		// 2. 锁被获取+正常
		// 3. 锁空闲+饥饿
		// 4. 锁空闲+正常
		
		// goroutine的状态可能是唤醒以及非唤醒
		
		// 复制一份当前的状态,目的是根据当前状态设置出期望的状态,存在new里面,
		// 并且通过CAS来比较以及更新锁的状态
		// old用来存锁的当前状态
		new := old
		
		
		// 如果说锁不是饥饿状态,就把期望状态设置为被获取(获取锁)
		// 也就是说,如果是饥饿状态,就不要把期望状态设置为被获取
		// 新到的goroutine乖乖排队去
		// 伪代码:if isNotStarving()
		if old&mutexStarving == 0 {
		    // 伪代码:newState = locked
			new |= mutexLocked
		}
		
		// 如果锁是被获取状态,或者饥饿状态
		// 就把期望状态中的等待队列的等待者数量+1(实际上是new + 8)
		// (会不会可能有三亿个goroutine等待拿锁……)
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		
		// 如果当前goroutine已经处于饥饿状态, 并且old state的已被加锁,
		// 将new state的状态标记为饥饿状态, 将锁转变为饥饿状态.
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
		
		// 如果说当前goroutine是被唤醒状态,我们需要reset这个状态
		// 因为goroutine要么是拿到锁了,要么是进入sleep了
		if awoke {
			// The goroutine has been woken from sleep,
			// so we need to reset the flag in either case.
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			
			// 这句就是把new设置为非唤醒状态
			// &^的意思是and not
			new &^= mutexWoken
		}
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
		    // 如果说old状态不是饥饿状态也不是被获取状态
			// 那么代表当前goroutine已经通过CAS成功获取了锁
			// (能进入这个代码块表示状态已改变,也就是说状态是从空闲到被获取)
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
			
			// If we were already waiting before, queue at the front of the queue.
			queueLifo := waitStartTime != 0
			
			// 如果说之前没有等待过,就初始化设置现在的等待时间
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			
			// 既然获取锁失败了,就使用sleep原语来阻塞当前goroutine
			// 通过信号量来排队获取锁
			// 如果是新来的goroutine,就放到队列尾部
			// 如果是被唤醒的等待锁的goroutine,就放到队列头部
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			
			// 这里sleep完了,被唤醒
			
			// 如果当前goroutine已经是饥饿状态了
			// 或者当前goroutine已经等待了1ms(在上面定义常量)以上
			// 就把当前goroutine的状态设置为饥饿
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			
			// 如果说锁现在是饥饿状态,就代表现在锁是被释放的状态,当前goroutine是被信号量所唤醒的
			// 也就是说,锁被直接交给了当前goroutine
			if old&mutexStarving != 0 {
				
				// 如果说当前锁的状态是被唤醒状态或者被获取状态,或者说等待的队列为空
				// 那么是不可能的,肯定是出问题了,因为当前状态肯定应该有等待的队列,锁也一定是被释放状态且未唤醒
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				
				// 当前goroutine获取锁,waiter数量-1
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				
				// 如果当前goroutine非饥饿状态,或者说当前goroutine是队列中最后一个goroutine
				// 那么就退出饥饿模式,把状态设置为正常
				if !starving || old>>mutexWaiterShift == 1 {
					// 在这里这么做至关重要,还要考虑等待时间。
					// 饥饿模式是非常低效率的,一旦两个goroutine将互斥锁切换为饥饿模式,它们便可以无限锁
					delta -= mutexStarving
				}
				// 原子性地加上改动的状态
				atomic.AddInt32(&m.state, delta)
				break
			}
			// 如果锁不是饥饿模式,就把当前的goroutine设为被唤醒
			// 并且重置iter(重置spin)
			awoke = true
			iter = 0
		} else {
	    	// 如果CAS不成功,也就是说没能成功获得锁,锁被别的goroutine获得了或者锁一直没被释放
			// 那么就更新状态,重新开始循环尝试拿锁
			old = m.state
		}
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

判断是否可以自旋:runtime/proc.go: sync_runtime_canSpin

// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
	// 这里的active_spin是个常量,值为4
	// 简单来说,sync.Mutex是有可能被多个goroutine竞争的,所以不应该大量自旋(消耗CPU)
	// 自旋的条件如下:
	// 1. 自旋次数小于active_spin(这里是4)次;
	// 2. 在多核机器上;
	// 3. GOMAXPROCS > 1并且至少有一个其它的处于运行状态的P;
	// 4. 当前P没有其它等待运行的G;
	// 满足以上四个条件才可以进行自旋。
	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

解锁:

func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// 这里获取到锁的状态,然后将状态减去被获取的状态(也就是解锁),称为new(期望)状态
	// 注意以上两个操作是原子的,所以不用担心多个goroutine并发的问题
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

func (m *Mutex) unlockSlow(new int32) {
    // 如果说,期望状态加上被获取的状态,不是被获取的话
	// 那么就panic
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
	
	// 如果说new状态(也就是锁的状态)不是饥饿状态
	if new&mutexStarving == 0 {
		old := new
		for {
		    // 如果说锁没有等待拿锁的goroutine
			// 或者锁被获取了(在循环的过程中被其它goroutine获取了)
			// 或者锁是被唤醒状态(表示有goroutine被唤醒,不需要再去尝试唤醒其它goroutine)
			// 或者锁是饥饿模式(会直接转交给队列头的goroutine)
			// 那么就直接返回,啥都不用做了
			// 也就是没有等待的goroutine, 或者锁不处于空闲的状态,直接返回.
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			/
			// 走到这一步的时候,说明锁目前还是空闲状态,并且没有goroutine被唤醒且队列中有goroutine等待拿锁
			// 将等待的goroutine数减一,并设置woken标识
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			
			// 设置新的state, 这里通过信号量会唤醒一个阻塞的goroutine去获取锁.
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// 饥饿模式下, 直接将锁的拥有权传给等待队列中的第一个.
		// 注意此时state的mutexLocked还没有加锁,唤醒的goroutine会设置它。
		// 在此期间,如果有新的goroutine来请求锁, 因为mutex处于饥饿状态, mutex还是被认为处于锁状态,
		// 新来的goroutine不会把锁抢过去.
		runtime_Semrelease(&m.sema, true, 1)
	}
}

mutex 模式

  1. normal:协程如果加锁不成功不会立即转入阻塞排队,而是判断是否满足自旋的条件,如果满足则会启动自旋过程,尝试抢锁

  2. starvation:自旋过程中能抢到锁,一定意味着同一时刻有协程释放了锁,我们知道释放锁时如果发现有阻塞等待的协程,
    还会释放一个信号量来唤醒一个等待协程,被唤醒的协程得到CPU后开始运行,此时发现锁已被抢占了,自己只好再次阻塞,
    不过阻塞前会判断自上次阻塞到本次阻塞经过了多长时间,如果超过1ms的话,会将Mutex标记为”饥饿”模式,然后再阻塞。
    处于饥饿模式下,不会启动自旋过程,也即一旦有协程释放了锁,那么一定会唤醒协程,被唤醒的协程将会成功获取锁,
    同时也会把等待计数减1

Woken状态

Woken状态用于加锁和解锁过程的通信,举个例子,同一时刻,两个协程一个在加锁,一个在解锁,在加锁的协程可能在自旋过程中,
此时把Woken标记为1,用于通知解锁协程不必释放信号量了,好比在说:你只管解锁好了,不必释放信号量,我马上就拿到锁了

参考文献:
《Go专家编程》之mutex
https://www.purewhite.io/2019/03/28/golang-mutex-source/
https://www.cnblogs.com/ricklz/p/14535653.html

上一篇:vue 前端实现上传录音功能


下一篇:ES基本命令使用