WaitGroup源码解读

WaitGroup

针对场景

需要多个子Goroutine执行任务,主Goroutine需要等待子Goroutine执行完后才能继续执行

源码解读

type WaitGroup struct {
	noCopy noCopy    //辅助字段,辅助vet工具检测是否有复制使用

	// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
	// 64-bit atomic operations require 64-bit alignment, but 32-bit
	// compilers do not ensure it. So we allocate 12 bytes and then use
	// the aligned 8 bytes in them as state, and the other 4 as storage
	// for the sema.
	state1 [3]uint32     //8字节为state(高32位是计数值、低32位是waiter数),4字节为sema信号量
}

type noCopy struct{}
// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

// 返回state、sema的指针
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
	if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {    //地址是64位对齐
		return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
	} else {     //地址是32位对齐
		return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
	}
}

func (wg *WaitGroup) Add(delta int) {
	statep, semap := wg.state()
	state := atomic.AddUint64(statep, uint64(delta)<<32)     //原子操作增加计数值
	v := int32(state >> 32)    //add后的计数值
	w := uint32(state)    //waiter数
	if v < 0 {     //计数值小于0
		panic("sync: negative WaitGroup counter")
	}
	if w != 0 && delta > 0 && v == int32(delta) {      //Add、Wait并发调用
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	if v > 0 || w == 0 {  //计数大于0或者waiter数为0,不需要唤醒waiter,直接返回
		return
	}

	//Add后计数为0并且有waiter
	if *statep != state {    //Add、Wait并发调用
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// 重置waiter数为0 
	*statep = 0
	for ; w != 0; w-- {
		//唤醒waiter
		runtime_Semrelease(semap, false, 0)
	}
}

func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

func (wg *WaitGroup) Wait() {
	statep, semap := wg.state()
	for {
		state := atomic.LoadUint64(statep)
		v := int32(state >> 32)    //计数值
		w := uint32(state)    //waiter数
		if atomic.CompareAndSwapUint64(statep, state, state+1) {     //waiter数新增1
			runtime_Semacquire(semap)    //阻塞等待
			if *statep != 0 {     //这里waiter被Add方法唤醒前,会重置state为0,这里不为0说明wg被重用了,panic
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			return
		}
	}
}

Add(delta int)方法:给计数值增加delta,因为Done也是调用Add,所以增加后的计数值等于0并且waiter数不为0时需要唤醒waiter
Add中的异常情况检查:

  • 计数值新增后小于0,那么panic
  • Add、Wait并发调用会panic

Wait方法:waiter数增加1,然后阻塞等待
Wait中的异常情况检查:

  • 如果wg在Wait方法返回前被重用那么panic

noCopy类型实现了Locker接口,vet工具就是对实现了Locker接口的类型进行静态检查,看是否存在复制值使用的情况

常见踩坑

计数器设置为负值

两种情况:

  1. 调用Add传入负值
  2. Done方法调用次数过多

正确使用姿势:创建完WaitGroup后,直接调用Add方法传入预期的waiter数,然后调用相同次数的Done;或者在每个goroutine创建前调用Add(1),每个goroutine执行完后调用Done

不期望的Add时机

要保证所有的Add都在Wait前调用,使用上面的姿势就没问题

上一轮的Wait方法还没返回就重用WaitGroup

package main

import "sync"

func main() {
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		println("doing things")
		wg.Done()
		wg.Add(1) //这里想重用wg,但是如果和第13行并发那么会panic
	}()
	wg.Wait()
}

WaitGroup虽然可以重用,但是要等上一轮的Wait方法返回后才能重用,否则就可能出现panic

公众号关注

WaitGroup源码解读

上一篇:Golang等待子协程执行完毕\go 主协程退出其他协程也会退出


下一篇:Go 缓冲信道和工作池