2.管道
简介
Golang的原子并发特性使得它很容易构造流数据管道,这使得Golang可有效的使用I/O和多CPU特性。本文提出一些关于管道的示
例,在这个过程中突出了操作失败的微妙之处和介绍处理失败的具体技术。
什么是管道
在Golang对于管道没有明确的定义;它只是许多种并发程序中的一种。管道是通道连接的一系列阶段, 每个阶段是一组
goroutine运行相同的功能。在每个阶段,goroutine运行步骤为:
从上游经过入境通道接受值
对数据执行一些功能操作,通常会产生新的值
从下游经过出境通道发送值
除了开始和最后阶段只有一个入境通道或者一个出境通道外,其他每个阶段有任意数量的入境通道和出境通道,。开始阶段有时
又称为源或者生产者;最后一个阶段又称为sink或者消费者。
我们将开始一个简单的示例来解释管道的思想和技术。稍后,我们将展示更多相关的例子。
平方数
一个通道有三个阶段。
第一阶段:gen,以从列表读出整数的方式转换整数列表到一个通道。gen函数开始goroutine后, 在通道上发送整数并且在在所
有的值被发送完后将通道关闭:
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
第二阶段:sq,从通道接受整数,然后将接受到的每个整数值的平方后返回到一个通道 。在入境通道关闭和发送所有下行值的阶
段结束后,关闭出口通道:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
main函数建立了通道并运行最后一个阶段:它接受来自第二阶段的值并打印出每个值,直到通道关闭:
func main() {
// Set up the pipeline.
c := gen(2, 3)
out := sq(c)
// Consume the output.
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}
由于sq有相同类型的入境和出境通道,我们可以写任意次。我们也可以重写main函数,像其他阶段一样做一系列循环 :
func main() {
// Set up the pipeline and consume the output.
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n) // 16 then 81
}
}
扇出,扇入
扇出(fan-out):多个函数能从相同的通道中读数据,直到通道关闭;这提供了一种在一组“人员”中分发任务的方式,使得
CPU和I/O的并行处理.
扇入(fan-in):一个函数能从多个输入中读取并处理数据,而这多个输入通道映射到一个单通道,该单通道随着所有输入的结
束而关闭。
我们可以改变通道去运行两个sq实例,每个实例从相同的输入通道读取数据。我们引入了一个新函数merge去扇入结果:
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the merged output from c1 and c2.
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
merge函数通过为每一个入境通道开启一个goroutine去复制数值到唯一的出境通道,从而实现了转换通道列表到一个单通道 。一
旦所有的output goroutine启动,所有在通道上的发送完成后merge函数开启一个以上的goroutine用于关闭出境通道。
在一个关闭的通道上发送没有意义,所以在关闭之前确保所有的发送完成是重要的。sync.WaitGroup类型提供了一个简单的方法
去组织同步:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
短暂停止
管道函数模型:
当所有的发送操作结束后, 阶段关闭他们的出境通道。
阶段持续接收来自入境通道的值,直到那些通道关闭。
这个模型允许每一个接收阶段通过range循环的写数据,确保一旦所有向下游发送的值发送成功,所有的goroutine退出。
但在一个真实的管道上,阶段并不总是接收所有的入境值。有时设计是这样的:接收者可能只需要一个子集值就能取得进展。更
多时候是一个阶段早早的退出,因为一个入境值代表一个早期阶段的错误。 在这两种情况下接收者不应该等待剩余的值到达,我
们想要早期阶段停止产生后续阶段不需要的值。
在我们的示例中,如果一个阶段不能处理所有的入境值,那么试图发送这些值得goroutine将无限期的阻塞:
// Consume the first value from output.
out := merge(c1, c2)
fmt.Println(<-out) // 4 or 9
return
// Since we didn't receive the second value from out,
// one of the output goroutines is hung attempting to send it.
}
这是一个资源锁:goroutine消耗内存和运行资源,并且在goroutine栈中的堆引用防止数据被回收。Goroutine不能垃圾回收;它
们必须自己退出。
当下游阶段在接收所有的入境值失败后,我们需要安排管道的上游阶段退出。一种实现方法是将出境通道改为一个缓冲区。该缓
冲区能保存固定数量的值;如果缓冲区有空闲就立即发送操作完成信号:
c := make(chan int, 2) // buffer size 2
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1
当在通道创建就预先知道待发送的数值个数时,通过使用缓冲区可以简化代码。例如,我们可以重写 gen 来将整数列表复制到
带有缓冲区的通道中,也可以避免创建一个新的 goroutine:
func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
回到管道中处于阻塞状态的 goroutine,我们可以考虑为 merge 返回的出境通道增加一个缓冲区:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1) // enough space for the unread inputs
// ... the rest is unchanged ...
尽管它修正了程序中 goroutine 的阻塞问题,但却不能称为好代码。在这里,缓冲区大小选取为 1,取决于预知 merge 将会接
收的数值个数及下游各阶段将会消费的数值个数。这很脆弱:如果我们给 gen 多传了一个数值,或者下游阶段少读了一些数值,
goroute 的阻塞问题会再次出现。
作为代替,我们需要为下游各阶段提供一种手段,来向发送方表明指明它们将停止接收数据的输入。
显式取消
当main没有接受完out所有的值就决定退出时,它必须告知上游状态(upstream stage)的goroutines,让它丢弃正在发送中的数据
。通过在一个叫做done的channel上发送数据,即可实现。例子里有两个受阻的发送方,所以发送的值有两组:
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the first value from output.
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// Tell the remaining senders we're leaving.
done <- struct{}{}
done <- struct{}{}
}
使用select语句,让发送中的goroutines取代了发送操作。这条语句既可以处理在发送out的情形,也可以处理从done中接受一个
值的情况。done的值类型是空结构,因为它的数值并不重要:它是一个接受事件,表明out的发送应该被丢弃。output goroutines
继续在channel c内循环运行,而不会阻塞上游状态(upstream stage):
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed or it receives a value
// from done, then output calls wg.Done.
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}
// ... the rest is unchanged ...
但是这种方法有个问题:下游的接收者需要知道潜在会被阻塞的上游发送者的数量。追踪这些数量不仅枯燥,还容易出错。
我们要有一个方法告知一个未知的、无限数量的go程序向下游发送它们的值。在GO里面我们通过关闭一个通道来实现,因为一个
在已关闭通道上的接收操作总能立即执行,并返回该元素类型的零值。
这意味着main函数只需关闭“done”通道就能开启所有发送者。close实际上是传给发送者的一个广播信号。我们扩展每一个管道
函数接收“done”参数并通过一个“defer”语句触发“close”,这样所有来自main的返回路径都会以信号通知管道退出。
func main() {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)
// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// done will be closed by the deferred call.
}
管道里的每个状态现在都可以随意的提早退出了:sq可以在它的循环中退出,因为我们知道如果done已经被关闭了,也会关闭上
游的gen状态。sq通过defer语句,保证不管从哪个返回路径,它的outchannel 都会被关闭。
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
下面列出了构建管道的指南:
状态会在所有发送操作做完后,关闭它们的流出 channel
状态会持续接收从流入 channel 输入的数值,直到 channel 关闭或者其发送者被释放。
管道要么保证足够能存下所有发送数据的缓冲区,要么接收来自接收者明确的要放弃 channel 的信号,来保证释放发送者。
让我们考虑一个更真实的管道情况。
MD5摘要生成算法在校验文件时非常有用。命令行工具md5sum会生成文件的摘要的列表。
% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我们的样例程序就像md5sum差不多,不一样的是它会以文件夹作为参数,并打印每个文件夹内文件的索引值,按路径名排序。
% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
程序的主函数调用辅助函数MD5ALL,它会返回一个map,将路径名对应到索引值,之后排序并打印结果。
func main() {
// Calculate the MD5 sum of all files under the specified directory,
// then print the results sorted by path name.
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}
MD5ALL函数是我们要讨论的。在serial.go中,它的实现没有使用并发,只是简单地遍历文件树,读取文件并生成摘要。
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
m[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return m, nil
}
I在parallel.go里,我们把MD5All分解为两个状态的管道。第一个状态,sumFiles,遍历目录,在一个新的 Goroutine 里对每个
文件做摘要,并把结果发送到类型为result的 channel:
type result struct {
path string
sum [md5.Size]byte
err error
}
sumFiles返回两个 channel:一个用来传递result,另一个用来返回filepath.Walk的错误。遍历函数启动一个新的 Goroutine
来处理每个常规文件,之后检查done。如果done已经被关闭了,遍历就立刻停止:
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// For each regular file, start a goroutine that sums the file and sends
// the result on c. Send the result of the walk on errc.
c := make(chan result)
errc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// Abort the walk if done is closed.
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// Walk has returned, so all calls to wg.Add are done. Start a
// goroutine to close c once all the sends are done.
go func() {
wg.Wait()
close(c)
}()
// No select needed here, since errc is buffered.
errc <- err
}()
return c, errc
}
MD5All从c接收所有的摘要值。MD5All返回早先的错误,通过defer关闭done:
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
受限的并发
在parallel.go里实现的MD5All对每个文件启动一个新的 Goroutine。如果目录里含有很多大文件,这可能会导致申请大量内存,
超出机器上的可用内存。
我们可以通过控制并行读取的文件数量来限制内存的申请。在bounded.go,我们创建固定数量的用于读取文件的 Goroutine,来
限制内存使用。现在整个管道有三个状态:遍历树,读取并对文件做摘要,收集摘要值。
第一个状态,walkFiles,发送树里的每个常规文件的路径:
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() {
// Close the paths channel after Walk returns.
defer close(paths)
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
})
}()
return paths, errc
}
中间的状态启动固定数量的digesterGoroutine,从paths接收文件名,并将结果result发送到 channelc:
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
不像之前的例子,digester并不关闭输出 channel,因为多个 Goroutine 会发送到共享的 channel。另一边,MD5All中的代码会
在所有digester完成后关闭 channel:
// Start a fixed number of goroutines to read and digest files.
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
我们也可以让每个digester创建并返回自己的输出 channel,但是这就需要一个单独的 Goroutine 来扇入所有结果。
最终从c收集到所有结果result,并检查从errc传入的错误。这个错误的检查不能提早,因为在这个时间点之前,walkFiles可能
会因为正在发送消息给下游而阻塞:
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// Check whether the Walk failed.
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
这篇文章展示了使用Go构建流数据管道的技术。要慎重处理这种管道产生的错误,因为管道里的每个状态都可能因为向下游发送
数值而阻塞,而下游的状态却不再关心输入的数据。我们展示了如何将关闭channel作为“完成”信号广播给所有由管道启动的
Goroutine,并且定义了正确构建管道的指南。
进一步阅读:
Go并发模式(视频)展示了Go的并发特性的基础知识,并演示了应用这些知识的方法。
高级Go并发模式(视频)覆盖了关于Go特性更复杂的使用场景,尤其是select。
Douglas McIlroy的论文《一窥级数数列》展示了Go使用的这类并发技术是如何优雅地支持复杂计算。
相关文章
- 04-20【golang学习笔记】切片
- 04-20Golang之new和make
- 04-20golang_并发安全: slice和map并发不安全及解决方法
- 04-20Golang 切片
- 04-20Golang内置类型和函数
- 04-20Golang数据类型之切片
- 04-20golang快速入门(八)数据类型特别之处(下)
- 04-20golang学习五·五:指针&切片&字典&结构体
- 04-20golang nil切片与空切片的区别
- 04-20golang的make