面向并发的内存模型
原子操作: 当数据操作满足 "不被中断 , 运行期间不会被线程调度机制打断" 的原子性时, 称之为原子操作 , 互斥锁就是原子操作实现的
sync.WaitGroup 用于主线程阻塞等待子线程完成执行后才继续执行(类似信号量) Add()计数器加 Done()计数器减 Wait()阻塞等到计数器为0
import ( "sync" ) var total struct { sync.Mutex value int } func worker(wg *sync.WaitGroup) { defer wg.Done() for i := 0; i <= 100; i++ { total.Lock() total.value += i total.Unlock() } } func main() { var wg sync.WaitGroup wg.Add(2) go worker(&wg) go worker(&wg) wg.Wait() fmt.Println(total.value) }
用互斥锁来保护数值型的共享资源 ,麻烦且效率低下。标准库的sync/atomic
包对原子操作提供了丰富的支持。
import ( "sync" "sync/atomic" ) var total uint64 func worker(wg *sync.WaitGroup) { defer wg.Done() var i uint64 for i = 0; i <= 100; i++ { atomic.AddUint64(&total, i) } } func main() { var wg sync.WaitGroup wg.Add(2) go worker(&wg) go worker(&wg) wg.Wait() }
在atomic包中对几种基础类型提供了原子操作,包括int32,int64,uint32,uint64,uintptr,unsafe.Pointer。对于每一种类型,提供了五类原子操作分别是
Add, 增加和减少
CompareAndSwap, 比较并交换
Swap, 交换
Load , 读取
Store, 存储
AddUint64原子性的将val的值添加到*addr并返回新值。
如要减去一个值c,调用AddUint64(&x, ^uint64(c-1));特别的,让x减1,调用AddUint64(&x, ^uint64(0))。
atomic.AddUint64
函数调用保证了total
的读取、更新和保存是一个原子操作,因此在多线程中访问也是安全的
单件模式: (又称单例模式)确保一个类只有一个实例,并提供一个全局访问点。
type Once struct { m Mutex done uint32 } func (o *Once) Do(f func()) { if atomic.LoadUint32(&o.done) == 1 { return } o.m.Lock() defer o.m.Unlock() if o.done == 0 { defer atomic.StoreUint32(&o.done, 1) f() } }
var ( instance *singleton once sync.Once ) func Instance() *singleton { once.Do(func() { instance = &singleton{} }) return instance }
sync/atomic
包对基本的数值类型及复杂对象的读写都提供了原子操作的支持。atomic.Value
原子对象提供了Load
和Store
两个原子方法,分别用于加载和保存数据,返回值和参数都是interface{}
类型,因此可以用于任意的自定义复杂类型。
var config atomic.Value // 保存当前配置信息 // 初始化配置信息 config.Store(loadConfig()) // 启动一个后台线程, 加载更新后的配置信息 go func() { for { time.Sleep(time.Second) config.Store(loadConfig()) } }() // 用于处理请求的工作者线程始终采用最新的配置信息 for i := 0; i < 10; i++ { go func() { for r := range requests() { c := config.Load() // ... } }() }
这是一个简化的生产者消费者模型:后台线程生成最新的配置信息;前台多个工作者线程获取最新的配置信息。所有线程共享配置信息资源。
顺序一致性内存模型
如果只是想简单地在线程之间进行数据同步的话,原子操作已经为编程人员提供了一些同步保障。不过这种保障有一个前提:顺序一致性的内存模型。
同一个Goroutine线程内部,顺序一致性内存模型是得到保证的。但是不同的Goroutine之间,并不满足顺序一致性内存模型,需要通过明确定义的同步事件来作为同步的参考。
func main() { done := make(chan int) go func(){ println("你好, 世界") done <- 1 }() <-done }
func main() { var mu sync.Mutex mu.Lock() go func(){ println("你好, 世界") mu.Unlock() }() mu.Lock()
生产者消费者模型
并发编程中最常见的例子就是生产者消费者模式,该模式主要通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
生产者生产一些任务放入任务队列中 , 消费者从该队列取出任务, 生产消费是两个异步过程,当没有任务时消费者则阻塞等待
// 生产者: 生成 factor 整数倍的序列 func Producer(factor int, out chan<- int) { for i := 0; ; i++ { out <- i*factor } } // 消费者 func Consumer(in <-chan int) { for v := range in { fmt.Println(v) } } func main() { ch := make(chan int, 64) // 成果队列 go Producer(3, ch) // 生成 3 的倍数的序列 go Producer(5, ch) // 生成 5 的倍数的序列 go Consumer(ch) // 消费 生成的队列 // 运行一定时间后退出 time.Sleep(5 * time.Second) }
靠休眠的方式无法保证稳定的输出结果的。
func main() { ch := make(chan int, 64) // 成果队列 go Producer(3, ch) // 生成 3 的倍数的序列 go Producer(5, ch) // 生成 5 的倍数的序列 go Consumer(ch) // 消费 生成的队列 // Ctrl+C 退出 sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) fmt.Printf("quit (%v)\n", <-sig) }
LInux信号类型 : https://blog.csdn.net/u012838045/article/details/80974341
发布订阅模型
发布者广播给所有订阅者订阅的内容 ,不管订阅者是否接收到 , 订阅者可以筛选是否为自己所订阅的内容
// Package pubsub implements a simple multi-topic pub-sub library. package pubsub import ( "sync" "time" ) type ( subscriber chan interface{} // 订阅者为一个管道 topicFunc func(v interface{}) bool // 主题为一个过滤器 ) // 发布者对象 type Publisher struct { m sync.RWMutex // 读写锁 buffer int // 订阅队列的缓存大小 timeout time.Duration // 发布超时时间 subscribers map[subscriber]topicFunc // 订阅者信息 } // 构建一个发布者对象, 可以设置发布超时时间和缓存队列的长度 func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { return &Publisher{ buffer: buffer, timeout: publishTimeout, subscribers: make(map[subscriber]topicFunc), } } // 添加一个新的订阅者,订阅全部主题 func (p *Publisher) Subscribe() chan interface{} { return p.SubscribeTopic(nil) } // 添加一个新的订阅者,订阅过滤器筛选后的主题 func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { ch := make(chan interface{}, p.buffer) p.m.Lock() p.subscribers[ch] = topic p.m.Unlock() return ch } // 退出订阅 func (p *Publisher) Evict(sub chan interface{}) { p.m.Lock() defer p.m.Unlock() delete(p.subscribers, sub) close(sub) } // 发布一个主题 func (p *Publisher) Publish(v interface{}) { p.m.RLock() defer p.m.RUnlock() var wg sync.WaitGroup //类似信号量,的计数器 for sub, topic := range p.subscribers { //遍历订阅者 wg.Add(1) //计数器+1 go p.sendTopic(sub, topic, v, &wg) //给 } wg.Wait() //阻塞等待子线程把计数器清零 } // 关闭发布者对象,同时关闭所有的订阅者管道。 func (p *Publisher) Close() { p.m.Lock() defer p.m.Unlock() for sub := range p.subscribers { delete(p.subscribers, sub) close(sub) } } // 发送主题,可以容忍一定的超时 func (p *Publisher) sendTopic( sub subscriber, //传入订阅者map的key(订阅者管道) topic topicFunc, //传入订阅者map的value(主题过滤,过滤函数) v interface{}, //发布的主题 wg *sync.WaitGroup, //计数器 ) { defer wg.Done() //计数器-1 if topic != nil && !topic(v) { //过滤函数不为nil 且 调用绑定的判断函数,如果不是订阅的内容这不发送 return } select { case sub <- v: //给每个订阅者发送主题 case <-time.After(p.timeout): } }
package main import ( "fmt" "strings" "test/pubsub" "time" ) func main() { p := pubsub.NewPublisher(100*time.Millisecond, 10) defer p.Close() all := p.Subscribe() //添加一个订阅者(管道)订阅全部主题 //添加一个被筛选过的主题(管道) golang := p.SubscribeTopic(func(v interface{}) bool { //该管道绑定一个判断函数(判断是否订阅主题) if s, ok := v.(string); ok { //类型断言,判断是否为string类型 return strings.Contains(s, "golang") //判断字符串是否为 golang } return false }) //发布主题 p.Publish("hello, world!") p.Publish("hello, golang!") go func() { for msg := range all { fmt.Println("all:", msg) } } () go func() { for msg := range golang { fmt.Println("golang:", msg) } } () // 运行一定时间后退出 time.Sleep(3 * time.Second) }
控制并发数
虽然goroutine轻量占用内存少可以并发大量协程, 但如果不控制数量短时间内系统负载暴增 将极大的占用CPU及内存 导致系统卡顿 程序崩溃 等
在Go语言自带的godoc程序实现中有一个vfs
的包对应虚拟的文件系统,在vfs
包下面有一个gatefs
的子包,gatefs
子包的目的就是为了控制访问该虚拟文件系统的最大并发数。
控制访问该虚拟文件系统的最大并发数
import ( "golang.org/x/tools/godoc/vfs" "golang.org/x/tools/godoc/vfs/gatefs" ) func main() { fs := gatefs.New(vfs.OS("/path"), make(chan bool, 8)) // ... }vfs.OS("/path")
基于本地文件系统构造一个虚拟的文件系统,然后gatefs.New
基于现有的虚拟文件系统构造一个并发受控的虚拟文件系统。
并发数控制的原理就是通过带缓存管道的发送和接收规则来实现最大并发阻塞
var limit = make(chan int, 3) func main() { for _, w := range work { go func() { limit <- 1 w() <-limit }() } select{} } 不过gatefs对此做一个抽象类型gate,增加了enter和leave方法分别对应并发代码的进入和离开。当超出并发数目限制的时候,enter方法会阻塞直到并发数降下来为止。 type gate chan bool func (g gate) enter() { g <- true } func (g gate) leave() { <-g } type gatefs struct { fs vfs.FileSystem //虚拟文件系统 gate //管道控制访问量 } func (fs gatefs) Lstat(p string) (os.FileInfo, error) { fs.enter() defer fs.leave() return fs.fs.Lstat(p) }
我们不仅可以控制最大的并发数目,而且可以通过带缓存Channel的使用量和最大容量比例来判断程序运行的并发率。当管道为空的时候可以认为是空闲状态,当管道满了时任务是繁忙状态,这对于后台一些低级任务的运行是有参考价值的。
Go语言中不同Goroutine之间主要依靠管道进行通信和同步。要同时处理多个管道的发送或接收操作,我们需要使用select
关键字(这个关键字和网络编程中的select
函数的行为类似)。当select
有多个分支时,会随机选择一个可用的管道分支,如果没有可用的管道分支则选择default
分支,否则会一直保存阻塞状态。
func worker(cannel chan bool) { for { select { default: fmt.Println("hello") // 正常工作 case <-cannel: // 退出 } } } func main() { cancel := make(chan bool) for i := 0; i < 10; i++ { go worker(cancel) } time.Sleep(time.Second) close(cancel) }
在Go1.7发布时,标准库增加了一个context
包,用来简化对于处理单个请求的多个Goroutine之间与请求域的数据、超时和退出等操作。
func worker(wg *sync.WaitGroup, cannel chan bool) { defer wg.Done() for { select { default: fmt.Println("hello") case <-cannel: return } } } func main() { cancel := make(chan bool) var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go worker(&wg, cancel) } time.Sleep(time.Second) close(cancel) wg.Wait() }
当并发体超时或main
主动停止工作者Goroutine时,每个工作者都可以安全退出。