从同步讲起
相比于Go语言宣扬的“用通讯的方式共享数据”,通过共享数据的方式来传递信息和协调线程运行的做法其实更加主流。本篇就是讨论一些与多线程、共享资源以及同步有关的知识。
sync包,就是一个与并发编程关系紧密的代码包。这里“sync”的中文意思就是“同步”。
重要的并发编程概念
这里会讲一些重要的并发编程概念:竞态条件、临界区、互斥量、死锁。死锁会在互斥锁里引出。
一旦数据被多个线程共享,那么就很可能会产生争用和冲突的情况。这种情况也被称为竞态条件(race condition),这往往会破幻共享数据的一致性。
概括来讲,同步的用途有两个:
- 避免多个线程在同一时刻操作同一个数据块
- 协调多个线程,避免它们在同一时刻执行同一个代码块
由于这样的数据块和代码块的背后都隐含着一种或多种资源,可以把他们看作是共享资源。
同步就是在控制多个线程对共享资源的访问。针对某个资源的访问,同一时刻只能有一个线程访问到该资源。那么可以说,多个并发进行的线程对这个共享资源的访问是完全串行的。只要一个代码片段需要实现对共享资源的串行化访问,就可以被视为一个临界区(critical section)。也就是说,要访问到资源就必须进入到这个区域。如果针对一个共享资源,这样的代码片段有多个,那么它们就可以被称为相关临界区。
应对竞态条件的问题,就需要施加一些保护的手段。方法之一就是使用实现了某种同步机制的工具,也称为同步工具。在Go语言中,可供我们选择的同步工具并不少。其中,最重要且最常用的同步工具当属互斥量(mutual exclusion,简称 mutex)。sync包中的Mutex就是与其对应的类型,该类型的值可以被称为互斥量或者互斥锁。
互斥锁
虽然Go语言是以“用通讯的方式共享数据”为亮点,但是依然提供了一些易用的同步工具。而互斥锁就是最常用到的一个。
一个互斥锁可以被用来保护一个临界区或者一组相关临界区。保证同一时刻只有一个goroutine处于改临界区之内。每当有goroutine想进入临界区是,需要对它进行锁定,并且在离开临界区时进行解锁。
代码示例
使用互斥锁时,锁定操作可以通过调用互斥锁的Lock方法实现,而解锁是调用Unlock方法。示例如下:
package main
import (
"fmt"
"flag"
"os"
"sync"
"bytes"
"io"
)
var lock bool
func init() {
flag.BoolVar(&lock, "lock", false, "是否加锁")
}
const (
max1 = 5 // 准备启用多个goroutine
max2 = 10 // 每个goroutine里写入这么多组数据
max3 = 10 // 每组数据就是重复写入多个数字
)
func main () {
// 解析命令行参数
flag.Parse()
// bytes.Buffer是一个缓冲byte类型的缓冲器,存放的都是byte类型
var buffer bytes.Buffer
var mu sync.Mutex // 互斥锁
done := make(chan struct{}) // 每当一个goroutine执行完毕了,就往这里发一个信号
for i := 0; i < max1; i++ {
go func(id int, writer io.Writer) {
defer func() {
done <- struct{}{}
}()
for j := 0; j < max2; j++ {
// 准备数据
header := fmt.Sprintf("\n[%d %d]", id, j)
data := fmt.Sprintf(" %d-%d", id, j)
// 加锁
if lock {
mu.Lock()
}
// 写入数据
_, err := writer.Write([]byte(header))
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR when write header in %d: %s\n", id, err)
}
for k := 0; k < max3; k++ {
_, err := writer.Write([]byte(data))
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR when write data in %d: %s\n", id, err)
}
}
// 解锁
if lock {
mu.Unlock()
}
}
}(i, &buffer)
}
// 等待goroutine退出
for i := 0; i < max1; i++ {
<- done
}
// 打印结果
fmt.Println(buffer.String())
}
这个示例提供了一个命令行参数-lock,可以选择加锁或者不加锁来运行这个程序。这样可以方便的比较在代码中加锁的作用。
注意事项和建议
使用互斥锁时的注意事项:
- 不要重复加锁
- 不要忘记解锁,最好是使用defer语句
- 不要对尚未加锁或者已经解锁的互斥锁解锁
- 不要在多个函数之间直接传递互斥锁
对一个已经被锁定的互斥锁进行锁定,是会立即阻塞当前goroutine的。会一直等到该互斥锁在别的goroutine里被解锁,并且这里的锁定操作完成为止。如果那边解锁后又被别的goroutine锁定了,那就继续等,一直到抢到锁完成锁定操作。
虽然没有任何的强制规定,你是可以用同一个互斥锁保护多个无关的临界区的。但是这样做,一定会使你的程序变的复杂,就是说不要这么做,需要的话,就多搞几把锁。如果真的把一个互斥锁同时用在了多个地方,必然会有更多的goroutine征用这把锁。这不但会使得程序变慢,还会打打增加死锁(deadlock)的可能性。
死锁
所谓死锁,就是当前程序中的主goroutine,以及启用的那个goroutine都已经被阻塞。这些goroutine可以被统称为用户级的goroutine。就是说整个程序都停滞不前了。
Go语言运行时,系统是不允许死锁的情况出现的。只要发现所有的用户级goroutine都处于等待状态,就会自行抛出panic。随便写个函数,连续上2次锁就死锁了:
func main() {
var mu sync.Mutex
mu.Lock()
mu.Lock()
mu.Unlock()
mu.Unlock()
}
抛出的信息如下,主要就看第一行fatal error: all goroutines are asleep - deadlock!
:
PS H:\Go\src\Go36\article26\example02> go run main.go
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_SemacquireMutex(0xc042046004, 0x0)
D:/Go/src/runtime/sema.go:71 +0x44
sync.(*Mutex).Lock(0xc042046000)
D:/Go/src/sync/mutex.go:134 +0xf5
main.main()
H:/Go/src/Go36/article26/example02/main.go:8 +0x55
exit status 2
PS H:\Go\src\Go36\article26\example02>
这种在Go运行时系统自行抛出的panic都属于致命错误,是无法被恢复的。调用recover函数也不起作用。就是说,一旦死锁,程序必然崩溃。
要避免这种情况,最有效的做法就是,让每一个互斥锁只保护一个临界区或一组相关的临界区。
用defer语句解锁
还要注意,对同一个goroutine而言,既不要重复锁定一个互斥锁,也不要忘记进行解锁。这里不要忘记解锁的一个很重要的原因就是为了避免重复锁定。在很多时候,一个函数执行的流程并不是单一的,流程中间可能会有分叉、也可能会被中断。最保险的做法就是使用defer语句来进行解锁,并且这样的defer语句应该紧跟在锁定操作的后面。
上面的那个示例,没有按这里说的来做,因为整个写操作是在for循环里的。解锁操作后还有其他语句要执行,这里是for循环里的其他迭代要处理。而defer语句是只有程序退出后才会执行的。不过这都不是借口,要按这里最保险的做法来做,只需要把for循环里的语句再写一个函数或匿名函数就可以用defer了:
for i := 0; i < max1; i++ {
go func(id int, writer io.Writer) {
defer func() {
done <- struct{}{}
}()
for j := 0; j < max2; j++ {
// 准备数据
header := fmt.Sprintf("\n[%d %d]", id, j)
data := fmt.Sprintf(" %d-%d", id, j)
func () {
// 加锁
if lock {
mu.Lock()
defer func() {
// 解锁
mu.Unlock()
}()
}
// 写入数据
_, err := writer.Write([]byte(header))
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR when write header in %d: %s\n", id, err)
}
for k := 0; k < max3; k++ {
_, err := writer.Write([]byte(data))
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR when write data in %d: %s\n", id, err)
}
}
}()
}
}(i, &buffer)
}
解锁未锁定的互斥锁也会立即引发panic。并且与死锁一样,也是无法被恢复的。从这一定看,也是需要保证对于没一个锁定操作,都必须且只能由一个对应的解锁操作。就是要让他们成对出现,这也算是互斥锁一个很重要的使用原则。而利用defer语句进行解锁就可以很容易的做到这一点。
互斥锁是结构体、值类型
Go语言中的互斥锁时开箱即用的,就是一旦声明了一个sync.Mutex类型的变量,就可以直接使用它。不过要注意,该类型是一个结构体,属于值类型:
type Mutex struct {
state int32
sema uint32
}
对于值类型,把它传递给一个函数、将他从函数中返回、把它赋值给其他变量、让它进入某个通道都会导致它的副本的产生。这里,原值和副本以及多个副本之间都是完全独立的,是不同的互斥锁。举例说明,如果你把一个互斥锁作为参数值传给了一个函数,那么在这个函数中对传入的锁的所有操作,都不会对存在于该函数之外的那个原锁产生任何影响。
这就是为什么“不要在多个函数之间直接传递互斥锁”。避免歧义,即使你希望的是在这个函数中使用另外一个互斥锁也不要这样做。
示例代码2
学习了上面的注意事项和建议,就来看看如何更好的使用互斥锁。下面是一个使用互斥锁的示例:
package main
import (
"bytes"
"fmt"
"os"
"io"
"sync"
"time"
)
// 创建互斥锁
var mu sync.Mutex
// singleHandler 代表单次处理函数的类型,读和写用的函数内容有些不同,但是签名都是这样的
type singleHandler func() (data string, n int, err error)
// 生成一个写入当前时间的函数
func genWriter(writer io.Writer) singleHandler {
return func() (data string, n int, err error) {
// 准备数据
data = fmt.Sprintf("%s\t", time.Now().Format(time.StampNano))
// 写入数据
mu.Lock()
defer mu.Unlock()
n, err = writer.Write([]byte(data))
return
}
}
// 生成一个读取数据的函数
func genReader(reader io.Reader) singleHandler {
return func() (data string, n int, err error) {
// 类型断言,把io.Reader接口转成*bytes.Buffer类型
// 下面要调用*bytes.Buffer类型的ReadString方法
// 因为函数的参数要求是一个接口类型,但是后面的读操作用的是*bytes.Buffer的ReadString方法
// 所以在调用方法前,必须要检查接口的实际类型(动态类型)
// 实际在主函数里调用genReader函数是,传入的就是*bytes.Buffer
// 类型断言x.(T),这里x必须为一个接口类型,但并非必须是空接口
// 这里reader是个io.Reader接口。如果要对非接口类型的变量做类型断言,就要先转成空接口
buffer, ok := reader.(*bytes.Buffer)
if !ok {
err = fmt.Errorf("unsupported reader")
return
}
// 读取数据
mu.Lock()
defer mu.Unlock()
data, err = buffer.ReadString('\t')
n = len(data)
return
}
}
// 处理流程配置的类型,这里把处理流程相关的信息全部写在下面的结构体类型里
type handlerConfig struct {
handler singleHandler // 处理函数
goNum int // 要启用的goroutine的数量
number int // 单个goroutine中处理的次数
interval time.Duration // 单个goroutine中,多次处理中间间隔的时间
counter int // 数据量计数器,字节数
counterMu sync.Mutex // 上面的数据量计数器专用的互斥锁
}
// 增加数据量计数器的方法
func (hc *handlerConfig) count(skip int) int {
hc.counterMu.Lock()
defer hc.counterMu.Unlock()
hc.counter += skip
return hc.counter
}
func main() {
// 创建缓冲区由于下面的读和写
var buffer bytes.Buffer
// 写入数据的配置,分6个goroutine分别写入4次,一个24次
writingConfig := handlerConfig{
handler: genWriter(&buffer),
goNum: 6,
number: 4,
interval: time.Millisecond * 100,
}
// 读取数据的配置,分8个goroutine分别读取3次,一个也是24次
readingConfig := handlerConfig{
handler: genReader(&buffer),
goNum: 8,
number: 3,
interval: time.Millisecond * 100,
}
done := make(chan struct{})
// 启用多个goroutine对缓冲区进行多次写入
for i := 0; i < writingConfig.goNum; i++ {
go func(i int) {
defer func() {
done <- struct{}{}
}()
for j :=0; j < writingConfig.number; j++ {
// 进入迭代前等待,逻辑稍微简单一点
// 如果写在最后,那么因为err而退出时这次迭代就不会等待了
time.Sleep(writingConfig.interval)
data, n, err := writingConfig.handler()
if err != nil {
fmt.Fprintf(os.Stderr, "writer [%d-%d] ERROR: %s\n", i, j, err)
continue
}
total := writingConfig.count(n)
fmt.Printf("writer [%d-%d] Report: %s (total %d)\n", i, j, data, total)
}
}(i)
}
// 启用多个goroutine对缓冲区进行多次读取
for i := 0; i < readingConfig.goNum; i++ {
go func (i int) {
defer func() {
done <- struct{}{}
}()
for j := 0; j < readingConfig.number; j++ {
var (
data string
n int
err error
)
// 下面的无限for循环是一个代码块,里面的data、n、err如果用短变量赋值就是局部变量
// 所以上面在代码块外面,声明了i的for循环内部的变量
for {
// 如果读比写快,被读的是空的,一读就到末尾了,就会返回EOF错误,
time.Sleep(readingConfig.interval)
data, n, err = readingConfig.handler()
// 这个判断逻辑是读取的EOF错误,就无限循环等待。读到内容或其他错误就跳出循环
if err == nil || err != io.EOF {
break
}
}
if err != nil {
fmt.Fprintf(os.Stderr, "reader [%d-%d] ERROR: %s\n", i, j, err)
}
total := readingConfig.count(n)
fmt.Printf("reader [%d-%d] Report: %s (total %d)\n", i, j, data, total)
}
}(i)
}
// 等待所有goroutine结束
doneNum := writingConfig.goNum + readingConfig.goNum
for i := 0; i < doneNum; i++ {
<- done
}
}
这个示例中,分别有读和写的两个处理函数。而处理函数里做的事情就是:加锁、defer解锁,完成读或写操作然后返回。这里就做到了加锁和解锁操作成对出现,并且把锁和要保护的共享资源放在一起了。
示例中还有一个互斥锁在handlerConfig结构体中,要保护的共享资源也是handlerConfig结构体中的counter字段。并且写了一个方法count实现对counter字段的锁定和修改。
读写锁
读写锁是读/写互斥锁的简称。在Go语言中,读写锁有sync.RWMutex类型的值代表。与sync.Mutex一样,这个类型也是开箱即用的。开箱即用,应该就是指不用赋值,定义了之后直接就能用了。就是让它的零值也具有意义。
读写锁就是把共享资源的“读操作”和“写操作”区别对待了。为两种操作施加了不同程度的保护。相比于互斥锁,读写锁可以实现更加细腻的访问控制。
一个读写锁中实际包含了两个锁,读锁和写锁:
- 写锁,它的Lock方法和Unlock方法分别用于对写锁进行锁定和解锁
- 读锁,它的RLock方法和RUnlock方法分别用于对读锁进行锁定和解锁
读写锁规则
对于同一个读写锁,有如下的规则:
- 在写锁已被锁定的情况下,再视图锁定写锁,会阻塞当前goroutine
- 在写锁已被锁定的情况下,试图锁定读锁,也会阻塞当前goroutine
- 在读锁已被锁定的情况下,试图锁定写锁,同样会阻塞当前goroutine
- 在读写已被锁定的情况下,再视图锁定读锁,并不会阻塞当前的goroutine
总结一下,就是可以有多个读操作,读锁锁定的情况下,别的goroutine也可以读。其他的情况下要操作,只能等之前锁定的操作完成释放锁,并且抢到锁了。再换个角度说,就是多个读操作可以同时进行,多个写操作不能同时进行,读和写操作也不能同时进行。
读写锁对写操作之间的互斥,其实是通过它内含的一个互斥锁实现的。因此,读写锁是互斥锁的一种扩展。所以无论是互斥锁还是读写锁,都不要试图去解锁未锁定的锁,因为这样会引发不可恢复的panic。
示例代码
之前互斥锁的示例中,使用互斥锁保护了对缓冲区的读写操作,而这里又讲了读写锁,不要被这里读和写的说法锁迷惑。对缓冲区的读操作是会把读到的内容从缓冲区里去除的,所以是有类似写的操作在里面的,使用互斥锁时正确的做法,并且不能使用这里的读写锁。
而这个示例中的读操作,就仅仅只是去获取到值而已了,在读操作的时候加个读锁正合适:
package main
import(
"fmt"
"sync"
"time"
)
// 计数器
type counter struct {
num uint // 计数
mu sync.RWMutex //读写锁
}
// 获取num值的操作,加读锁
func (c *counter) number() uint {
c.mu.RLock()
defer c.mu.RUnlock()
return c.num
}
// 修改num值的操作,加写锁
func (c *counter) add (increment uint) uint {
c.mu.Lock()
defer c.mu.Unlock()
c.num += increment
return c.num
}
// 跑一下上面的两个方法看看效果
func main() {
c := counter{}
done := make(chan struct{})
// 增加计数器
go func() {
defer func() {
done <- struct{}{}
}()
for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond * 500)
c.add(1)
}
}()
go func() {
defer func() {
done <- struct{}{}
}()
for j := 0; j < 20; j++ {
time.Sleep(time.Millisecond * 200)
fmt.Printf("[%d-%02d] 读数: %d\n", 1, j, c.number())
}
}()
go func() {
defer func() {
done <- struct{}{}
}()
for k := 0; k < 20; k++ {
time.Sleep(time.Millisecond * 300)
fmt.Printf("[%d-%02d] 读数: %d\n", 2, k, c.number())
}
}()
<- done
<- done
<- done
}