Go 语言并发编程介绍
作者:闪电豹猫 码字不易,转载需注明出处
1. 并发与并行
现代 CPU 1 秒内能执行数十亿条指令,而人类的反应速度最快也才是毫秒级,所以计算机才看起来是在同时执行多项任务。一句话概括就是 “宏观并行,微观串行”。
串行 (Serial) 是指一个一个来处理任务。
并发 (Concurrency) 是指把任务在不同的时间点交给单个处理器进行处理。从处理器的角度来看,处理器没有 “分身” 去同时执行多项任务,任务不会在微观层面同时执行。
并行 (Parallelism) 是把每一个任务分给每一个处理器独立完成,多个任务之间一定是同时执行的。
举个许多人去打水的栗子:
-
串行:只有一个水龙头,所有人排队打水,一个人打满后下一个人才可以开始打。排在队伍末尾的人可能需要很长的等待时间才能打上水。
-
并发:仍然只有一个水龙头,不过每个人一次最多打 5 秒钟的水,用完水才能再过来接着排队打。这样的好处是相比于串行,每个人都能在需要水的时候很快打上一点点水,这点水用完了,还可以排队过来打,大大缩短了每个人的平均等待时间。
-
并行:多少人过来打水就有多少个水龙头,这样能让所有人同时打够自己需要的水,没有排队时间。这样的情况并不多见,一般还是并发见得多。并行可以看成 “多个串行”。
2. 进程与线程
程序都是存在磁盘上的。当我们双击执行这个程序时,这个程序就会被复制到内存里,成为活跃的程序。每个活跃的程序都会占用系统资源,成为一个进程,为了管理它们对系统资源的使用情况,操作系统会为这个进程分配一个 PID (Process ID),哪怕是多次双击执行同一个程序,系统也会把它们视为不同的进程,分配不同的 PID 。进程是操作系统分配资源的基本单位。
线程又称轻量级进程,是包含于进程里的实体概念,一个进程通常包含若干个线程。线程可以调用进程所拥有的系统资源,所以线程是独立运行和独立调度的基本单位。比如一个音乐 APP ,在听歌的同时可以刷评论区,对应的就是音乐 APP 进程里有两个线程:播放线程和网页浏览线程。
3. Goroutine
3.1 协程 Coroutine
协程 (Coroutine),又称微线程,是一种比线程还轻量级的存在。一个进程可拥有多个线程,一个线程可以拥有多个协程。协程是编译器级的,进程和线程是操作系统级的。因此,协程是有程序自己管理,不由操作系统内核直接管理;进程和协程则直接接受操作系统内核管理。因此协程的切换没有线程的切换那样的开销。协程的轻量性可以允许程序创建上万个协程而不会导致系统资源枯竭。
3.2 Coroutine 与 Goroutine
Go 语言的协程叫 Goroutine
,由 Go runtime 调度管理。Go 程序会智能的调度各 Goroutine 给 每个 CPU,创建 Goroutine 的堆栈开销很小。
Coroutine 只能顺序执行,而 Goroutine 可以并行执行。前者只能发生在单线程中,而后者可以发生在多个线程中。Goroutine 是协程的深度抽象。
Coroutine 程序只有主动交出控制权后,系统才能把控制权转给其他 Coroutine;在 CPU 闲置时,若 Coroutine 拒绝交出控制权,操作系统也无能为力,只能干等着,也就是失去响应和死机。Goroutine 属于抢占式处理,也就不存在这样的问题了。
3.3 创建 Goroutine
Goroutine 语法规则:
go 函数名( 参数列表 )
比如:
go NewVector( x, y, z )
同一程序 (进程) 的所有 Goroutine 都共享同一个地址空间。
需要注意的是,所有 Goroutine 都会在 main() 函数终止时停止,不论这些 Goroutine 本身是否执行完毕。Goroutine 内还可以开启 “子 Goroutine”。main() 自己就是一个协程,叫主协程。
比如如下代码:
package main
import (
"fmt"
"time"
)
func main() {
for i := 0; i <= 5; i++ {
go f(i)
}
time.Sleep(2 * time.Second) // 等上两秒钟,让所有 Goroutine 执行完再退出
}
func f(a int) {
go fmt.Println(a + 1000)
fmt.Println(a)
}
它的一种可能输出为
5
1
1001
3
1003
0
2
1000
1002
1005
4
1004
如果没有那行 “等两秒钟” 的代码,在我这里试了十次,都是没有输出就退出了。这是因为 main() 比各 Goroutine 都要先终止的缘故。但是,使用time.Sleep()
的方法以阻塞主协程来让其他协程执行完毕的方式并不可靠。后面会将到什么样的阻塞方式更可靠。
下面这个示例允许程序在打印数字的同时,一直等待用户的输入,给了输入后输出所输入的并退出程序。
package main
import (
"fmt"
"time"
)
func ticking() {
var sec int
for {
fmt.Printf("%d\n", sec)
sec++
time.Sleep(1 * time.Second)
}
}
func main() {
go ticking() // 将打印数字交给 Goroutine
var str string
for {
fmt.Scanf("%s", &str)
fmt.Println(str)
return
}
}
可能的输出:
0
1
2
waht // 这行是键盘输入
waht
3.4 匿名函数创建 Goroutine
匿名函数和闭包也是可以用来作为 Goroutine 的对象的。比如如下代码:
package main
import (
"fmt"
"time"
)
func main() {
go func() {
var sec int
for {
fmt.Printf("%d\n", sec)
sec++
time.Sleep(1 * time.Second)
}
}()
var str string
for {
fmt.Scanf("%s", &str)
fmt.Println(str)
return
}
}
这个就是 3.3 节最后一个示例代码的匿名函数写法,故不重复了。
3.5 启动多个 Goroutine
多个 Goroutine 是抢占式执行,随机调度。比如:
package main
import (
"fmt"
"time"
)
func Num() {
for i := 0; i <= 9; i++ {
fmt.Printf("%d", i)
time.Sleep(250 * time.Millisecond) // 250 ms
}
}
func Letter() {
for i := 'a'; i <= 'k'; i++ {
fmt.Printf("%c", i)
time.Sleep(250 * time.Millisecond) // 250 ms
}
}
func main() {
go Num()
go Letter()
time.Sleep(3 * time.Second)
}
一种可能的输出:
a01bc2d3e4f5g6h7i8j9k
3.6 调整并发运行性能
在多个 Goroutine 的情况下,可以通过runtime.Gosched()
交出控制权。不过一般不需要这个函数,实践中一般也用不着。
在传统并发编程中,开发者需要维护线程池中的线程数量与 CPU 核心数的对应关系。Go 并发是由 Go runtime 实现智能调度的,不过开发者也可以通过runtime.GOMAXPROCS( 逻辑处理器数量 )
调整。
Go 1.5 版本 (2015 年中期的更新) 开始,默认让代码并发执行,最大效率的利用 CPU。
我们可以通过for
循环来创建多个 Goroutine,下面的例子就是一个同时创建多个 Goroutine 的程序代码:
package main
import (
"fmt"
"runtime"
)
func main() {
runtime.GOMAXPROCS(16)
var counter = 0
func() {
for i := 0; i < 1000; i++ {
go func() {
counter++ // 这句不是一个原子操作
}()
}
}()
fmt.Println("Please ENTER to continue...")
var s string
fmt.Scanln(&s) // 阻塞程序,比 time.Sleep() 的阻塞方式更可靠
fmt.Println(counter)
}
一种可能的输出是:
Please ENTER to continue...
885
一般来讲,我们预期的结果应该是1000
,但是实际上,每次运行这个程序,输出的结果都是不确定的,结果范围在闭区间 [0, 1000] 内,这就是并发程序的不确定性,同时这也反应出并发程序经常会遇到的数据同步问题。为什么?
在上面的代码中,counter++
并不是一个原子操作。所谓的原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序不可以被打乱,也不可以被切割而只执行其中的一部分。语句counter++
不是原子操作,它由多个步骤组成:
- 将内存中 counter 的值加载到 CPU 中
- 在 CPU 中对 counter 加 1
- 将 CPU 中的 counter 值存储到内存中
因此,当在多核 CPU 的计算机上时,若没有显式地调用语句runtime.GOPAXPROCS( 1 )
将并发程序只允许运行在一个 CPU 核心上的话,该程序将可能按照默认设置,运行在多个 CPU 上。由于 CPU 执行的速度非常快,可能会出现数据不同步的情况,为了更清晰的说明,我举个栗子,可能会发生这样的现象:
- 假设现在内存中 counter = 123
- CPU 3 将内存中 counter 的值 [123] 加载进来
- CPU 3 对 123 + 1 进行运算,得到 [124]
- CPU 1 将内存中 counter 的值 [123] 加载进来
- CPU 3 将运算结果 [124] 存储在 counter 所占的内存空间中
- CPU 6 将内存中 counter 的值 [124] 加载进来
- CPU 6 对 124 + 1 进行运算,得到 [125]
- CPU 1 对 123 + 1 进行运算,得到 [124]
- CPU 6 将运算结果 [125] 存储在 counter 所占的内存空间中
- CPU 1 将运算结果 [124] 存储在 counter 所占的内存空间中
- 最终内存中 counter = 124
我们发现,三个 CPU 对变量 counter 独立且同时执行了counter++
的指令,但是最后结果却不是我们期望的 126,这就是因为变量 counter 的内存空间对各个 CPU 是共享的,如果我们在程序中限制GOMAXPROCS
为 1 或者设置互斥锁,在一个 CPU 对这块内存区域访问时,禁止其它 CPU 对该块的访问,让它们等待上一个 CPU 的对该内存区域的操作完成就可以解决问题了。具体后面会介绍。
3.7 Goroutine 异常处理
在我们的并发程序运行的过程中,可能会遇到某个 Goroutine 遇到错误并导致整个程序崩溃的情况,这种一个子协程异常导致整个程序崩溃的情况是在是因小失大,因此有了下面的方法来解决:
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func div(num int) {
defer func() { // defer 关键词后面的这个匿名函数是在 div() 运行完毕返回(或者崩溃)时才开始运行的
err := recover()
if err != nil {
fmt.Println(err)
}
wg.Done() // 给协程计数器减1,在defer里面,保证能被执行
}()
fmt.Printf("10/%d = %d\n", num, 10/num)
}
func main() {
for i := 0; i <= 5; i++ {
wg.Add(1) // 给协程计数器加1
go div(i)
}
wg.Wait() // 确保所有协程运行完毕再退出程序
}
可能的输出结果:
10/5 = 2
10/1 = 10
10/3 = 3
10/2 = 5
runtime error: integer divide by zero
10/4 = 2
这样,一个协程崩了,就不再会导致整个程序崩掉。
4. Go 调度器
Go 语言的并发调度 (Schedule) 模型叫做 G-M-P 模型:
-
G: 是一个 Goroutine 协程对象,每次调用一个 Goroutine 时,都会创建一个 G 对象。它是对并发执行的任务的封装,G 对象是一个非常轻量级的对象,属于用户级资源,对操作系统透明,上下文切换消耗低。
-
M: 是 Machine 的简称,代表一个线程对象。每创建一个 M 时,都会有一个底层线程被创建。M 的作用就是允许 G 中包装的并发任务,Go 调度器的目标是把 G 高效的绑定到 M 上去执行。M 属于系统资源,创建数量受系统的限制,一般来说,G 的数量要远高于 M 的数量。M 最高的数量是 10000,这个数可由
runtime/debug.SetMaxThreads()
来更改。 -
P: 是 Processor 的简称,是一个逻辑处理器对象。每个 M 都得绑定到一个 P 上去执行,就像一个线程得绑定到一个 CPU 核心上去一样。每个 P 都有个全局 G 队列,主要用于管理 G 对象,并为 G 在 M 上的运行提供本地化条件。P 对象的最大数量就是
GOMAXPROCS
(也就是 256),启动程序后是固定的,一般不修改 P 的数量;M 和 P 的数量一般不相等,因为可能会有休眠的 M 。 -
本地队列 (Local Queue): 每个 P 维护一个本地队列,与 P 绑定的 M 中如果有新的 G 需要运行,一般会放到 P 的本地队列里存储;除非本地队列已满,才会截取本地队列中特定数量的 G 放入全局队列里,然后把刚才因满而没放入本地队列的 G 放入本地队列里。
-
全局队列 (Global Queue): 可存储本地队列存储不下的 G 对象。为了保证调度的公平性,调度器在调度过程中会有特定的概率 (如 1/61) 优先从全局队列里获取 G。
-
窃取 (Stealing): 为了提高资源的利用率,使得空闲的 M 有活干,调度器会随机地从其他 P 的本地队列里窃取特定数量的 G 到空闲的 M 上去执行。
在程序开始运行的时候,Go 调度器会首先初始化线程 m0、栈、垃圾回收,以及创建和初始化由runtime.GOMAXPROCS( N )
设置的 N 个逻辑处理器的 P 对象。具体地,首先启动线程 m0,将 m0 绑定到某一个逻辑处理器 P 上,并从 P 的本地队列里获取需要运行的 G。最先开始获取到的是 main 协程。G 是拥有栈的,M 便会根据 G 中的栈信息和调度信息设置运行环境。
线程 M 允许 G,当运行的 G 退出后,M 会被再次获取可运行的 G,这样一直重复,知道 main 协程退出。
G 只能运行在 M 上,一个 M 必须持有一个 P,P:非休眠的M = 1:1。M 会从 P 的本地队列里弹出一个处于可运行状态的 G。如果 P 的本地队列为空,就会执行窃取。
每次使用关键词go
时,都会:
- 创建一个 G 对象,加入到本地队列或全局队列。
- 如果还有空闲的 P,那么创建一个 M
- M 会启动一个底层线程,循环执行能找到的 G 任务。
- G 任务的执行顺序是:先从本地队列找,没有的话再从全局队列找
当 M 执行某一个 G 时,如果发生了syscall
或者其他的阻塞操作,M 就会阻塞。如果当前 M 有一些 G 在执行,调度器会把 这个线程 M 和 P 解除绑定关系,然后创建一个新的线程 M 服务于 P。
5. 协程间通信 之 共享变量
共享变量是 Go 程序的各个 Goroutine 之间传递数据的两大方法之一。为了保证数据同步,会配合锁机制,利用读写锁或互斥锁对共享变量加锁,从而保证多个协程的数据共享安全。
在 3.6 节的示例程序中,没有实现不同协程对共享变量counter
的安全访问。下面的代码是为 3.6 节的代码引入锁机制,实现数据安全访问。
package main
import (
"fmt"
"runtime"
"sync"
)
func main() {
runtime.GOMAXPROCS(16)
var counter = 0
var mlock sync.Mutex // 定义一个互斥锁
func() {
for i := 0; i < 1000; i++ {
go func() {
mlock.Lock() // 给该句后面遇到的变量加锁
counter++ // 这句不是一个原子操作
mlock.Unlock() // 结束加锁
}()
}
}()
fmt.Println("Please ENTER to continue...")
var s string
fmt.Scanln(&s) // 阻塞程序,比 time.Sleep() 的阻塞方式更可靠
fmt.Println(counter)
}
按理来说,控制台输出应该是:
Please ENTER to continue...
1000
一旦对counter
进行加锁操作,其他 Goroutine 就无法在它释放前从内存中加载counter
的值进行处理,而必须等待,从而保证数据的安全访问。
6. 协程间通信 之 信道 (channel)
信道 (channel) 又称通道,是协程间通信的两大方法的另一种方法。信道本质是一个 FIFO (先进先出,First In Fisrt Out) 的队列,这种协程间通信方式比共享变量的方式更简单高效。Go 语言提倡使用 channel 替代共享内存。
6.1 创建 channel 类型
channel 也是一种数据类型,声明一个 channel 类型的语法格式如下:
var 信道名 chan 信道具体类型
创建一个 channel 格式如下:
信道名 := make( 信道具体类型 )
比如:
var a chan int // 只是个声明
var b chan *float64 // 只是个声明
var c chan nil // 只是个声明
d := make( chan string ) // 实例化
e := make( chan interface{} ) // 实例化
其中的信道e
是一个空接口型信道,可以传入任意类型的数据。
channel 不是值类型,而是引用类型,所以需要make()
创建了 (实例化了) 才能使用。
6.2 向 channel 写入数据
让 channel 接收数据需要一个特殊的操作符<-
,比如
a := make( chan int16 )
a <- 5
表示创建了一个类型为 int16 的信道,并且向其发送了数据 5。
默认情况下,channel 都是无缓冲的。也就是说,在上面的例子中,如果我们发送了 5,却没有后续的语句从管道 a 进行接收,那么所有协程 (包括 main 协程) 将会持续阻塞。
因此,如下代码会使程序出现死锁 (deadlock),程序无响应,Go runtime 会识别出来并报错退出:fatal error: all goroutines are asleep - deadlock!
:
func main() {
a := make(chan int)
a <- 5
}
这样修改也会死锁,程序报错退出:
func main() {
a := make(chan int)
a <-5
fmt.Println( <-a )
}
这样修改才能保证并不是所有的协程处于阻塞状态:
func main() {
a := make(chan int)
go func() {
a <- 5
}()
fmt.Println(<-a)
}
具体为什么第二种修改方式是可行的,后面会介绍到。
无缓冲信道本身并不存储数据,只是负责数据的流通。
-
从无缓冲信道取数据,必须要有数据流进来才可以,否则当前 Goroutine 阻塞,直到有协程传入数据
-
数据流入无缓冲信道, 如果没有其他 Goroutine 来拿走这个数据,那么当前 Goroutine 阻塞,直到有协程过来拿走这个数据
因此,使用无缓冲 channel,必定涉及两个协程。上面两个死锁报错的程序全程序使用了无缓冲 channel 却只有一个协程,这就一定会导致死锁。可以通过改成有缓冲 channel 或者将读写信道的操作扔到新协程里去来改进。
6.3 从 channel 读取数据
无缓冲 channel 的收发操作应该发生在两个协程中。
假定我们有个叫做ch
的 int16 信道,读数据的语法有下面几种:
- 简易阻塞接收数据
a := <-ch
执行该语句时,该句所在 Goroutine 将阻塞,直到有数据传进来为止
- 完整的阻塞接收数据
aa, ok := <-ch // ok 是个 bool 型变量,用于判断当前 channel 是否关闭
第二关ok
变量是用来判断信道ch
是否关闭的变量。为 true 时,信道开启;为 false 时,信道关闭。
- 忽略接收的数据
<-ch
这种读取信道数据却又忽略所读的数据的方法,其意图不在于数据的传递,而是用来阻塞该语句所在的协程。
- for ... range 循环接收数据
通过for ... range
语句,可以将一个有缓冲信道的数据一股脑按顺序全读出来,这种for ... range
会在所有数据读取完毕毕后自动停止循环,比如:
for aaa := range ch {
fmt.Printf("从通道读取数据:%v\n", aaa)
}
- for 循环接收数据
如果只是用了for
循环而没有配合上range
,那么需要手动使用break
退出循环:
for {
aaaa, ojbk := <-ch
if !ojbk {
fmt.Println("数据读取完毕")
break
}
fmt.Printf("读取到数据:%v\n", aaaa)
}
下面给出这 5 方法的示例程序:
package main
import (
"fmt"
)
func SendData(ch chan int16) {
defer close(ch) // bie wang le guan diao tong dao, fou ze zhe ge goroutine hui yi zhi kaiz he
ch <- 1
ch <- 3
ch <- 5
}
func reader1(ch chan int16) {
defer fmt.Println("One is done.")
for aa := range ch {
fmt.Printf("Method One: %v\n", aa)
}
}
func reader2(ch chan int16) {
defer fmt.Println("Two is done.")
for {
aaa, ojbk := <-ch
if !ojbk {
break
}
fmt.Printf("Method Two: %v\n", aaa)
}
}
func reader3(ch chan int16) {
defer fmt.Println("Three is done.")
for {
aaa := <-ch
if aaa == 0 {
break
}
fmt.Printf("Method Three: %v\n", aaa)
}
}
func reader4(ch chan int16) {
defer fmt.Println("Four is done.")
fmt.Printf("Method Four: %v %v %v\n", <-ch, <-ch, <-ch)
}
func main() {
// 1: for ... range 方法
ch := make(chan int16)
go SendData(ch)
reader1(ch)
// 2: for break
ch2 := make(chan int16)
go SendData(ch2)
reader2(ch2)
// 3: for break 不推荐,因为有可能传入的就是类型默认值,比如 0 或者 "" 或者 false
ch3 := make(chan int16)
go SendData(ch3)
reader3(ch3)
// 4: 一个一个读取
ch4 := make(chan int16)
go SendData(ch4)
reader4(ch4)
}
for ... range 循环遍历信道后,必须关闭信道。如果未关闭,则会引发死锁。
6.4 无缓冲信道
在各类书籍文献中,一旦提到 Go 的信道 (channel),那么没有特殊说明,一定说的是无缓冲信道。本文也是如此,提到信道或者 channel 一定指的是无缓冲信道。
我们都知道如何创建一个信道:
ch := make(chan 类型)
信道可分成 3 种类型:
-
只读信道:只允许读取,不允许写入的信道。创建一个只读信道的语法如下:
ch := make(<-chan 类型)
-
只写信道:只允许写入,不允许读取的信道。创建一个只写信道的语法如下:
ch := make(chan<- 类型)
-
双向信道:就是一个普通的信道。创建一个双向信道的语法如下:
ch := make(chan 类型)
一个信道默认就是个双向信道,而对于<-
,可以理解为数据的流动方向。<-chan
就是只读,chan<-
就是只写。
有的人可能会问,一个只读或只写的信道有个啥子存在的意义呢?实际上,它们还是很有用的,比如用于传递函数参数时,这样子开发可以提高程序的安全性和可靠性。一个函数专门用于向信道写数据,另一个函数专门用于读数据,这样可以避免在函数内误操作带来的错误。
在 6.2 和 6.3 节中,我们已经初步介绍了信道的读写方法。接下来,我们还要进一步地介绍无缓冲信道的读写。
func main() {
ch := make(chan int16)
go func() {
ch <- 7 // 这里只发生了一个数据
}()
fmt.Println(<-ch) // 从管道取一个数据,没问题
//fmt.Println(<-ch) // 再取一个出来会遭遇死锁
time.Sleep(2 * time.Second) // 让所有子协程在 main 协程退出前完成
}
控制台输出:
7
该示例中,发送数据与读取数据不在同一个协程中,因为在只有一个主协程的程序里对无缓冲信道读写数据会遭遇死锁。其次,如果把第二次读取数据那行取消注释,则也会遭遇死锁,这是因为程序运行到这一行之后,由于没有其他协程向信道写入数据,所以该句所在的协程将会持续阻塞。由于程序此时只剩一个协程在运行,所以会死锁崩掉。
信道可以通过close(信道名)
关闭。如果在向信道写入数据后手动关闭了这个信道,那么没有任何其它协程往信道里写数据,协程也可以正常从这个关闭了的信道读取数据,这个数据是类型零值 (如 0 、"" 、false) 。但是记住,往一个关闭了的信道里写数据会导致报错崩掉。
不能关闭一个只读的信道,可以关闭一个只写和双向信道。
一般不推荐使用 “for 和判断零值” 来循环读信道,因为如果一个信道已被关闭,那么再循环读取时,从信道里读出来的将总会是零值,也就永远无法退出循环了:
for {
aaa := <-ch
if aaa == 0 {
break // 如果信道 ch 已关闭,那么这将是个死循环
}
}
用for ... range
就不会遇到这个零值问题,不过要记住,发送方一定要在发送完所有数据后来一句close(通道名)
,否则读取方协程这边会一直等着发送方新数据的发送,也就是无法结束读取。这就是说,若使用 for ... range 来接收数据,那么发送方发送的数据必须是有限个,发送方不能使用无限 for{} 循环。
如果发送方使用无限 for{} 循环,那么接收方绝不能使用 for ... range 循环,只能使用无限 for{} 循环来接收数据。当然,接收方使用无限 for{} 接收数据时,最好设置个退出条件,要不然发送方的无限发送协程都结束了,接收方这边是无法知道的,接收方这边可能又是一个退不出的死循环。至于这个退出条件怎么写比较好,下面会介绍。
6.5 select 语句块
select 语言块是专门为信道而设计的,它和 switch 的语法规则非常相近,相同点是可以有多个 case 和一个 default,不同点是在 select 中,所有的 case 语句要么执行信道的写入,要么执行信道的读取。select 与 {
之间不能有任何表达式。select 里的 case 是随机执行的,不是顺序执行的。fallthrough
不能用于 select 语句块里。
select 语句块本身不带有循环监听机制,需要通过外层 for 循环来启用循环监听机制。在监听 case 分支时,如果没有任何分支满足监听条件,则进入阻塞状态。如果有多个分支符合条件,则随机选择一个执行。default
分支是用来处理所有分支都不符合条件的情况。
下面给出示例代码:
package main
import (
"fmt"
"time"
)
func send1(ch chan<- int) {
time.Sleep(1500 * time.Millisecond) // 设置该协程先休眠 1.5 秒再写入数据
ch <- 7
}
func send2(ch chan<- string) {
time.Sleep(400 * time.Millisecond) // 设置该协程先休眠 0.4 秒再向信道写入数据
ch <- "Upside down T"
}
func main() {
ch1 := make(chan int)
ch2 := make(chan string)
go send1(ch1)
go send2(ch2)
for {
select {
case aaa := <-ch1:
fmt.Printf("Got it from ch1: %v\n", aaa)
case aaa := <-ch2:
fmt.Printf("Got it from ch2: %v\n", aaa)
case <-time.After(1 * time.Second): // 这里设置的超时时间为 1 秒
fmt.Println("Time out.")
goto byebye
}
}
byebye:
fmt.Println("See you.")
}
控制台输出:
Got it from ch2: Upside down T
Time out.
See you.
在该示例代码中,都是先休眠一段时间再发送数据。我们用了无限 for 循环来配合 select 开启循环监听 case 分支,其中第三个 case 分支case <-time.After(1 * time.Second)
中的time.After(1 * time.Second)
函数返回一个类型为<-chan Time
的只读信道,我们选择对这个信道丢弃数据接收的方式接受信道数据,这实际是个超时处理。如果超时,就需要跳出 for 循环,不需要再继续监听了,此处goto byebye
来实现。如果用 break,只能跳出 select 中的一个 case 选项,而不能跳出 for 循环。
该例中,由于 send2() 可以看成花费了 0.4 秒向信道发送数据,在 1 秒的超时时间内,所以信道 ch2 正常的接收了;而 send1() 里我们设置的休眠 1.5 秒大于了超时时间,所以信道 ch1 没有接收到数据。
如果我们把超时分支去掉,换成一个 default 分支,分支内用 goto 跳出 for 循环,其他不变:
package main
import (
"fmt"
"time"
)
func send1(ch chan<- int) {
ch <- 7
}
func send2(ch chan<- string) {
ch <- "Upside down T"
}
func main() {
ch1 := make(chan int)
ch2 := make(chan string)
go send1(ch1)
go send2(ch2)
time.Sleep(1 * time.Second) // 这里设置成 1 s 后才开始接收数据,避免数据还没发送,接收方啥也没收到而直接 default 退出
for {
select {
case aaa := <-ch1:
fmt.Printf("Got it from ch1: %v\n", aaa)
case aaa := <-ch2:
fmt.Printf("Got it from ch2: %v\n", aaa)
default:
fmt.Println("Literally nothing more here.")
goto byebye
}
}
byebye:
fmt.Println("Cya.")
}
控制台输出:
Got it from ch2: Upside down T
Got it from ch1: 7
Literally nothing more here.
Cya.
这份修改过的代码中,如果直接去掉 default 分支的话,switch 所在协程会阻塞,而我们的程序一共就一个协程,这会导致死锁:
Got it from ch1: 7
Got it from ch2: Upside down T
fatal error: all goroutines are asleep - deadlock!
6.6 有缓冲信道
有缓冲信道 (Buffered channel),是在信道自身带有一定缓存空间用于接收数据的信道。如果说无缓冲信道的数据收发双方必须同步操作的话,那么有缓冲信道数据收发操作是可以异步完成的。有缓冲信道可以看成一个有容量上限的队列用于暂时存储数据,如果缓冲满了,就会发生信道阻塞,这时除非接收方从信道读取数据,否则信道将就这么一直阻塞,无法写入新的数据。
举个有缓冲信道的栗子吧。菜就是数据,后厨们不同炒菜可看成不同数据的发送,上菜就是数据的接收,饭店的上菜窗口可看成缓冲区,假设这个窗口最多放得下 3 个碟子,这样一个饭店就可以看成一个有缓冲信道了。咱们管后厨叫 Goroutine 1,服务生为 Goroutine 2,服务生会一直盯着出菜窗口,一有菜出来就立刻端走一碟去把菜上给顾客的桌上去,然后回来看是否还有待上的碟子;而后厨只管做菜就好。这两个角色是独立进行的,就好比两个 Goroutine 一个只发送一个只接受,也是独立运行的。
在 Go 语言中,创建一个缓冲 channel 的语法如下:
ch := make(chan 类型, 容量)
任何时候,我们都可以通过len(信道名)
函数获得信道的当前有多少个元素、通过cap(信道名)
获得信道的容量。
简单来几个示例代码吧:
示例 1
func main() {
ch := make(chan int, 1000)
ch <- 6
ch <- 9
fmt.Println(<-ch, <-ch)
}
控制台输出6 9
没问题。
示例 2
func main() {
ch := make(chan int, 1)
ch <- 6
ch <- 9
fmt.Println(<-ch, <-ch)
}
示例 2 会发生 deadlock 崩溃。
示例 3 :
func main() {
ch := make(chan int, 3)
ch <- 6
fmt.Println(<-ch, <-ch)
}
同样的,示例 3 会死锁崩溃。
下表给出了信道不同状态的操作规则:
操作 | 信道状态 | 结果 |
---|---|---|
读取操作 | nil | 阻塞 |
读取操作 | 活跃且非空 | 成功获取到值 |
读取操作 | 活跃且空 | 阻塞 |
读取操作 | 已关闭 | 类型默认值 |
读取操作 | 只写信道 | 错误 |
操作 | 信道状态 | 结果 |
---|---|---|
写入操作 | nil | 阻塞 |
写入操作 | 活跃且已满 | 阻塞 |
写入操作 | 活跃且不满 | 成功写入 |
写入操作 | 已关闭 | panic 异常 |
写入操作 | 只读信道 | 错误 |
操作 | 信道之前状态 | 结果 |
---|---|---|
关闭操作 | nil | panic |
关闭操作 | 活跃且非空 | 成功关闭。若信道内还有值时,可继续成功读取,直到无值后,可读取类型默认值 |
关闭操作 | 活跃且空 | 成功关闭。可继续成功读取类型默认值 |
关闭操作 | 已关闭 | panic |
关闭操作 | 只读信道 | 错误 |
7. sync 包介绍
7.1 同步等待组 sync.WaitGroup
WaitGroup 的定义方式如下:
type WaitGroup struct {
noCopy noCopy
state1 [12]byte
sema uint32
}
sync.WaitGroup 中有三个方法Add()
Done()
Wait()
。
-
Add()
Add方法的定义是func (wg *WaitGroup) Add(delta int)
其中 delta 可正可负,一般为正。通过该方法来增加应等待 Goroutine 的数量。 -
Done()
func (wg *WaitGroup) Done()
用来减小 WaitGroup 计数器的值,应在 Goroutine 的最后执行。 -
Wait()
func (wg *WaitGroup) Wait()
用于阻塞 Goroutine 知道 WaitGroup 的计数器的值为 0。
7.2 互斥锁 sync.Mutex
互斥锁的定义如下:
type Mutex struct {
state int32
sema uint32
}
Mutex 可以作为其它结构体的字段,一般用于保护那些包含了 Mutex 属性的自定义结构体。像 int、float64、string 等内置类型都不包含 Mutex 属性,用互斥锁来保护其实是不合理的。
type mytype struct {
m sync.Mutex
var int
}
x := new(mytype)
像上述代码一样,将任意类型与一个互斥锁封装成一个新结构体类型,这样当 m 上锁后,var 也是一并“上锁”的。这样的用法是推荐的,也是用来保护任意类型的好办法。
Mutex 中就俩方法:
-
Lock()
原型是func (m *Mutex) Lock()
通过 Lock() 来锁住 m,如果 m 已加锁,则阻塞直到 m 解锁。 -
Unlock()
func (m *Mutex) Unlock()
Unlock() 用于解锁 m,如果 m 未加锁则会导致运行时错误。
互斥锁不与 Goroutine 绑定,也就是说,Lock() 和 Unlock() 可以发生在不同的 Goroutine 中。
实践中,Mutex 多用于写多读少的情况。
7.3 读写互斥锁 sync.RWMutex
读写互斥锁的定义:
type RWMutex struct {
w Mutex // held if there are pending writers
writeSem unit32 // 写锁需要等待读锁释放的信号量
readerSem uint32 // 读锁需要等待写锁释放的信号量
readerCount int32 // 读锁计数器
readerWait int32 // 获取写锁时需要等待的读锁释放数量
}
读写互斥锁简称读写锁 (RWMutex),分为读锁和读写锁。读写锁在使用过程中,读与读不互斥,读与写互斥,写与写互斥。因为 RWMutex 内部由 Mutex 实现,所以读写锁也是与 Goroutine 无关,而且 RWMutex 也可以作为其他结构体的字段。
当读锁上锁时,所有 Goroutine 都可以进行读取操作而不能写入。当读写上锁时,同一时刻只能存在一个 Goroutine 进行读写操作,其他协程没有读和写的权限。
RWMutex 中有 5 个方法:
-
Lock()
func (rw *RWMutex) Lock()
,将 rw 锁定为写入状态,禁止其他协程读取和写入。 -
Unlock()
func (rw *RWMutex) Unlock()
,解除 rw 的写入锁,如果 rw 未加写入锁则会遇到运行时错误。 -
RLock()
func (rw *RWMutex) RLock()
,将 rw 锁定为读取状态,禁止其他协程写入,但不禁止读取 -
RUnlock()
func (rw *RWMutex) RUnlock()
,接触 rw 的读取锁,如果 rw 未加读取锁则会遇到运行时错误。 -
RLocker()
func (rw *RWMutex) RLocker()
,返回一个读写锁,通过调用 rw.RLock() 和 rw.RUnlock() 实现了 Locker 接口。
下面给出一个使用到了 WaitGroup 和 RWMutex 的一个示例代码,该程序模拟售票大厅的多窗口并行查询余票和抢票功能。
package main
import (
"fmt"
"sync"
"time"
)
type ticketS struct {
m sync.RWMutex
num int
}
func BuyTicket(wg *sync.WaitGroup, lck *ticketS, window int) {
defer (*wg).Done()
//(*wg).Add(1)
for {
(*lck).m.Lock() // other goroutines cant read or write
if (*lck).num > 0 {
(*lck).num--
fmt.Printf("[%d]: just got one ticket\n", window)
} else {
fmt.Printf("[%d]: sold out\n", window)
(*lck).m.Unlock()
break
}
(*lck).m.Unlock()
time.Sleep(70 * time.Millisecond) // 设置该窗口 0.07 秒后才接受下一个查余票请求
}
}
func QueryTicket(wg *sync.WaitGroup, lck *ticketS, window int) {
defer (*wg).Done()
for {
(*lck).m.RLock()
fmt.Printf("[%d]: There's %d tickets left\n", window, (*lck).num)
(*lck).m.RUnlock()
if (*lck).num <= 0 {
break
} else {
time.Sleep(60 * time.Millisecond) // 设置该窗口 0.06 秒后才接受下一个查余票请求
}
}
}
func main() {
defer fmt.Println("main done")
var wg sync.WaitGroup
var ticket ticketS
ticket.num = 55 // 一共 55 张票
wg.Add(6) // 3 个协程用于查余票,3 个协程用于抢票
go QueryTicket(&wg, &ticket, 1)
go QueryTicket(&wg, &ticket, 2)
go QueryTicket(&wg, &ticket, 3)
go BuyTicket(&wg, &ticket, 1)
go BuyTicket(&wg, &ticket, 2)
go BuyTicket(&wg, &ticket, 3)
wg.Wait()
}
控制台可能的一种输出为:
[3]: just got one ticket
[1]: There's 54 tickets left
[3]: There's 54 tickets left
此处省略很多行...
[2]: There's 1 tickets left
[1]: just got one ticket
[3]: sold out
[1]: There's 0 tickets left
[3]: There's 0 tickets left
[2]: sold out
[2]: There's 0 tickets left
[1]: sold out
main done
RWMutex 适用于读取量大的情况,因为 RWMutex 多个读可以并存。
8. 并发编程三大天坑
8.1 死锁
当一个 Go 程序所有协程全部阻塞时,该程序会进入死锁 (deadlock) 状态,报错退出。
在实践中,排查死锁错误是比较容易的,因为 Go Runtime 报错信息以足够排查出问题所在了。
8.2 数据竞争
多个协程同时对一个数据进行读写操作,既没有用互斥锁,也没有用信道,那一定会导致数据竞争 (Data race)。比如 1000 个 1 并发相加,结果偶尔得不到 1000。
通过go run -race abc.go
可以让 Go 在运行你的 Go 源代码的同时监测数据竞争的情况。
比如 7.3 节给出的模拟售票大厅的示例中,即便我们对余票数量使用了读写锁,但是仍然在主函数里对函数 BuytTicket() 和 QueryTicket() 调用过程中出现了数据竞争。当工程量庞大时,数据竞争将越来越隐蔽且难以跟踪。
8.3 协程泄漏
如果你启动了一个 Goroutine,但并没有符合预期的退出,直到程序结束,这种情况就是协程泄露。当协程泄露发生时,该 Goroutine 的栈 (一般 2k 内存空间起) 一直被占用不能释放,Goroutine 里的函数在堆上申请的空间也不能被垃圾回收器回收。这样,在程序运行期间,内存占用持续升高,可用内存越来也少,最终将导致系统崩溃。
一般来说,面向客户端的程序不容易发现协程泄漏的问题,因为一旦出现协程泄漏,用户只会觉得卡,退出重进甚至重启电脑都能解决问题。而对于位于服务器端的 Web 应用来说,由于是 7 x 24 小时无休工作,一旦协程泄漏将,就会导致最后服务器宕机。所以,协程泄漏应该受到重视。
具体而言,协程内信道持续阻塞、select 句块没有命中任何分支且没有 default 处理、Goroutine 内部 for 死循环无法退出等都会使得这些协程无法终止并被 gc 垃圾回收,最终导致内存泄漏。
码字不易,转载需注明出处:https://www.cnblogs.com/hhzm/p
本文在撰写时难免会有错误和不足之处,欢迎在评论区里留言讨论,一起进步。