基于GMP模型的调度器是go实现其引以为傲的用户态线程的核心。本文就以GMP调度器为核心分析一下调度流程,顺便分析一下定时器Timer的实现,它和调度器息息相关。
本文的大纲如下:
1.GMP的关键数据结构
2.goroutine的生命周期
3.系统线程的生命周期
4.触发shedule()的时机
5.网络轮询netpoll
6.sysmon守护线程
7.定时器实现
1.GMP的关键数据结构
调度器相关的数据结构都在runtime/runtime2.go中,主要有g(goroutine,协程),m(machine,操作系统线程),p(processor,调度器的资源抽象,m争抢p来获取执行权),schedt(scheduler_type,调度器的全局抽象)
以下都是只给出一些关键字段,实际上每个完整数据结构都有几十个字段!
1.1 g
type g struct { stack stack _panic *_panic _defer *_defer m *m sched gobuf atomicstatus uint32 }
stack: 协程栈的上下界,由lo和hi两个字段组成
_panic: 协程的panic链表,头部的是最内层的panic(也就是外层的panic可能会被abort)
_defer: 协程的defer链表,头部的是最内层defer。不过本文不关心defer和panic,只是想说明defer和panic都是goroutine级别的
m: 当前g对应的m
sched: 记录了协程的运行时状态,内部包含sp、pc、bp等伪寄存器
atomicstatus: 协程的状态机,共有近十种状态,如idle,dead,runnable,running,syscall,waiting,preempted等
1.2 m
type m struct { g0 *g gsignal *g curg *g p puintptr nextp puintptr oldp puintptr spinning bool }
g0: 运行在系统栈上的协程,即负责调度协程的协程
gsignal: 负责响应信号的协程。由上可知一个空闲的m也自带两个g
curg: m当前运行的g,可能为空表示空闲
p: m当前绑定的p。要运行用户代码,必须绑定上一个p。一般p远少于m。
nextp: m刚刚创建或从阻塞转为就绪时,预先绑定的p。真正运行时,就调用acquirep绑定这个p
oldp: m陷入系统调用时会出让p,这里保存这个p,系统调用返回时试图恢复这个p。这是希望利用局部性。
spinning: 是否自旋。如果当前的m是自旋的,而又从本地队列runq和全局队列globalrunq都获取不到可运行的g时,就会从其他p的本地队列尝试窃取g
1.3 p
type p struct { status uint32 m muintptr deferpool [5][]*_defer runq [256]guintptr gFree struct { gList n int32 } sudogcache []*sudog timer0When uint64 timers []*timer }
status: p的状态机
m: p的m。与m的p双向绑定
deferpool: _defer结构体的per-P缓存池。在deferproc创建新_defer时(本文不分析,详见runtime/panic.go)会优先从中分配内存,按内存大小分为五个链表。这也是利用局部性
runq: 当前p的可运行g队列
gFree: 当前p的空闲g队列(即状态为dead的g。g执行完毕后不会销毁,而是等待被复用)
sudogcache: 创建sudog的缓存池,作用同deferpool。sudog代表一个阻塞状态的g,其记录了g所在的阻塞队列等信息,本文不介绍。
timers,timer0When: timers保存了当前p全部定时器,timer0When保存了所有定时器中最近的即将到达的时间。本文会介绍定时器的实现
1.4 schedt
type schedt struct { lastpoll uint64 midle muintptr nmidle int32 nmidlelocked int32 maxmcount int32 nmsys int32 nmfreed int64 pidle puintptr npidle uint32 nmspinning uint32 runq gQueue gFree struct { lock mutex stack gList noStack gList n int32 } sudogcache *sudog deferpool [5]*_defer }
可见,schedt的这些字段部分是状态统计信息,一些(最后4个)是p中的某些结构的全局版本。
以及第一个lastpoll是上一次进行网络轮询(netpoll)的时间。在讲定时器时会说到
2. goroutine的生命周期
对于用户来说,GMP调度器中距离自己最近的当然是g。那么当我们轻轻松松地敲下go来启动一个g时,golang为我们做了什么呢?
在编译阶段,编译器会把go关键字转换成一个runtime.newproc调用。newproc有两个参数,初始栈大小和go出来的函数指针,这两个都是在编译时就能确定的。接下来详细看一下newproc
注意!本文的代码都是高度简化版,源代码太复杂了,作者太强了,顶礼膜拜!
func newproc(siz int32, fn *funcval) { argp := add(unsafe.Pointer(&fn), sys.PtrSize) gp := getg() pc := getcallerpc() systemstack(func() { newg := newproc1(fn, argp, siz, gp, pc) //初始化一个g结构体 _p_ := getg().m.p.ptr() runqput(_p_, newg, true) //将g加入当前p的本地队列。当前p也就是使用go关键字的那个g所在的m绑定的p,有点绕.. if mainStarted { wakep() //尝试启动一个空闲的p,这个函数之后会说到 } }) }
先看newproc1具体怎么初始化g
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g { _g_ := getg() siz := narg siz = (siz + 7) &^ 7 _p_ := _g_.m.p.ptr() newg := gfget(_p_) //gfget(p)从p的空闲列表中获取一个g。如果p的空闲列表为空,从全局空闲列表中将最多32个g放入p的空闲列表并返回。 if newg == nil { newg = malg(_StackMin) //memory_alloc_g,新创建一个g,这个g只初始化了栈空间的相关信息 casgstatus(newg, _Gidle, _Gdead) //状态转移。dead表示分配了栈,但未在执行代码,这名字有点迷惑性,刚初始化完和执行结束都是dead allgadd(newg) } totalSize := 4*sys.PtrSize + uintptr(siz) + sys.MinFrameSize totalSize += -totalSize & (sys.StackAlign - 1) sp := newg.stack.hi - totalSize spArg := sp
//接下来就是初始化sched字段等运行状态信息了 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp newg.stktopsp = sp newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum //*1 newg.sched.g = guintptr(unsafe.Pointer(newg)) gostartcallfn(&newg.sched, fn) newg.gopc = callerpc newg.ancestors = saveAncestors(callergp) newg.startpc = fn.fn casgstatus(newg, _Gdead, _Grunnable) return newg }
*1 所有新创建的g的pc都从goexit这个ABI(Application Binary Interface)函数所在的位置开始。这样做等效于:当这个g执行完毕时,就会返回到goexit,执行goexit函数。goexit在本节的最后讲
现在g已经加入p的本地队列了,接下来就是等待被调度了!触发调度的方式有多种,之后会讲到。这边先说明一下,调度的主函数是schedule()函数。它的作用是进行一轮调度:尝试获取一个g,并执行它
func schedule() { _g_ := getg() top: pp := _g_.m.p.ptr() checkTimers(pp, 0) //检查最近的定时器有没有到达,如果到了就运行它。具体逻辑更加复杂,后面会说 var gp *g var inheritTime bool if gp == nil { if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { //约有1/61的调度会从全局队列中拿,这是为了减少饥饿 lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } } if gp == nil { gp, inheritTime = runqget(_g_.m.p.ptr()) //从p的本地队列中拿 } if gp == nil { gp, inheritTime = findrunnable() // 一个非常复杂的方法,它是阻塞的,直到获取到一个可运行的g } execute(gp, inheritTime) //执行goroutine的真正入口 }
findrunnable是一个非常复杂的方法,它阻塞直到获取到一个可运行的g。期间还会顺便做一下网络轮询(之后会说)。工作窃取(work-stealing)也是在这个函数里完成的
func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() top: _p_ := _g_.m.p.ptr() now, pollUntil, _ := checkTimers(_p_, 0) //检查定时器。返回值pollUntil表示下一个定时器到达的时间,后续步骤有用到 if gp, inheritTime := runqget(_p_); gp != nil { //从p的本地队列中取 return gp, inheritTime } if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) //从全局队列中取 unlock(&sched.lock) if gp != nil { return gp, false } } //轮询网络(后面会说)!netpoll(0)表示进行非阻塞轮询,返回一组就绪的g,返回第一个,调用injectglist将剩下的加入p的本地队列 if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if list := netpoll(0); !list.empty() { // non-blocking gp := list.pop() injectglist(&list) //将一组g加入当前p的本地队列,如果当前没有绑定p,就加入全局队列 casgstatus(gp, _Gwaiting, _Grunnable) //状态转移:阻塞态->就绪态 return gp, false } } //如果当前的m处于自旋状态(或者忙碌的p数量比自旋的m数量更多),就尝试从其它p的本地队列中窃取g! procs := uint32(gomaxprocs) if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) { if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } gp, inheritTime, tnow, w, newWork := stealWork(now) now = tnow if gp != nil { return gp, inheritTime } if newWork { goto top } if w != 0 && (pollUntil == 0 || w < pollUntil) { pollUntil = w } } lock(&sched.lock) if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 { unlock(&sched.lock) goto top } if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) return gp, false } //实在无事可做了,就把当前的p加入schedt的空闲p列表 pidleput(_p_) unlock(&sched.lock)
// ***关键!阻塞直到有可用的g if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 { atomic.Store64(&sched.pollUntil, uint64(pollUntil)) delay := int64(-1) if pollUntil != 0 { if now == 0 { now = nanotime() } delay = pollUntil - now if delay < 0 { delay = 0 } } list := netpoll(delay) //阻塞至多delay时间。如果没有定时器,则无限阻塞,直到被netpollBreak唤起。返回就绪的一组g atomic.Store64(&sched.pollUntil, 0) atomic.Store64(&sched.lastpoll, uint64(nanotime())) _p_ = pidleget() //获取空闲p if _p_ == nil { injectglist(&list) //获取失败,将所有g注入(本地或全局)队列 } else { acquirep(_p_) //获取成功,将p和m绑定,将第一个以后的g注入(本地或全局)队列,第一个g被返回 if !list.empty() { gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) return gp, false } goto top } } else if pollUntil != 0 && netpollinited() { pollerPollUntil := int64(atomic.Load64(&sched.pollUntil)) if pollerPollUntil == 0 || pollerPollUntil > pollUntil { netpollBreak() } } stopm() goto top }
上面标有***代码是关键,完成了阻塞获取g的逻辑。netpoll和定时器部分后面会讲到
execute是goroutine的真正入口,它的核心逻辑非常简单,就是调用gogo函数,定位到g.sched中保存的运行位置
func execute(gp *g, inheritTime bool) { _g_ := getg() _g_.m.curg = gp gp.m = _g_.m //把当前的m(实际上是g0的m)和要执行的g绑定起来 casgstatus(gp, _Grunnable, _Grunning) //状态转移:就绪态->运行态 gp.waitsince = 0 gp.preempt = false gp.stackguard0 = gp.stack.lo + _StackGuard gogo(&gp.sched) //gogo! }
gogo是一个汇编(go assembly,和普通汇编略有不同)实现的函数,用g.sched里保存的pc、sp等寄存器信息进行跳转
TEXT runtime·gogo(SB), NOSPLIT, $0-8 MOVQ buf+0(FP), BX // 读取g.sched参数放入BX MOVQ gobuf_g(BX), DX MOVQ 0(DX), CX JMP gogo<>(SB) TEXT gogo<>(SB), NOSPLIT, $0 get_tls(CX) MOVQ DX, g(CX) MOVQ DX, R14 MOVQ gobuf_sp(BX), SP MOVQ gobuf_ret(BX), AX MOVQ gobuf_ctxt(BX), DX MOVQ gobuf_bp(BX), BP MOVQ gobuf_pc(BX), BX JMP BX //就是在这里跳转到了g.sched里保存的pc位置!开始正式执行用户写的func
现在终于开始执行用户代码了!但是这个g依然不能高枕无忧了!有多种方式可以让这个g失去执行权,即让g.m与p解绑,之后会讨论。现在先假设g顺利执行完了,那就会按之前说的,调用goexit
goexit也是个汇编函数,但逻辑比较简单,总之最后调用到goexit0函数
func goexit0(gp *g) { _g_ := getg() casgstatus(gp, _Grunning, _Gdead) //状态转移 gp.m = nil //将当前g的一些状态清空 locked := gp.lockedm != 0 gp.lockedm = 0 _g_.m.lockedg = 0 gp.preemptStop = false gp.paniconfault = false gp._defer = nil gp._panic = nil gp.writebuf = nil gp.waitreason = 0 gp.param = nil gp.labels = nil gp.timer = nil dropg() //里面完成了m.curg与当前g的解绑 gfput(_g_.m.p.ptr(), gp) //把当前g加入全局的空闲队列schedt.gFree schedule() //关键! }
可以看出,g退出时会先进行一些清理工作,最终会再次调用schedule(),开始新一轮调度!这就是调度器能持续不断地运行的原因
此外,执行完毕的g不会被销毁,而是加入了全局的空闲队列。还记得这个空闲队列哪里用到吗?newproc创建g的时候!以此实现g的复用
3. 系统线程的生命周期
系统线程(m)是g的运行载体,一个m只能在同一时刻运行一个g。m为了获取代码执行权必须绑定一个p。如果m不在执行用户代码(如进行系统调用),则和p解绑。
启动一个m的入口是startm函数。
func startm(_p_ *p, spinning bool) { mp := acquirem() lock(&sched.lock) if _p_ == nil { //startm的第一个参数决定了该函数的语义:是让m被动等待可用的p,还是让p主动把所有权移交给m _p_ = pidleget() //从空闲p列表中获取p if _p_ == nil { //如果没有,就返回,启动m失败!因为m的运行依附于p unlock(&sched.lock) releasem(mp) return } } nmp := mget() //从空闲m列表中获取m if nmp == nil { id := mReserveID() unlock(&sched.lock) var fn func() if spinning { fn = mspinning } newm(fn, _p_, id) //如果获取不到,就新建一个m releasem(mp) return } unlock(&sched.lock) nmp.spinning = spinning nmp.nextp.set(_p_) //把新建出来的m的nextp设为当前p,等m真正运行时,就会绑定这个p notewakeup(&nmp.park) releasem(mp) }
以下总结了调用startm的时机:
1. 有新的g可用时。一般是调用newproc和ready(标记一个g为就绪)函数时。前面提到newproc会调用wakep,其中就调的startm,第一个参数为nil,也就是不指定p,而是让m等待空闲的p
2. p移交所有权时。典型的情况就是sysmon守护线程(后面会讲)发现一个p处于syscall状态的时间太长了,就会调用handoffp函数将它的所有权移交,具体在后面讲sysmon的时候会说。其中也会调startm,第一个参数就是这个p本身,表示让m和这个p绑定,也就是p的所有者被移交(handoff)了。
startm中提到,如果在空闲m列表中获取不到m,就会调用newm创建一个。
func newm(fn func(), _p_ *p, id int64) { mp := allocm(_p_, fn, id) mp.doesPark = (_p_ != nil) mp.nextp.set(_p_) mp.sigmask = initSigmask newm1(mp) }
allocm分配了一个m的空间,进行一些初始化,但还没有和任何系统线程绑定
func allocm(_p_ *p, fn func(), id int64) *m { _g_ := getg() acquirem() if _g_.m.p == 0 { acquirep(_p_) } mp := new(m) mp.mstartfn = fn mcommoninit(mp, id) mp.g0 = malg(-1) //为这个m创建g0 mp.g0.m = mp if _p_ == _g_.m.p.ptr() { releasep() } releasem(_g_.m) return mp }
newm1将m真正和系统线程绑定
func newm1(mp *m) { newosproc(mp) //其实这个函数不止这么点,但核心就这一句,新建操作系统线程,并与m绑定 }
在linux环境下,newosproc会使用clone系统调用创建新线程
func newosproc(mp *m) {
//也是省了一堆无关代码 ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart))) }
clone的第四个参数指定线程的入口,是mstart函数的地址,于是目光转向mstart函数,它是真正创建系统线程的入口
mstart先是进行一些栈的初始化,最终进入mstart1函数
func mstart0() { _g_ := getg() //初始化一大堆栈参数 mstart1() mexit(osStack) }
func mstart1() { _g_ := getg()
//保存当前g(因为是新创建的m,所以只能是g0)的sched状态 _g_.sched.g = guintptr(unsafe.Pointer(_g_)) _g_.sched.pc = getcallerpc() _g_.sched.sp = getcallersp() asminit() minit() if _g_.m == &m0 { mstartm0() //在这里面初始化gsignal信号处理协程 } if fn := _g_.m.mstartfn; fn != nil { fn() } if _g_.m != &m0 { acquirep(_g_.m.nextp.ptr()) //acquirep(p)将p和当前m绑定,也就是这个m终于获得了代码的执行权了!还记得nextp吗,是在newm函数里设置的 _g_.m.nextp = 0 } schedule() }
mstart1将之前设置的nextp与m绑定,使m终于获得了代码执行权!并且立即进入下一轮调度schedule()
注意,这个函数一开始还在g0.sched中保存了运行状态。别忘了schedule()会跳转到下一个g的pc而不会返回,所以,为了能让mstart1返回,必须有类似gogo(&_g_.m.g0.sched)的调用,跳转到mstart1的调用者,才能返回。返回后,进入mexit函数销毁线程。
mexit函数主要就是调用handoffp函数,让即将销毁的m的p转交所有权。然后调用一些系统调用来摧毁线程,没什么好说的。
那么什么时候会有gogo(&_g_.m.g0.sched)的调用呢?我看了半天只找到一处,就是在某个g退出时(调用goexit),会检查这个g有没有锁定(锁定,一般指的是调用公有函数runtime.LockOsThread,使当前g独占某个操作系统线程)过当前m,有的话就会调用这个函数。换句话说,如果g锁定了所在的m,那么就认为这个m不干净了,等g结束后就会跳转到g0.sched从而销毁m。这个g真是坑爹啊!正常情况下,m是不会被销毁的。
最后,在startm中提到会现在空闲m列表中获取m,获取不到再进入newm。这个空闲m列表,其实就是保存了休眠状态下的m。触发垃圾回收时当前m会休眠。此外,当g从系统调用中返回时,g.m会寻找可用的p以继续执行代码,如果没有可用的p,也只能进入休眠了。
4. 触发schedule()的时机
之前看到,在启动g或m的时候会立即开始一轮调度。除此之外,触发调度还有一些时机
4.1 g让出执行权
主要是运行态的g主动成为阻塞态或就绪态。
成为阻塞态就是调用gopark函数挂起自身。gopark函数最终调用park_m函数
func park_m(gp *g) { //gp是当前的g casgstatus(gp, _Grunning, _Gwaiting) dropg() //将g.m.curg和g解绑 schedule() }
成为就绪态就是调用公有函数runtime.GoSched()出让当前g的执行权,有点像java里的Thread.yield()。GoSched最终调用goschedImpl函数
func goschedImpl(gp *g) { status := readgstatus(gp) casgstatus(gp, _Grunning, _Grunnable) dropg() globrunqput(gp) //将g放入全局队列 schedule() }
两者最终都调用schedule()开启新一轮调度
4.2 系统调用
在进行系统调用时,编译器会在这个系统调用前后插入reentersyscall和exitsyscall两个钩子函数,在这两个函数里进行GMP状态的保存和恢复
func reentersyscall(pc, sp uintptr) { _g_ := getg() save(pc, sp) //保存寄存器状态 _g_.syscallsp = sp _g_.syscallpc = pc casgstatus(_g_, _Grunning, _Gsyscall) //g的状态转移:运行态->系统调用中 _g_.m.syscalltick = _g_.m.p.ptr().syscalltick _g_.sysblocktraced = true pp := _g_.m.p.ptr() pp.m = 0 //将m和p解绑 _g_.m.oldp.set(pp) //在m.oldp中保存p,为了在系统调用返回时尝试重新获得p _g_.m.p = 0 atomic.Store(&pp.status, _Psyscall) //p的状态转移: 变为系统调用中。或许是p的状态比g简单,所以不用cas? }
func exitsyscall() { _g_ := getg() oldp := _g_.m.oldp.ptr() _g_.m.oldp = 0 if exitsyscallfast(oldp) { //尝试重新获取oldp if trace.enabled { if oldp != _g_.m.p.ptr() || _g_.m.syscalltick != _g_.m.p.ptr().syscalltick { systemstack(traceGoStart) } } _g_.m.p.ptr().syscalltick++ casgstatus(_g_, _Gsyscall, _Grunning) _g_.syscallsp = 0 _g_.m.locks-- _g_.throwsplit = false if sched.disable.user && !schedEnabled(_g_) { Gosched() } return } mcall(exitsyscall0) //尝试失败,走完整流程 }
reentersyscall中在解绑m与p时,会将p保存到m.oldp中,在exitsyscall中尝试恢复p,这样做是希望利用局部性。
func exitsyscallfast(oldp *p) bool { _g_ := getg() //尝试绑定原来的p,前提是p没有移情别恋,例如被sysmon监控到p处于syscall状态太久而移交所有权 if oldp != nil && oldp.status == _Psyscall && atomic.Cas(&oldp.status, _Psyscall, _Pidle) { wirep(oldp) //将oldp与m绑定 exitsyscallfast_reacquired() return true } if sched.pidle != 0 { //如果能找到空闲的p,也能接受,至少m不用休眠了! var ok bool systemstack(func() { ok = exitsyscallfast_pidle() //里面会把空闲的p与m绑定 }) if ok { return true } } return false }
func exitsyscall0(gp *g) { //快速绑定失败,走完整流程 casgstatus(gp, _Gsyscall, _Grunnable) dropg() var _p_ *p if schedEnabled(gp) { _p_ = pidleget() //获取空闲p } var locked bool if _p_ == nil { globrunqput(gp) //获取失败,将g加入全局队列 stopm() //将m加入空闲m列表 } if _p_ != nil { //获取成功,将p与m绑定,开始执行g acquirep(_p_) execute(gp, false) } schedule() }
exitsyscall0是慢路径,因为它由mcall调用,会造成协程切换(g切换到g0)。如果最终获取不到p,就使g加入全局队列,m休眠,开启新一轮调度。
4.3 p被sysmon线程发起抢占
这个留在后面和sysmon一起说
5. 网络轮询netpoll
netpoll是go对多路复用的实现。在linux系统上,主要依赖epoll系列系统调用。
在调度过程中,主要是在finerunnable函数中进行netpoll, 返回一组就绪的g
netpoll接口需要实现netpollinit, netpoll, netpollBreak, netpollopen, netpollclose, netpollisPollDescriptor六个函数。与本文主题相关的是前三个
func netpollinit() { epfd = epollcreate1(1024) r, w, errno := nonblockingPipe() ev := epollevent{ events: _EPOLLIN, } *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev) netpollBreakRd = uintptr(r) netpollBreakWr = uintptr(w) }
看到这几个函数我不禁留下了感动的眼泪,太熟悉了,就是epoll的套皮。
epollinit先调用epoll_create,再创建epollevent结构体,再创建一个管道进行读方向的监听
func netpoll(delay int64) gList { if epfd == -1 { return gList{} } var waitms int32 if delay < 0 { //参数<0表示无限阻塞 waitms = -1 } else if delay == 0 { //参数==0表示非阻塞 waitms = 0 } else if delay < 1e6 { //否则表示带超时的阻塞 waitms = 1 } else if delay < 1e15 { waitms = int32(delay / 1e6) } else { waitms = 1e9 } var events [128]epollevent retry: n := epollwait(epfd, &events[0], int32(len(events)), waitms) if n < 0 { goto retry } var toRun gList for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { continue } if mode != 0 { pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) netpollready(&toRun, pd, mode) } } return toRun }
netpoll根据传入的参数不同有阻塞、非阻塞、超时阻塞三种模式。调用epoll_wait返回就绪事件,并用netpollready工具函数把这些事件转换成g链表后返回
func netpollBreak() { if atomic.Cas(&netpollWakeSig, 0, 1) { for { var b byte n := write(netpollBreakWr, unsafe.Pointer(&b), 1) if n == 1 { break } } } }
netpollBreak向管道中写入内容,导致管道的读端事件被触发,这可以中止netpoll的阻塞状态。
所以,现在可以回顾以下findrunnable中所做的关于网络轮询的事情:
1.调用checkTimers运行已经到达的定时器,如果没有定时器到达,就返回一个pollUntil表示下一个定时器的到达时间。如果没有下一个定时器则返回0
2.调用非阻塞的netpoll先尝试获取一波g,这些g可能是之前执行了read或connect等调用而进入阻塞
3.所有能做的事都做完了之后就进行阻塞的netpoll调用。如果pollUntil不为0,则进行带超时的netpoll调用,使得届时能从netpoll中及时返回以响应定时器到达事件
6. sysmon守护线程
sysmon(system monitor)守护线程有点像linux守护进程或者mysql后台线程,总之都是打杂的。sysmon和其他守护线程一样,也是在一个大循环里进行轮询监控,并对一些异常事件进行处理。
sysmon随着全局g0的启动而启动。
func main() { ...省略 if GOARCH != "wasm" { atomic.Store(&sched.sysmonStarting, 1) systemstack(func() { newm(sysmon, nil, -1) }) }
...省略 }
全局g0在main函数中通过newm创建了一个不依赖p的m(第二个参数为nil),并把sysmon函数作为m的底层线程的入口,即创建了sysmon守护线程。
func sysmon() {
...省略 idle := 0 // 记录sysmon已经连续多少个循环没被唤醒,也没唤醒别人了,即一片祥和 delay := uint32(0) //大循环的间隔 for { if idle == 0 { // 至少间隔20微秒 delay = 20 } else if idle > 50 { // 如果连续没被唤醒的次数过多,就可以多睡一会了 delay *= 2 } if delay > 10*1000 { // 最多间隔不超过10毫秒 delay = 10 * 1000 } now := nanotime() if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) { lock(&sched.lock) if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) { syscallWake := false next, _ := timeSleepUntil() //返回下一个定时器的到达时间
//如果sysmon没什么活干(无须追踪调度、不在等待gc或p全部空闲),定时器也还没到,就可以休息了 if next > now { atomic.Store(&sched.sysmonwait, 1) unlock(&sched.lock) // Make wake-up period small enough // for the sampling to be correct. sleep := forcegcperiod / 2 if next-now < sleep { sleep = next - now } shouldRelax := sleep >= osRelaxMinNS if shouldRelax { osRelax(true) } syscallWake = notetsleep(&sched.sysmonnote, sleep) //休息了。进行系统调用之类的会唤醒sysmon,那就会把idle清零,delay重置 mDoFixup() if shouldRelax { osRelax(false) } lock(&sched.lock) atomic.Store(&sched.sysmonwait, 0) noteclear(&sched.sysmonnote) } if syscallWake { idle = 0 delay = 20 } } unlock(&sched.lock) }
}
...省略 }
sysmon也会想偷懒,没什么活干就休息了,详见上面的注释
sysmon()函数很长,但功能划分得很清楚,主要就四个部分:检查死锁、网络轮询兜底、对p发起抢占、垃圾收集兜底
6.1 检查死锁
核心逻辑是checkdead函数。去除了cgo相关的逻辑,还是蛮简单的
func checkdead() { run := mcount() - sched.nmidle - sched.nmidlelocked - sched.nmsys //运行中的m if run > 0 { return } grunning := 0 //若没有m在运行,统计不在运行的g数量。这变量名很奇怪,grunning统计的是不在运行的g forEachG(func(gp *g) { if isSystemGoroutine(gp, false) { return } s := readgstatus(gp) switch s &^ _Gscan { case _Gwaiting, _Gpreempted: grunning++ case _Grunnable, _Grunning, _Gsyscall: throw("checkdead: runnable g") } }) if grunning == 0 { //如果不在运行的g数量为0,那压根就没g了,说明所有g都退出了,这是由于主协程调用了runtime.Goexit引起的 throw("no goroutines (main called runtime.Goexit) - deadlock!") } // 如果不在运行的g的数量大于0,即有阻塞态的g,就检查有没有定时器,如果有的话,所有g阻塞也是合理的 for _, _p_ := range allp { if len(_p_.timers) > 0 { return } } throw("all goroutines are asleep - deadlock!") //如果也没有定时器,那就是死锁了 }
6.2 网络轮询兜底
lastpoll := int64(atomic.Load64(&sched.lastpoll)) if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) list := netpoll(0) if !list.empty() { incidlelocked(-1) injectglist(&list) incidlelocked(1) } }
和findrunnable里的逻辑几乎一样。作用是每10ms会进行一次netpoll兜底,将就绪的g注入队列
6.3 对p发起抢占
核心逻辑是retake函数。
func retake(now int64) uint32 { n := 0 // Prevent allp slice changes. This lock will be completely // uncontended unless we're already stopping the world. lock(&allpLock) // We can't use a range loop over allp because we may // temporarily drop the allpLock. Hence, we need to re-fetch // allp each time around the loop. for i := 0; i < len(allp); i++ { _p_ := allp[i] pd := &_p_.sysmontick s := _p_.status sysretake := false if s == _Prunning || s == _Psyscall { // 如果p的运行时间太长(超过10ms),调用preemptone将其抢占! t := int64(_p_.schedtick) if int64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now } else if pd.schedwhen+forcePreemptNS <= now { preemptone(_p_) sysretake = true } } if s == _Psyscall { // p处于系统调用状态时, t := int64(_p_.syscalltick) if !sysretake && int64(pd.syscalltick) != t { pd.syscalltick = uint32(t) pd.syscallwhen = now continue }
//若p的本地队列不为空,或者不存在自旋的m与空闲的p,或者距离系统调用发起已经超过10ms了,就调用handoffp移交p的所有权!也就是尽量不让p闲着 if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } // Drop allpLock so we can take sched.lock. unlock(&allpLock) incidlelocked(-1) if atomic.Cas(&_p_.status, s, _Pidle) { if trace.enabled { traceGoSysBlock(_p_) traceProcStop(_p_) } n++ _p_.syscalltick++ handoffp(_p_) } incidlelocked(1) lock(&allpLock) } } unlock(&allpLock) return uint32(n) }
preemptone(p)主要是依赖preemptM系统调用,让当前的g停止在p上运行
handoffp(p)通过调用startm函数来启动一个m与p绑定。handoffp秉持着尽量启动m的原则,如果实在太闲了,则不会启动m,而是将p加入空闲列表
如果retake没能抢占任何p,则sysmon的idle自增,表明又白跑一轮。
6.4 垃圾回收兜底
这不是本文的内容,简单带过一下就是每120秒或触发兜底的ix强制垃圾回收,没什么好说的
7. 定时器实现
最后讲一下定时器的实现,在前面的小节里也出现过很多次了。相比GMP,定时器模块就比较简单了
定时器的数据结构是timer,主要字段如下
type timer struct { pp puintptr when int64 period int64 f func(interface{}, uintptr) arg interface{} seq uintptr }
pp: 指向定时器所在的p。回忆一下p结构体里也有一个[]*timer保存了所有定时器
when: 下次触发时间
period: 周期定时器(Ticker)的周期
f,arg: 定时器到达的回调函数及其参数
seq: 触发序号
p结构体里的timers其实不是一个简单的切片,而是一个小顶四叉堆(4-ary heap,二叉堆的扩展,也是用数组表示树形结构),堆顶元素就是when最小的,也就是最快到达的定时器。
不信的话看看这段源码:
func siftupTimer(t []*timer, i int) int { if i >= len(t) { badTimer() } when := t[i].when if when <= 0 { badTimer() } tmp := t[i] for i > 0 { p := (i - 1) / 4 if when >= t[p].when { break } t[i] = t[p] i = p } if tmp != t[i] { t[i] = tmp } return i }
一行没改!这就是一个经典的堆实现,只不过是四叉的,取父亲下标要除以4。这是向上筛选的函数,向下筛选的函数类似,不展示了
调用addtimer新增定时器,最终进入doaddtimer函数
func doaddtimer(pp *p, t *timer) { if netpollInited == 0 { //定时器依赖netpoll模块(之前展示过),所以没初始化要先初始化 netpollGenericInit() } t.pp.set(pp) //pp就是当前的p i := len(pp.timers) pp.timers = append(pp.timers, t) siftupTimer(pp.timers, i) //往p.timers堆中插入一个定时器 if t == pp.timers[0] { atomic.Store64(&pp.timer0When, uint64(t.when)) } atomic.Xadd(&pp.numTimers, 1) }
定时器的触发被放在调度器的调度周期里进行,触发定时器的入口是checkTimers(在schedule()函数内,之前展示过)
如果g的计算量较大,执行时间很长,或者进行频繁大量的系统调用,schedule函数的主动触发频率就会降低,不得不依靠sysmon线程的抢占式调度。在最坏情况下,调度周期可达到10ms,这也就是定时器的最高保证精度。
checkTimers在成功获取到已到达的定时器后,最终会走进runOneTimer
func runOneTimer(pp *p, t *timer, now int64) { f := t.f arg := t.arg seq := t.seq if t.period > 0 { //如果是周期定时器,计算下一轮触发事件并调整小顶堆 delta := t.when - now t.when += t.period * (1 + -delta/t.period) if t.when < 0 { // check for overflow. t.when = maxWhen } siftdownTimer(pp.timers, 0) if !atomic.Cas(&t.status, timerRunning, timerWaiting) { badTimer() } updateTimer0When(pp) } else { //否则这个定时器没用了,删除 dodeltimer0(pp) if !atomic.Cas(&t.status, timerRunning, timerNoStatus) { badTimer() } }
unlock(&pp.timersLock) f(arg, seq) //触发回调函数 lock(&pp.timersLock) }
实际上定时器比这更加复杂,它也有复杂的状态机,但是和本文主题相差太远,就在此略过了。