Golang笔记: channel
定义
不要通过共享内存来通信,而是通过通信来实现内存共享
多个 goroutine 借助 channel 来传输数据,实现了跨 goroutine 间的数据传输,多者独立运行,不需要强关联,更不影响对方的 goroutine 状态。不存在 goroutine1 对 goroutine2 进行直传的情况。
channel 基本特性
分类
channel 共有两种模式,分别是:双向和单向;三种表现方式,分别是:声明双向通道:chan T
、声明只允许发送的通道:chan <- T
、声明只允许接收的通道:<- chan T
。
channel 中还分为 “无缓冲 channel” 和 “缓冲 channel”。
等待队列
从channel读数据,如果channel缓冲区为空或者没有缓冲区,当前goroutine会被阻塞。
向channel写数据,如果channel缓冲区已满或者没有缓冲区,当前goroutine会被阻塞。
被阻塞的goroutine将会挂在channel的等待队列中:
- 因读阻塞的goroutine会被向channel写入数据的goroutine唤醒;
- 因写阻塞的goroutine会被从channel读数据的goroutine唤醒;
数据结构
channel 图解
hchan 结构体
// src/runtime/chan.go
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *_type
sendx uint
recvx uint
recvq waitq
sendq waitq
lock mutex
}
/*
- qcount:队列中的元素总数量。
- dataqsiz:循环队列的长度。
- buf:指向长度为 dataqsiz 的底层数组,仅有当 channel 为缓冲型的才有意义。
- elemsize:能够接受和发送的元素大小。
- closed:是否关闭。
- elemtype:能够接受和发送的元素类型。
- sendx:已发送元素在循环队列中的索引位置。
- recvx:已接收元素在循环队列中的索引位置。
- recvq:接受者的 sudog 等待队列(缓冲区不足时阻塞等待的 goroutine)。
- sendq:发送者的 sudog 等待队列。
*/
recvq
和 sendq
,其表现为等待队列,其类型为 runtime.waitq
的双向链表结构
type waitq struct {
first *sudog
last *sudog
}
// 无论是 first 属性又或是 last,其类型都为 runtime.sudog 结构体
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer
...
}
/*
g:指向当前的 goroutine。
next:指向下一个 g。
prev:指向上一个 g。
elem:数据元素,可能会指向堆栈。
*/
Chan使用
创建chan
ch := make(chan string)
ch := make(chan string, 1024)
创建 channel 的逻辑主要分为三大块:
- 当前 channel 不存在缓冲区,也就是元素大小为 0 的情况下,就会调用
mallocgc
方法分配一段连续的内存空间。 - 当前 channel 存储的类型存在指针引用,就会连同
hchan
和底层数组同时分配一段连续的内存空间。 - 通用情况,默认分配相匹配的连续内存空间。
需要注意到一块特殊点,那就是 channel 的创建都是调用的 mallocgc
方法,也就是 channel 都是创建在堆上的。因此 channel 是会被 GC 回收的,自然也不总是需要 close
方法来进行显示关闭了。
向chan中写入数据
向一个channel中写数据过程
-
如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程;
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... if c.closed != 0 { // 会对 channel 进行一次状态判断(是否关闭 unlock(&c.lock) panic(plainError("send on closed channel")) } if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } }
-
如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } } /* 会对缓冲区进行判定(`qcount` 和 `dataqsiz` 字段),以此识别缓冲区的剩余空间。紧接进行如下操作: - 调用 `chanbuf` 方法,以此获得底层缓冲数据中位于 sendx 索引的元素指针值。 - 调用 `typedmemmove` 方法,将所需发送的数据拷贝到缓冲区中。 - 数据拷贝后,对 sendx 索引自行自增 1。同时若 sendx 与 dataqsiz 大小一致,则归 0(环形队列)。 - 自增完成后,队列总数同时自增 1。解锁互斥锁,返回结果。 */
-
如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) KeepAlive(ep) } /* - 调用 getg 方法获取当前 goroutine 的指针,用于后续发送数据。 - 调用 acquireSudog 方法获取 sudog 结构体,并设置当前 sudog 具体的待发送数据信息和状态。 - 调用 c.sendq.enqueue 方法将刚刚所获取的 sudog 加入待发送的等待队列。 - 调用 gopark 方法挂起当前 goroutine(会记录执行位置),状态为 waitReasonChanSend,阻塞等待 channel。 - 调用 KeepAlive 方法保证待发送的数据值是活跃状态,也就是分配在堆上,避免被 GC 回收。 */
流程图
从chan中读取数据
前置处理
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
/*
一开始时 chanrecv 方法会判断其是否为 nil channel。
场景如下:
若 channel 是 nil channel,且为阻塞接收则调用 gopark 方法挂起当前 goroutine。
若 channel 是非阻塞模式,则直接返回。
而接下来对于非阻塞模式的 channel 会进行快速失败检查,检测 channel 是否已经准备好接收。
*/
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}
/*
其分以下几种情况:
无缓冲区:循环队列为 0 及等待队列 sendq 内没有 goroutine 正在等待。
有缓冲区:缓冲区数组为空。
随后会对 channel 的 closed 状态进行判断,因为 channel 是无法重复打开的,需要确定当前 channel 是否为未关闭状态。再确定接收失败,返回。
但若是 channel 已经关闭且不存在缓存数据了,则会清理 ep 指针中的数据并返回。
*/
从channel读数据过程
-
如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,最后把G唤醒,结束读取过程;
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { lock(&c.lock) if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } ... }
-
如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,结束读取过程;
-
如果等待发送队列sendq为空、缓冲区中有数据,则从缓冲区取出数据,结束读取过程;
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { if c.qcount > 0 { qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } ... }
-
如果等待发送队列sendq为空、缓冲区中无数据,将当前goroutine加入recvq,进入睡眠,等待被写goroutine唤醒;
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) ... }
流程图
特性
- 读队列Recvq, 写入数据时G被唤醒; 写队列Sendq, 读取数据时G被唤醒
- sendq有G有数据; recvq有G无数据
Chan关闭
panic出现的常见场景:
- 关闭值为nil的channel(未make)
- 关闭已经被关闭的channel
- 向已经关闭的channel写数据
sendq、recvq G释放
释放接收方
在完成了异常边界判断和标志设置后,会将接受者的 sudog 等待队列(recvq)加入到待清除队列 glist 中:
func closechan(c *hchan) {
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
...
}
所取出并加入的 goroutine 状态需要均为 _Gwaiting
,以保证后续的新一轮调度。
释放发送方
同样,与释放接收方一样。会将发送方也加入到到待清除队列 glist 中:
func closechan(c *hchan) {
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
...
}
协程调度
将所有 glist 中的 goroutine 状态从 _Gwaiting
设置为 _Grunnable
状态,等待调度器的调度:
func closechan(c *hchan) {
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
后续所有的 goroutine 允许被重新调度后。若原本还在被动阻塞的发送方或接收方,将重获*,后续该干嘛就去干嘛了,再跑回其所属的应用流程。
channel send/recv 分析
send
send
方法承担向 channel 发送具体数据的功能:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
-
调用
sendDirect
方法将待发送的数据直接拷贝到待接收变量的内存地址(执行栈)。- 例如:
msg := <-ch
语句,也就是将数据从ch
直接拷贝到了msg
的内存地址。
- 例如:
-
调用
sg.g
属性, 从 sudog 中获取等待接收数据的 goroutine,并传递后续唤醒所需的参数。 -
调用
goready
方法唤醒需接收数据的 goroutine,期望从_Gwaiting
状态调度为_Grunnable
。
recv
recv
方法承担在 channel 中接收具体数据的功能:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
该方法在接受上分为两种情况,分别是直接接收和缓冲接收:
直接接收(不存在缓冲区):
- 调用
recvDirect
方法,其作用与sendDirect
方法相对,会直接从发送方的 goroutine 调用栈中将数据拷贝过来到接收方的 goroutine。
缓冲接收(存在缓冲区):
- 调用
chanbuf
方法,根据recvx
索引的位置读取缓冲区元素,并将其拷贝到接收方的内存地址。 - 拷贝完毕后,对
sendx
和recvx
索引位置进行调整。
最后还是常规的 goroutine 调度动作,会调用 goready
方法来唤醒当前所处理的 sudog 的对应 goroutine。那么在下一轮调度时,既然已经接收了数据,自然发送方也就会被唤醒。
细节
- channel 都是创建在堆上的。因此 channel 是会被 GC 回收的;
- 调用 KeepAlive 方法保证待发送的数据值是活跃状态,也就是分配在堆上,避免被 GC 回收。
- 在具体的数据传输上,都是围绕着 “边界上下限处理,上互斥锁,阻塞/非阻塞,缓冲/非缓冲,缓存出队列,拷贝数据,解互斥锁,协程调度” 在不断地流转处理。在基本逻辑上也是相对重合的,因为发送和接收,创建和关闭总是相对的。
参考博客
Go语言基础之并发
一文带你解密 Go 语言之通道 channel
Go专家编程(书籍购买)