预备知识
1.1 进程、线程、协程
进程(Process):在内存中的程序。有自己独立的独占的虚拟 CPU 、虚拟的 Memory、虚拟的 IO devices。
OS 直接支持并调度。进程之间只能通过系统提供的 IO 机制通讯。共享内存(变量)是不可能的!
(1) 每一进程占用独立的地址空间。
此处的地址空间包括代码、数据及其他资源。
(2) 进程间的通信开销较大且受到许多限制。
对象(或函数)接口、通信协议、…
(3) 进程间的切换开销也较大。
又称Context Switch。
上下文包括代码、数据、堆栈、处理器状态、资源、…
线程(Thread):轻量级进程。在现代操作系统中,是进程中程序执行流的最小单元。一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。 一个进程由若干线程组成,它们共享进程的计算、存储、IO资源。因此,程序员必须使用系统提供的同步、消息机制,处理资源的竞争和消息的通讯。
(1) 多个线程共享进程的地址空间(代码、数据、其他资源等)。
线程也需要自己的资源,如程序计数器、寄存器组、调用栈等。
(2) 线程间的通信开销较少且比较简单。
因为共享而减少了需要通信的内容。
但也因为充分共享而无法对共享资源进行保护。
(3) 线程间的切换开销也较小。
只需保存每一线程的程序计数器、寄存器组、堆栈等空间。
不必切换或复制整个地址空间,从而成本大为降低(约1/10)
线程有分为两大类:
操作系统管理的线程(Core Thread),通常根据 CPU 资源决定线程的数量,一般为 CPU 数量的两倍。
语言提供的线程库管理的线程(User Thread),它执行时映射到系统线程,按任务类型(计算密集型,IO密集型)决定线程池的管理方式与数量。
协程(coroutine/fiber):轻量级线程。 是可以并发执行的函数,由编译或用户指定位置将控制权交给协程调度程序执行的方式。它是非抢占式的,可以避免反复系统调用,还有进程切换造成的开销,给你上几千个逻辑流,也称用户级别线程。
并行和并发
并行(parallel):指在同一时刻,有多条指令在多个处理器上同时执行。 并发(concurrency):指在同一时刻只能有一条指令执行,但多个进程指令被快速的轮换执行,使得在宏观上具有多个进程同时执行的效果,但在微观上并不是同时执行的,只是把时间分成若干段,使多个进程快速交替的执行。
并行是两个队列同时使用两台咖啡机,并发是两个队列交替使用一台咖啡机
Go语言并发优势
有人把Go比作21世纪的C语言,第一是因为Go语言设计简单,第二,21世纪最重要的就是并行程序设计,而Go从语言层面就支持了并行。同时,并发程序的内存管理有时候是非常复杂的,而Go语言提供了自动垃圾回收机制。Go语言中有个概念叫做goroutine, 这类似我们熟知的线程,但是更轻。一般情况下,一个普通的桌面计算机跑十几二十个线程就有点负载过大了,但是同样这台机器却可以轻松地让成百上千甚至过万个goroutine进行资源竞争。
goroutine是什么
goroutine是Go并行设计的核心。goroutine说到底其实就是协程,但是它比线程更小,十几个goroutine可能体现在底层就是五六个线程,Go语言内部帮你实现了这些goroutine之间的内存共享。执行goroutine只需极少的栈内存(大概是4~5KB),当然会根据相应的数据伸缩。也正因为如此,可同时运行成千上万个并发任务。goroutine比thread更易用、更高效、更轻便。
创建goroutine
只需在函数调⽤语句前添加 go 关键字,就可创建并发执⾏单元。开发⼈员无需了解任何执⾏细节,调度器会自动将其安排到合适的系统线程上执行。在并发编程里,我们通常想讲一个过程切分成几块,然后让每个goroutine各自负责一块工作。当一个程序启动时,其主函数即在一个单独的goroutine中运行,我们叫它main goroutine。新的goroutine会用go语句来创建。
package main import ( "fmt" "time" ) func newTask() { i := 0 for { i++ fmt.Printf("new goroutine: i = %d\n", i) time.Sleep(1 * time.Second) //延时1s } } func main() { go newTask() //创建一个 goroutine,启动另外一个任务 i := 0 for { //main goroutine 循环打印 i++ fmt.Printf("main goroutine: i = %d\n", i) time.Sleep(1 * time.Second) //延时1s } }
程序运行结果:
main goroutine: i = 15 new goroutine: i = 15 new goroutine: i = 16 main goroutine: i = 16 new goroutine: i = 17 main goroutine: i = 17 new goroutine: i = 18 main goroutine: i = 18 main goroutine: i = 19 new goroutine: i = 19 new goroutine: i = 20 main goroutine: i = 20 main goroutine: i = 21 new goroutine: i = 21 new goroutine: i = 22 main goroutine: i = 22 。。。
串行地去执行两次loop
函数:
package main import ( "fmt" ) func loop() { for i := 0; i < 10; i++ { fmt.Printf("%d ", i) } } func main() { loop() loop() }
输出会是这样的:
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
把一个loop放在一个goroutine里跑,我们可以使用关键字go
来定义并启动一个goroutine:
package main import ( "fmt" "time" ) func loop() { for i := 0; i < 10; i++ { fmt.Printf("%d ", i) } } func main() { go loop() loop() time.Sleep(time.Second) }
注意,如果不用 time.Sleep(time.Second), 则可能在goroutine还没来得及跑loop的时候,主函数已经退出了
main函数退出地太快了,我们要想办法阻止它过早地退出,一个办法是让main等待一下:,目的达到了。可是采用等待的办法并不好,如果goroutine在结束的时候,告诉下主线说“Hey, 我要跑完了!”就好了,这就是接下来要讲到的信道。
信道
信道是什么?简单说,是goroutine之间互相通讯的东西。类似我们Unix上的管道(可以在进程间传递消息), 用来goroutine之间发消息和接收消息。其实,就是在做goroutine之间的内存共享。
示例:
package main import ( "fmt" ) func main() { var messages chan string = make(chan string) go func(message string) { messages <- message // 存消息 }("Ping!") fmt.Println(<-messages) // 取消息 }
默认的,信道的存消息和取消息都是阻塞的 (叫做无缓冲的信道,不过缓冲这个概念稍后了解,先说阻塞的问题)。
也就是说, 无缓冲的信道在取消息和存消息的时候都会挂起当前的goroutine,除非另一端已经准备好。
package main import ( "fmt" ) var ch chan int = make(chan int) func foo() { ch <- 0 // 向ch中加数据,如果没有其他goroutine来取走这个数据,那么挂起foo, 直到main函数把0这个数据拿走 } func main() { go foo() data := <-ch // 从ch取数据,如果ch中还没放数据,那就挂起main线,直到foo函数中放数据为止 fmt.Println(data) }
那既然信道可以阻塞当前的goroutine, 那么回到上一部分「goroutine」所遇到的问题「如何让goroutine告诉主线我执行完毕了」 的问题来, 使用一个信道来告诉主线即可:
package main import ( "fmt" ) var complete chan int = make(chan int) func loop() { for i := 0; i < 10; i++ { fmt.Printf("%d ", i) } complete <- 0 // 执行完毕了,发个消息 } func main() { go loop() <-complete // 直到线程跑完, 取到消息. main在此阻塞住 }
如果不用信道来阻塞主线的话,主线就会过早跑完,loop线都没有机会执行、、、
其实,无缓冲的信道永远不会存储数据,只负责数据的流通,为什么这么讲呢?
从无缓冲信道取数据,必须要有数据流进来才可以,否则当前线阻塞
数据流入无缓冲信道, 如果没有其他goroutine来拿走这个数据,那么当前线阻塞
无缓冲信道的大小都是0 (len(channel)
)
如果信道正有数据在流动,我们还要加入数据,或者信道干涩,我们一直向无数据流入的空信道取数据呢? 就会引起死锁
死锁
一个死锁的例子:
package main func main() { ch := make(chan int) <-ch // 阻塞main goroutine, 信道c被锁 }
执行结果:
fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]: main.main() D:/GOWORK/src/jsonstudy/main.go:5 +0x2c
何谓死锁? 操作系统有讲过的,所有的线程或进程都在等待资源的释放。如上的程序中, 只有一个goroutine, 所以当你向里面加数据或者存数据的话,都会锁死信道, 并且阻塞当前 goroutine, 也就是所有的goroutine(其实就main线一个)都在等待信道的开放(没人拿走数据信道是不会开放的),也就是死锁咯。
package main import ( "fmt" ) func f(ch chan int) { ch <- 10 // 将一个数据value写入至channel,这会导致阻塞,直到有其他goroutine从这个channel中读取数据 } func main() { var ch chan int // 声明一个传递int类型的channel ch = make(chan int) // 使用内置函数make()定义一个channel go f(ch) //========= value := <-ch // 从channel中读取数据,如果channel之前没有写入数据,也会导致阻塞,直到channel中被写入数据为止 fmt.Println(value) //========= close(ch) }
总结来看,为什么会死锁?非缓冲信道上如果发生了流入无流出,或者流出无流入,也就导致了死锁。或者这样理解 Go启动的所有goroutine里的非缓冲信道一定要一个线里存数据,一个线里取数据,要成对才行 。所以下面的示例一定死锁:
package main func main() { c, quit := make(chan int), make(chan int) go func() { c <- 1 // c通道的数据没有被其他goroutine读取走,堵塞当前goroutine quit <- 0 // quit始终没有办法写入数据 }() <-quit // quit 等待数据的写 }
仔细分析的话,是由于:主线等待quit信道的数据流出,quit等待数据写入,而func被c通道堵塞,所有goroutine都在等,所以死锁。
简单来看的话,一共两个线,func线中流入c通道的数据并没有在main线中流出,肯定死锁。
package main func main() { c := make(chan int) go func() { c <- 1 }() }
程序正常退出了,很简单,并不是我们那个总结不起作用了,还是因为一个让人很囧的原因,main又没等待其它goroutine,自己先跑完了, 所以没有数据流入c信道,一共执行了一个goroutine, 并且没有发生阻塞,所以没有死锁错误
那么死锁的解决办法呢?
最简单的,把没取走的数据取走,没放入的数据放入, 因为无缓冲信道不能承载数据,那么就赶紧拿走!
package main func main() { c, quit := make(chan int), make(chan int) go func() { c <- 1 quit <- 0 }() <-c // 取走c的数据! <-quit }
一个解决办法是缓冲信道, 即设置c有一个数据的缓冲大小:
c := make(chan int, 1)
这样的话,c可以缓存一个数据。也就是说,放入一个数据,c并不会挂起当前线, 再放一个才会挂起当前线直到第一个数据被其他goroutine取走, 也就是只阻塞在容量一定的时候,不达容量不阻塞。
无缓冲信道的数据进出顺序
我们已经知道,无缓冲信道从不存储数据,流入的数据必须要流出才可以。
package main import ( "fmt" ) var ch chan int = make(chan int) func foo(id int) { //id: 这个routine的标号 ch <- id } func main() { for i := 0; i < 5; i++ { // 开启5个routine go foo(i) } for i := 0; i < 5; i++ { // 取出信道中的数据 fmt.Print(<-ch) } }
输出:01234
我们开了5个goroutine,然后又依次取数据。其实整个的执行过程细分的话,5个线的数据 依次流过信道ch, main打印之, 而宏观上我们看到的即 无缓冲信道的数据是先到先出,但是 无缓冲信道并不存储数据,只负责数据的流通
缓冲信道
终于到了这个话题了, 其实缓存信道用英文来讲更为达意: buffered channel.
缓冲这个词意思是,缓冲信道不仅可以流通数据,还可以缓存数据。它是有容量的,存入一个数据的话 , 可以先放在信道里,不必阻塞当前线而等待该数据取走。
当缓冲信道达到满的状态的时候,就会表现出阻塞了,因为这时再也不能承载更多的数据了,「你们必须把 数据拿走,才可以流入数据」。
在声明一个信道的时候,我们给make以第二个参数来指明它的容量(默认为0,即无缓冲):
var ch chan int = make(chan int, 2) // 写入2个元素都不会阻塞当前goroutine, 存储个数达到2的时候会阻塞
如下的例子,缓冲信道ch可以无缓冲的流入3个元素:
func main() { ch := make(chan int, 3) ch <- 1 ch <- 2 ch <- 3 }
如果你再试图流入一个数据的话,信道ch会阻塞main线, 报死锁。
也就是说,缓冲信道会在满容量的时候加锁。
其实,缓冲信道是先进先出的,我们可以把缓冲信道看作为一个线程安全的队列:
func main() { ch := make(chan int, 3) ch <- 1 ch <- 2 ch <- 3 fmt.Println(<-ch) // 1 fmt.Println(<-ch) // 2 fmt.Println(<-ch) // 3 }
信道数据读取和信道关闭
你也许发现,上面的代码一个一个地去读取信道简直太费事了,Go语言允许我们使用range
来读取信道
func main() { ch := make(chan int, 3) ch <- 1 ch <- 2 ch <- 3 for v := range ch { fmt.Println(v) } }
如果你执行了上面的代码,会报死锁错误的,原因是range不等到信道关闭是不会结束读取的。也就是如果 缓冲信道干涸了,那么range就会阻塞当前goroutine, 所以死锁咯。
那么,我们试着避免这种情况,比较容易想到的是读到信道为空的时候就结束读取:
ch := make(chan int, 3) ch <- 1 ch <- 2 ch <- 3 for v := range ch { fmt.Println(v) if len(ch) <= 0 { // 如果现有数据量为0,跳出循环 break } }
以上的方法是可以正常输出的,但是注意检查信道大小的方法不能在信道存取都在发生的时候,用于取出所有数据,这个例子 是因为我们只在ch中存了数据,现在一个一个往外取,信道大小是递减的。
或者显式地关闭信道:
func main() { ch := make(chan int, 3) ch <- 1 ch <- 2 ch <- 3 for v := range ch { fmt.Println(v) if len(ch) <= 0 { close((ch)) } } }
被关闭的信道会禁止数据流入, 是只读的。我们仍然可以从关闭的信道中取出数据,但是不能再写入数据了。
package main import ( "fmt" "time" ) func produce(p chan<- int) { for i := 0; i < 10; i++ { p <- i fmt.Println("send:", i) } } func consumer(c <-chan int) { for i := 0; i < 10; i++ { v := <-c fmt.Println("receive:", v) } } func main() { ch := make(chan int) go produce(ch) go consumer(ch) time.Sleep(1 * time.Second) }
输出:
receive: 0
send: 0
send: 1
receive: 1
receive: 2
send: 2
send: 3
receive: 3
receive: 4
send: 4
send: 5
receive: 5
receive: 6
send: 6
send: 7
receive: 7
receive: 8
send: 8
send: 9
receive: 9
在这段代码中,因为channel是没有缓冲的,所以当生产者给channel赋值后,生产者这个线程会阻塞,直到消费者线程将channel中的数据取出。消费者第一次将数据取出后,进行下一次循环时,消费者的线程也会阻塞,因为生产者还没有将数据存入,这时程序会去执行生产者的线程。程序就这样在消费者和生产者两个线程间不断切换,直到循环结束。
下面我们再看一个带缓冲的例子:
package main import ( "fmt" "time" ) func produce(p chan<- int) { for i := 0; i < 10; i++ { p <- i fmt.Println("send:", i) } } func consumer(c <-chan int) { for i := 0; i < 10; i++ { v := <-c fmt.Println("receive:", v) } } func main() { ch := make(chan int, 10) go produce(ch) go consumer(ch) time.Sleep(1 * time.Second) }
输出:
send: 0
send: 1
send: 2
send: 3
send: 4
send: 5
send: 6
send: 7
send: 8
send: 9
receive: 0
receive: 1
receive: 2
receive: 3
receive: 4
receive: 5
receive: 6
receive: 7
receive: 8
receive: 9
在这个程序中,缓冲区可以存储10个int类型的整数,在执行生产者线程的时候,线程就不会阻塞,一次性将10个整数存入channel,在读取的时候,也是一次性读取。
等待多gorountine的方案
那好,我们回到最初的一个问题,使用信道堵塞主线,等待开出去的所有goroutine跑完。
这是一个模型,开出很多小goroutine, 它们各自跑各自的,最后跑完了向主线报告。
我们讨论如下2个版本的方案:
只使用单个无缓冲信道阻塞主线
使用容量为goroutines数量的缓冲信道
对于方案1, 示例的代码大概会是这个样子:
package main import ( "fmt" ) var quit chan int // 只开一个信道 func foo(id int) { fmt.Println(id) quit <- 0 // ok, finished } func main() { count := 1000 quit = make(chan int) // 无缓冲 for i := 0; i < count; i++ { go foo(i) } for i := 0; i < count; i++ { <-quit } }
对于方案2, 把信道换成缓冲1000的:
package main import ( "fmt" ) var count = 1000 func foo(id int, quit chan int) { fmt.Println(id) quit <- 0 // ok, finished } func main() { quit := make(chan int, count) // 容量1000 for i := 0; i < count; i++ { go foo(i, quit) } for i := 0; i < count; i++ { <-quit } }
或者
package main import ( "fmt" ) var count = 1000 var quit = make(chan int, count) // 容量1000 func foo(id int) { fmt.Println(id) quit <- 0 // ok, finished } func main() { for i := 0; i < count; i++ { go foo(i) } for i := 0; i < count; i++ { <-quit } }
其实区别仅仅在于一个是缓冲的,一个是非缓冲的。
对于这个场景而言,两者都能完成任务, 都是可以的。
-
无缓冲的信道是一批数据一个一个的「流进流出」
-
缓冲信道则是一个一个存储,然后一起流出去
Go语言的并发和并行
package main import ( "fmt" ) var quit chan int = make(chan int) func loop() { for i := 0; i < 10; i++ { fmt.Printf("%d ", i) } quit <- 0 } func main() { // 开两个goroutine跑函数loop, loop函数负责打印10个数 go loop() go loop() for i := 0; i < 2; i++ { <-quit } }
输出:
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
并行和并发
从概念上讲,并发和并行是不同的
- 两个队列,一个Coffee机器,那是并发
- 两个队列,两个Coffee机器,那是并行
从上面的两个例子执行后的表现来看,多个goroutine跑loop函数会挨个goroutine去进行,而sleep则是一起执行的。这是为什么?
默认地, Go所有的goroutines只能在一个线程里跑 。也就是说, 以上两个代码都不是并行的,但是都是是并发的。
如果当前goroutine不发生阻塞,它是不会让出CPU给其他goroutine的, 所以例子一中的输出会是一个一个goroutine进行的,而sleep函数则阻塞掉了 当前goroutine, 当前goroutine主动让其他goroutine执行, 所以形成了逻辑上的并行, 也就是并发
真正的并行
为了达到真正的并行,我们需要告诉Go我们允许同时最多使用多个核。
回到起初的例子,我们设置最大开2个原生线程, 我们需要用到runtime包(runtime包是goroutine的调度器):
package main import ( "fmt" "runtime" ) var quit chan int = make(chan int) func loop() { for i := 0; i < 100; i++ { //为了观察,跑多些 fmt.Printf("%d ", i) } quit <- 0 } func main() { runtime.GOMAXPROCS(2) // 最多使用2个核 go loop() go loop() for i := 0; i < 2; i++ { <-quit } }
这下会看到两个goroutine会抢占式地输出数据了。
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 0 1 2 3 4 5 6 7 8 9 10 11 12 26 27 28 29 30 31 32 33 34 35 36 37 38 39 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 37 38 39 40 41 42 43 44 45 88 89 90 91 92 93 94 95 96 97 98 99 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
我们还可以这样显式地让出CPU时间:
package main import ( "fmt" "runtime" ) var quit chan int = make(chan int) func loop() { for i := 0; i < 10; i++ { runtime.Gosched() // 显式地让出CPU时间给其他goroutine fmt.Printf("%d ", i) } quit <- 0 } func main() { go loop() go loop() for i := 0; i < 2; i++ { <-quit } }
观察下结果会看到这样有规律的输出:
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
其实,这种主动让出CPU时间的方式仍然是在单核里跑。但手工地切换goroutine导致了看上去的“并行”。
如下的程序,按照理解应该打印下5次 "world"呀,可是为什么什么也没有打印
package main import "fmt" func say(s string) { for i := 0; i < 5; i++ { fmt.Println(s) } } func main() { go say("world") //开一个新的Goroutines执行 for { } }
楼下的答案已经很棒了,这里Go仍然在使用单核,for死循环占据了单核CPU所有的资源,而main线和say两个goroutine都在一个线程里面, 所以say没有机会执行。解决方案还是两个:
允许Go使用多核(runtime.GOMAXPROCS)
手动显式调动(runtime.Gosched)
关于runtime包几个函数:
- Gosched 让出cpu
- NumCPU 返回当前系统的CPU核数量
- GOMAXPROCS 设置最大的可同时使用的CPU核数
- Goexit 退出当前goroutine(但是defer语句会照常执行)
总结
我们从例子中可以看到,默认的, 所有goroutine会在一个原生线程里跑,也就是只使用了一个CPU核。
在同一个原生线程里,如果当前goroutine不发生阻塞,它是不会让出CPU时间给其他同线程的goroutines的,这是Go运行时对goroutine的调度,我们也可以使用runtime包来手工调度。
我们可以在Golang官方网站的这里 找到一句话:
When a coroutine blocks, such as by calling a blocking system call, the run-time automatically moves other coroutines on the same operating system thread to a different, runnable thread so they won't be blocked.
也就是说:
当一个goroutine发生阻塞,Go会自动地把与该goroutine处于同一系统线程的其他goroutines转移到另一个系统线程上去,以使这些goroutines不阻塞