WaitGroup
针对场景
需要多个子Goroutine执行任务,主Goroutine需要等待子Goroutine执行完后才能继续执行
源码解读
type WaitGroup struct {
noCopy noCopy //辅助字段,辅助vet工具检测是否有复制使用
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
state1 [3]uint32 //8字节为state(高32位是计数值、低32位是waiter数),4字节为sema信号量
}
type noCopy struct{}
// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
// 返回state、sema的指针
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { //地址是64位对齐
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else { //地址是32位对齐
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32) //原子操作增加计数值
v := int32(state >> 32) //add后的计数值
w := uint32(state) //waiter数
if v < 0 { //计数值小于0
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) { //Add、Wait并发调用
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 { //计数大于0或者waiter数为0,不需要唤醒waiter,直接返回
return
}
//Add后计数为0并且有waiter
if *statep != state { //Add、Wait并发调用
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 重置waiter数为0
*statep = 0
for ; w != 0; w-- {
//唤醒waiter
runtime_Semrelease(semap, false, 0)
}
}
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32) //计数值
w := uint32(state) //waiter数
if atomic.CompareAndSwapUint64(statep, state, state+1) { //waiter数新增1
runtime_Semacquire(semap) //阻塞等待
if *statep != 0 { //这里waiter被Add方法唤醒前,会重置state为0,这里不为0说明wg被重用了,panic
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
Add(delta int)方法:给计数值增加delta,因为Done也是调用Add,所以增加后的计数值等于0并且waiter数不为0时需要唤醒waiter
Add中的异常情况检查:
- 计数值新增后小于0,那么panic
- Add、Wait并发调用会panic
Wait方法:waiter数增加1,然后阻塞等待
Wait中的异常情况检查:
- 如果wg在Wait方法返回前被重用那么panic
noCopy类型实现了Locker接口,vet工具就是对实现了Locker接口的类型进行静态检查,看是否存在复制值使用的情况
常见踩坑
计数器设置为负值
两种情况:
- 调用Add传入负值
- Done方法调用次数过多
正确使用姿势:创建完WaitGroup后,直接调用Add方法传入预期的waiter数,然后调用相同次数的Done;或者在每个goroutine创建前调用Add(1),每个goroutine执行完后调用Done
不期望的Add时机
要保证所有的Add都在Wait前调用,使用上面的姿势就没问题
上一轮的Wait方法还没返回就重用WaitGroup
package main
import "sync"
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
println("doing things")
wg.Done()
wg.Add(1) //这里想重用wg,但是如果和第13行并发那么会panic
}()
wg.Wait()
}
WaitGroup虽然可以重用,但是要等上一轮的Wait方法返回后才能重用,否则就可能出现panic