目录
select可以同时监听一个或多个channel,直到其中一个channel ready
Channel
goroutine
Channel
单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。
虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。
Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。
如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
channel类型
channel是一种类型,一种引用类型。声明通道类型的格式如下:
var 变量 chan 元素类型
举几个例子:
var ch1 chan int // 声明一个传递整型的通道 var ch2 chan bool // 声明一个传递布尔型的通道 var ch3 chan []int // 声明一个传递int切片的通道
创建channel
通道是引用类型,通道类型的空值是nil。
var ch chan int fmt.Println(ch) // <nil>
声明的通道后需要使用make函数初始化之后才能使用。
创建channel的格式如下:
make(chan 元素类型, [缓冲大小])
- channel的缓冲大小是可选的。
举几个例子:
ch4 := make(chan int) ch5 := make(chan bool) ch6 := make(chan []int)
channel操作
通道有发送(send)、接收(receive)和关闭(close)三种操作。
发送和接收都使用<-符号。
现在我们先使用以下语句定义一个通道:
ch := make(chan int)
发送
将一个值发送到通道中。
ch <- 10 // 把10发送到ch中
接收
从一个通道中接收值。
x := <- ch // 从ch中接收值并赋值给变量x <-ch // 从ch中接收值,忽略结果
关闭
我们通过调用内置的close函数来关闭通道。
close(ch)
关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。
通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。
关闭后的通道有以下特点:
1.对一个关闭的通道,再发送值就会导致panic。
2.对一个关闭的通道,进行接收会一直获取值直到通道为空。
3.对一个关闭的并且没有值的通道,执行接收操作会得到对应类型的零值。
4.关闭一个已经关闭的通道会导致panic。
无缓冲的通道
无缓冲的通道又称为阻塞的通道。我们来看一下下面的代码:
func main() { ch := make(chan int) ch <- 10 fmt.Println("发送成功") }
上面这段代码能够通过编译,但是执行的时候会出现以下错误:
fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan send]: main.main() .../src/github.com/pprof/studygo/day06/channel02/main.go:8 +0x54
为什么会出现deadlock错误呢?
因为我们使用ch := make(chan int)创建的是无缓冲的通道,无缓冲的通道只有在有人接收值的时候才能发送值。就像你住的小区没有快递柜和代收点,快递员给你打电话必须要把这个物品送到你的手中,简单来说就是无缓冲的通道必须有接收才能发送。
上面的代码会阻塞在ch <- 10这一行代码形成死锁,那如何解决这个问题呢?
一种方法是启用一个goroutine去接收值,例如:
func recv(c chan int) { ret := <-c fmt.Println("接收成功", ret) } func main() { ch := make(chan int) go recv(ch) // 启用goroutine从通道接收值 ch <- 10 fmt.Println("发送成功") }
无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。
使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。
有缓冲的通道
解决上面问题的方法还有一种就是使用有缓冲区的通道。
我们可以在使用make函数初始化通道的时候为其指定通道的容量,例如:
func main() { ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道 ch <- 10 fmt.Println("发送成功") }
只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。就像你小区的快递柜只有那么个多格子,格子满了就装不下了,就阻塞了,等到别人取走一个快递员就能往里面放一个。
我们可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量,虽然我们很少会这么做。
close()
可以通过内置的close()函数关闭channel(如果你的管道不往里存值或者取值的时候一定记得关闭管道)
package main import "fmt" func main() { c := make(chan int) go func() { for i := 0; i < 5; i++ { c <- i } close(c) }() for { if data, ok := <-c; ok { fmt.Println(data) } else { break } } fmt.Println("main结束") } //0 //1 //2 //3 //4 //main结束
如何优雅的从通道循环取值
当通过通道发送有限的数据时,我们可以通过close函数关闭通道来告知从该通道接收值的goroutine停止等待。当通道被关闭时,往该通道发送值会引发panic,从该通道里接收的值一直都是类型零值。那如何判断一个通道是否被关闭了呢?
我们来看下面这个例子:
package main import "fmt" // channel 练习 func main() { ch1 := make(chan int) ch2 := make(chan int) // 开启goroutine将0~100的数发送到ch1中 go func() { for i := 0; i < 100; i++ { ch1 <- i } close(ch1) }() // 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中 go func() { for { i, ok := <-ch1 // 通道关闭后再取值ok=false if !ok { break } ch2 <- i * i } close(ch2) }() // 在主goroutine中从ch2中接收值打印 for i := range ch2 { // 通道关闭后会退出for range循环 fmt.Println(i) } }
从上面的例子中我们看到有两种方式在接收值的时候判断通道是否被关闭,我们通常使用的是for range的方式。
单向通道
有的时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收。
Go语言中提供了单向通道来处理这种情况。例如,我们把上面的例子改造如下:
package main import "fmt" func counter(out chan<- int) { for i := 0; i < 100; i++ { out <- i } close(out) } func squarer(out chan<- int, in <-chan int) { for i := range in { out <- i * i } close(out) } func printer(in <-chan int) { for i := range in { fmt.Println(i) } } func main() { ch1 := make(chan int) ch2 := make(chan int) go counter(ch1) go squarer(ch2, ch1) printer(ch2) }
其中,
1.chan<- int是一个只能发送的通道,可以发送但是不能接收;
2.<-chan int是一个只能接收的通道,可以接收但是不能发送。
在函数传参及任何赋值操作中将双向通道转换为单向通道是可以的,但反过来是不可以的。
通道总结
channel常见的异常总结,如下图:
注意:关闭已经关闭的channel也会引发panic。
channel示例
模拟--仅需任意任务完成
package util_all_done import ( "fmt" "runtime" "testing" "time" ) func runTask(id int) string { time.Sleep(10 * time.Millisecond) return fmt.Sprintf("The result is from %d", id) } // 仅需任意任务完成 func FirstResponse() string { numOfRunner := 10 ch := make(chan string, numOfRunner) for i := 0; i < numOfRunner; i++ { go func(i int) { ret := runTask(i) ch <- ret }(i) } return <-ch } func TestFirstResponse(t *testing.T) { t.Log("Before:", runtime.NumGoroutine()) t.Log(FirstResponse()) time.Sleep(time.Second * 1) t.Log("After:", runtime.NumGoroutine()) } //=== RUN TestFirstResponse //--- PASS: TestFirstResponse (1.01s) // until_all_done_test.go:46: Before: 2 // until_all_done_test.go:47: The result is from 0 // until_all_done_test.go:49: After: 2 //PASS
模拟--必须所有任务完成
package util_all_done import ( "fmt" "runtime" "testing" "time" ) func runTask(id int) string { time.Sleep(10 * time.Millisecond) return fmt.Sprintf("The result is from %d", id) } // 必需所有任务完成 func AllResponse() string { numOfRunner := 10 ch := make(chan string, numOfRunner) for i := 0; i < numOfRunner; i++ { go func(i int) { ret := runTask(i) ch <- ret }(i) } finalRet := "" for j := 0; j < numOfRunner; j++ { finalRet += <-ch + "\n" } return finalRet } func TestAllResponse(t *testing.T) { t.Log("Before:", runtime.NumGoroutine()) t.Log(AllResponse()) time.Sleep(time.Second * 1) t.Log("After:", runtime.NumGoroutine()) } //=== RUN TestAllResponse //--- PASS: TestAllResponse (1.01s) //until_all_done_test.go:46: Before: 2 //until_all_done_test.go:47: The result is from 0 //The result is from 2 //The result is from 5 //The result is from 6 //The result is from 1 //The result is from 9 //The result is from 4 //The result is from 3 //The result is from 8 //The result is from 7 // //until_all_done_test.go:49: After: 2 //PASS
模拟--线程池
package object_pool import ( "errors" "time" ) type ReusableObj struct { } type ObjPool struct { bufChan chan *ReusableObj //用于缓冲可重用对象 } func NewObjPool(numOfObj int) *ObjPool { objPool := ObjPool{} objPool.bufChan = make(chan *ReusableObj, numOfObj) for i := 0; i < numOfObj; i++ { objPool.bufChan <- &ReusableObj{} } return &objPool } func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) { select { case ret := <-p.bufChan: return ret, nil case <-time.After(timeout): //超时控制 return nil, errors.New("time out") } } func (p *ObjPool) ReleaseObj(obj *ReusableObj) error { select { case p.bufChan <- obj: return nil default: return errors.New("overflow") } }
测试
package object_pool import ( "fmt" "testing" "time" ) func TestObjPool(t *testing.T) { pool := NewObjPool(10) // if err := pool.ReleaseObj(&ReusableObj{}); err != nil { //尝试放置超出池大小的对象 // t.Error(err) // } for i := 0; i < 11; i++ { if v, err := pool.GetObj(time.Second * 1); err != nil { t.Error(err) } else { fmt.Printf("%T\n", v) if err := pool.ReleaseObj(v); err != nil { t.Error(err) } } } fmt.Println("Done") } //=== RUN TestObjPool //*object_pool.ReusableObj //*object_pool.ReusableObj //*object_pool.ReusableObj //*object_pool.ReusableObj //*object_pool.ReusableObj //*object_pool.ReusableObj //*object_pool.ReusableObj //*object_pool.ReusableObj //*object_pool.ReusableObj //*object_pool.ReusableObj //*object_pool.ReusableObj //Done //--- PASS: TestObjPool (0.00s) //PASS
Goroutine池
- 本质上是生产者消费者模型
- 可以有效控制goroutine数量,防止暴涨
- 需求:
- 计算一个数字的各个位数之和,例如数字123,结果为1+2+3=6
- 随机生成数字进行计算
package main import ( "fmt" "math/rand" ) type Job struct { // id Id int // 需要计算的随机数 RandNum int } type Result struct { // 这里必须传对象实例 job *Job // 求和 sum int } func main() { // 需要2个管道 // 1.job管道 jobChan := make(chan *Job, 128) // 2.结果管道 resultChan := make(chan *Result, 128) // 3.创建工作池 createPool(64, jobChan, resultChan) // 4.开个打印的协程 go func(resultChan chan *Result) { // 遍历结果管道打印 for result := range resultChan { fmt.Printf("job id:%v randnum:%v result:%d\n", result.job.Id, result.job.RandNum, result.sum) } }(resultChan) var id int // 循环创建job,输入到管道 for { id++ // 生成随机数 r_num := rand.Int() job := &Job{ Id: id, RandNum: r_num, } jobChan <- job } } // 创建工作池 // 参数1:开几个协程 func createPool(num int, jobChan chan *Job, resultChan chan *Result) { // 根据开协程个数,去跑运行 for i := 0; i < num; i++ { go func(jobChan chan *Job, resultChan chan *Result) { // 执行运算 // 遍历job管道所有数据,进行相加 for job := range jobChan { // 随机数接过来 r_num := job.RandNum // 随机数每一位相加 // 定义返回值 var sum int for r_num != 0 { tmp := r_num % 10 sum += tmp r_num /= 10 } // 想要的结果是Result r := &Result{ job: job, sum: sum, } //运算结果扔到管道 resultChan <- r } }(jobChan, resultChan) } }
定时器
Timer:时间到了,执行只执行1次
package main import ( "fmt" "time" ) func main() { // 1.timer基本使用 //timer1 := time.NewTimer(2 * time.Second) //t1 := time.Now() //fmt.Printf("t1:%v\n", t1) //t2 := <-timer1.C //fmt.Printf("t2:%v\n", t2) // 2.验证timer只能响应1次 //timer2 := time.NewTimer(time.Second) //for { // <-timer2.C // fmt.Println("时间到") //} // 3.timer实现延时的功能 //(1) //time.Sleep(time.Second) //(2) //timer3 := time.NewTimer(2 * time.Second) //<-timer3.C //fmt.Println("2秒到") //(3) //<-time.After(2*time.Second) //fmt.Println("2秒到") // 4.停止定时器 //timer4 := time.NewTimer(2 * time.Second) //go func() { // <-timer4.C // fmt.Println("定时器执行了") //}() //b := timer4.Stop() //if b { // fmt.Println("timer4已经关闭") //} // 5.重置定时器 timer5 := time.NewTimer(3 * time.Second) timer5.Reset(1 * time.Second) fmt.Println(time.Now()) fmt.Println(<-timer5.C) for { } }
Ticker:时间到了,多次执行
package main import ( "fmt" "time" ) func main() { // 1.获取ticker对象 ticker := time.NewTicker(1 * time.Second) i := 0 // 子协程 go func() { for { //<-ticker.C i++ fmt.Println(<-ticker.C) if i == 5 { //停止 ticker.Stop() } } }() for { } }
select多路复用
在某些场景下我们需要同时从多个通道接收数据。通道在接收数据时,如果没有数据可以接收将会发生阻塞。你也许会写出如下代码使用遍历的方式来实现:
for{ // 尝试从ch1接收值 data, ok := <-ch1 // 尝试从ch2接收值 data, ok := <-ch2 … }
这种方式虽然可以实现从多个通道接收值的需求,但是运行性能会差很多。为了应对这种场景,Go内置了select关键字,可以同时响应多个通道的操作。
select的使用类似于switch语句,它有一系列case分支和一个默认的分支。每个case会对应一个通道的通信(接收或发送)过程。select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句。具体格式如下:
select { case <-chan1: // 如果chan1成功读到数据,则进行该case处理语句 case chan2 <- 1: // 如果成功向chan2写入数据,则进行该case处理语句 default: // 如果上面都没有成功,则进入default处理流程 }
select可以同时监听一个或多个channel,直到其中一个channel ready
package main import ( "fmt" "time" ) func test1(ch chan string) { time.Sleep(time.Second * 5) ch <- "test1" } func test2(ch chan string) { time.Sleep(time.Second * 2) ch <- "test2" } func main() { // 2个管道 output1 := make(chan string) output2 := make(chan string) // 跑2个子协程,写数据 go test1(output1) go test2(output2) // 用select监控 select { case s1 := <-output1: fmt.Println("s1=", s1) case s2 := <-output2: fmt.Println("s2=", s2) } }
如果多个channel同时ready,则随机选择一个执行
package main import ( "fmt" ) func main() { // 创建2个管道 int_chan := make(chan int, 1) string_chan := make(chan string, 1) go func() { //time.Sleep(2 * time.Second) int_chan <- 1 }() go func() { string_chan <- "hello" }() select { case value := <-int_chan: fmt.Println("int:", value) case value := <-string_chan: fmt.Println("string:", value) } fmt.Println("main结束") }
Select可以用于判断管道是否存满
package main import ( "fmt" "time" ) // 判断管道有没有存满 func main() { // 创建管道 output1 := make(chan string, 10) // 子协程写数据 go write(output1) // 取数据 for s := range output1 { fmt.Println("res:", s) time.Sleep(time.Second) } } func write(ch chan string) { for { select { // 写数据 case ch <- "hello": fmt.Println("write hello") default: fmt.Println("channel full") } time.Sleep(time.Millisecond * 500) } }
并发安全和锁
有时候在Go代码中可能会存在多个goroutine同时操作一个资源(临界区),这种情况会发生竞态问题(数据竞态)。类比现实生活中的例子有十字路口被各个方向的的汽车竞争;还有火车上的卫生间被车厢里的人竞争。
举个例子:
package main import ( "fmt" "sync" ) var x int64 var wg sync.WaitGroup func add() { for i := 0; i < 5000; i++ { x = x + 1 } wg.Done() } func main() { wg.Add(1) go add() go add() wg.Wait() fmt.Println(x) } // 6310 (想要的结果应该是10000)
上面的代码中我们开启了两个goroutine去累加变量x的值,这两个goroutine在访问和修改x变量的时候就会存在数据竞争,导致最后的结果与期待的不符。
互斥锁
互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。
使用互斥锁来修复上面代码的问题:
package main import ( "fmt" "sync" ) var x int64 var wg sync.WaitGroup var lock sync.Mutex func add() { for i := 0; i < 5000; i++ { lock.Lock() // 加锁 x = x + 1 lock.Unlock() // 解锁 } wg.Done() } func main() { wg.Add(2) go add() go add() wg.Wait() fmt.Println(x) } // 10000
使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁;
当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。
读写互斥锁
互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。
读写锁分为两种:读锁和写锁。
当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;
当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。
读写锁示例:
package main import ( "fmt" "sync" "time" ) var ( x int64 wg sync.WaitGroup lock sync.Mutex rwlock sync.RWMutex ) func write() { // lock.Lock() // 加互斥锁 rwlock.Lock() // 加写锁 x = x + 1 time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒 rwlock.Unlock() // 解写锁 // lock.Unlock() // 解互斥锁 wg.Done() } func read() { // lock.Lock() // 加互斥锁 rwlock.RLock() // 加读锁 time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒 rwlock.RUnlock() // 解读锁 // lock.Unlock() // 解互斥锁 wg.Done() } func main() { start := time.Now() for i := 0; i < 10; i++ { wg.Add(1) go write() } for i := 0; i < 1000; i++ { wg.Add(1) go read() } wg.Wait() end := time.Now() fmt.Println(end.Sub(start)) }
需要注意的是读写锁非常适合读多写少的场景,如果读和写的操作差别不大,读写锁的优势就发挥不出来。
Sync
sync.WaitGroup
在代码中生硬的使用time.Sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步。 sync.WaitGroup有以下几个方法:
方法名 | 功能 |
(wg * WaitGroup) Add(delta int) | 计数器+delta |
(wg *WaitGroup) Done() | 计数器-1 |
(wg *WaitGroup) Wait() | 阻塞直到计数器变为0 |
sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。
我们利用sync.WaitGroup将上面的代码优化一下:
package main import ( "fmt" "sync" ) var wg sync.WaitGroup func hello() { defer wg.Done() fmt.Println("Hello Goroutine!") } func main() { wg.Add(1) go hello() // 启动另外一个goroutine去执行hello函数 fmt.Println("main goroutine done!") wg.Wait() } //main goroutine done! //Hello Goroutine!
需要注意sync.WaitGroup是一个结构体,传递的时候要传递指针。
sync.Once
说在前面的话:这是一个进阶知识点。
在编程的很多场景下我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、只关闭一次通道等。
Go语言中的sync包中提供了一个针对只执行一次场景的解决方案–sync.Once。
sync.Once只有一个Do方法,其签名如下:
func (o *Once) Do(f func()) {}
注意:如果要执行的函数f需要传递参数就需要搭配闭包来使用。
加载配置文件示例
延迟一个开销很大的初始化操作到真正用到它的时候再执行是一个很好的实践。因为预先初始化一个变量(比如在init函数中完成初始化)会增加程序的启动耗时,而且有可能实际执行过程中这个变量没有用上,那么这个初始化操作就不是必须要做的。我们来看一个例子:
var icons map[string]image.Image func loadIcons() { icons = map[string]image.Image{ "left": loadIcon("left.png"), "up": loadIcon("up.png"), "right": loadIcon("right.png"), "down": loadIcon("down.png"), } } // Icon 被多个goroutine调用时不是并发安全的 func Icon(name string) image.Image { if icons == nil { loadIcons() } return icons[name] }
多个goroutine并发调用Icon函数时不是并发安全的,现代的编译器和CPU可能会在保证每个goroutine都满足串行一致的基础上*地重排访问内存的顺序。loadIcons函数可能会被重排为以下结果:
func loadIcons() { icons = make(map[string]image.Image) icons["left"] = loadIcon("left.png") icons["up"] = loadIcon("up.png") icons["right"] = loadIcon("right.png") icons["down"] = loadIcon("down.png") }
在这种情况下就会出现即使判断了icons不是nil也不意味着变量初始化完成了。考虑到这种情况,我们能想到的办法就是添加互斥锁,保证初始化icons的时候不会被其他的goroutine操作,但是这样做又会引发性能问题。
使用sync.Once改造的示例代码如下:
var icons map[string]image.Image var loadIconsOnce sync.Once func loadIcons() { icons = map[string]image.Image{ "left": loadIcon("left.png"), "up": loadIcon("up.png"), "right": loadIcon("right.png"), "down": loadIcon("down.png"), } } // Icon 是并发安全的 func Icon(name string) image.Image { loadIconsOnce.Do(loadIcons) return icons[name] }
sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。
sync.Once 实现懒汉式单例模型
java实现单例模式
懒汉式单例模型--GO实现
package once_test import ( "fmt" "sync" "testing" "unsafe" ) type Singleton struct { data string } var singleInstance *Singleton var once sync.Once func GetSingletonObj() *Singleton { once.Do(func() { fmt.Println("Create Obj") singleInstance = new(Singleton) }) return singleInstance } func TestGetSingletonObj(t *testing.T) { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { obj := GetSingletonObj() fmt.Printf("%X\n", unsafe.Pointer(obj)) wg.Done() }() } wg.Wait() } //=== RUN TestGetSingletonObj //Create Obj //C0000CA000 //C0000CA000 //C0000CA000 //C0000CA000 //C0000CA000 //C0000CA000 //C0000CA000 //C0000CA000 //C0000CA000 //C0000CA000 //--- PASS: TestGetSingletonObj (0.00s)
sync.Map
Go语言中内置的map不是并发安全的。请看下面的示例:
var m = make(map[string]int) func get(key string) int { return m[key] } func set(key string, value int) { m[key] = value } func main() { wg := sync.WaitGroup{} for i := 0; i < 20; i++ { wg.Add(1) go func(n int) { key := strconv.Itoa(n) set(key, n) fmt.Printf("k=:%v,v:=%v\n", key, get(key)) wg.Done() }(i) } wg.Wait() }
上面的代码开启少量几个goroutine的时候可能没什么问题,当并发多了之后执行上面的代码就会报fatal error: concurrent map writes错误。
像这种场景下就需要为map加锁来保证并发的安全性了,Go语言的sync包中提供了一个开箱即用的并发安全版map–sync.Map。开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用。同时sync.Map内置了诸如Store、Load、LoadOrStore、Delete、Range等操作方法。
var m = sync.Map{} func main() { wg := sync.WaitGroup{} for i := 0; i < 20; i++ { wg.Add(1) go func(n int) { key := strconv.Itoa(n) m.Store(key, n) value, _ := m.Load(key) fmt.Printf("k=:%v,v:=%v\n", key, value) wg.Done() }(i) } wg.Wait() }
sync.Pool对象缓存
sync.Pool对象获取
- 尝试从私有对象获取
- 私有对象不存在,尝试从当前 Processor 的共享池获取
- 如果当前 Processor 共享池也是空的,那么就尝试去其他Processor 的共享池获取
- 如果所有子池都是空的,最后就用用户指定的 New 函数产生一个新的对象返回
- 如果私有对象不存在则保存为私有对象
- 如果私有对象存在,放⼊当前 Processor ⼦池的共享池中
sync.Pool 对象的生命周期
- GC 会清除 sync.pool 缓存的对象
- 对象的缓存有效期为下一次GC 之前
sync.Pool 总结
- 适合于通过复用,降低复杂对象的创建和 GC 代价
- 协程安全, 会有锁的开销
- 生命周期受 GC 影响,不适合于做连接池等,需自己管理生命周期的资源的池化
sync.Pool 示例
package object_pool import ( "fmt" "runtime" "sync" "testing" ) func TestSyncPool(t *testing.T) { pool := &sync.Pool{ New: func() interface{} { fmt.Println("Create a new object.") return 100 }, } v := pool.Get().(int) fmt.Println(v) pool.Put(3) runtime.GC() //GC 会清除sync.pool中缓存的对象 v1, _ := pool.Get().(int) fmt.Println(v1) } //GC前 //=== RUN TestSyncPool //Create a new object. //100 //3 //--- PASS: TestSyncPool (0.00s) //PASS //GC后 //=== RUN TestSyncPool //Create a new object. //100 //Create a new object. //100 //--- PASS: TestSyncPool (0.00s) //PASS func TestSyncPoolInMultiGroutine(t *testing.T) { pool := &sync.Pool{ New: func() interface{} { fmt.Println("Create a new object.") return 10 }, } pool.Put(100) pool.Put(100) pool.Put(100) var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { fmt.Println(pool.Get()) wg.Done() }(i) } wg.Wait() } //=== RUN TestSyncPoolInMultiGroutine //100 //100 //100 //Create a new object. //10 //Create a new object. //10 //Create a new object. //Create a new object. //10 //Create a new object. //10 //10 //Create a new object. //10 //Create a new object. //10 //--- PASS: TestSyncPoolInMultiGroutine (0.00s) //PASS
原子操作(atomic包)
原子操作
代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型我们还可以使用原子操作来保证并发安全,因为原子操作是Go语言提供的方法它在用户态就可以完成,因此性能比加锁操作更好。Go语言中原子操作由内置的标准库sync/atomic提供。
atomic包
方法 | 解释 |
func LoadInt32(addr int32) (val int32) func LoadInt64(addr `int64 ) (val int64)<br>func LoadUint32(addr uint32) (val uint32)<br>func LoadUint64(addr uint64) (val uint64)<br>func LoadUintptr(addr uintptr) (val uintptr)<br>func LoadPointer(addr unsafe.Pointer`) (val unsafe.Pointer) |
读取操作 |
func StoreInt32(addr *int32 , val int32)func StoreInt64(addr *int64 , val int64)func StoreUint32(addr *uint32 , val uint32)func StoreUint64(addr *uint64 , val uint64)func StoreUintptr(addr *uintptr , val uintptr)func StorePointer(addr *unsafe.Pointer , val unsafe.Pointer) |
写入操作 |
func AddInt32(addr *int32 , delta int32) (new int32)func AddInt64(addr *int64 , delta int64) (new int64)func AddUint32(addr *uint32 , delta uint32) (new uint32)func AddUint64(addr *uint64 , delta uint64) (new uint64)func AddUintptr(addr *uintptr , delta uintptr) (new uintptr) |
修改操作 |
func SwapInt32(addr *int32 , new int32) (old int32)func SwapInt64(addr *int64 , new int64) (old int64)func SwapUint32(addr *uint32 , new uint32) (old uint32)func SwapUint64(addr *uint64 , new uint64) (old uint64)func SwapUintptr(addr *uintptr , new uintptr) (old uintptr)func SwapPointer(addr *unsafe.Pointer , new unsafe.Pointer) (old unsafe.Pointer) |
交换操作 |
func CompareAndSwapInt32(addr *int32 , old, new int32) (swapped bool)func CompareAndSwapInt64(addr *int64 , old, new int64) (swapped bool)func CompareAndSwapUint32(addr *uint32 , old, new uint32) (swapped bool)func CompareAndSwapUint64(addr *uint64 , old, new uint64) (swapped bool)func CompareAndSwapUintptr(addr *uintptr , old, new uintptr) (swapped bool)func CompareAndSwapPointer(addr *unsafe.Pointer , old, new unsafe.Pointer) (swapped bool) |
比较并交换操作 |
atomic示例
我们填写一个示例来比较下互斥锁和原子操作的性能。
var x int64 var l sync.Mutex var wg sync.WaitGroup // 普通版加函数 func add() { // x = x + 1 x++ // 等价于上面的操作 wg.Done() } // 互斥锁版加函数 func mutexAdd() { l.Lock() x++ l.Unlock() wg.Done() } // 原子操作版加函数 func atomicAdd() { atomic.AddInt64(&x, 1) wg.Done() } func main() { start := time.Now() for i := 0; i < 10000; i++ { wg.Add(1) // go add() // 普通版add函数 不是并发安全的 // go mutexAdd() // 加锁版add函数 是并发安全的,但是加锁性能开销大 go atomicAdd() // 原子操作版add函数 是并发安全,性能优于加锁版 } wg.Wait() end := time.Now() fmt.Println(x) fmt.Println(end.Sub(start)) }
atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。
参考链接:Channel · Go语言中文文档