Go 语言有提供了多个包来将多个 goroutine 的生命周期编组管理。最简单的是标准库的 sync .WaitGroup,应用比较普遍的是 google 的 errgroup,Prometheus 用的是 oklog 的 run。下面学习后两个包的用法。
errgroup
errgroup 为作为一个任务中的多个子任务的多组 goroutine 提供了同步、错误传播和取消上下文。
Package errgroup provides synchronization, error propagation, and Context cancelation for groups of goroutines working on subtasks of a common task.
Group 结构体是一组 goroutine 的集合。零值 Group 是有效的,在 error 的时候不会 cancel。使用方法如下:
1. 提供错误处理的 sync.WaitGroup
WaitGroup 只管理 goroutine 同步,如果发生错误要自己处理,errgroup 提供了错误处理的能力,不过仅能记录多个 goroutine 中第一个出错的。
// 初始化 Group 实例作为组
g := new(errgroup.Group)
// 启动组中的多个 goroutine,第一次返回非空的 error 会 cancel 整个组并返回这个 error。
g.Go(func() error{})
g.Go(func() error{})
// 阻塞等待组完成,如果有错误,获取第一个 error
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
}
2. 传入 context.Context
WithContext(ctx context.Context)
返回的 Group
在传入 Go
中的多个 goroutine 中第一次出错或第一次 Wait
返回时 cancel。
run
跟 errgroup 类似,run.Group
创建以后通过 Add()
方法添加 goroutine。其有两个函数参数,第一个是执行函数,第二个是中断函数,调用中断函数的时候会使执行函数返回。调用 Run(0)
函数的时候并发执行所有 goroutine,并阻塞等待知道第一个 goroutine 退出,调用中断函数,待全部 goroutine 返回后返回到调用者。run 与 errgroup 相似,除了它不需要 goroutine 理解 context 语义。
示例1
package main
import (
"errors"
"fmt"
"time"
"github.com/oklog/run"
)
func main() {
var g run.Group
{
cancel := make(chan struct{})
g.Add(func() error {
for {
select {
case <-time.After(time.Second):
fmt.Println("the first actor had its time elapsed 1 second")
// return nil
case <-cancel:
fmt.Println("The first actor was canceled")
return nil
}
}
}, func(err error) {
fmt.Printf("the first actor was interrupted with: %v\n", err)
close(cancel)
})
}
{
g.Add(func() error {
time.Sleep(2 * time.Second)
fmt.Println("the second actor is returning after 2 second")
return errors.New("second actor teardown")
}, func(err error) {
// 在对应的执行函数返回后,这个取消函数也会被执行
fmt.Printf("the second actor was interrupted with %v\n", err)
})
}
g.Run()
}
$ go run main.go
the first actor had its time elapsed 1 second
the second actor is returning after 2 second
the first actor was interrupted with: second actor teardown
the second actor was interrupted with second actor teardown
The first actor was canceled
run.Group 中每个 Add 的成员是由执行函数和取消函数组成的一对函数,某一个执行函数返回错误以后,将执行它对应的取消函数,同时其他的取消函数。
示例2 Context
package main
import (
"context"
"fmt"
"github.com/oklog/run"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
var g run.Group
{
ctx1, cancel := context.WithCancel(ctx)
g.Add(func() error {
return runUntilCanceled(ctx1)
}, func(error) {
cancel()
})
}
go cancel()
fmt.Printf("The group was terminated with: %v\n", g.Run())
}
func runUntilCanceled(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
}
The group was terminated with: context canceled