Channel是Go中重要且独特的一种并发编程原语,借助其线程安全和阻塞的特性,可以实现信息传递、信号通知、互斥锁、任务编排等场景。
一.信号传递
有 4 个 goroutine,编号为 1、2、3、4。每秒钟会有一个 goroutine 打印出它自己的编号,要求你编写程序,让输出的编号总是按照 1、2、3、4、1、2、3、4……这个顺序打印出来。
为了实现顺序的数据传递,我们可以定义一个令牌的变量,谁得到令牌,谁就可以打印一次自己的编号,同时将令牌传递给下一个 goroutine。
type Token struct{}
func newWorker(id int, ch chan Token, nextCh chan Token) {
for {
token := <-ch // 取得令牌
fmt.Println((id + 1)) // id从1开始
time.Sleep(time.Second)
nextCh <- token
}
}
func main() {
chs := []chan Token{make(chan Token), make(chan Token), make(chan Token), make(chan Token)}
// 创建4个worker
for i := 0; i < 4; i++ {
go newWorker(i, chs[i], chs[(i+1)%4])
}
//首先把令牌交给第一个worker
chs[0] <- struct{}{}
select {}
}
首先定义一个令牌类型(Token),接着定义一个创建 worker 的方法,这个方法会从它自己的 chan 中读取令牌。哪个 goroutine 取得了令牌,就可以打印出自己编号,因为需要每秒打印一次数据,所以,我们让它休眠 1 秒后,再把令牌交给它的下家。这类场景有一个特点,就是当前持有数据的 goroutine 都有一个信箱,信箱使用 chan 实现,goroutine 只需要关注自己的信箱中的数据,处理完毕后,就把结果发送到下一家的信箱中。
二.信号通知
chan 类型有这样一个特点:chan 如果为空,那么,receiver 接收数据的时候就会阻塞等待,直到 chan 被关闭或者有新的数据到来。利用这个机制,我们可以实现 wait/notify 的设计模式。传统的并发原语 Cond 也能实现这个功能。但是,Cond 使用起来比较复杂,容易出错,而使用 chan 实现 wait/notify 模式,就方便多了。
比如,使用 chan 实现程序的 graceful shutdown,在退出之前执行一些连接关闭、文件 close、缓存落盘等一些动作。
func main() {
go func() {
...... // 执行业务处理
}()
// 处理CTRL+C等中断信号
termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
// 执行退出之前的清理动作
doCleanup()
fmt.Println("优雅退出")
}
有时候,doCleanup 可能是一个很耗时的操作,比如十几分钟才能完成,如果程序退出需要等待这么长时间,用户是不能接受的,所以,在实践中,我们需要设置一个最长的等待时间。只要超过了这个时间,程序就不再等待,可以直接退出。所以,退出的时候分为两个阶段:closing,代表程序退出,但是清理工作还没做;closed,代表清理工作已经做完。
func main() {
var closing = make(chan struct{})
var closed = make(chan struct{})
go func() {
// 模拟业务处理
for {
select {
case <-closing:
return
default:
// ....... 业务计算
time.Sleep(100 * time.Millisecond)
}
}
}()
// 处理CTRL+C等中断信号
termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
close(closing)
// 执行退出之前的清理动作
go doCleanup(closed)
select {
case <-closed:
case <-time.After(time.Second):
fmt.Println("清理超时,不等了")
}
fmt.Println("优雅退出")
}
func doCleanup(closed chan struct{}) {
time.Sleep((time.Minute))
close(closed)
}
三.互斥锁
使用 chan 也可以实现互斥锁。在 chan 的内部实现中,就有一把互斥锁保护着它的所有字段。从外在表现上,chan 的发送和接收之间也存在着 happens-before 的关系,保证元素放进去之后,receiver 才能读取到。先初始化一个 capacity 等于 1 的 Channel,然后再放入一个元素。这个元素就代表锁,谁取得了这个元素,就相当于获取了这把锁。
// 使用chan实现互斥锁
type Mutex struct {
ch chan struct{}
}
// 使用锁需要初始化
func NewMutex() *Mutex {
mu := &Mutex{make(chan struct{}, 1)}
mu.ch <- struct{}{}
return mu
}
// 请求锁,直到获取到
func (m *Mutex) Lock() {
<-m.ch
}
// 解锁
func (m *Mutex) Unlock() {
select {
case m.ch <- struct{}{}:
default:
panic("unlock of unlocked mutex")
}
}
// 尝试获取锁
func (m *Mutex) TryLock() bool {
select {
case <-m.ch:
return true
default:
}
return false
}
// 加入一个超时的设置
func (m *Mutex) LockTimeout(timeout time.Duration) bool {
timer := time.NewTimer(timeout)
select {
case <-m.ch:
timer.Stop()
return true
case <-timer.C:
}
return false
}
// 锁是否已被持有
func (m *Mutex) IsLocked() bool {
return len(m.ch) == 0
}
func main() {
m := NewMutex()
ok := m.TryLock()
fmt.Printf("locked v %v\n", ok)
ok = m.TryLock()
fmt.Printf("locked %v\n", ok)
}
用 buffer 等于 1 的 chan 实现互斥锁,在初始化这个锁的时候往 Channel 中先塞入一个元素,谁把这个元素取走,谁就获取了这把锁,把元素放回去,就是释放了锁。元素在放回到 chan 之前,不会有 goroutine 能从 chan 中取出元素的,这就保证了互斥性。
利用 select+chan 的方式,很容易实现 TryLock、Timeout 的功能。具体来说就是,在 select 语句中,我们可以使用 default 实现 TryLock,使用一个 Timer 来实现 Timeout 的功能。
// 后续待补充任务异步处理