go调度器的源码级分析

 

基于GMP模型的调度器是go实现其引以为傲的用户态线程的核心。本文就以GMP调度器为核心分析一下调度流程,顺便分析一下定时器Timer的实现,它和调度器息息相关。

本文的大纲如下:

1.GMP的关键数据结构

2.goroutine的生命周期

3.系统线程的生命周期

4.触发shedule()的时机

5.网络轮询netpoll

6.sysmon守护线程

7.定时器实现

1.GMP的关键数据结构

调度器相关的数据结构都在runtime/runtime2.go中,主要有g(goroutine,协程),m(machine,操作系统线程),p(processor,调度器的资源抽象,m争抢p来获取执行权),schedt(scheduler_type,调度器的全局抽象)

以下都是只给出一些关键字段,实际上每个完整数据结构都有几十个字段!

1.1 g

type g struct {
	stack       stack
	_panic    *_panic
	_defer    *_defer
	m         *m
	sched     gobuf
	atomicstatus uint32
}

stack: 协程栈的上下界,由lo和hi两个字段组成

_panic: 协程的panic链表,头部的是最内层的panic(也就是外层的panic可能会被abort)

_defer: 协程的defer链表,头部的是最内层defer。不过本文不关心defer和panic,只是想说明defer和panic都是goroutine级别的 

m: 当前g对应的m

sched: 记录了协程的运行时状态,内部包含sp、pc、bp等伪寄存器

atomicstatus: 协程的状态机,共有近十种状态,如idle,dead,runnable,running,syscall,waiting,preempted等

1.2 m

type m struct {
	g0      *g
	gsignal       *g
	curg          *g
	p             puintptr
	nextp         puintptr
	oldp          puintptr
	spinning      bool
}

g0: 运行在系统栈上的协程,即负责调度协程的协程

gsignal: 负责响应信号的协程。由上可知一个空闲的m也自带两个g

curg: m当前运行的g,可能为空表示空闲

p: m当前绑定的p。要运行用户代码,必须绑定上一个p。一般p远少于m。

nextp: m刚刚创建或从阻塞转为就绪时,预先绑定的p。真正运行时,就调用acquirep绑定这个p

oldp: m陷入系统调用时会出让p,这里保存这个p,系统调用返回时试图恢复这个p。这是希望利用局部性。

spinning: 是否自旋。如果当前的m是自旋的,而又从本地队列runq和全局队列globalrunq都获取不到可运行的g时,就会从其他p的本地队列尝试窃取g

1.3 p

type p struct {
	status      uint32
	m           muintptr
	deferpool    [5][]*_defer
	runq     [256]guintptr
	gFree struct {
		gList
		n int32
	}
	sudogcache []*sudog
	timer0When uint64
	timers []*timer
}

status: p的状态机

m: p的m。与m的p双向绑定

deferpool: _defer结构体的per-P缓存池。在deferproc创建新_defer时(本文不分析,详见runtime/panic.go)会优先从中分配内存,按内存大小分为五个链表。这也是利用局部性

runq: 当前p的可运行g队列

gFree: 当前p的空闲g队列(即状态为dead的g。g执行完毕后不会销毁,而是等待被复用)

sudogcache: 创建sudog的缓存池,作用同deferpool。sudog代表一个阻塞状态的g,其记录了g所在的阻塞队列等信息,本文不介绍。

timers,timer0When: timers保存了当前p全部定时器,timer0When保存了所有定时器中最近的即将到达的时间。本文会介绍定时器的实现

1.4 schedt

type schedt struct {
	lastpoll  uint64
	midle        muintptr
	nmidle       int32
	nmidlelocked int32
	maxmcount    int32
	nmsys        int32
	nmfreed      int64
	pidle      puintptr
	npidle     uint32
	nmspinning uint32
	runq     gQueue
	gFree struct {
		lock    mutex
		stack   gList
		noStack gList
		n       int32
	}
	sudogcache *sudog
	deferpool [5]*_defer
}

可见,schedt的这些字段部分是状态统计信息,一些(最后4个)是p中的某些结构的全局版本。

以及第一个lastpoll是上一次进行网络轮询(netpoll)的时间。在讲定时器时会说到

2. goroutine的生命周期

对于用户来说,GMP调度器中距离自己最近的当然是g。那么当我们轻轻松松地敲下go来启动一个g时,golang为我们做了什么呢?

在编译阶段,编译器会把go关键字转换成一个runtime.newproc调用。newproc有两个参数,初始栈大小和go出来的函数指针,这两个都是在编译时就能确定的。接下来详细看一下newproc

注意!本文的代码都是高度简化版,源代码太复杂了,作者太强了,顶礼膜拜!

func newproc(siz int32, fn *funcval) {
	argp := add(unsafe.Pointer(&fn), sys.PtrSize)
	gp := getg()
	pc := getcallerpc()
	systemstack(func() {
		newg := newproc1(fn, argp, siz, gp, pc) //初始化一个g结构体

		_p_ := getg().m.p.ptr()
		runqput(_p_, newg, true) //将g加入当前p的本地队列。当前p也就是使用go关键字的那个g所在的m绑定的p,有点绕..

		if mainStarted {
			wakep() //尝试启动一个空闲的p,这个函数之后会说到
		}
	})
}

 先看newproc1具体怎么初始化g

func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
	_g_ := getg()
	siz := narg
	siz = (siz + 7) &^ 7

	_p_ := _g_.m.p.ptr()
	newg := gfget(_p_) //gfget(p)从p的空闲列表中获取一个g。如果p的空闲列表为空,从全局空闲列表中将最多32个g放入p的空闲列表并返回。
	if newg == nil {
		newg = malg(_StackMin) //memory_alloc_g,新创建一个g,这个g只初始化了栈空间的相关信息
		casgstatus(newg, _Gidle, _Gdead) //状态转移。dead表示分配了栈,但未在执行代码,这名字有点迷惑性,刚初始化完和执行结束都是dead
		allgadd(newg)
	}

	totalSize := 4*sys.PtrSize + uintptr(siz) + sys.MinFrameSize
	totalSize += -totalSize & (sys.StackAlign - 1)
	sp := newg.stack.hi - totalSize
	spArg := sp

//接下来就是初始化sched字段等运行状态信息了 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp newg.stktopsp = sp newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum //*1 newg.sched.g = guintptr(unsafe.Pointer(newg)) gostartcallfn(&newg.sched, fn) newg.gopc = callerpc newg.ancestors = saveAncestors(callergp) newg.startpc = fn.fn casgstatus(newg, _Gdead, _Grunnable) return newg }

*1 所有新创建的g的pc都从goexit这个ABI(Application Binary Interface)函数所在的位置开始。这样做等效于:当这个g执行完毕时,就会返回到goexit,执行goexit函数。goexit在本节的最后讲

现在g已经加入p的本地队列了,接下来就是等待被调度了!触发调度的方式有多种,之后会讲到。这边先说明一下,调度的主函数是schedule()函数。它的作用是进行一轮调度:尝试获取一个g,并执行它

func schedule() {
	_g_ := getg()
top:
	pp := _g_.m.p.ptr()

	checkTimers(pp, 0) //检查最近的定时器有没有到达,如果到了就运行它。具体逻辑更加复杂,后面会说

	var gp *g
	var inheritTime bool

	if gp == nil {
		if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { //约有1/61的调度会从全局队列中拿,这是为了减少饥饿
			lock(&sched.lock)
			gp = globrunqget(_g_.m.p.ptr(), 1)
			unlock(&sched.lock)
		}
	}
	if gp == nil {
		gp, inheritTime = runqget(_g_.m.p.ptr()) //从p的本地队列中拿
	}
	if gp == nil {
		gp, inheritTime = findrunnable() // 一个非常复杂的方法,它是阻塞的,直到获取到一个可运行的g
	}
	execute(gp, inheritTime) //执行goroutine的真正入口
}

findrunnable是一个非常复杂的方法,它阻塞直到获取到一个可运行的g。期间还会顺便做一下网络轮询(之后会说)。工作窃取(work-stealing)也是在这个函数里完成的

func findrunnable() (gp *g, inheritTime bool) {
	_g_ := getg()
top:
	_p_ := _g_.m.p.ptr()
	now, pollUntil, _ := checkTimers(_p_, 0) //检查定时器。返回值pollUntil表示下一个定时器到达的时间,后续步骤有用到

	if gp, inheritTime := runqget(_p_); gp != nil { //从p的本地队列中取
		return gp, inheritTime
	}

	if sched.runqsize != 0 {
		lock(&sched.lock)
		gp := globrunqget(_p_, 0) //从全局队列中取
		unlock(&sched.lock)
		if gp != nil {
			return gp, false
		}
	}

        //轮询网络(后面会说)!netpoll(0)表示进行非阻塞轮询,返回一组就绪的g,返回第一个,调用injectglist将剩下的加入p的本地队列
	if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
		if list := netpoll(0); !list.empty() { // non-blocking
			gp := list.pop()
			injectglist(&list) //将一组g加入当前p的本地队列,如果当前没有绑定p,就加入全局队列
			casgstatus(gp, _Gwaiting, _Grunnable) //状态转移:阻塞态->就绪态
			return gp, false
		}
	}

        //如果当前的m处于自旋状态(或者忙碌的p数量比自旋的m数量更多),就尝试从其它p的本地队列中窃取g!
	procs := uint32(gomaxprocs)
	if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
		if !_g_.m.spinning {
			_g_.m.spinning = true
			atomic.Xadd(&sched.nmspinning, 1)
		}

		gp, inheritTime, tnow, w, newWork := stealWork(now)
		now = tnow
		if gp != nil {
			return gp, inheritTime
		}
		if newWork {
			goto top
		}
		if w != 0 && (pollUntil == 0 || w < pollUntil) {
			pollUntil = w
		}
	}

	lock(&sched.lock)
	if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
		unlock(&sched.lock)
		goto top
	}
	if sched.runqsize != 0 {
		gp := globrunqget(_p_, 0)
		unlock(&sched.lock)
		return gp, false
	}
        //实在无事可做了,就把当前的p加入schedt的空闲p列表
	pidleput(_p_)
	unlock(&sched.lock)
        
// ***关键!阻塞直到有可用的g if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 { atomic.Store64(&sched.pollUntil, uint64(pollUntil)) delay := int64(-1) if pollUntil != 0 { if now == 0 { now = nanotime() } delay = pollUntil - now if delay < 0 { delay = 0 } } list := netpoll(delay) //阻塞至多delay时间。如果没有定时器,则无限阻塞,直到被netpollBreak唤起。返回就绪的一组g atomic.Store64(&sched.pollUntil, 0) atomic.Store64(&sched.lastpoll, uint64(nanotime())) _p_ = pidleget() //获取空闲p if _p_ == nil { injectglist(&list) //获取失败,将所有g注入(本地或全局)队列 } else { acquirep(_p_) //获取成功,将p和m绑定,将第一个以后的g注入(本地或全局)队列,第一个g被返回 if !list.empty() { gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) return gp, false } goto top } } else if pollUntil != 0 && netpollinited() { pollerPollUntil := int64(atomic.Load64(&sched.pollUntil)) if pollerPollUntil == 0 || pollerPollUntil > pollUntil { netpollBreak() } } stopm() goto top }

上面标有***代码是关键,完成了阻塞获取g的逻辑。netpoll和定时器部分后面会讲到

execute是goroutine的真正入口,它的核心逻辑非常简单,就是调用gogo函数,定位到g.sched中保存的运行位置

func execute(gp *g, inheritTime bool) {
	_g_ := getg()
	_g_.m.curg = gp
	gp.m = _g_.m //把当前的m(实际上是g0的m)和要执行的g绑定起来
	casgstatus(gp, _Grunnable, _Grunning) //状态转移:就绪态->运行态
	gp.waitsince = 0
	gp.preempt = false
	gp.stackguard0 = gp.stack.lo + _StackGuard
	gogo(&gp.sched) //gogo!
}

gogo是一个汇编(go assembly,和普通汇编略有不同)实现的函数,用g.sched里保存的pc、sp等寄存器信息进行跳转

TEXT runtime·gogo(SB), NOSPLIT, $0-8
    MOVQ    buf+0(FP), BX        // 读取g.sched参数放入BX
    MOVQ    gobuf_g(BX), DX
    MOVQ    0(DX), CX
    JMP    gogo<>(SB)

TEXT gogo<>(SB), NOSPLIT, $0
    get_tls(CX)
    MOVQ    DX, g(CX)
    MOVQ    DX, R14
    MOVQ    gobuf_sp(BX), SP
    MOVQ    gobuf_ret(BX), AX
    MOVQ    gobuf_ctxt(BX), DX
    MOVQ    gobuf_bp(BX), BP
    MOVQ    gobuf_pc(BX), BX
    JMP    BX                //就是在这里跳转到了g.sched里保存的pc位置!开始正式执行用户写的func

 现在终于开始执行用户代码了!但是这个g依然不能高枕无忧了!有多种方式可以让这个g失去执行权,即让g.m与p解绑,之后会讨论。现在先假设g顺利执行完了,那就会按之前说的,调用goexit

goexit也是个汇编函数,但逻辑比较简单,总之最后调用到goexit0函数

func goexit0(gp *g) {
	_g_ := getg()

	casgstatus(gp, _Grunning, _Gdead) //状态转移
	gp.m = nil //将当前g的一些状态清空
	locked := gp.lockedm != 0
	gp.lockedm = 0
	_g_.m.lockedg = 0
	gp.preemptStop = false
	gp.paniconfault = false
	gp._defer = nil
	gp._panic = nil
	gp.writebuf = nil
	gp.waitreason = 0
	gp.param = nil
	gp.labels = nil
	gp.timer = nil

	dropg() //里面完成了m.curg与当前g的解绑

	gfput(_g_.m.p.ptr(), gp) //把当前g加入全局的空闲队列schedt.gFree
	schedule() //关键!
}

可以看出,g退出时会先进行一些清理工作,最终会再次调用schedule(),开始新一轮调度!这就是调度器能持续不断地运行的原因

此外,执行完毕的g不会被销毁,而是加入了全局的空闲队列。还记得这个空闲队列哪里用到吗?newproc创建g的时候!以此实现g的复用

3. 系统线程的生命周期

系统线程(m)是g的运行载体,一个m只能在同一时刻运行一个g。m为了获取代码执行权必须绑定一个p。如果m不在执行用户代码(如进行系统调用),则和p解绑。

启动一个m的入口是startm函数。

func startm(_p_ *p, spinning bool) {
	mp := acquirem()
	lock(&sched.lock)
	if _p_ == nil { //startm的第一个参数决定了该函数的语义:是让m被动等待可用的p,还是让p主动把所有权移交给m
		_p_ = pidleget() //从空闲p列表中获取p
		if _p_ == nil { //如果没有,就返回,启动m失败!因为m的运行依附于p
			unlock(&sched.lock)
			releasem(mp)
			return
		}
	}
	nmp := mget() //从空闲m列表中获取m
	if nmp == nil {
		id := mReserveID()
		unlock(&sched.lock)

		var fn func()
		if spinning {
			fn = mspinning
		}
		newm(fn, _p_, id) //如果获取不到,就新建一个m
		releasem(mp)
		return
	}
	unlock(&sched.lock)
	nmp.spinning = spinning
	nmp.nextp.set(_p_) //把新建出来的m的nextp设为当前p,等m真正运行时,就会绑定这个p
	notewakeup(&nmp.park)
	releasem(mp)
}

以下总结了调用startm的时机:

1. 有新的g可用时。一般是调用newproc和ready(标记一个g为就绪)函数时。前面提到newproc会调用wakep,其中就调的startm,第一个参数为nil,也就是不指定p,而是让m等待空闲的p

2. p移交所有权时。典型的情况就是sysmon守护线程(后面会讲)发现一个p处于syscall状态的时间太长了,就会调用handoffp函数将它的所有权移交,具体在后面讲sysmon的时候会说。其中也会调startm,第一个参数就是这个p本身,表示让m和这个p绑定,也就是p的所有者被移交(handoff)了。

startm中提到,如果在空闲m列表中获取不到m,就会调用newm创建一个。

func newm(fn func(), _p_ *p, id int64) {
	mp := allocm(_p_, fn, id)
	mp.doesPark = (_p_ != nil)
	mp.nextp.set(_p_)
	mp.sigmask = initSigmask
	newm1(mp)
}

allocm分配了一个m的空间,进行一些初始化,但还没有和任何系统线程绑定

func allocm(_p_ *p, fn func(), id int64) *m {
	_g_ := getg()
	acquirem()
	if _g_.m.p == 0 {
		acquirep(_p_)
	}

	mp := new(m)
	mp.mstartfn = fn
	mcommoninit(mp, id)

	mp.g0 = malg(-1) //为这个m创建g0
	mp.g0.m = mp

	if _p_ == _g_.m.p.ptr() {
		releasep()
	}
	releasem(_g_.m)

	return mp
}

newm1将m真正和系统线程绑定

func newm1(mp *m) {
	newosproc(mp) //其实这个函数不止这么点,但核心就这一句,新建操作系统线程,并与m绑定
}

在linux环境下,newosproc会使用clone系统调用创建新线程

func newosproc(mp *m) {
//也是省了一堆无关代码 ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart))) }

clone的第四个参数指定线程的入口,是mstart函数的地址,于是目光转向mstart函数,它是真正创建系统线程的入口

mstart先是进行一些栈的初始化,最终进入mstart1函数

func mstart0() {
	_g_ := getg()
        //初始化一大堆栈参数
	mstart1()

	mexit(osStack)
}

  

func mstart1() {
	_g_ := getg()

//保存当前g(因为是新创建的m,所以只能是g0)的sched状态 _g_.sched.g = guintptr(unsafe.Pointer(_g_)) _g_.sched.pc = getcallerpc() _g_.sched.sp = getcallersp() asminit() minit() if _g_.m == &m0 { mstartm0() //在这里面初始化gsignal信号处理协程 } if fn := _g_.m.mstartfn; fn != nil { fn() } if _g_.m != &m0 { acquirep(_g_.m.nextp.ptr()) //acquirep(p)将p和当前m绑定,也就是这个m终于获得了代码的执行权了!还记得nextp吗,是在newm函数里设置的 _g_.m.nextp = 0 } schedule() }

mstart1将之前设置的nextp与m绑定,使m终于获得了代码执行权!并且立即进入下一轮调度schedule()

注意,这个函数一开始还在g0.sched中保存了运行状态。别忘了schedule()会跳转到下一个g的pc而不会返回,所以,为了能让mstart1返回,必须有类似gogo(&_g_.m.g0.sched)的调用,跳转到mstart1的调用者,才能返回。返回后,进入mexit函数销毁线程。

mexit函数主要就是调用handoffp函数,让即将销毁的m的p转交所有权。然后调用一些系统调用来摧毁线程,没什么好说的。

那么什么时候会有gogo(&_g_.m.g0.sched)的调用呢?我看了半天只找到一处,就是在某个g退出时(调用goexit),会检查这个g有没有锁定(锁定,一般指的是调用公有函数runtime.LockOsThread,使当前g独占某个操作系统线程)过当前m,有的话就会调用这个函数。换句话说,如果g锁定了所在的m,那么就认为这个m不干净了,等g结束后就会跳转到g0.sched从而销毁m。这个g真是坑爹啊!正常情况下,m是不会被销毁的。

最后,在startm中提到会现在空闲m列表中获取m,获取不到再进入newm。这个空闲m列表,其实就是保存了休眠状态下的m。触发垃圾回收时当前m会休眠。此外,当g从系统调用中返回时,g.m会寻找可用的p以继续执行代码,如果没有可用的p,也只能进入休眠了。

4. 触发schedule()的时机

之前看到,在启动g或m的时候会立即开始一轮调度。除此之外,触发调度还有一些时机

4.1 g让出执行权

主要是运行态的g主动成为阻塞态或就绪态。

成为阻塞态就是调用gopark函数挂起自身。gopark函数最终调用park_m函数

func park_m(gp *g) { //gp是当前的g
	casgstatus(gp, _Grunning, _Gwaiting)
	dropg() //将g.m.curg和g解绑
	schedule()
}

成为就绪态就是调用公有函数runtime.GoSched()出让当前g的执行权,有点像java里的Thread.yield()。GoSched最终调用goschedImpl函数

func goschedImpl(gp *g) {
	status := readgstatus(gp)
	casgstatus(gp, _Grunning, _Grunnable)
	dropg()
	globrunqput(gp) //将g放入全局队列
	schedule()
}

两者最终都调用schedule()开启新一轮调度

4.2 系统调用

在进行系统调用时,编译器会在这个系统调用前后插入reentersyscall和exitsyscall两个钩子函数,在这两个函数里进行GMP状态的保存和恢复

func reentersyscall(pc, sp uintptr) {
	_g_ := getg()
	save(pc, sp) //保存寄存器状态
	_g_.syscallsp = sp
	_g_.syscallpc = pc
	casgstatus(_g_, _Grunning, _Gsyscall) //g的状态转移:运行态->系统调用中

	_g_.m.syscalltick = _g_.m.p.ptr().syscalltick
	_g_.sysblocktraced = true
	pp := _g_.m.p.ptr()
	pp.m = 0 //将m和p解绑
	_g_.m.oldp.set(pp) //在m.oldp中保存p,为了在系统调用返回时尝试重新获得p
	_g_.m.p = 0
	atomic.Store(&pp.status, _Psyscall) //p的状态转移: 变为系统调用中。或许是p的状态比g简单,所以不用cas?
}
func exitsyscall() {
	_g_ := getg()
	oldp := _g_.m.oldp.ptr()
	_g_.m.oldp = 0
	if exitsyscallfast(oldp) { //尝试重新获取oldp
		if trace.enabled {
			if oldp != _g_.m.p.ptr() || _g_.m.syscalltick != _g_.m.p.ptr().syscalltick {
				systemstack(traceGoStart)
			}
		}
		_g_.m.p.ptr().syscalltick++
		casgstatus(_g_, _Gsyscall, _Grunning)

		_g_.syscallsp = 0
		_g_.m.locks--
		_g_.throwsplit = false

		if sched.disable.user && !schedEnabled(_g_) {
			Gosched()
		}

		return
	}
	mcall(exitsyscall0) //尝试失败,走完整流程
}

reentersyscall中在解绑m与p时,会将p保存到m.oldp中,在exitsyscall中尝试恢复p,这样做是希望利用局部性。

func exitsyscallfast(oldp *p) bool {
	_g_ := getg()
        //尝试绑定原来的p,前提是p没有移情别恋,例如被sysmon监控到p处于syscall状态太久而移交所有权
	if oldp != nil && oldp.status == _Psyscall && atomic.Cas(&oldp.status, _Psyscall, _Pidle) {
		wirep(oldp) //将oldp与m绑定
		exitsyscallfast_reacquired()
		return true
	}

	if sched.pidle != 0 { //如果能找到空闲的p,也能接受,至少m不用休眠了!
		var ok bool
		systemstack(func() {
			ok = exitsyscallfast_pidle() //里面会把空闲的p与m绑定
		})
		if ok {
			return true
		}
	}
	return false
}
func exitsyscall0(gp *g) { //快速绑定失败,走完整流程
	casgstatus(gp, _Gsyscall, _Grunnable)
	dropg()
	var _p_ *p
	if schedEnabled(gp) {
		_p_ = pidleget() //获取空闲p
	}
	var locked bool
	if _p_ == nil {
		globrunqput(gp) //获取失败,将g加入全局队列
		stopm() //将m加入空闲m列表
	}
	if _p_ != nil { //获取成功,将p与m绑定,开始执行g
		acquirep(_p_)
		execute(gp, false)
	}
	schedule()
}

exitsyscall0是慢路径,因为它由mcall调用,会造成协程切换(g切换到g0)。如果最终获取不到p,就使g加入全局队列,m休眠,开启新一轮调度。

4.3 p被sysmon线程发起抢占

这个留在后面和sysmon一起说

5. 网络轮询netpoll

netpoll是go对多路复用的实现。在linux系统上,主要依赖epoll系列系统调用。

在调度过程中,主要是在finerunnable函数中进行netpoll, 返回一组就绪的g

netpoll接口需要实现netpollinit, netpoll, netpollBreak, netpollopen, netpollclose, netpollisPollDescriptor六个函数。与本文主题相关的是前三个

func netpollinit() {
	epfd = epollcreate1(1024)
	r, w, errno := nonblockingPipe()
	ev := epollevent{
		events: _EPOLLIN,
	}
	*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
	errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
	netpollBreakRd = uintptr(r)
	netpollBreakWr = uintptr(w)
}

看到这几个函数我不禁留下了感动的眼泪,太熟悉了,就是epoll的套皮。

epollinit先调用epoll_create,再创建epollevent结构体,再创建一个管道进行读方向的监听

func netpoll(delay int64) gList {
	if epfd == -1 {
		return gList{}
	}
	var waitms int32
	if delay < 0 { //参数<0表示无限阻塞
		waitms = -1
	} else if delay == 0 { //参数==0表示非阻塞
		waitms = 0
	} else if delay < 1e6 { //否则表示带超时的阻塞
		waitms = 1
	} else if delay < 1e15 {
		waitms = int32(delay / 1e6)
	} else {
		waitms = 1e9
	}
	var events [128]epollevent
retry:
	n := epollwait(epfd, &events[0], int32(len(events)), waitms)
	if n < 0 {
		goto retry
	}
	var toRun gList
	for i := int32(0); i < n; i++ {
		ev := &events[i]
		if ev.events == 0 {
			continue
		}
		if mode != 0 {
			pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
			netpollready(&toRun, pd, mode)
		}
	}
	return toRun
}

netpoll根据传入的参数不同有阻塞、非阻塞、超时阻塞三种模式。调用epoll_wait返回就绪事件,并用netpollready工具函数把这些事件转换成g链表后返回

func netpollBreak() {
	if atomic.Cas(&netpollWakeSig, 0, 1) {
		for {
			var b byte
			n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
			if n == 1 {
				break
			}
		}
	}
}

netpollBreak向管道中写入内容,导致管道的读端事件被触发,这可以中止netpoll的阻塞状态。

所以,现在可以回顾以下findrunnable中所做的关于网络轮询的事情:

1.调用checkTimers运行已经到达的定时器,如果没有定时器到达,就返回一个pollUntil表示下一个定时器的到达时间。如果没有下一个定时器则返回0

2.调用非阻塞的netpoll先尝试获取一波g,这些g可能是之前执行了read或connect等调用而进入阻塞

3.所有能做的事都做完了之后就进行阻塞的netpoll调用。如果pollUntil不为0,则进行带超时的netpoll调用,使得届时能从netpoll中及时返回以响应定时器到达事件

6. sysmon守护线程

sysmon(system monitor)守护线程有点像linux守护进程或者mysql后台线程,总之都是打杂的。sysmon和其他守护线程一样,也是在一个大循环里进行轮询监控,并对一些异常事件进行处理。

sysmon随着全局g0的启动而启动。

func main() {
	...省略
	if GOARCH != "wasm" {
		atomic.Store(&sched.sysmonStarting, 1)
		systemstack(func() {
			newm(sysmon, nil, -1)
		})
	}
...省略 }

全局g0在main函数中通过newm创建了一个不依赖p的m(第二个参数为nil),并把sysmon函数作为m的底层线程的入口,即创建了sysmon守护线程。

func sysmon() {
...省略 idle := 0 // 记录sysmon已经连续多少个循环没被唤醒,也没唤醒别人了,即一片祥和 delay := uint32(0) //大循环的间隔 for { if idle == 0 { // 至少间隔20微秒 delay = 20 } else if idle > 50 { // 如果连续没被唤醒的次数过多,就可以多睡一会了 delay *= 2 } if delay > 10*1000 { // 最多间隔不超过10毫秒 delay = 10 * 1000 } now := nanotime() if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) { lock(&sched.lock) if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) { syscallWake := false next, _ := timeSleepUntil() //返回下一个定时器的到达时间
//如果sysmon没什么活干(无须追踪调度、不在等待gc或p全部空闲),定时器也还没到,就可以休息了 if next > now { atomic.Store(&sched.sysmonwait, 1) unlock(&sched.lock) // Make wake-up period small enough // for the sampling to be correct. sleep := forcegcperiod / 2 if next-now < sleep { sleep = next - now } shouldRelax := sleep >= osRelaxMinNS if shouldRelax { osRelax(true) } syscallWake = notetsleep(&sched.sysmonnote, sleep) //休息了。进行系统调用之类的会唤醒sysmon,那就会把idle清零,delay重置 mDoFixup() if shouldRelax { osRelax(false) } lock(&sched.lock) atomic.Store(&sched.sysmonwait, 0) noteclear(&sched.sysmonnote) } if syscallWake { idle = 0 delay = 20 } } unlock(&sched.lock) }
}
...省略 }

sysmon也会想偷懒,没什么活干就休息了,详见上面的注释

sysmon()函数很长,但功能划分得很清楚,主要就四个部分:检查死锁、网络轮询兜底、对p发起抢占、垃圾收集兜底

6.1 检查死锁

核心逻辑是checkdead函数。去除了cgo相关的逻辑,还是蛮简单的

func checkdead() {
	run := mcount() - sched.nmidle - sched.nmidlelocked - sched.nmsys //运行中的m
	if run > 0 {
		return
	}

	grunning := 0 //若没有m在运行,统计不在运行的g数量。这变量名很奇怪,grunning统计的是不在运行的g
	forEachG(func(gp *g) {
		if isSystemGoroutine(gp, false) {
			return
		}
		s := readgstatus(gp)
		switch s &^ _Gscan {
		case _Gwaiting,
			_Gpreempted:
			grunning++
		case _Grunnable,
			_Grunning,
			_Gsyscall:
			throw("checkdead: runnable g")
		}
	})
	if grunning == 0 { //如果不在运行的g数量为0,那压根就没g了,说明所有g都退出了,这是由于主协程调用了runtime.Goexit引起的
		throw("no goroutines (main called runtime.Goexit) - deadlock!")
	}

	// 如果不在运行的g的数量大于0,即有阻塞态的g,就检查有没有定时器,如果有的话,所有g阻塞也是合理的
	for _, _p_ := range allp {
		if len(_p_.timers) > 0 {
			return
		}
	}
	throw("all goroutines are asleep - deadlock!") //如果也没有定时器,那就是死锁了
}

6.2 网络轮询兜底

		lastpoll := int64(atomic.Load64(&sched.lastpoll))
		if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
			atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
			list := netpoll(0)
			if !list.empty() {
				incidlelocked(-1)
				injectglist(&list)
				incidlelocked(1)
			}
		}

和findrunnable里的逻辑几乎一样。作用是每10ms会进行一次netpoll兜底,将就绪的g注入队列

6.3 对p发起抢占

核心逻辑是retake函数。

func retake(now int64) uint32 {
	n := 0
	// Prevent allp slice changes. This lock will be completely
	// uncontended unless we're already stopping the world.
	lock(&allpLock)
	// We can't use a range loop over allp because we may
	// temporarily drop the allpLock. Hence, we need to re-fetch
	// allp each time around the loop.
	for i := 0; i < len(allp); i++ {
		_p_ := allp[i]
		pd := &_p_.sysmontick
		s := _p_.status
		sysretake := false
		if s == _Prunning || s == _Psyscall {
			// 如果p的运行时间太长(超过10ms),调用preemptone将其抢占!
			t := int64(_p_.schedtick)
			if int64(pd.schedtick) != t {
				pd.schedtick = uint32(t)
				pd.schedwhen = now
			} else if pd.schedwhen+forcePreemptNS <= now {
				preemptone(_p_)
				sysretake = true
			}
		}
		if s == _Psyscall {
			// p处于系统调用状态时,
			t := int64(_p_.syscalltick)
			if !sysretake && int64(pd.syscalltick) != t {
				pd.syscalltick = uint32(t)
				pd.syscallwhen = now
				continue
			}
//若p的本地队列不为空,或者不存在自旋的m与空闲的p,或者距离系统调用发起已经超过10ms了,就调用handoffp移交p的所有权!也就是尽量不让p闲着 if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } // Drop allpLock so we can take sched.lock. unlock(&allpLock) incidlelocked(-1) if atomic.Cas(&_p_.status, s, _Pidle) { if trace.enabled { traceGoSysBlock(_p_) traceProcStop(_p_) } n++ _p_.syscalltick++ handoffp(_p_) } incidlelocked(1) lock(&allpLock) } } unlock(&allpLock) return uint32(n) }

preemptone(p)主要是依赖preemptM系统调用,让当前的g停止在p上运行

handoffp(p)通过调用startm函数来启动一个m与p绑定。handoffp秉持着尽量启动m的原则,如果实在太闲了,则不会启动m,而是将p加入空闲列表

如果retake没能抢占任何p,则sysmon的idle自增,表明又白跑一轮。

6.4 垃圾回收兜底

这不是本文的内容,简单带过一下就是每120秒或触发兜底的ix强制垃圾回收,没什么好说的

7. 定时器实现

最后讲一下定时器的实现,在前面的小节里也出现过很多次了。相比GMP,定时器模块就比较简单了

定时器的数据结构是timer,主要字段如下

type timer struct {
	pp puintptr
	when   int64
	period int64
	f      func(interface{}, uintptr)
	arg    interface{}
	seq    uintptr
}

pp: 指向定时器所在的p。回忆一下p结构体里也有一个[]*timer保存了所有定时器

when: 下次触发时间

period: 周期定时器(Ticker)的周期

f,arg: 定时器到达的回调函数及其参数

seq: 触发序号

 

p结构体里的timers其实不是一个简单的切片,而是一个小顶四叉堆(4-ary heap,二叉堆的扩展,也是用数组表示树形结构),堆顶元素就是when最小的,也就是最快到达的定时器。

不信的话看看这段源码:

func siftupTimer(t []*timer, i int) int {
	if i >= len(t) {
		badTimer()
	}
	when := t[i].when
	if when <= 0 {
		badTimer()
	}
	tmp := t[i]
	for i > 0 {
		p := (i - 1) / 4
		if when >= t[p].when {
			break
		}
		t[i] = t[p]
		i = p
	}
	if tmp != t[i] {
		t[i] = tmp
	}
	return i
}

一行没改!这就是一个经典的堆实现,只不过是四叉的,取父亲下标要除以4。这是向上筛选的函数,向下筛选的函数类似,不展示了

调用addtimer新增定时器,最终进入doaddtimer函数

func doaddtimer(pp *p, t *timer) {
	if netpollInited == 0 { //定时器依赖netpoll模块(之前展示过),所以没初始化要先初始化
		netpollGenericInit()
	}

	t.pp.set(pp) //pp就是当前的p
	i := len(pp.timers)
	pp.timers = append(pp.timers, t)
	siftupTimer(pp.timers, i) //往p.timers堆中插入一个定时器
	if t == pp.timers[0] {
		atomic.Store64(&pp.timer0When, uint64(t.when))
	}
	atomic.Xadd(&pp.numTimers, 1)
}

定时器的触发被放在调度器的调度周期里进行,触发定时器的入口是checkTimers(在schedule()函数内,之前展示过)

如果g的计算量较大,执行时间很长,或者进行频繁大量的系统调用,schedule函数的主动触发频率就会降低,不得不依靠sysmon线程的抢占式调度。在最坏情况下,调度周期可达到10ms,这也就是定时器的最高保证精度。

checkTimers在成功获取到已到达的定时器后,最终会走进runOneTimer

func runOneTimer(pp *p, t *timer, now int64) {
	f := t.f
	arg := t.arg
	seq := t.seq

	if t.period > 0 { //如果是周期定时器,计算下一轮触发事件并调整小顶堆
		delta := t.when - now
		t.when += t.period * (1 + -delta/t.period)
		if t.when < 0 { // check for overflow.
			t.when = maxWhen
		}
		siftdownTimer(pp.timers, 0)
		if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
			badTimer()
		}
		updateTimer0When(pp)
	} else { //否则这个定时器没用了,删除
		dodeltimer0(pp)
		if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {
			badTimer()
		}
	}
unlock(&pp.timersLock) f(arg, seq) //触发回调函数 lock(&pp.timersLock) }

实际上定时器比这更加复杂,它也有复杂的状态机,但是和本文主题相差太远,就在此略过了。

上一篇:探究 Go 源码中 panic & recover 有哪些坑?


下一篇:spring的aop