Go并发编程学习总结

文章目录

Go语言的并发模型

线程模型

  • 线程的实现模型主要有3个:用户级线程模型、内核级线程模型和两极线程模型.

内核级线程模型

Go并发编程学习总结

  • 用户线程与内核调度实体(KSE)是1:1关系,大部分编程语言的线程库都是对操作系统内核级线程的一层封装,创建出来的每个线程与一个不同的KSE静态关联,因此其调度完全由OS调度器来做。

  • 优点:

    充分利用CPU,实现真正的并行

    缺点:

    没创建一个用户级线程都系要创建一个内核级线程与其对应,这样创建线程的开销比较大,会影响到应用程序的性能。

用户级线程模型

Go并发编程学习总结

  • 用户线程与KSE是多对1关系,一个进程中所有创建的线程都与一个KSE在运行时动态关联

  • 许多语言的协程就是这种方式,上下文切换代价小,但容易出现调用阻塞的致命问题

  • 优点:

    上下文切换都发生在用户空间,避免的模态切换,从而对于性能有积极的影响

    缺点:

    同时只有一个处理器被利用,解决了并发问题,但是没有解决并行问题,如果线程因为I/O操作陷入了内核态,所有线程都将被阻塞

两极线程模型

Go并发编程学习总结

  • 用户线程与KSE是多对多关系(M:N),综合上边两种的优势,多个用户线程可以与一个KSE关联减少上下文切换开销,当有线程发生阻塞时,其余线程可以与其他的KSE关联
  • 该模型也被称为混合型线程模型,即用户调度器实现用户线程到KSE的调度,内核调度器实现KSE到CPU上的调度

signal处理和优雅退出守护进程

  1. Golng中的信号处理

    • 系统信号处理主要涉及os包,os.signal包以及syscall包,其中最主要的函数是signal包中的Notify函数:

      func Notify(c chan<-os.Signal, sig ...os.Signal)

      该函数会将进程收到的系统信号发送给channel。

  2. Kill pid 与kill -9 pid的区别

    • kill pid 对应进程号发送结束信号,可以被应用程序捕获,处理, 结束进程推荐使用该方式
    • kill -9 pid: 不能被应用程序捕获,立即结束程序
    • 如果结束的进程是一个创建过子进程的父进程,那么其子进程就会成孤儿进程
  3. 应用程序如何优雅退出

    1. 注册SIGTERM信号的处理函数并在处理函数中做一些进程退出的准备,注册可以通过signal() 或 sigaction()实现。通常在处理函数中设置一个bool型的flag变量以表明进程收到了SIGTERM信号,准备退出。
    2. 在主进程中阻塞检测flag变量,一旦为true, 释放资源或打印日志,最后退出。
  4. Go中的Signal发送和处理

    • 对信号的处理主要使用os/signal包中的两个方法

    • notify 方法用来监听收到的信号

    • stop方法用来取消监听

    • 监听全部信号

      package main
      
      import (
         "fmt"
         "os"
         "os/signal"
      )
      
      func main() {
         c := make(chan os.Signal)
         signal.Notify(c)
         fmt.Println("启动")
         s := <-c
         fmt.Println("退出信号",s)
      }
      
    • 监听指定信号

      package main
      
      import (
         "fmt"
         "os"
         "os/signal"
         "syscall"
      )
      
      func main() {
         c := make(chan os.Signal)
         signal.Notify(c, os.Interrupt, os.Kill, syscall.SIGUSR1, syscall.SIGUSR2)
         fmt.Println("启动")
         s := <-c
         fmt.Println("退出信号",s)
      }
      
    • 优雅退出守护进程

      package main
      
      import (
         "fmt"
         "os"
         "os/signal"
         "syscall"
         "time"
      )
      
      func main() {
         // 创建监听退出chan
         c := make(chan os.Signal)
         // 监听指定信号
         signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM,
            syscall.SIGQUIT)
         // linux 下
         //signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM,
         // syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2)
      
         go func() {
            for s := range c {
               switch s {
               case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
                  fmt.Println("退出", s)
                  ExitFunc()
               //case syscall.SIGUSR1:
               // fmt.Println("usr1", s)
               //case syscall.SIGUSR2:
               // fmt.Println("usr2", s)
               default:
                  fmt.Println("other", s)
               }
            }
         }()
         fmt.Println("进程启动...")
         sum := 0
         for true {
            sum++
            fmt.Println("sum:", sum)
            time.Sleep(time.Second)
         }
      
      }
      
      func ExitFunc() {
         fmt.Println("开始退出。。。")
         fmt.Println("执行清理。。。")
         fmt.Println("结束退出。。。")
         os.Exit(0)
      }
      

Go并发与CSP模型

  • CSP(Communicating Sequential Process)顺序通信进程,一种并发编程模型,描述两个独立的并发实体通过共享的通讯channel(管道)进行通信,不关注发送消息的实体,而关注与发送消息时使用的channel.

  • Go通过goroutine与channel实现并发,goroutine类似协程coroutine,与coroutine的区别在于,能够在发现阻塞后启动新的微线程

  • Goroutine是实际并发执行的实体,底层使用协程coroutine实现并发

    coroutine是一种运行在用户态的用户线程,具有以下优点:

    • 用户空间避免了内核态和用户态的切换导致的成本
    • 可以由语言和框架层进行调度
    • 更小的栈空间允许创建大量的实例
  • Go并发调度:G-P-M模型, 可以让多核CPU中每个CPU执行一个协程:

    Go并发编程学习总结

Go并发调度:G-P-M模型

  • goroutine实现了两级线程模型:G-P-M模型,该模型有4个重要结构M、G、P、Sched:

    • Sched结构就是调度器,它维护有存储M和G的队列以及调度器的一些状态信息等
    • M结构是Machine,系统线程,它由操作系统管理的,goroutine就是跑在M之上的;M是一个很大的结构,里边维护小对象内存cache、当前执行的goroutine、随机数发生器等等非常多的信息,一个M对应一个CPU核
    • P结构是Processor,处理器,执行goroutine,维护一个goroutine队列
    • G是goroutine实现的核心,包含栈,指令指针,以及其他对调度goroutine很重要的信息,例如其阻塞的channel
  • Processor的数量可以通过环境变量设置或者运行时设置,Processor数量固定意味着任意时刻只有对应数量的线程在执行代码

  • 一个M对应一个P,一个P执行并维护一个goroutine队列,如果机器有多核并设置P为多个,同时就会有多个M:

    Go并发编程学习总结

线程阻塞

  • 当正在运行的goroutine阻塞的时候,会再创建一个系统线程(M1),当前的M线程放弃它的Processor,P转到新的线程中去:

    Go并发编程学习总结

runqueue执行完成

  • 当其中一个Processor的runqueue为空,没有goroutine可以调度,它会从另外一个上下文偷取一半的goroutine:

    Go并发编程学习总结

Goroutine调度原理图

Go并发编程学习总结

runtime包

  • go编译器产生的是本地可执行代码,这些代码仍旧运行在Go的runtime当中。
  • runtime 类似java的虚拟机,负责管理包括内存分配、垃圾回收、栈处理、goroutine、channel、切片、map、反射等等

常用函数

  • NumCPU: 返回当前系统的CPU核数量

  • GOMAXPROCS: 设置最大可同时使用的CPU核数

    package main
    
    import (
       "fmt"
       "runtime"
    )
    
    func main() {
       // 获取cpu的数量
       fmt.Println("逻辑CPU的核数:", runtime.NumCPU())
    
       // 设置go 程序执行的最大的:[1,256]
       n := runtime.GOMAXPROCS(2)
       fmt.Println(n)
    }
    
  • Goshed:让当前线程让出cpu以让其他线程运行,它不会挂起当前线程,因为当前线程未来会继续执行

    package main
    
    import (
       "fmt"
       "runtime"
    )
    
    func main() {
       go func() {
          for i := 0; i < 5; i++ {
             fmt.Println("Goroutine...")
          }
       }()
       for i := 0; i < 4; i++ {
          runtime.Gosched()
          fmt.Println("main..")
       }
    
       //Goroutine...
       //Goroutine...
       //Goroutine...
       //Goroutine...
       //Goroutine...
       //main..
       //main..
       //main..
       //main..
    
    }
    
  • NumGoroutine :返回正在执行和排队的任务总数

  • Goexit: 终止协程

    package main
    
    import (
       "fmt"
       "runtime"
       "time"
    )
    
    func main() {
       go func() {
          fmt.Println("goroutine开始。。。")
          fun()
          fmt.Println("goroutine结束")
       }()
       time.Sleep(3*time.Second)
    }
    
    //goroutine开始。。。
    //defer...
    
    func fun(){
       defer fmt.Println("defer...")
       runtime.Goexit()
       fmt.Println("func函数")
    }
    
  • runtime.GC :会让运行时系统进行一次强制性的垃圾收集

  • GOROOT :获取goroot目录

    package main
    
    import (
       "fmt"
       "runtime"
    )
    
    func main() {
       fmt.Println("GOROOT->", runtime.GOROOT())
       fmt.Println("os/platform->", runtime.GOOS)
    }
    
  • GOOS : 查看目标操作系统

创建协程池

  • goroutine是一个轻量级的线程,它的创建、调度都是在用户态进行,并不需要进入内核,这意味着开销是很小的所以,一般使用goroutine是不会考虑使用协程池的

  • 对于一些生产者消费者模型,就需要使用到协程池的实现:

    例如:随机生成数字,计算数字各个位数相加之和:

    package main
    
    import (
       "fmt"
       "math/rand"
    )
    
    type Job struct {
       Id int
       RandNum int
    }
    
    type Result struct {
       job *Job
       sum int
    }
    
    
    func main() {
       // 需要2个管道
       // 1.job管道
       jobChan := make(chan *Job, 128)
       // 2. 结果管道
       resultChan := make(chan *Result, 128)
    
       // 3. 创建工作池
       createPool(64, jobChan, resultChan)
       // 4. 开个打印的协程
       go func(resultChan chan *Result) {
          for result := range resultChan {
             fmt.Printf("job id %v random:%v result:%d\n", result.job.Id,
                result.job.RandNum, result.sum)
          }
       }(resultChan)
       var id int
       // 循环创建job, 输入到管道
       for {
          id++
          // 生成随机数
          r_num := rand.Int()
          job := &Job{
             Id:      id,
             RandNum: r_num,
          }
          jobChan <- job
       }
    }
    
    // 创建工作池
    // num: 开几个协程
    func createPool(num int, jobChan chan *Job, resultChan chan *Result) {
       for i := 0; i < num; i++ {
          go func(jobChan chan *Job, resultChan chan *Result) {
             //遍历job 管道所有数据,进行相加
             for job := range jobChan {
                r_num := job.RandNum
                var sum int
                for r_num != 0 {
                   tmp := r_num % 10
                   sum += tmp
                   r_num /= 10
                }
                r := &Result{
                   job: job,
                   sum: sum,
                }
                resultChan <- r
             }
          }(jobChan, resultChan)
       }
    }
    
  • 不断创建goroutine而不去关闭,会导致内存溢出的问题,我们在使用goroutine的时候一定要知道什么时候能够退出和控制goroutine

    使用goroutine导致内存溢出的案例:

package main

import (
   "fmt"
   "runtime"
)

// 模仿耗时的业务代码
func consumer(ch chan int) {
   for {
      data := <- ch
      fmt.Println(data)
   }
}

func main() {
   // 用于goroutine通信
   ch := make(chan int)
   for {
      var dummy string
      // 获取输入,模拟进程持续运行
      fmt.Scan(&dummy)
      // 使用goroutine运行耗时业务代码
      go consumer(ch)
      // 输出现在的goroutine数量
      fmt.Println("goroutines:", runtime.NumGoroutine())
   }
}

正确写法需要指定退出goroutine的时机:

package main

import (
   "fmt"
   "runtime"
)

// 模仿耗时的业务代码
func consumer(ch chan int) {
   for {
      data := <- ch
      if data == 0 {
         break
      }
      fmt.Println(data)
   }
   fmt.Println("goroutine exit")
}

func main() {
   // 用于goroutine通信
   ch := make(chan int)
   for {
      var dummy string
      // 获取输入,模拟进程持续运行
      fmt.Scan(&dummy)
      if dummy == "quit" {
         // 设置子 goroutine退出标志
         for i := 0; i < runtime.NumGoroutine()-1; i++ {
            ch <- 0
         }
      }
      // 使用goroutine运行耗时业务代码
      go consumer(ch)
      // 输出现在的goroutine数量
      fmt.Println("goroutines:", runtime.NumGoroutine())
   }
}

Channel

channel基本使用

  • goroutine通过channel实现通信,可以把channel理解为一个通信队列

  • channel是引用类型,会被垃圾回收机制回收,空值为nil, 只有通过make初始化的channel才可以使用

  • 无缓冲通道

    • 创建:make(chan interface{})
    • 使用无缓冲的通道必须先接收才能发送,否则报deadlock
    • 使用无缓冲通道进行通信将导致发送和接收的goroutine同步化,一般使用高特性保证多个goroutine同步
  • 有缓冲的通道

    • 创建:make(chan interface{} 2)
    • 不要求先接收才能发送,当先发送达到缓冲数量时,如果没有接收会导致发送阻塞
  • 关闭channel

    • 当channel不再使用的时候,一定记得使及时用close()关闭
    • 一般使用for range遍历获取channel中的值,或者x, ok := <- channel从channel中取值时第二个参数可以判断channel是否关闭
    • 关闭后的channel有以下特点:
      1. 对一个关闭的通道再发送值就会导致panic
      2. 对一个关闭的通道进行接收会一直获取直到通道为空
      3. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值
      4. 关闭一个已经关闭的channel会导致panic
  • 单向通道

    • channel作为函数参数传参格式chan <- interface{}:只接收值的channel

      <- chan interface{}:只写入值的channel

      作为参数传递为引用类型

    • 一般通过单向通道实现生产者消费者模型:

      package main
      
      import "fmt"
      
      func producer(out chan <- int) {
         for i := 0; i < 10; i++ {
            out <- i * i
         }
         close(out)
      }
      
      func consumer(in <- chan int) {
         for num := range in {
            fmt.Println("num=", num)
         }
      }
      
      
      func main() {
         ch := make(chan int)
         go producer(ch)
         consumer(ch)
      }
      

channel超时机制

  • 当channel中没有值的时候,再从channel中获取值会阻塞,可以通过select加上单向通道技巧实现channel超时机制:

    package main
    
    import (
       "fmt"
       "time"
    )
    
    func main() {
       ch := make(chan int)
       quit := make(chan bool)
       go func() {
          for {
             select {
             case num := <-ch:
                fmt.Println("num =", num)
             case <- time.After(3*time.Second):
                fmt.Println("超时")
                quit <- true
             }
          }
       }()
       for i := 0; i < 5; i++ {
          ch <- i
          time.Sleep(time.Second)
       }
       <- quit
       fmt.Println("程序结束")
    }
    

channel使用细节和注意事项

  1. channel 可以声明为只读,或者只写性质

    package main
    
    import "fmt"
    
    func main() {
       var ch chan int
       ch = make(chan int, 10)
       exitChan := make(chan struct{}, 2)
       go send(ch, exitChan)
       go recv(ch, exitChan)
    
       var total = 0
       for _ = range exitChan {
          total++
          if total == 2 {
             break
          }
       }
       fmt.Println("结束。。。")
    }
    
    func send(ch chan <- int, exitChan chan struct{}) {
       for i := 0; i < 10; i++ {
          ch <- i
       }
       close(ch)
       var a struct{}
       exitChan <- a
    }
    
    func recv(ch <- chan int, exitChan chan struct{}) {
       for {
          v, ok := <- ch
          if !ok {
             break
          }
          fmt.Println(v)
       }
       var a struct{}
       exitChan <- a
    }
    
  2. 使用select 可以解决从管道取数据的阻塞问题

    package main
    
    import (
       "fmt"
       "time"
    )
    
    func main() {
       // 1. 定义一个管道10 个int数据
       intChan := make(chan int, 10)
       for i := 0; i < 10; i++ {
          intChan<- i
       }
    
       // 2. 定义一个管道5个string 数据
       stringChan := make(chan string, 5)
    
       for i := 0; i < 5; i++ {
          stringChan <- "hello" + fmt.Sprintf("%d", i)
       }
    
       // 传统的方法在遍历管道时,如果不关闭会阻塞而导致deadlock
       // 在实际开发中很多情况我们并不能确定什么时候需要关闭channel
       // 就可以使用select 方式解决channel关闭问题
       for {
          select {
          case v := <- intChan:
             fmt.Println(v)
             time.Sleep(time.Second)
          case v := <-stringChan:
             fmt.Println(v)
             time.Sleep(time.Second)
          default:
             fmt.Println("over!")
             time.Sleep(time.Second)
             return
          }
       }
    }
    
  3. goroutine 中使用recover, 解决协程中出现panic导致程序崩溃问题

    package main
    
    import (
       "fmt"
       "time"
    )
    
    func main() {
       go sayHello()
       go test()
       for i := 0; i < 10; i++ {
          fmt.Println("main() ok=", i)
          time.Sleep(time.Second)
       }
    }
    
    func sayHello() {
       for i := 0; i < 10; i++ {
          time.Sleep(time.Second)
          fmt.Println("hello, world")
       }
    }
    
    func test() {
       // 使用defer + recover
       defer func() {
          if err := recover(); err != nil {
             fmt.Println("test() 发生错误", err)
          }
       }()
       // 定义一个nil的map, 抛出异常
       var wrongMap map[int]string
       wrongMap[0] = "golang"
    }
    
  4. channel 发送和接收元素的本质是:值的拷贝

    package main
    
    import (
       "fmt"
       "time"
    )
    
    type user struct {
       name string
       age int8
    }
    var u = user{name: "Ankur", age: 25}
    var g = &u
    
    func modifyUser(pu *user) {
       fmt.Println("modifyUser Received Value", pu)
       pu.name = "Anand"
    }
    
    func printUser(u <- chan *user) {
       time.Sleep(2 * time.Second)
       fmt.Println("printUser goRoutine called", <- u)
    }
    
    func main() {
       c := make(chan *user, 2)
       // g 是指向u的地址,将g 发送至channel,就是将g的值(指向u的地址)copy发送
       c <- g
       fmt.Println(g)
       // 修改g指向的u的地址,g的值改变了
       g = &user{
          name: "Ankur annad",
          age:  100,
       }
       // 从channel中取出g的值并不受上边的改变而改变
       // printUser goRoutine called &{Ankur 25}
       go printUser(c)
       // 不使用channel, 直接通过参数传递g, 然后修改g的值
       //modifyUser Received Value &{Ankur annad 100}
       go modifyUser(g)
       time.Sleep(5 * time.Second)
       // g的值被改变了
       // &{Anand 100}
       fmt.Println(g)
    }
    
  5. channel 可能会引发goroutine 泄露:goroutine 操作 channel 后,处于发送或接收阻塞状态,而 channel
    处于满或空的状态,一直得不到改变。同时,垃圾回收器也不会回收此类资源,进而
    导致 gouroutine 会一直处于等待队列中

  6. 停止信号:channel 结合time 可以实现超时控制,与定时执行:

    select {
    // 100 ms后还没有从s.stopc获取数据就超时退出
    case <-time.After(100 * time.Millisecond):
    case <-s.stopc:
       return false
    }
    
    // 没隔一秒执行定时任务
    func worker() {
       ticker := time.Tick(1 * time.Second)
       for {
          select {
          case <-ticker:
             // 执行定时任务
             fmt.Println("执行1s定时任务")
          }
       }
    }
    
  7. 解耦生产方和消费方

    package main
    
    import (
       "fmt"
       "time"
    )
    
    func main() {
       taskCh := make(chan int, 100)
       go worker(taskCh)
       // 往任务channel中添加任务
       for i := 0; i < 10; i++ {
          taskCh <- i
       }
       // 阻塞主线程
       select {
       case <-time.After(time.Hour):
       }
    }
    
    func worker(taskChan <- chan int) {
       const N = 5
       // 启动5个工作协程
       for i := 0; i < N; i++ {
          go func(id int) {
             for {
                task := <- taskChan
                fmt.Printf("finish task: %d by worker %d\n", task, id)
                time.Sleep(time.Second)
             }
          }(i)
       }
    }
    
  8. 控制并发数

    var limit = make(chan int, 3)
    func main() {
       // …………
       for _, w := range work {
          go func() {
             limit <- 1
             w()
              // 如果上边的w()逻辑发生异常,将不会从limit 中取数据,需要结合defer 取出数据
             <-limit
          }()
       }
       // …………
    }
    
    • 需要注意limit 要放在goroutine的内部:

      如果放在外层,就是控制goroutine的数量,可能会阻塞for 循环,影响业务逻辑。

channel 与 Time

定时器有两种:

  • Timer: 一次性的时间触发事件

    Timer 三要素:

    定时时间:d

    触发动作:f

    时间channel: t

    常见的创建方式:

    t := time.NewTimer(d)
    t := time.AfterFunc(d, f)
    c := time.After(d)
    
  • Ticker: 按一定时间间隔持续触发时间事件

time.NewTimer()

  • 创建一个新的计时器,该计时器将在其通道上至少持续d 之后发送当前时间
  • 当从timer 的channel 中获取值时,将阻塞对应的时间
package main

import (
   "fmt"
   "time"
)

func main() {
   // 新建一个计时器
   timer := time.NewTimer(3 * time.Second)
   fmt.Printf("%T\n", timer)
   fmt.Println(time.Now())
   // 将等待channel 中的信号, 阻塞三秒
   ch2 := timer.C
   fmt.Println(<-ch2)
}

timer.Stop

  • 停止调计时器
package main

import (
   "fmt"
   "time"
)

func main() {
   // 新建一个计时器
   timer := time.NewTimer(5 * time.Second)

   // 在另外的线程中触发计时器
   go func() {
      <-timer.C
      fmt.Println("Timer结束")
   }()
   // 由于上边的计时器是在另外的线程中触发,主线程的代码将继续执行
   time.Sleep(3*time.Second)
   //sleep 3 秒后timer 并没有结束, 使用stop 停止计时器
   stop := timer.Stop()
   if stop {
      fmt.Println("timer 停止")
   }
}

time.After()

  • 返回一个time 类型的channel ,存储的是d时间间隔后的时间
package main

import (
   "fmt"
   "time"
)

func main() {
   ch1 := time.After(3 * time.Second)
   fmt.Printf("%T\n", ch1)
   fmt.Println(time.Now())
   time2 := <- ch1
   fmt.Println(time2)
}

优雅关闭channel

  • 在使用channel的时候,通常无法获知channel是否关闭关闭一个closed的channel 会导致panic, 向一个closed的channel中发送数据也会导致panic

  • 优雅关闭channel方案:发送者关闭channel

  • 发送者关闭channel 分为4中情况:

    1. 一个sender, 一个receiver, 发送者发送完毕直接关闭

    2. 一个sender, M 个receiver,发送者发送完毕直接关闭

    3. N 个 sender, 一个receiver,增加一个传递关闭信号的channel, receiver 通过信号channel 下达关闭数据channel指令。sender 监听到关闭信号后,停止发送数据。

      package main
      
      import (
      	"fmt"
      	"math/rand"
      	"time"
      )
      
      func main() {
      	rand.Seed(time.Now().UnixNano())
      	const Max = 100000
      	const NumSenders = 1000
      	dataChan := make(chan int, 100)
      	stopChan := make(chan struct{})
      
      	// sender
      	for i := 0; i < NumSenders; i++ {
      		go func() {
      			select {
      			case <-stopChan:
      				// sender 接收到关闭dataChan 的信号,没有主动关闭channel,而是直接return
      				// 当channel没有被goroutine 使用时会被gc回收
      				// 此处优雅的体现了让gc回收channel
      				return
      			case dataChan <- rand.Intn(Max):
      			}
      		}()
      	}
      
      	// receiver
      	go func() {
      		for value := range dataChan {
      			if value == Max-1 {
      				fmt.Println("send stop signal to senders.")
      				close(stopChan)
      				return
      			}
      			fmt.Println(value)
      		}
      	}()
      	select {
      	case <- time.After(time.Hour):
      	}
      }
      
    4. N 个 sender, M个receiver, 同样需要一个传递关闭信号的channel, 但由于多个receiver 信号channel 不能由receiver 直接关闭,否则会panic

      此时需要引入一个中间人,M个receiver 都会向它发送关闭dataChan的“请求”, 中间人收到第一个请求后,就会直接下达关闭dataChan的指令, N个sender 也可以向中间人发送关闭dataChan的指令。

      package main
      
      import (
      	"fmt"
      	"math/rand"
      	"strconv"
      	"time"
      )
      
      func main() {
      	rand.Seed(time.Now().UnixNano())
      	const Max = 100000
      	const NumReceivers = 10
      	const NumSenders = 1000
      	dataChan := make(chan int, 100)
      	stopChan := make(chan struct{})
      	// toStop: 就是向中间人发送请求的channel, 必须为带缓存的channel,
      	// 否则直接向中间人channel发送消息会导致panic异常
      	toStopChan := make(chan string, 1)
      	var stoppedBy string
      	// 中间人
      	go func() {
      		stoppedBy = <- toStopChan
      		fmt.Println(stoppedBy)
      		close(stopChan)
      	}()
      	// 发送者
      	for i := 0; i < NumSenders; i++ {
      		go func(id string) {
      			for {
      				value := rand.Intn(Max)
      				if value == 0 {
      					select {
      					case toStopChan <- "sender#" + id:
      					default:   // 防止向toStopChan写入阻塞加上default
      					}
      					return
      				}
      				select {
      				case <-stopChan:
      					return
      				case dataChan <- value:
      				}
      			}
      		}(strconv.Itoa(i))
      	}
      	// 接收者
      	for i := 0; i < NumReceivers; i++ {
      		go func(id string) {
      			for  {
      				select {
      				case <-stopChan:
      					return
      				case value := <- dataChan:
      					if value == Max - 1 {
      						select {
      						case toStopChan <- "receiver#" + id + "#" + strconv.Itoa(value):
      						default:    // 防止向toStopChan写入阻塞加上default
      						}
      						return
      					}
      					//fmt.Println(value)
      				}
      			}
      		}(strconv.Itoa(i))
      	}
      	select {
      	case <-time.After(time.Hour):
      	}
      }
      

死锁、活锁和饥饿概述

死锁

  • 多个线程因争夺资源造成互相等待的现象,若无外力将处于死锁状态

  • 发生死锁的条件:

    • 互斥条件

      线程对资源的访问是排他性的,如果一个线程占用了资源,其他线程都等待该线程释放资源

    • 请求和保持条件

      线程T1已经占用了资源R1,但又提出使用资源R2请求,但是R2已经被其他线程占用,导致T1也必须等待,但又对自己保持的资源R1不释放

    • 不剥夺条件

      线程已获得的资源,在未使用完之前,不能被其他线程剥夺,只能在使用完以后由自己释放

    • 环路等待条件

      即:{p0,p1,p2,…pn},进程 p0(或线程)等待 p1 占用的资源,p1 等待 p2 占用的资源,pn 等待 p0 占用的资源。

  • 解决方案

    • 约定访问顺序
    • 在同一个事务中,尽可能做到一次锁定获取所需的资源
    • 对于容易产生死锁的业务场景,尝试升级锁颗粒度,使用表级锁
    • 采用分布式事务锁或者使用乐观锁

活锁

  • 例如线程 1 可以使用资源,但它很礼貌,让其他线程先使用资源,线程 2 也可以使用资源,但它同样很绅士,也让其他线程先使用资源。就这样你让我,我让你,最后两个线程都无法使用资源
  • 活锁通常发生在处理事务消息中,如果不能成功处理某个消息,那么消息处理机制将回滚事务,并将它重新放到队列的开头。这样,错误的事务被一直回滚重复执行,这种形式的活锁通常是由过度的错误恢复代码造成的,因为它错误地将不可修复的错误认为是可修复的错误。
  • 解决方案
    • 在重试机制中引入随机性

饥饿

  • 与锁使用的粒度有关,指一个可运行的进程尽管能继续执行,但被调度器无期限的忽视,而不能被调度执行的情况

Go语言竞争状态

  • 有并发就有资源竞争,解决了资源竞争的问题,并发也就没那么复杂
  • go build命令中多加了一个-race标志,这样生成的可执行程序自带了检测资源竞争的功能

锁住共享资源

原子函数

  • 原子函数能够以很底层的加锁机制来同步访问整型变量和指针

    • atomic.AddInt64多个goroutine间安全的加同一个变量

      package main
      
      import (
         "fmt"
         "runtime"
         "sync"
         "sync/atomic"
      )
      
      var (
         counter int64
         wg sync.WaitGroup
      )
      
      func main() {
         wg.Add(2)
         go incCounter(1)
         go incCounter(2)
         wg.Wait()  // 等待goroutine结束
         fmt.Println(counter)
      }
      
      func incCounter(id int) {
         defer wg.Done()
         for count := 0; count < 2; count++ {
            atomic.AddInt64(&counter, 1) // 安全的对counter 加1
            runtime.Gosched()   // 暂停当前goroutine, 退回执行队列,让其他等待的goroutine 执行
         }
      } 
      
    • LoadInt64StoreInt64。这两个函数提供了一种安全地读和写一个整型值的方式

      package main
      
      import (
         "fmt"
         "sync"
         "sync/atomic"
         "time"
      )
      
      var (
         shutdown int64
         wg sync.WaitGroup
      )
      
      func main() {
         wg.Add(2)
         go doWork("A")
         go doWork("B")
         time.Sleep(1 * time.Second)
         fmt.Println("Shutdown Now")
         atomic.StoreInt64(&shutdown, 1)
         wg.Wait()
      }
      
      func doWork(name string) {
         defer wg.Done()
         for {
            fmt.Printf("Done %s Work\n", name)
            time.Sleep(250 * time.Millisecond)
            if atomic.LoadInt64(&shutdown) == 1 {
               fmt.Printf("shutting %s Down\n", name)
               break
            }
         }
      }
      
      Done B Work
      Done A Work
      
      Done A Work
      Done B Work
      Done B Work
      Done A Work
      Done A Work
      Done B Work
      Shutdown Now
      shutting A Down
      shutting B Down
      

互斥锁

  • 使用互斥锁也可以实现资源共享

    package main
    
    import (
       "fmt"
       "runtime"
       "sync"
    )
    
    var (
       counter int64
       wg sync.WaitGroup
       mutex sync.Mutex
    )
    func main() {
       wg.Add(2)
       go incCounter(1)
       go incCounter(2)
       wg.Wait()
       fmt.Println(counter)
    }
    func incCounter(id int) {
       defer wg.Done()
       for count := 0; count < 2; count++ {
          //同一时刻只允许一个goroutine进入这个临界区
          mutex.Lock()
          {
             value := counter
             runtime.Gosched()
             value++
             counter = value
          }
          mutex.Unlock() //释放锁,允许其他正在等待的goroutine进入临界区
       }
    }
    

Sync 包

sync.WaitGroup

  • 同步计数器, 传递参数的时候要传递指针
方法名 功能
(wg * WaitGroup) Add(delta int) 计数器+delta
(wg *WaitGroup) Done() 计数器-1
(wg *WaitGroup) Wait() 阻塞直到计数器变为0

sync.Once

  • 在并发中保证操作只执行一次,如加载一次配置文件,关闭一个通道
  • sync.Once 只有一个Do方法:func (o *Once) Do(f func()) {}如果函数f 需要传递参数就需要使用闭包

sync.Map

  • go语言中的map是线程不安全的,在并发中需要使用sync.Map
  • Store: 存储 Load: 取值 Delete: 删除 Range: 遍历
package main

import (
   "fmt"
   "strconv"
   "sync"
)

//var m = make(map[string]int)   go语言中的map是线程不安全的
var m = sync.Map{} // 开箱即用无需make

func get(key string) int {
   value, _ := m.Load(key)
   return value.(int)
}

func set(key string, value int) {
   m.Store(key, value)
}

func main() {
   wg := sync.WaitGroup{}
   for i := 0; i < 20; i++ {
      wg.Add(1)
      go func(n int) {
         key := strconv.Itoa(n)
         set(key, n)
         fmt.Printf("k=:%v, v:=%v\n", key, get(key))
         wg.Done()
      }(i)
   }
   wg.Wait()
}

Context上下文

  • context上下文用于协程间信息交互,方法如下:
    • Deadline: 返回当前context 被取消的时间,也就是完成工作的截止时间
    • Done: 返回一个channel,该channel在当前工作完成或者上下文被取消后关闭,多次调用Done会返回同一个channel
    • Err: 返回当前Context结束的原因,只会在Done返回的channel被关闭时才会返回非空的值:
      • 如果当前Context被取消就会返回Canceled错误
      • 如果当前context超时就会返回DeadlineExceeded错误
    • value: 从context 中获取对应键的值

Background()

  • 主要用于main函数、初始化以及测试代码中,作为context的顶层结构

With系列函数

WithCancel

  • 控制单个协程

    • context.Background()创建根Context, 通常在main函数、初始化和测试代码中创建,作为顶层Context
    • context.WithCancel(parent)创建可取消的子Context, 同时返回函数cancel
    • 在子协程中,调用cancel() 函数通知子协程退出
    package main
    
    import (
    	"context"
    	"fmt"
    	"time"
    )
    
    func reqTask(ctx context.Context, name string) {
    	for {
    		select {
    		case <-ctx.Done():
    			fmt.Println("stop", name)
    			return
    		default:
    			fmt.Println(name, "send request")
    			time.Sleep(1 * time.Second)
    		}
    	}
    }
    
    func main() {
    	ctx, cancel := context.WithCancel(context.Background())
    	go reqTask(ctx, "worker1")
    	time.Sleep(3 * time.Second)
    	cancel()
    	time.Sleep(3 * time.Second)
    }
    
  • 控制多个协程

    func main() {
       ctx, cancel := context.WithCancel(context.Background())
       go reqTask(ctx, "worker1")
       go reqTask(ctx, "worker2")
       time.Sleep(3 * time.Second)
       cancel()
       time.Sleep(3 * time.Second)
    }
    

    为每个子协程传递同一个ctx即可

WithDeadline

  • 带有过期时间的上下文:

    package main
    
    import (
       "context"
       "fmt"
       "time"
    )
    
    func main() {
       d := time.Now().Add(5 * time.Second)
       ctx, cancel := context.WithDeadline(context.Background(), d)
       // ctx 可能会过期但在任何情况下都最好调用cancel函数
       defer cancel()
       select {
       case <-time.After(1*time.Second):  // 当ctx 没过期时1秒后退出
             fmt.Println("overslept")
       case <-ctx.Done():    // 打印ctx 过期退出的错误信息
          fmt.Println(ctx.Err())
       }
    }
    
  • 当过期时,或者当调用返回的cancel函数时,或者当父上下文的Done通道关闭时,返回通道的上下文将被关闭

WithTimeout

  • 对withdeadline的一层封装

WithValue

  • 上下文key, value

    package main
    
    import (
       "context"
       "fmt"
    )
    
    func main() {
       type favContextKey string
       f := func(ctx context.Context, k favContextKey) {
          if v := ctx.Value(k); v != nil {
             fmt.Println("found value:", v)
             return
          }
          fmt.Println("key not found:", k)
       }
       k := favContextKey("language")
       ctx := context.WithValue(context.Background(), k, "Go")
       f(ctx, k)
       f(ctx, favContextKey("color"))
    }
    

}


- **控制多个协程**

```go
func main() {
   ctx, cancel := context.WithCancel(context.Background())
   go reqTask(ctx, "worker1")
   go reqTask(ctx, "worker2")
   time.Sleep(3 * time.Second)
   cancel()
   time.Sleep(3 * time.Second)
}

为每个子协程传递同一个ctx即可

WithDeadline

  • 带有过期时间的上下文:

    package main
    
    import (
       "context"
       "fmt"
       "time"
    )
    
    func main() {
       d := time.Now().Add(5 * time.Second)
       ctx, cancel := context.WithDeadline(context.Background(), d)
       // ctx 可能会过期但在任何情况下都最好调用cancel函数
       defer cancel()
       select {
       case <-time.After(1*time.Second):  // 当ctx 没过期时1秒后退出
             fmt.Println("overslept")
       case <-ctx.Done():    // 打印ctx 过期退出的错误信息
          fmt.Println(ctx.Err())
       }
    }
    
  • 当过期时,或者当调用返回的cancel函数时,或者当父上下文的Done通道关闭时,返回通道的上下文将被关闭

WithTimeout

  • 对withdeadline的一层封装

WithValue

  • 上下文key, value

    package main
    
    import (
       "context"
       "fmt"
    )
    
    func main() {
       type favContextKey string
       f := func(ctx context.Context, k favContextKey) {
          if v := ctx.Value(k); v != nil {
             fmt.Println("found value:", v)
             return
          }
          fmt.Println("key not found:", k)
       }
       k := favContextKey("language")
       ctx := context.WithValue(context.Background(), k, "Go")
       f(ctx, k)
       f(ctx, favContextKey("color"))
    }
    
上一篇:go 入门学习


下一篇:VBM法MRI图像处理——记第一次使用cat12