Golang笔记: channel

Golang笔记: channel

定义

不要通过共享内存来通信,而是通过通信来实现内存共享

多个 goroutine 借助 channel 来传输数据,实现了跨 goroutine 间的数据传输,多者独立运行,不需要强关联,更不影响对方的 goroutine 状态。不存在 goroutine1 对 goroutine2 进行直传的情况。
Golang笔记: channel

channel 基本特性

分类

channel 共有两种模式,分别是:双向和单向;三种表现方式,分别是:声明双向通道:chan T、声明只允许发送的通道:chan <- T、声明只允许接收的通道:<- chan T

channel 中还分为 “无缓冲 channel” 和 “缓冲 channel”。

等待队列

从channel读数据,如果channel缓冲区为空或者没有缓冲区,当前goroutine会被阻塞。
向channel写数据,如果channel缓冲区已满或者没有缓冲区,当前goroutine会被阻塞。

被阻塞的goroutine将会挂在channel的等待队列中:

  • 因读阻塞的goroutine会被向channel写入数据的goroutine唤醒;
  • 因写阻塞的goroutine会被从channel读数据的goroutine唤醒;

数据结构

channel 图解

Golang笔记: channel

hchan 结构体

// src/runtime/chan.go
type hchan struct {
 qcount   uint      
 dataqsiz uint     
 buf      unsafe.Pointer 
 elemsize uint16
 closed   uint32
 elemtype *_type 
 sendx    uint  
 recvx    uint  
 recvq    waitq  
 sendq    waitq  

 lock mutex
}

/*
- qcount:队列中的元素总数量。
- dataqsiz:循环队列的长度。
- buf:指向长度为 dataqsiz 的底层数组,仅有当 channel 为缓冲型的才有意义。
- elemsize:能够接受和发送的元素大小。
- closed:是否关闭。
- elemtype:能够接受和发送的元素类型。
- sendx:已发送元素在循环队列中的索引位置。
- recvx:已接收元素在循环队列中的索引位置。
- recvq:接受者的 sudog 等待队列(缓冲区不足时阻塞等待的 goroutine)。
- sendq:发送者的 sudog 等待队列。
*/

recvqsendq,其表现为等待队列,其类型为 runtime.waitq 的双向链表结构

type waitq struct {
 first *sudog
 last  *sudog
}

// 无论是 first 属性又或是 last,其类型都为 runtime.sudog 结构体
type sudog struct {
 g *g

 next *sudog
 prev *sudog
 elem unsafe.Pointer
 ...
}

/*
g:指向当前的 goroutine。
next:指向下一个 g。
prev:指向上一个 g。
elem:数据元素,可能会指向堆栈。
*/

Chan使用

创建chan

ch := make(chan string)
ch := make(chan string, 1024)

创建 channel 的逻辑主要分为三大块:

  • 当前 channel 不存在缓冲区,也就是元素大小为 0 的情况下,就会调用 mallocgc 方法分配一段连续的内存空间。
  • 当前 channel 存储的类型存在指针引用,就会连同 hchan 和底层数组同时分配一段连续的内存空间。
  • 通用情况,默认分配相匹配的连续内存空间。

需要注意到一块特殊点,那就是 channel 的创建都是调用的 mallocgc 方法,也就是 channel 都是创建在堆上的。因此 channel 是会被 GC 回收的,自然也不总是需要 close 方法来进行显示关闭了。

向chan中写入数据

向一个channel中写数据过程

  1. 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程;

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
     ...
     if c.closed != 0 { // 会对 channel 进行一次状态判断(是否关闭
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
     }
    
     if sg := c.recvq.dequeue(); sg != nil {
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
     }
    }
    
  2. 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
     ...
     if c.qcount < c.dataqsiz {
      qp := chanbuf(c, c.sendx)
      typedmemmove(c.elemtype, qp, ep)
      c.sendx++
      if c.sendx == c.dataqsiz {
       c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
     }
    
     if !block {
      unlock(&c.lock)
      return false
     }
    }
    /*
    会对缓冲区进行判定(`qcount` 和 `dataqsiz` 字段),以此识别缓冲区的剩余空间。紧接进行如下操作:
    
    - 调用 `chanbuf` 方法,以此获得底层缓冲数据中位于 sendx 索引的元素指针值。
    - 调用 `typedmemmove` 方法,将所需发送的数据拷贝到缓冲区中。
    - 数据拷贝后,对 sendx 索引自行自增 1。同时若 sendx 与 dataqsiz 大小一致,则归 0(环形队列)。
    - 自增完成后,队列总数同时自增 1。解锁互斥锁,返回结果。
    */
    

    Golang笔记: channel

  3. 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
     ...
     gp := getg()
     mysg := acquireSudog()
     mysg.releasetime = 0
     if t0 != 0 {
      mysg.releasetime = -1
     }
    
     mysg.elem = ep
     mysg.waitlink = nil
     mysg.g = gp
     mysg.isSelect = false
     mysg.c = c
     gp.waiting = mysg
     gp.param = nil
     c.sendq.enqueue(mysg)
    
     atomic.Store8(&gp.parkingOnChan, 1)
     gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    
     KeepAlive(ep)
    }
    /*
    - 调用 getg 方法获取当前 goroutine 的指针,用于后续发送数据。
    - 调用 acquireSudog 方法获取 sudog 结构体,并设置当前 sudog 具体的待发送数据信息和状态。
    - 调用 c.sendq.enqueue 方法将刚刚所获取的 sudog 加入待发送的等待队列。
    - 调用 gopark 方法挂起当前 goroutine(会记录执行位置),状态为 waitReasonChanSend,阻塞等待 channel。
    - 调用 KeepAlive 方法保证待发送的数据值是活跃状态,也就是分配在堆上,避免被 GC 回收。
    */
    

    Golang笔记: channel

流程图

Golang笔记: channel

从chan中读取数据

前置处理

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
 if c == nil {
  if !block {
   return
  }
  gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
  throw("unreachable")
 }
/*
一开始时 chanrecv 方法会判断其是否为 nil channel。

场景如下:

若 channel 是 nil channel,且为阻塞接收则调用 gopark 方法挂起当前 goroutine。
若 channel 是非阻塞模式,则直接返回。
而接下来对于非阻塞模式的 channel 会进行快速失败检查,检测 channel 是否已经准备好接收。
*/
    
 if !block && empty(c) {
  if atomic.Load(&c.closed) == 0 {
   return
  }

  if empty(c) {
   if ep != nil {
    typedmemclr(c.elemtype, ep)
   }
   return true, false
  }
 }
 ...
}
/*
其分以下几种情况:

无缓冲区:循环队列为 0 及等待队列 sendq 内没有 goroutine 正在等待。
有缓冲区:缓冲区数组为空。
随后会对 channel 的 closed 状态进行判断,因为 channel 是无法重复打开的,需要确定当前 channel 是否为未关闭状态。再确定接收失败,返回。

但若是 channel 已经关闭且不存在缓存数据了,则会清理 ep 指针中的数据并返回。
*/

从channel读数据过程

  1. 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,最后把G唤醒,结束读取过程;

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    
     lock(&c.lock)
    
     if sg := c.sendq.dequeue(); sg != nil {
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true, true
     }
     ...
    }
    
  2. 如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,结束读取过程;

  3. 如果等待发送队列sendq为空、缓冲区中有数据,则从缓冲区取出数据,结束读取过程;

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    
     if c.qcount > 0 {
      qp := chanbuf(c, c.recvx)
      if ep != nil {
       typedmemmove(c.elemtype, ep, qp)
      }
      typedmemclr(c.elemtype, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
       c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return true, true
     }
    
     if !block {
      unlock(&c.lock)
      return false, false
     }
     ...
    }
    
  4. 如果等待发送队列sendq为空、缓冲区中无数据,将当前goroutine加入recvq,进入睡眠,等待被写goroutine唤醒;

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    
     gp := getg()
     mysg := acquireSudog()
     mysg.releasetime = 0
     if t0 != 0 {
      mysg.releasetime = -1
     }
    
     mysg.elem = ep
     mysg.waitlink = nil
     gp.waiting = mysg
     mysg.g = gp
     mysg.isSelect = false
     mysg.c = c
     gp.param = nil
     c.recvq.enqueue(mysg)
    
     atomic.Store8(&gp.parkingOnChan, 1)
     gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
     ...
    }
    

流程图

Golang笔记: channel

特性

  1. 读队列Recvq, 写入数据时G被唤醒; 写队列Sendq, 读取数据时G被唤醒
  2. sendq有G有数据; recvq有G无数据

Chan关闭

panic出现的常见场景:

  1. 关闭值为nil的channel(未make)
  2. 关闭已经被关闭的channel
  3. 向已经关闭的channel写数据

Golang笔记: channel

sendq、recvq G释放

释放接收方

在完成了异常边界判断和标志设置后,会将接受者的 sudog 等待队列(recvq)加入到待清除队列 glist 中:

func closechan(c *hchan) {

 var glist gList
 for {
  sg := c.recvq.dequeue()
  if sg == nil {
   break
  }
  if sg.elem != nil {
   typedmemclr(c.elemtype, sg.elem)
   sg.elem = nil
  }
  if sg.releasetime != 0 {
   sg.releasetime = cputicks()
  }
  gp := sg.g
  gp.param = nil
  if raceenabled {
   raceacquireg(gp, c.raceaddr())
  }
  glist.push(gp)
 }
 ...
}

所取出并加入的 goroutine 状态需要均为 _Gwaiting,以保证后续的新一轮调度。

释放发送方

同样,与释放接收方一样。会将发送方也加入到到待清除队列 glist 中:

func closechan(c *hchan) {

 // release all writers (they will panic)
 for {
  sg := c.sendq.dequeue()
  if sg == nil {
   break
  }
  sg.elem = nil
  if sg.releasetime != 0 {
   sg.releasetime = cputicks()
  }
  gp := sg.g
  gp.param = nil
  if raceenabled {
   raceacquireg(gp, c.raceaddr())
  }
  glist.push(gp)
 }
 unlock(&c.lock)
 ...
}

协程调度

将所有 glist 中的 goroutine 状态从 _Gwaiting 设置为 _Grunnable 状态,等待调度器的调度:

func closechan(c *hchan) {

 // Ready all Gs now that we've dropped the channel lock.
 for !glist.empty() {
  gp := glist.pop()
  gp.schedlink = 0
  goready(gp, 3)
 }
}

后续所有的 goroutine 允许被重新调度后。若原本还在被动阻塞的发送方或接收方,将重获*,后续该干嘛就去干嘛了,再跑回其所属的应用流程。

channel send/recv 分析

send

send 方法承担向 channel 发送具体数据的功能:

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 if sg.elem != nil {
  sendDirect(c.elemtype, sg, ep)
  sg.elem = nil
 }
 gp := sg.g
 unlockf()
 gp.param = unsafe.Pointer(sg)
 if sg.releasetime != 0 {
  sg.releasetime = cputicks()
 }
 goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
 dst := sg.elem
 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
 memmove(dst, src, t.size)
}
  • 调用 sendDirect 方法将待发送的数据直接拷贝到待接收变量的内存地址(执行栈)。

    • 例如:msg := <-ch 语句,也就是将数据从 ch 直接拷贝到了 msg 的内存地址。
  • 调用 sg.g 属性, 从 sudog 中获取等待接收数据的 goroutine,并传递后续唤醒所需的参数。

  • 调用 goready 方法唤醒需接收数据的 goroutine,期望从 _Gwaiting 状态调度为 _Grunnable

recv

recv 方法承担在 channel 中接收具体数据的功能:

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 if c.dataqsiz == 0 {
  if ep != nil {
   recvDirect(c.elemtype, sg, ep)
  }
 } else {
  qp := chanbuf(c, c.recvx)
  if ep != nil {
   typedmemmove(c.elemtype, ep, qp)
  }
  typedmemmove(c.elemtype, qp, sg.elem)
  c.recvx++
  if c.recvx == c.dataqsiz {
   c.recvx = 0
  }
  c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
 }
 sg.elem = nil
 gp := sg.g
 unlockf()
 gp.param = unsafe.Pointer(sg)
 if sg.releasetime != 0 {
  sg.releasetime = cputicks()
 }
 goready(gp, skip+1)
}

该方法在接受上分为两种情况,分别是直接接收和缓冲接收:

直接接收(不存在缓冲区):

  • 调用 recvDirect 方法,其作用与 sendDirect 方法相对,会直接从发送方的 goroutine 调用栈中将数据拷贝过来到接收方的 goroutine。

缓冲接收(存在缓冲区):

  • 调用 chanbuf 方法,根据 recvx 索引的位置读取缓冲区元素,并将其拷贝到接收方的内存地址。
  • 拷贝完毕后,对 sendxrecvx 索引位置进行调整。

最后还是常规的 goroutine 调度动作,会调用 goready 方法来唤醒当前所处理的 sudog 的对应 goroutine。那么在下一轮调度时,既然已经接收了数据,自然发送方也就会被唤醒。

细节

  1. channel 都是创建在堆上的。因此 channel 是会被 GC 回收的;
  2. 调用 KeepAlive 方法保证待发送的数据值是活跃状态,也就是分配在堆上,避免被 GC 回收。
  3. 在具体的数据传输上,都是围绕着 “边界上下限处理,上互斥锁,阻塞/非阻塞,缓冲/非缓冲,缓存出队列,拷贝数据,解互斥锁,协程调度” 在不断地流转处理。在基本逻辑上也是相对重合的,因为发送和接收,创建和关闭总是相对的。

参考博客

Go语言基础之并发
一文带你解密 Go 语言之通道 channel
Go专家编程(书籍购买)

上一篇:go中的关键字-defer


下一篇:arcgis之gp服务发布