Golang基础7-并发编程

并发编程

https://www.cnblogs.com/Survivalist/p/11527949.html

进程和线程、协程的区别_线程协程进程的区别-****博客

Golang中的并发编程是一个重点,我们要了解Golang中的并发Goroutine因此需要先理解进程、线程、之后再理解协程。

进程:操作系统进行资源分配的最小单元,是程序在执行过程中的一次活动,包括程序,数据集合,程序控制块(PCB)等。每个进程都有独立的内存空间,包括代码、数据、堆栈,因此进程之间相互隔离。进程切换开销大(包括栈、寄存器、页表、文件句柄等切换)。尽管更安全,但也占据了较多系统资源

线程:操作系统进行调度的最小单元,是进程中执行的基本单元,由线程ID,当前指令指针PC,寄存器和堆栈组成。一个进程包含>=1个线程,其中一个为主线程,多个线程时通过共享内存,上下文切换较快,资源开销较小。共享内存尤其需要注意线程同步互斥问题。

协程:用户轻量级线程,由程序控制,不被操作系统内核管理

更轻量,独立栈空间(协程之间不共享内存,但是可以通信(channel)进行交互),更易并发(高效切换无需过多锁)

并行和并发区别

goroutine

参考文章:Goroutine · Golang 学习笔记

每个goroutine是官方实现的超级"线程池"

每个实例4-5KB栈内存和实现机制大幅减少创建和销毁使得go更易实现高并发

goroutine奉行通信(配合channel)实现共享内存。

在go语言层面内置调度和上下文切换机制,并且go程序会智能地将任务合理的分配给CPU

简单demo
package main

import (
    "fmt"
    "time"
)

func Hello() {
    fmt.Println("Hello Function!")
}

func main() {
    //在函数前加入关键字go
    go Hello()
    fmt.Println("Main done")
    //休眠,等go Hello()执行完
    time.Sleep(time.Second)
}

sync.WaitGroup demo
  1. WaitGroup用来启动一组goroutine,等待任务做完再结束goroutine。
  2. wg.Add(delta int):设置将要启动的Goroutine的数量,来设置WaitGroup内部计数器
  3. wg.Done():每个goroutine完成后,计数器-1 ;对于可能panic的可以使用defer wg.Done()
  4. wg.Wait():阻塞自己,等待所有goroutine完成任务,计数器减为0,返回

sync.WaitGroup中的Add和Done线程安全,可以从多个groutine中调用这两个方法,不用担心数据竞争和其他并发问题。

package main

import (
    "fmt"
    "sync"
)
//启动多个goroutine
func main() {
    //协程同步
    var wg sync.WaitGroup
    wg.Add(9)

    for i := 0; i < 9; i++ {
        //当作参数传入会拷贝一份,因此可以保证输出0-8
        go func(i int) {
            defer  wg.Done()
            fmt.Printf("%d ",i)
        }(i)
    }
    // 阻塞主程序,等待所有 Goroutine 完成
    wg.Wait()
}

输出结构并发乱序。0-9的其中一个组合

sync.Map demo
    1. sync.Map并发安全的sync.Map,可以安全并发的读写操作,常见操作见代码
    2. 与之相对应的原生map,线程不安全,并发读写时需要加锁
package main

import (
    "fmt"
    "sync"
)

func main() {
    //sync.Map的key和value都是interface{}
    var m sync.Map

    //写入
    m.Store("1", 18)
    m.Store("2", 20)

    //读取
    age, ok := m.Load("1")
    if ok {
        fmt.Println("读取成功", age, ok)
    } else {
        fmt.Println("读取失败!")
    }

    //遍历!!
    m.Range(func(key, value interface{}) bool {
        fmt.Println("遍历:key=", key, " value=", value)
        return true
    })

    //根据key删除
    m.Delete("2")
    age, ok = m.Load("2")
    if ok {
        fmt.Println("删除后读取成功", age, ok)
    } else {
        fmt.Println("删除后读取失败!")
    }

    //存在则读取否则写入
    //如果存在key=2,ok返回为true,否则false
    age, ok = m.LoadOrStore("2", "100")
    if ok {
        fmt.Println("已存在的:", age)
    } else {
        fmt.Println("不存在,store后的:", age)
    }

}

map并发 demo
    • 原生map实现并发时一定需要加锁来保证安全,不然报错。
    • sync.Map安全Map,不需要上锁解锁操作。
package main

import (
    "fmt"
    "sync"
)

func main() {
    //没有加锁的并发写入,则会报错
    m := make(map[int]int)

    var wg sync.WaitGroup
    var mu sync.Mutex

    for i := 0; i < 9; i++ {
        wg.Add(1)

        go func(i int) {
            for j := 0; j < 9; j++ {
                //上锁
                mu.Lock()
                m[j] = i
                mu.Unlock()
            }
            wg.Done()
        }(i)

    }
    

    //    安全Map
    var sm sync.Map
    for i := 0; i < 9; i++ {
        wg.Add(1)
        go func(i int) {
            for j := 0; j < 9; j++ {
                sm.Store(j, i)
            }
            wg.Done()
        }(i)

    }
    //完成前面并发任务后输出
    wg.Wait()
    
    fmt.Println("最终打印map值:", m)
    fmt.Print("最终打印sync.Map值:")
    sm.Range(func(key, value interface{}) bool {
        fmt.Printf("%d:%d ", key, value)
        return true
    })

}

go的GMP调度原理

channel

go中不要通过共享内存来通信,而是通过通信来共享内存。

参考:Go Channel 详解

go高性能编程:GitHub - wuqinqiang/Go_Concurrency: go concurrency class code

go语言核心类型,管道,并发中可以进行发送或接收数据进行通信。

<-

使用make创建channel,chan底层是一个环形数组

类型:chan chan <- <-chan

使用场景:

    • 消息传递、消息过滤
    • 信号广播
    • 事件的订阅和广播
    • 任务分发
    • 结果汇总
    • 并发控制
    • 同步和异步

简单demo

无缓冲channel(B要第一时间知道A是否完成)、有缓冲channel(生产者消费者模型)

package main

import (
    "fmt"
    "sync"
)

func main() {
    c := make(chan int, 2)
    defer close(c)

    var wg sync.WaitGroup

    wg.Add(1)

    go func() {
        defer wg.Done()
        c <- 3 + 4
        c <- 1
        fmt.Println("发送成功")
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        c <- 100
    }()
    
    //wg.Wait()
    i := <-c
    j := <-c
    _ = <-c//忽略这个值
    fmt.Println(i, j)

    wg.Wait()

}

import "fmt"
func sum(s []int, c chan int) {
    sum := 0
    for _, v := range s {
        sum += v
    }
    c <- sum // send sum to c
}
func main() {
    s := []int{7, 2, 8, -9, 4, 0}
    c := make(chan int)
    go sum(s[:len(s)/2], c)
    go sum(s[len(s)/2:], c)
    x, y := <-c, <-c // receive from c
    fmt.Println(x, y, x+y)
}

channel的range

func main() {
    go func() {
        time.Sleep(1 * time.Hour)
    }()
    c := make(chan int)
    go func() {
        for i := 0; i < 10; i = i + 1 {
            c <- i
        }
        close(c)
    }()
    for i := range c {
        fmt.Println(i)
    }
    fmt.Println("Finished")
}

这个range会一直从c中获取,直到c关闭

select demo

类似与linux中io的select、poll、epoll。

select语句类似于switch,随机执行一个可执行的case,select只用于通信操作,如果没有case可运行那么将阻塞,直到有case可运行。默认的字句总是可以运行。

    • 每个case都必须是一个通信
    • 所有channel表达式都会被求值
    • 所有被发送的表达式都会被求值
    • 如果任意某个通信可以进行,它就执行;其他被忽略。
    • 如果有多个case都可以运行,Select会随机地选出一个执行。其他不会执行。
    • 否则:如果有default子句,则执行该语句。
    • 如果没有default字句,select将阻塞,直到某个通信可以运行;Go不会重新对channel或值进行求值。
package main

import "fmt"

//select用于退出
func fibonacci(c, quit chan int) {
    x, y := 0, 1
    for {
        select {
        case c <- x:
            x, y = y, x+y
        case <-quit:
            fmt.Println("quit")
            return
        }
    } 
}

func main() {
    c := make(chan int)
    quit := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()
    fibonacci(c, quit)
}
timeout demo
package main

import "time"
import "fmt"

func main() {
    c1 := make(chan string, 1)
    go func() {
        time.Sleep(time.Second * 2)
        c1 <- "result 1"
    }()
    select {
    case res := <-c1:
        fmt.Println(res)
    //超时退出
    case <-time.After(time.Second * 1):
        fmt.Println("timeout 1")
    }
}
单向通道 demo

send chan <- string//只能发送给send

read <-chan string// 只能读取read

package main

import (
    "fmt"
    "time"
)

func Produce(out chan<- int) {
    for i := 0; i < 10; i++ {
        out <- i * i
    }

}

func Consumer(in <-chan int) {
    for num := range in {
        fmt.Println(num)
    }
}

func main() {
    c := make(chan int, 0)

    go Produce(c)
    go Consumer(c)

    time.Sleep(time.Second)
}

package main

import "fmt"

// 只能发送给管道
func Counter(out chan<- int) {
    for i := 0; i < 10; i++ {
        out <- i
    }
    close(out)
}

// chan <-  只能发送给管道     <-chan 管道发送嘛,因此只能接收
func Squarer(out chan<- int, in <-chan int) {
    for i := range in {
        out <- i * i
    }
    close(out)
}

func Printer(in <-chan int) {
    for i := range in {
        fmt.Println(i)
    }
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go Counter(ch1)
    go Squarer(ch2, ch1)
    Printer(ch2)
}

输出0-9的平方。

协程池demo

Golang学习篇——协程池_golang 携程池-****博客

package main

import (
    "fmt"
    "math/rand"
    "sync"
)

// 当前task
type Task struct {
    Id     int
    Random int
}

// 结果
type Result struct {
    Task *Task
    Sum  int
}

// 创建Task
func CreateTask(taskChan chan<- *Task, wg *sync.WaitGroup) {
    defer wg.Done()
    for id := 0; id < 100000; id++ {
        //创建Task
        task := &Task{
            Id:     id,
            Random: rand.Intn(200) + 1,
        }
        // 传递给taskChan管道
        taskChan <- task
    }
    close(taskChan)

}

// 创建线程池来处理
func CreatePool(num int, taskChan <-chan *Task, resultChan chan<- *Result, wg *sync.WaitGroup) {
    for i := 0; i < num; i++ {
        wg.Add(1)
        // 创建多个goroutine并发
        go func() {
            for task := range taskChan {
                // 当前的Num
                currentNum := task.Random
                sum := 0
                // 计算sum的值
                for currentNum != 0 {
                    temp := currentNum % 10
                    sum += temp
                    currentNum /= 10
                }
                // 此时任务的结果是:
                currentResult := &Result{
                    Task: task,
                    Sum:  sum,
                }
                // 发送给结果管道
                resultChan <- currentResult
            }
            wg.Done()
        }()
    }

}

// 开启打印 Result
func PrintResult(resultChan <-chan *Result) {
    //输出
    for res := range resultChan {
        fmt.Printf("输出结果,Id:=%d,Random:=%d,Sum:=%d\n", res.Task.Id, res.Task.Random, res.Sum)
    }
}

func main() {
    // 创建task管道,传递task
    taskChan := make(chan *Task, 128)

    // 结果管道
    resultChan := make(chan *Result, 128)

    // 确保goroutine全部完成
    var wg sync.WaitGroup

    wg.Add(1)
    go CreateTask(taskChan, &wg)

    // 创建协程池
    CreatePool(133, taskChan, resultChan, &wg)

    go func() {
        wg.Wait()
        close(resultChan)
    }()

    // 创建协程进行打印
    PrintResult(resultChan)

}

channel一定注意防止被阻塞而导致程序出现死锁!!!

并发安全和锁

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。sync.Mutex

sync.Mutex互斥锁demo

多个goroutine对同一个共享资源(当前的x)的竞争你,x=x+1,在汇编当中并不是原子性的操作,因此并发时会导致数据不一致,方法1,上锁。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

var (
    total int32
    wg    sync.WaitGroup
    mutex sync.Mutex
)

func Add() {
    defer wg.Done()

    for i := 0; i < 10000; i++ {
        //原子操作
        atomic.AddInt32(&total, 1)
        //mutex.Lock()
        //total++
        //mutex.Unlock()
    }

}

func Del() {
    defer wg.Done()

    for i := 0; i < 10000; i++ {
        atomic.AddInt32(&total, -1)
        //mutex.Lock()
        //total--
        //mutex.Unlock()
    }

}

func main() {

    fmt.Println("origin num:", total)

    wg.Add(2)

    go Add()
    go Del()

    wg.Wait()
    fmt.Println("After num:", total)
}
package main

import (
    "fmt"
    "sync"
)

var x int64
var wg sync.WaitGroup
var lock sync.Mutex

func Add() {
    for i := 0; i < 50; i++ {
        lock.Lock()
        x = x + 1
        lock.Unlock()
    }
    wg.Done()
}

func main() {
    wg.Add(2)
    go Add()
    go Add()
    wg.Wait()
    fmt.Println(x)
}

读写互斥锁 demo
package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    x      int64
    wg     sync.WaitGroup
    lock   sync.Mutex
    rwlock sync.RWMutex //读写互斥锁
)

func Write() {
    //lock.Lock() //加互斥锁
    rwlock.Lock()
    x = x + 1
    time.Sleep(10 * time.Millisecond)
    rwlock.Unlock()
    //lock.Unlock() //解互斥锁
    wg.Done()
}

func Read() {
    //lock.Lock()
    rwlock.RLock()
    time.Sleep(time.Millisecond)

    rwlock.RUnlock()
    //lock.Unlock()
    wg.Done()
}

func main() {
    start := time.Now()

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go Write()
    }

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go Read()
    }

    wg.Wait()
    end := time.Now()
    fmt.Println(end.Sub(start))
}

sync

前面介绍过sync的一些方法

sync.WaitGroup

sync.Once

参考:Go sync.Once | Go 语言高性能编程 | 极客兔兔

执行一次的函数,可以在代码任意位置加载,常用于单例模式(懒汉式),并发场景安全。而init是package首次执行时加载(饿汉式)

对外接口:func (o *Once) Do(f func())

sync.Map

这个是并发安全的Map

原子操作

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

var x int64
var l sync.Mutex
var wg sync.WaitGroup

// 普通版加函数
func add() {
    // x = x + 1
    x++ // 等价于上面的操作
    wg.Done()
}

// 互斥锁版加函数
func mutexAdd() {
    l.Lock()
    x++
    l.Unlock()
    wg.Done()
}

// 原子操作版加函数
func atomicAdd() {
    atomic.AddInt64(&x, 1)
    wg.Done()
}

func main() {
    start := time.Now()
    for i := 0; i < 10000; i++ {
        wg.Add(1)
        //go add() // 普通版add函数 不是并发安全的
        //go mutexAdd() // 加锁版add函数 是并发安全的,但是加锁性能开销大
        go atomicAdd() // 原子操作版add函数 是并发安全,性能优于加锁版
    }
    wg.Wait()
    end := time.Now()
    fmt.Println(x)
    fmt.Println(end.Sub(start))
}

Context

context详解:https://www.cnblogs.com/juanmaofeifei/p/14439957.html

Go 语言并发编程与 Context | Go 语言设计与实现

Context是用来用来处理goroutine,可以在多个goroutine中传递取消信号、超时等。

通俗的解释:Context · Go语言中文文档

由于golang的server在goroutine当中,context就是有效管理这些goroutine,相互调用的goroutine之间通过传递context变量保持关联,这样在不用暴露各goroutine内部实现细节的前提下,有效地控制各goroutine的运行。

引入--退出goroutine
方式1,采用全局变量
package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

// 退出全局变量
var stop bool

func worker() {
    defer wg.Done()
    for {
        if stop {
            break
        }
        time.Sleep(time.Second)
        fmt.Println("worker")
    }
}

func main() {
    wg.Add(1)
    go worker()

    time.Sleep(3 * time.Second)
    stop = true
    wg.Wait()
}
方式2,采用管道通信
package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

var ch = make(chan struct{})

func worker() {
    defer wg.Done()
LOOP:
    for {

        select {
        case <-ch:
            fmt.Println("exit")
            break LOOP
        default:
            time.Sleep(time.Second)
            fmt.Println("worker")
        }

    }
}

func main() {
    wg.Add(1)
    go worker()

    time.Sleep(3 * time.Second)
    ch <- struct{}{}
    wg.Wait()
}
方式3,采用context
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func worker(ctx context.Context) {
    defer wg.Done()
LOOP:
    for {

        select {
        case <-ctx.Done():
            fmt.Println("exit")
            break LOOP
        default:
            time.Sleep(time.Second)
            fmt.Println("worker")
        }

    }
}

func main() {
    wg.Add(1)
    ctx, cancel := context.WithCancel(context.Background())
    go worker(ctx)

    time.Sleep(3 * time.Second)
    cancel() //等待子routine结束

    wg.Wait()
}

如果函数当中需要被控制、超时、传递时,但不希望改变原来的接口时,函数第一个参数传入ctx。

context

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}

WithDeadline
package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    d := time.Now().Add(50 * time.Millisecond)
    ctx, canel := context.WithDeadline(context.Background(), d)

    // 尽管ctx会过期,但在任何情况下调用它的cancel函数都是很好的实践。
    // 如果不这样做,可能会使上下文及其父类存活的时间超过必要的时间。
    defer canel()

    select {
    case <-time.After(10 * time.Millisecond):
        fmt.Println("overslept")
    case <-ctx.Done():
        fmt.Println(ctx.Err())
    }
}

WithTimeout
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func worker(ctx context.Context) {
    defer wg.Done()
LOOP:
    for {

        select {
        case <-ctx.Done():
            fmt.Println("exit")
            break LOOP
        default:
            time.Sleep(time.Second)
            fmt.Println("worker")
        }

    }
}

func main() {
    wg.Add(1)

    //超时控制
    ctx, _ := context.WithTimeout(context.Background(), 6*time.Second)
    go worker(ctx)

    time.Sleep(3 * time.Second)
    wg.Wait()
}
WithValue
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func worker(ctx context.Context) {
    //拿到key,value
    fmt.Printf("traceid:%s\n", ctx.Value("traceid"))
    //记录一些日志等等,方便排查

    defer wg.Done()
LOOP:
    for {

        select {
        case <-ctx.Done():
            fmt.Println("exit")
            break LOOP
        default:
            time.Sleep(time.Second)
            fmt.Println("worker")
        }

    }
}

func main() {
    wg.Add(1)

    //超时控制
    ctx, _ := context.WithTimeout(context.Background(), 6*time.Second)

    //传递一些值,后续可能链路追踪id
    childCtx := context.WithValue(ctx, "traceid", "123456")

    go worker(childCtx)

    time.Sleep(3 * time.Second)
    wg.Wait()
}
上一篇:矩阵按列相乘运算的并行化实现方法


下一篇:深入理解Python协程:从基础到实战