模型总揽
核心实体
Goroutines (G)
golang调度单元,golang可以开启成千上万个g,每个g可以理解为一个任务,等待被调度。其存储了goroutine的执行stack信息、goroutine状态以及goroutine的任务函数等。g只能感知到p,下文说的m对其透明的。
OSThread (M)
系统线程,实际执行g的狠角色,但m并不维护g的状态,一切都是由幕后黑手p来控制。
Processor (P)
维护m执行时所需要的上下文,p的个数通常和cpu核数一致(可以设置),代表gorotine的并发度。其维护了g的队列。
实体间的关系
一图胜千言,直接看这个经典的图
调度本质
即schedule函数,通过调度,放弃目前执行的g,选择一个g来执行。选择算法不是本文重点,这里不做过多讲述。
切换时机
- 会阻塞的系统调用,比如文件io,网络io;
- time系列定时操作;
- go func的时候, func执行完的时候;
- 管道读写阻塞的情况;
- 垃圾回收之后。
- 主动调用runtime.Gosched()
调度时机分析
阻塞性系统调用
系统调用,如read,golang重写了所有系统调用,在系统调用加入了调度逻辑 拿read举例
/usr/local/go/src/os/file.go:97 // Read reads up to len(b) bytes from the File. // It returns the number of bytes read and an error, if any. // EOF is signaled by a zero count with err set to io.EOF. func (f *File) Read(b []byte) (n int, err error) { if f == nil { return 0, ErrInvalid } n, e := f.read(b) if n == 0 && len(b) > 0 && e == nil { return 0, io.EOF } if e != nil { err = &PathError{"read", f.name, e} } return n, err } 复制代码
嵌套到几层,就不全部贴出来,跟到底是如下函数:
func read(fd int, p []byte) (n int, err error) { var _p0 unsafe.Pointer if len(p) > 0 { _p0 = unsafe.Pointer(&p[0]) } else { _p0 = unsafe.Pointer(&_zero) } r0, _, e1 := Syscall(SYS_READ, uintptr(fd), uintptr(_p0), uintptr(len(p))) n = int(r0) if e1 != 0 { err = errnoErr(e1) } return } func Syscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno) 复制代码
Syscall是汇编实现
TEXT ·Syscall(SB),NOSPLIT,$0-56 BL runtime·entersyscall(SB) MOVD a1+8(FP), R3 MOVD a2+16(FP), R4 MOVD a3+24(FP), R5 MOVD R0, R6 MOVD R0, R7 MOVD R0, R8 MOVD trap+0(FP), R9 // syscall entry SYSCALL R9 BVC ok MOVD $-1, R4 MOVD R4, r1+32(FP) // r1 MOVD R0, r2+40(FP) // r2 MOVD R3, err+48(FP) // errno BL runtime·exitsyscall(SB) RET ok: MOVD R3, r1+32(FP) // r1 MOVD R4, r2+40(FP) // r2 MOVD R0, err+48(FP) // errno BL runtime·exitsyscall(SB) RET 复制代码
可以看到,进入系统调用时,是调用entersyscall,当离开系统调用,会运行exitsyscall
// Standard syscall entry used by the go syscall library and normal cgo calls. //go:nosplit func entersyscall(dummy int32) { reentersyscall(getcallerpc(unsafe.Pointer(&dummy)), getcallersp(unsafe.Pointer(&dummy))) } func reentersyscall(pc, sp uintptr) { _g_ := getg() // Disable preemption because during this function g is in Gsyscall status, // but can have inconsistent g->sched, do not let GC observe it. _g_.m.locks++ // Entersyscall must not call any function that might split/grow the stack. // (See details in comment above.) // Catch calls that might, by replacing the stack guard with something that // will trip any stack check and leaving a flag to tell newstack to die. _g_.stackguard0 = stackPreempt _g_.throwsplit = true // Leave SP around for GC and traceback. save(pc, sp) _g_.syscallsp = sp _g_.syscallpc = pc casgstatus(_g_, _Grunning, _Gsyscall) if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp { systemstack(func() { print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]\n") throw("entersyscall") }) } if trace.enabled { systemstack(traceGoSysCall) // systemstack itself clobbers g.sched.{pc,sp} and we might // need them later when the G is genuinely blocked in a // syscall save(pc, sp) } if atomic.Load(&sched.sysmonwait) != 0 { // TODO: fast atomic systemstack(entersyscall_sysmon) save(pc, sp) } if _g_.m.p.ptr().runSafePointFn != 0 { // runSafePointFn may stack split if run on this stack systemstack(runSafePointFn) save(pc, sp) } _g_.m.syscalltick = _g_.m.p.ptr().syscalltick _g_.sysblocktraced = true _g_.m.mcache = nil _g_.m.p.ptr().m = 0 atomic.Store(&_g_.m.p.ptr().status, _Psyscall) if sched.gcwaiting != 0 { systemstack(entersyscall_gcwait) save(pc, sp) } // Goroutines must not split stacks in Gsyscall status (it would corrupt g->sched). // We set _StackGuard to StackPreempt so that first split stack check calls morestack. // Morestack detects this case and throws. _g_.stackguard0 = stackPreempt _g_.m.locks-- } 复制代码
进入系统调用时,p和m分离,当前运行的g状态变为_Gsyscall。
_Gsyscall恢复时机:
- 当m执行完,调用exitsyscall重新和之前的p绑定,其中调度的还是schedule函数;
- sysmon线程,发现该p一定时间没有执行,会其分配一个新的m。此时进入调度。
time定时类操作
都拿time.Sleep举例
// Sleep pauses the current goroutine for at least the duration d. // A negative or zero duration causes Sleep to return immediately. func Sleep(d Duration) 实际定义在runtime // timeSleep puts the current goroutine to sleep for at least ns nanoseconds. //go:linkname timeSleep time.Sleep func timeSleep(ns int64) { if ns <= 0 { return } t := getg().timer if t == nil { t = new(timer) getg().timer = t } *t = timer{} t.when = nanotime() + ns t.f = goroutineReady t.arg = getg() lock(&timers.lock) addtimerLocked(t) goparkunlock(&timers.lock, "sleep", traceEvGoSleep, 2) } 复制代码
goparkunlock 最终调用gopark
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string, traceEv byte, traceskip int) { mp := acquirem() gp := mp.curg status := readgstatus(gp) if status != _Grunning && status != _Gscanrunning { throw("gopark: bad g status") } mp.waitlock = lock mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf)) gp.waitreason = reason mp.waittraceev = traceEv mp.waittraceskip = traceskip releasem(mp) // can't do anything that might move the G between Ms here. mcall(park_m) } 复制代码
mcall(fn) 是切换到g0,让g0来调用fn,这里我们看下park_m定义 park_m
func park_m(gp *g) {mcall(park_m) _g_ := getg() if trace.enabled { traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip) } casgstatus(gp, _Grunning, _Gwaiting) dropg() if _g_.m.waitunlockf != nil { fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&_g_.m.waitunlockf)) ok := fn(gp, _g_.m.waitlock) _g_.m.waitunlockf = nil _g_.m.waitlock = nil if !ok { if trace.enabled { traceGoUnpark(gp, 2) } casgstatus(gp, _Gwaiting, _Grunnable) execute(gp, true) // Schedule it back, never returns. } } schedule() } 复制代码
可以看到,先把状态转化为_Gwaiting, 再进行了一次schedule 针对_Gwaiting的g,需要调用goready,才能恢复。
新起一个协程和退出
新开一个协程,g状态会变为_GIdle,触发调度。当协程执行完,会调用goexit1 此时状态变为_GDead _Gdead可以被复用,或者被gc清除。
管道阻塞
chansend即c<-chanel的实现
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2) throw("unreachable") } if debugChan { print("chansend: chan=", c, "\n") } if raceenabled { racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend)) } ........ // 省略无关代码 ........ // Block on the channel. Some receiver will complete our operation for us. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.selectdone = nil mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) return true } 复制代码
可以看到,实际还是调用goparkunlock->gopark,来进行调度。
gc之后
stw之后,会重新选择g开始执行。此处不对垃圾回收做过多扩展。
主动调用runtime.Gosched()
没有找到非要调用runtime.Gosched的场景,主要作用还是为了调试,学习runtime吧
// Gosched yields the processor, allowing other goroutines to run. It does not // suspend the current goroutine, so execution resumes automatically. //go:nosplit func Gosched() { mcall(gosched_m) } 复制代码
第一步就将环境切换到g0,然后执行一个叫gosched_m的函数
// Gosched continuation on g0. func gosched_m(gp *g) { if trace.enabled { traceGoSched() } goschedImpl(gp) } func goschedImpl(gp *g) { status := readgstatus(gp) if status&^_Gscan != _Grunning { dumpgstatus(gp) throw("bad g status") } casgstatus(gp, _Grunning, _Grunnable) dropg() lock(&sched.lock) globrunqput(gp) unlock(&sched.lock) schedule() } 复制代码
可以看到,当前g被设置为_Grunnable,放入执行队列。然后调用schedule,选择一个合适的g进行执行。
总结
golang协程调度时机主要是阻塞性操作开始,结束。研究每个场景相关代码,即可对golang有更深的理解。这里也分享一个阅读源码的小经验,每次带着一个特定问题去寻找答案,比如本文的调度时机,后面再看调度算法,垃圾回收,这样每次能忽略无关因素,通过多个不同的主题,整个框架会越来越完善。
参考文章
A complete journey with Goroutines