ChanBase
管道base, 判断管道是否关闭, 优雅关闭管道
fmt.Println(<-c)会消耗里面的内容吗
# 1 _, ok := <-c fmt.Println(ok) # 2 select { case _,ok := <-c: fmt.Println("------------", ok) } 上面两个也会消耗管道数据
// 我的问题
// 1. 多次关闭管道发生啥
// fmt(<-c)此时管道取出来数字了吗
package main
import "fmt"
func main() {
c := make(chan interface{}, 6)
for i := 0; i < 4; i++ {
c<-i
}
fmt.Println(<-c)
fmt.Println("管道还剩下")
for {
v, ok := <-c
if !ok {
break
} else {
fmt.Println(v)
}
}
defer close(c)
}
# 输出
0
管道还剩下
1
2
3
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
D:/gohere/go-newbase/context/withdead-line/main.go:19 +0x20e
exit status 2
报错原因
-
超出管道缓存,一直写
package main import "fmt" func main() { c := make(chan interface{}, 6) for i := 0; i < 14; i++ { c<-i } fmt.Println(<-c) defer close(c) } # 输出 fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan send]: main.main() D:/gohere/go-newbase/context/withdead-line/main.go:12 +0xa6 exit status 2
-
管道啥也没有了,用的
v, ok := <-c
判断管道是否关闭
思路: 管道没关闭还好使的时候,这时候是没数据是阻塞的,所以使用select,因为阻塞的时候进入了default分支了。
管道已经经关闭就好判断了
func ischanClose(c chan interface{}) bool {
select {
case _, rece := <-c:
return !rece
default:
}
return false
}
# 输出
1. 管道好使: 输出fasle
2. bad: 输出true
chan正常思想
在使用管道时候,我们select一个context。
select {
case <-ctx.Done():
// ... exit
return
case v, ok := <-c:
// do something....
default:
// do default ....
}
优雅关闭管道
为啥讨论这个问题,因为多次关闭会panic()
封装为struct只关闭一次
package main
import (
"sync"
)
type Chan struct {
C chan interface{}
once sync.Once
}
func NewChan(len int) *Chan {
return &Chan{
C: make(chan interface{}, len),
}
}
func (c *Chan) safeCloseChan() {
c.once.Do(func() {
close(c.C)
})
}
func main() {
c := NewChan(6)
for i := 0; i < 4; i++ {
c.safeCloseChan()
}
}
# 没报错输出
context控制
保证一点:close(ch) 操作的只能有一个人,我们单独抽出来一个 goroutine 来做这个事情,并且使用 context 来做事件同步
package main
import (
"context"
"sync"
"time"
)
func main() {
// channel 初始化
c := make(chan int, 10)
// 用来 recevivers 同步事件的
wg := sync.WaitGroup{}
// 上下文
ctx, cancel := context.WithCancel(context.TODO())
// 专门关闭的协程
go func() {
time.Sleep(2 * time.Second)
cancel()
// ... 某种条件下,关闭 channel
close(c)
}()
// senders(写端)
for i := 0; i < 10; i++ {
go func(ctx context.Context, id int) {
select {
case <-ctx.Done():
return
case c <- id: // 入队
// ...
}
}(ctx, i)
}
// receivers(读端)
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// ... 处理 channel 里的数据
for v := range c {
_ = v
}
}()
}
// 等待所有的 receivers 完成;
wg.Wait()
}
事件发生顺序
- 10 个写端协程(sender)运行,投递元素;
- 10 个读端协程(receiver)运行,读取元素;
- 2 分钟超时之后,单独协程执行
close(channel)
操作; - 主协程返回;