go中的数据结构通道-channel

1. channel的使用

  很多文章介绍channel的时候都和并发揉在一起,这里我想把它当做一种数据结构来单独介绍它的实现原理。

  channel,通道。golang中用于数据传递的一种数据结构。是golang中一种传递数据的方式,也可用作事件通知。

1.1 声明、传值、关闭

  使用chan关键字声明一个通道,在使用前必须先创建,操作符 <- 用于指定通道的方向,发送或接收。如果未指定方向,则为双向通道。

 //声明和创建
var ch chan int // 声明一个传递int类型的channel
ch := make(chan int) // 使用内置函数make()定义一个channel
ch2 := make(chan interface{}) // 创建一个空接口类型的通道, 可以存放任意格式 type Equip struct{ /* 一些字段 */ }
ch2 := make(chan *Equip) // 创建Equip指针类型的通道, 可以存放*Equip //传值
ch <- value // 将一个数据value写入至channel,这会导致阻塞,直到有其他goroutine从这个channel中读取数据
value := <-ch // 从channel中读取数据,如果channel之前没有写入数据,也会导致阻塞,直到channel中被写入数据为止 ch := make(chan interface{}) // 创建一个空接口通道
ch <- // 将0放入通道中
ch <- "hello" // 将hello字符串放入通道中 //关闭
close(ch) // 关闭channel

  把数据往通道中发送时,如果接收方一直都没有接收,那么发送操作将持续阻塞。Go 程序运行时能智能地发现一些永远无法发送成功的语句并报错:

fatal error: all goroutines are asleep - deadlock!
//运行时发现所有的 goroutine(包括main)都处于等待 goroutine。

1.2 四种重要的通道使用方式

无缓冲通道

  通道默认是无缓冲的,无缓冲通道上的发送操作将会被阻塞,直到有其他goroutine从对应的通道上执行接收操作,数据传送完成,通道继续工作。

package main

import (
"fmt"
"time"
)
var done chan bool
func HelloWorld() {
fmt.Println("Hello world goroutine")
time.Sleep(*time.Second)
done <- true
}
func main() {
done = make(chan bool) // 创建一个channel
go HelloWorld()
<-done
}
 //输出
//Hello world goroutine

  由于main不会等goroutine执行结束才返回,前文专门加了sleep输出为了可以看到goroutine的输出内容,那么在这里由于是阻塞的,所以无需sleep。

  将代码中”done <- true”和”<-done”,去掉再执行,没有上面的输出内容。

管道

  通道可以用来连接goroutine,一边的输入是另一边输出。这就叫做管道:

go中的数据结构通道-channel

 package main

 import (
"fmt"
"time"
)
var echo chan string
var receive chan string // 定义goroutine 1
func Echo() {
time.Sleep(*time.Second)
echo <- "这是一次测试"
} // 定义goroutine 2
func Receive() {
temp := <- echo // 阻塞等待echo的通道的返回
receive <- temp
} func main() {
echo = make(chan string)
receive = make(chan string) go Echo()
go Receive() getStr := <-receive // 接收goroutine 2的返回 fmt.Println(getStr)
}

  输出字符串:"这是一次测试"。

  在这里不一定要去关闭channel,因为底层的垃圾回收机制会根据它是否可以访问来决定是否自动回收它。(这里不是根据channel是否关闭来决定的)

单向通道类型
 package main

 import (
"fmt"
"time"
) // 定义goroutine 1
func Echo(out chan<- string) { // 定义输出通道类型
time.Sleep(*time.Second)
out <- "这又是一次测试"
close(out)
} // 定义goroutine 2
func Receive(out chan<- string, in <-chan string) { // 定义输出通道类型和输入类型
temp := <-in // 阻塞等待echo的通道的返回
out <- temp
close(out)
} func main() {
echo := make(chan string)
receive := make(chan string) go Echo(echo)
go Receive(receive, echo) getStr := <-receive // 接收goroutine 2的返回 fmt.Println(getStr)
}

  输出:这又是一次测试。

缓冲管道

  goroutine的通道默认是是阻塞的,那么有什么办法可以缓解阻塞? 答案是:加一个缓冲区。

  创建一个缓冲通道:

 ch := make(chan string, ) // 创建了缓冲区为3的通道

 //==
len(ch) // 长度计算
cap(ch) // 容量计算

  缓冲通道传递数据示意图:

go中的数据结构通道-channel

2. 内部结构

  Go语言channel是first-class的,意味着它可以被存储到变量中,可以作为参数传递给函数,也可以作为函数的返回值返回。作为Go语言的核心特征之一,虽然channel看上去很高端,但是其实它仅仅就是一个数据结构而已,具体定义在 $GOROOT/src/runtime/chan.go里。如下:

 type hchan struct {
qcount uint // 队列中的总数据
dataqsiz uint // 循环队列的大小
buf unsafe.Pointer // 指向dataqsiz元素数组 指向环形队列
elemsize uint16 //
closed uint32
elemtype *_type // 元素类型
sendx uint // 发送索引
recvx uint // 接收索引
recvq waitq // 接待员名单, 因recv而阻塞的等待队列。
sendq waitq // 发送服务员列表, 因send而阻塞的等待队列。
//锁定保护hchan中的所有字段,以及几个在此通道上阻止的sudogs中的字段。
  //按住此锁定时不要更改另一个G的状态(尤其是不要准备G),因为这可能会导致死锁堆栈缩小。
lock mutex
}

  其核心是存放channel数据的环形队列,由qcount和elemsize分别指定了队列的容量和当前使用量。dataqsize是队列的大小。elemalg是元素操作的一个Alg结构体,记录下元素的操作,如copy函数,equal函数,hash函数等。

  如果是带缓冲区的chan,则缓冲区数据实际上是紧接着Hchan结构体中分配的。不带缓冲的 channel ,环形队列 size 则为 0。

 c = (Hchan*)runtime.mal(n + hint*elem->size);

  另一重要部分是recvq和sendq两个双向链表,前者是等待读通道(<-channel)的goroutine队列,后者是等待写通道(channel <- xxx)的goroutine队列。若一个goroutine阻塞于channel了,那么它就被挂在recvq或sendq队列中。WaitQ是链表的定义,包含一个头结点和一个尾结点:

 struct    WaitQ
{
SudoG* first;
SudoG* last;
};

  队列中的每个成员是一个SudoG结构体变量:

 struct    SudoG
{
G* g; // g和selgen构成
uint32 selgen; // 指向g的弱指针
SudoG* link;
int64 releasetime;
byte* elem; // 数据元素
};

  SudoG里主要结构是一个g和一个elem。elem用于存储goroutine的数据。读通道时,数据会从Hchan的buf队列中拷贝到SudoG的elem域。写通道时,数据则是由SudoG的elem域拷贝到Hchan的队列中。

go中的数据结构通道-channel

  • buf是有缓冲的channel所特有的结构,用来存储缓存数据。是个循环链表
  • sendxrecvx用于记录buf这个循环链表中的发送或者接收的index
  • lock是个互斥锁。

从最基本的开始-创建channel

创建一个缓冲channel

 ch := make(chan int, )  //

底层操作就是从Heap中分配一块内存,在内存中实例化一个hchan的结构体,并返回一个ch指针,使用 channel时,在函数之间的传递就是这个指针,这就是为什么函数传递中无需使用channel的指针,而直接用channel就行了,因为channel本身就是一个指针。  

基本的写channel操作,在底层运行时库中对应的是一个runtime.chansend函数。

 chan <- value
  在运行时库中会执行:
 void runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc)

  其中c就是channel,ep是取变量v的地址。这里的传值约定是调用者负责分配好ep的空间,仅需要简单的取变量地址就够了。pres参数是在select中的通道操作使用的。

  这个函数首先判断是同步或异步。同步chan不带缓冲区,可能写阻塞,而异步chan带缓冲区,只有缓冲区满才阻塞。在同步的情况下,首先查看Hchan结构体中的recvq链表时否为空,即是否有因为读该管道而阻塞的goroutine。如果有则可以正常写channel,否则操作会阻塞。

  recvq不为空时,将一个SudoG结构体出队列,将传给通道的数据(函数参数ep)拷贝到SudoG结构体中的elem域,并将SudoG中的g放到就绪队列中,状态置为ready,然后函数返回。如果recvq为空,将当前goroutine阻塞。此时将一个SudoG结构体挂到通道的sendq链表中,这个SudoG中的elem域是参数eq,SudoG中的g是当前的goroutine。当前goroutine会被设置为waiting状态并挂到等待队列中。

  异步时,如果缓冲区满,要将当前goroutine和数据一起作为SudoG结构体挂在sendq队列中。在channel缓冲区不满的情况,直接将数据放到channel的缓冲区中,调用者返回。

实现细节:

  • 当使用send (ch <- xx)或者recv ( <-ch)的时候,首先要锁住hchan这个结构体。(lock字段);
  • 向缓冲区写数据,按链表顺序存放在buf中,直到缓冲区满;
  • 取数据的时候按链表顺序读取,符合FIFO的原则。

读写操作的细节都可以细化为:

  • 第一,加锁
  • 第二,把数据从goroutine中copy到“队列”中(或者从队列中copy到goroutine中)。
  • 第三,释放锁

  读channel操作也是类似的,对应的函数是runtime.chansend。基本过程类似。

  当协程尝试从未关闭的 channel 中读取数据时,内部的操作如下:

  • 当 buf 非空时,此时 recvq 必为空,buf 弹出一个元素给读协程,读协程获得数据后继续执行,此时若 sendq 非空,则从 sendq 中弹出一个写协程转入 running 状态,待写数据入队列 buf ,此时读取操作 <- ch 未阻塞;
  • 当 buf 为空但 sendq 非空时(不带缓冲的 channel),则从 sendq 中弹出一个写协程转入 running 状态,待写数据直接传递给读协程,读协程继续执行,此时读取操作 <- ch 未阻塞;
  • 当 buf 为空并且 sendq 也为空时,读协程入队列 recvq 并转入 blocking 状态,当后续有其他协程往 channel 写数据时,读协程才会重新转入 running 状态,此时读取操作 <- ch 阻塞。

  类似的,当协程尝试往未关闭的 channel 中写入数据时,内部的操作如下:

  • 当队列 recvq 非空时,此时队列 buf 必为空,从 recvq 弹出一个读协程接收待写数据,此读协程此时结束阻塞并转入 running 状态,写协程继续执行,此时写入操作 ch <- 未阻塞;
  • 当队列 recvq 为空但 buf 未满时,此时 sendq 必为空,写协程的待写数据入 buf 然后继续执行,此时写入操作 ch <- 未阻塞;
  • 当队列 recvq 为空并且 buf 为满时,此时写协程入队列 sendq 并转入 blokcing 状态,当后续有其他协程从 channel 中读数据时,写协程才会重新转入 running 状态,此时写入操作 ch <- 阻塞。

  当关闭 non-nil channel 时,内部的操作如下:

  • 当队列 recvq 非空时,此时 buf 必为空,recvq 中的所有协程都将收到对应类型的零值然后结束阻塞状态;
  • 当队列 sendq 非空时,此时 buf 必为满,sendq 中的所有协程都会产生 panic ,在 buf 中数据仍然会保留直到被其他协程读取。

  空通道是指将一个channel赋值为nil,或者定义后不调用make进行初始化。按照Go语言的语言规范,读写空通道是永远阻塞的。其实在函数runtime.chansend和runtime.chanrecv开头就有判断这类情况,如果发现参数c是空的,则直接将当前的goroutine放到等待队列,状态设置为waiting。

  读一个关闭的通道,永远不会阻塞,会返回一个通道数据类型的零值。这个实现也很简单,将零值复制到调用函数的参数ep中。写一个关闭的通道,则会panic。关闭一个空通道,也会导致panic。

3. channel的高级用法

3.1 条件变量(condition variable)

  类型于 POSIX 接口中线程通知其他线程某个事件发生的条件变量,channel 的特性也可以用来当成协程之间同步的条件变量。因为 channel 只是用来通知,所以 channel 中具体的数据类型和值并不重要,这种场景一般用 struct {} 作为 channel 的类型。

一对一通知

  类似 pthread_cond_signal() 的功能,用来在一个协程中通知另个某一个协程事件发生:

 package main

 import (
"fmt"
"time"
) func main() {
ch := make(chan struct{})
nums := make([]int, ) go func() {
time.Sleep(time.Second)
for i := ; i < len(nums); i++ {
nums[i] = i
}
// send a finish signal
ch <- struct{}{}
}() // wait for finish signal
<-ch
fmt.Println(nums)
}
广播通知

  类似 pthread_cond_broadcast() 的功能。利用从已关闭的 channel 读取数据时总是非阻塞的特性,可以实现在一个协程中向其他多个协程广播某个事件发生的通知:

 package main

 import (
"fmt"
"time"
) func main() {
N :=
exit := make(chan struct{})
done := make(chan struct{}, N) // start N worker goroutines
for i := ; i < N; i++ {
go func(n int) {
for {
select {
// wait for exit signal
case <-exit:
fmt.Printf("worker goroutine #%d exit\n", n)
done <- struct{}{}
return
case <-time.After(time.Second):
fmt.Printf("worker goroutine #%d is working...\n", n)
}
}
}(i)
} time.Sleep( * time.Second)
// broadcast exit signal
close(exit)
// wait for all worker goroutines exit
for i := ; i < N; i++ {
<-done
}
fmt.Println("main goroutine exit")
}

3.2 信号量

  channel 的读/写相当于信号量的 P / V 操作,下面的示例程序中 channel 相当于信号量:

 package main

 import (
"log"
"math/rand"
"time"
) type Seat int
type Bar chan Seat func (bar Bar) ServeConsumer(customerId int) {
log.Print("-> consumer#", customerId, " enters the bar")
seat := <-bar // need a seat to drink
log.Print("consumer#", customerId, " drinks at seat#", seat)
time.Sleep(time.Second * time.Duration(+rand.Intn()))
log.Print("<- consumer#", customerId, " frees seat#", seat)
bar <- seat // free the seat and leave the bar
} func main() {
rand.Seed(time.Now().UnixNano()) bar24x7 := make(Bar, ) // the bar has 10 seats
// Place seats in an bar.
for seatId := ; seatId < cap(bar24x7); seatId++ {
bar24x7 <- Seat(seatId) // none of the sends will block
} // a new consumer try to enter the bar for each second
for customerId := ; ; customerId++ {
time.Sleep(time.Second)
go bar24x7.ServeConsumer(customerId)
}
}

3.3 互斥量

  互斥量相当于二元信号里,所以 cap 为 1 的 channel 可以当成互斥量使用:

 package main

 import "fmt"

 func main() {
mutex := make(chan struct{}, ) // the capacity must be one counter :=
increase := func() {
mutex <- struct{}{} // lock
counter++
<-mutex // unlock
} increase1000 := func(done chan<- struct{}) {
for i := ; i < ; i++ {
increase()
}
done <- struct{}{}
} done := make(chan struct{})
go increase1000(done)
go increase1000(done)
<-done; <-done
fmt.Println(counter) //
}

4. 关闭 channel

  关闭不再需要使用的 channel 并不是必须的。跟其他资源比如打开的文件、socket 连接不一样,这类资源使用完后不关闭后会造成句柄泄露,channel 使用完后不关闭也没有关系,channel 没有被任何协程用到后最终会被 GC 回收。关闭 channel 一般是用来通知其他协程某个任务已经完成了。golang 也没有直接提供判断 channel 是否已经关闭的接口,虽然可以用其他不太优雅的方式自己实现一个:

 func isClosed(ch chan int) bool {
select {
case <-ch:
return true
default:
}
return false
}

  不过实现一个这样的接口也没什么必要。因为就算通过 isClosed() 得到当前 channel 当前还未关闭,如果试图往 channel 里写数据,仍然可能会发生 panic ,因为在调用 isClosed() 后,其他协程可能已经把 channel 关闭了。
关闭 channel 时应该注意以下准则:

  • 不要在读取端关闭 channel ,因为写入端无法知道 channel 是否已经关闭,往已关闭的 channel 写数据会 panic ;
  • 有多个写入端时,不要再写入端关闭 channle ,因为其他写入端无法知道 channel 是否已经关闭,关闭已经关闭的 channel 会发生 panic ;
  • 如果只有一个写入端,可以在这个写入端放心关闭 channel 。

  关闭 channel 粗暴一点的做法是随意关闭,如果产生了 panic 就用 recover 避免进程挂掉。稍好一点的方案是使用标准库的 sync 包来做关闭 channel 时的协程同步,不过使用起来也稍微复杂些。下面介绍一种优雅些的做法。

4.1 一写多读

  这种场景下这个唯一的写入端可以关闭 channel 用来通知读取端所有数据都已经写入完成了。读取端只需要用 for range 把 channel 中数据遍历完就可以了,当 channel 关闭时,for range 仍然会将 channel 缓冲中的数据全部遍历完然后再退出循环:

 package main

 import (
"fmt"
"sync"
) func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, ) send := func() {
for i := ; i < ; i++ {
ch <- i
}
// signal sending finish
close(ch)
} recv := func(id int) {
defer wg.Done()
for i := range ch {
fmt.Printf("receiver #%d get %d\n", id, i)
}
fmt.Printf("receiver #%d exit\n", id)
} wg.Add()
go recv()
go recv()
go recv()
send() wg.Wait()
}

4.2 多写一读

  这种场景下虽然可以用 sync.Once 来解决多个写入端重复关闭 channel 的问题,但更优雅的办法设置一个额外的 channel ,由读取端通过关闭来通知写入端任务完成不要再继续再写入数据了:

 package main

 import (
"fmt"
"sync"
) func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, )
done := make(chan struct{}) send := func(id int) {
defer wg.Done()
for i := ; ; i++ {
select {
case <-done:
// get exit signal
fmt.Printf("sender #%d exit\n", id)
return
case ch <- id* + i:
}
}
} recv := func() {
count :=
for i := range ch {
fmt.Printf("receiver get %d\n", i)
count++
if count >= {
// signal recving finish
close(done)
return
}
}
} wg.Add()
go send()
go send()
go send()
recv() wg.Wait()
}

4.2 多写多读

  这种场景稍微复杂,和上面的例子一样,也需要设置一个额外 channel 用来通知多个写入端和读取端。另外需要起一个额外的协程来通过关闭这个 channel 来广播通知:

 package main

 import (
"fmt"
"sync"
"time"
) func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, )
done := make(chan struct{}) send := func(id int) {
defer wg.Done()
for i := ; ; i++ {
select {
case <-done:
// get exit signal
fmt.Printf("sender #%d exit\n", id)
return
case ch <- id* + i:
}
}
} recv := func(id int) {
defer wg.Done()
for {
select {
case <-done:
// get exit signal
fmt.Printf("receiver #%d exit\n", id)
return
case i := <-ch:
fmt.Printf("receiver #%d get %d\n", id, i)
time.Sleep(time.Millisecond)
}
}
} wg.Add()
go send()
go send()
go send()
go recv()
go recv()
go recv() time.Sleep(time.Second)
// signal finish
close(done)
// wait all sender and receiver exit
wg.Wait()
}

  channle 作为 golang 最重要的特性,用起来还是比较方便的。传统的 C 里要实现类似的功能的话,一般需要用到 socket 或者 FIFO 来实现,另外还要考虑数据包的完整性与并发冲突的问题,channel 则屏蔽了这些底层细节,使用者只需要考虑读写就可以了。 channel 是引用类型,了解一下 channel 底层的机制对更好的使用 channel 还是很用必要的。虽然操作原语简单,但涉及到阻塞的问题,使用不当可能会造成死锁或者无限制的协程创建最终导致进程挂掉。

上一篇:Netty版本升级血泪史之线程篇


下一篇:详解 通道 (Channel 接口)