Go 语言源码分析——map

哈希表用于存储键值对的映射关系,具有O(1)的读写性能。通过哈希函数可以将不同的键映射到不同索引上,当不同的键映射到同一个索引上时,会产生哈希冲突,可通过开放寻址法、链表法来解决哈希冲突,其中Go使用的是链表法。

一、数据结构 

map将键值对存放在桶数组中,每个桶只保存8个键值对,通过键的低8位选择桶,通过键的高8位选择放在桶的哪个位置。如果有超过8个键值对映射到同一个桶,则会放到溢出桶

type hmap struct {
	count     int    // 当前map的元素数量
	flags     uint8
	B         uint8  // 当前持有的buckets数量,因为总是2的倍数,所以存的是对数值
	noverflow uint16 // 溢出的bucket数量
	hash0     uint32 // hash seed

	buckets    unsafe.Pointer // 指向bucket的指针
	oldbuckets unsafe.Pointer // 用于扩容时保存之前的bucket,大小是当前bucket的一半
	nevacuate  uintptr        // 迁移进度

	extra *mapextra // 当键值非指针时使用
}

type mapextra struct {
	// 当键值非指针时,溢出桶存放到mapextra结构中,overflow存放buckets中的溢出桶,oldoverflow存放oldbuckets中的溢出桶,nextOverflow预分配溢出桶空间
	overflow    *[]*bmap
	oldoverflow *[]*bmap

	nextOverflow *bmap
}

type bmap struct {
	tophash [bucketCnt]uint8  // bucketCnt=8,一个桶只能放8个键值对,tophash存放了键的哈希的高8位,通过比较不同键的哈希的高 8 位可以减少访问键值对次数以提高性能
}

二、操作

1.创建map

map是通过makemap方法创建的,该方法接受一个要创建的map的类型,要创建的map的大小,以及一个hmap指针,返回一个hmap指针

makemap(t *maptype, hint int, h *hmap) *hmap

首先对要创建的map大小是否溢出进行检查

mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
if overflow || mem > maxAlloc {
	hint = 0
}

若传入的h为空则初始化一个map

if h == nil {
	h = new(hmap)
}
h.hash0 = fastrand()

然后计算合适的桶的数量

B := uint8(0)
for overLoadFactor(hint, B) {
	B++
}
h.B = B

然后对桶进行初始化,其中如果B为0,则桶的初始化延迟到赋值时进行

if h.B != 0 {
	var nextOverflow *bmap
	h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
	if nextOverflow != nil {
		h.extra = new(mapextra)
		h.extra.nextOverflow = nextOverflow
	}
}

return h

其中调用了makeBucketArray计算出要创建的桶的数量并为此分配内存

func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
	base := bucketShift(b)
	nbuckets := base
	// 当桶的数量小于2的4次方时,由于数据量较小,使用溢出桶可能性较低,省去创建过程,否则还需额外创建2的b-4次方个桶
	if b >= 4 {
		nbuckets += bucketShift(b - 4)
		sz := t.bucket.size * nbuckets
		up := roundupsize(sz)
		if up != sz {
			nbuckets = up 
		}
	}

	if dirtyalloc == nil {
        // 分配一个新的数组
		buckets = newarray(t.bucket, int(nbuckets))
	} else {
		// 清理原来的数组
		buckets = dirtyalloc
		size := t.bucket.size * nbuckets
		if t.bucket.ptrdata != 0 {
			memclrHasPointers(buckets, size)
		} else {
			memclrNoHeapPointers(buckets, size)
		}
	}

	if base != nbuckets {
		// 找到溢出桶的位置,因为预先分配的溢出桶的overflow都是nil,若要知道哪个溢出桶是最后一个需要进行标记
		nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
		last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
		last.setoverflow(t, (*bmap)(buckets))
	}
	return buckets, nextOverflow
}

2.获取元素

当使用m[key]获取对应的value时,会调用mapaccess1方法,该方法接受一个map的类型,一个hmap的指针,以及key的指针。该方法永远不会返回nil,若该key不存在map时,会返回值的类型对应的空值,因此还可通过返回一个bool值标识该key是否存在map,这个调用到了mapaccess2方法

mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer
mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool)

首先是一个简单判断,若map未初始化或map存放的键值对为0,则快速返回

if h == nil || h.count == 0 {
	if t.hashMightPanic() {
		t.hasher(key, 0)
	}
	return unsafe.Pointer(&zeroVal[0])
}

如果此时另一个goroutine在对map进行写操作,会抛出异常,因此map若要用于并发读写,需要配合互斥锁

if h.flags&hashWriting != 0 {
	throw("concurrent map read and map write")
}

接着是一些准备工作

hash := t.hasher(key, uintptr(h.hash0))  // 计算键的哈希值
m := bucketMask(h.B)
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))  // 找到桶的位置
if c := h.oldbuckets; c != nil {  // 若oldbuckets!=nil,说明此时正在进行扩容
	if !h.sameSizeGrow() {
		// 若不是同size扩容的话,要计算老的buckets的数量
		m >>= 1
	}
	oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
	if !evacuated(oldb) {  // 若该键在老的桶里,则去oldbuckets获取键的值
		b = oldb
	}
}
top := tophash(hash)  // 获取键的哈希值的高8位

最后去桶里查找键对应的值,返回

当遍历了桶的八个键值对都没找到,则遍历溢出桶
for ; b != nil; b = b.overflow(t) {
    // 依次遍历桶里的8个键值对查找
	for i := uintptr(0); i < bucketCnt; i++ {
		if b.tophash[i] != top {
			if b.tophash[i] == emptyRest {  // 说明该桶的键值对已遍历完且没找到对应的key,跳出循环
				break bucketloop
			}
			continue
		}
		k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))  // 找到存放key的位置
		if t.indirectkey() {  // 获取key的值
			k = *((*unsafe.Pointer)(k))
		}
		if t.key.equal(key, k) {  // 若桶里的key和要查找的key相等,则找到对应value的值的位置,获取气质并返回
			e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
			if t.indirectelem() {
				e = *((*unsafe.Pointer)(e))
			}
			return e
		}
	}
}
return unsafe.Pointer(&zeroVal[0])

总结:

  1. 首先会定位到要查找的桶,遍历其8个槽位,若找到则返回
  2. 若没找到,则去溢出桶查找,找到则返回
  3. 若溢出桶也没找到,则该键值对不在map中

3.添加键值对

当通过m[key] = value添加键值对时,会调用mapassign方法,该方法接受一个map的类型,一个hmap的指针,一个key的指针,并返回一个指针

mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer

对未初始化的map赋值,会panic

if h == nil {
	panic(plainError("assignment to entry in nil map"))
}

检查是否有其他goroutine在执行写操作,若有则抛出异常

if h.flags&hashWriting != 0 {
	throw("concurrent map writes")
}

然后是一些准备工作

hash := t.hasher(key, uintptr(h.hash0))  // 计算键的哈希值

h.flags ^= hashWriting  // 设置map的写位,使得并发读写时会panic

if h.buckets == nil {
	h.buckets = newobject(t.bucket) // 若桶未初始化则进行初始化
}

again:
	bucket := hash & bucketMask(h.B)  // 找到桶
	if h.growing() {  // 若在扩容,则先执行迁移操作
		growWork(t, h, bucket)
	}
	b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))  // 找到桶的位置
	top := tophash(hash)  // 获取键的哈希值的高8位

	var inserti *uint8
	var insertk unsafe.Pointer
	var elem unsafe.Pointer

然后是写入操作

bucketloop:
	for {
        // 遍历当前桶的8个键值对
		for i := uintptr(0); i < bucketCnt; i++ {
			if b.tophash[i] != top {
                // 若当前的槽为空,则计算该槽的key和value的位置
				if isEmpty(b.tophash[i]) && inserti == nil {
					inserti = &b.tophash[i]
					insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
					elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
				}
				if b.tophash[i] == emptyRest {
					break bucketloop
				}
				continue
			}
            // 若该槽已有对应的键值,则比较槽的key和传入的key是否相等
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
			if t.indirectkey() {
				k = *((*unsafe.Pointer)(k))
			}
            // 不相等,则继续寻找可用的槽
			if !t.key.equal(key, k) {
				continue
			}
			// 若key已存在,则更新键值,并跳出循环
			if t.needkeyupdate() {
				typedmemmove(t.key, k, key)
			}
			elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
			goto done
		}
        // 若当前bucket的8个槽位都不可用,则去溢出桶寻找
		ovf := b.overflow(t)
		if ovf == nil {
			break
		}
		b = ovf
	}

    // 如果没有找到可用的槽
    // 若当前没有在扩容,但是触发最大LoadFactor,或者存在过多的溢出桶,会触发扩容,并重新查找桶和槽的位置
	if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
		hashGrow(t, h)
		goto again
	}

	if inserti == nil {
		// 没有找到可插入的槽,则创建一个溢出桶
		newb := h.newoverflow(t, b)
		inserti = &newb.tophash[0]
		insertk = add(unsafe.Pointer(newb), dataOffset)
		elem = add(insertk, bucketCnt*uintptr(t.keysize))
	}

	// 写入键值对
	if t.indirectkey() {
		kmem := newobject(t.key)
		*(*unsafe.Pointer)(insertk) = kmem
		insertk = kmem
	}
	if t.indirectelem() {
		vmem := newobject(t.elem)
		*(*unsafe.Pointer)(elem) = vmem
	}
	typedmemmove(t.key, insertk, key)
	*inserti = top
	h.count++

最后是一些收尾工作,若并发写位被其他goroutine修改,则抛出异常,否则取消并发写的标记

done:
	if h.flags&hashWriting == 0 {
		throw("concurrent map writes")
	}
	h.flags &^= hashWriting
	if t.indirectelem() {
		elem = *((*unsafe.Pointer)(elem))
	}
	return elem

总结:

  1. 首先会去对应的桶查找,若找到一个槽的key与传入的key相等,则更新
  2. 若在当前桶找到空槽,则写入
  3. 若当前桶没有找到可用的槽,则去溢出桶查找,重复1、2操作
  4. 若溢出桶也没有可用的槽,则先判断当前是否需要扩容,若需要则进行扩容,并重新1、2、3操作
  5. 若不需要扩容,则创建一个新的溢出桶,写入

4.删除键值对 

当使用delete关键字删除map中的键值对时,会调用到mapdelete方法,该方法接收一个map的类型,一个hmap的指针,以及一个key的指针

mapdelete(t *maptype, h *hmap, key unsafe.Pointer)

 先是一些检查工作,若map未初始化或map的键值对数量为0则直接返回;若此时有其他goroutine在并发写,则抛出异常

if h == nil || h.count == 0 {
	if t.hashMightPanic() {
		t.hasher(key, 0) // see issue 23734
	}
	return
}
if h.flags&hashWriting != 0 {
	throw("concurrent map writes")
}

 接着是一些准备工作,与查找、写入类似

hash := t.hasher(key, uintptr(h.hash0))

// 设置写标记
h.flags ^= hashWriting

bucket := hash & bucketMask(h.B)
if h.growing() {
	growWork(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
bOrig := b
top := tophash(hash)

接着查找要删除的key并执行删除操作 

search:
    // 若当前桶没有找到,则遍历溢出桶
	for ; b != nil; b = b.overflow(t) {
        // 遍历桶的8个槽
		for i := uintptr(0); i < bucketCnt; i++ {
			if b.tophash[i] != top {
                // 表明后面没有非空的桶了,跳出循环
				if b.tophash[i] == emptyRest {
					break search
				}
				continue
			}
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
			k2 := k
			if t.indirectkey() {
				k2 = *((*unsafe.Pointer)(k2))
			}
			if !t.key.equal(key, k2) {
				continue
			}
			// 找到了要删除的key,对key和value执行清理操作
			if t.indirectkey() {
				*(*unsafe.Pointer)(k) = nil
			} else if t.key.ptrdata != 0 {
				memclrHasPointers(k, t.key.size)
			}
			e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
			if t.indirectelem() {
				*(*unsafe.Pointer)(e) = nil
			} else if t.elem.ptrdata != 0 {
				memclrHasPointers(e, t.elem.size)
			} else {
				memclrNoHeapPointers(e, t.elem.size)
			}
			b.tophash[i] = emptyOne  // 标记该槽为空
			// 若当前为最后一个槽,需要判断溢出桶的第一个槽是否为emptyRest
			if i == bucketCnt-1 {
				if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
					goto notLast
				}
			} else {
                // 若不为最后一个槽,则判断下一个槽是否为emptyRest 
				if b.tophash[i+1] != emptyRest {
					goto notLast
				}
			}
			for {
                // 将当前槽和其之前的非空槽置为emptyRest
				b.tophash[i] = emptyRest
				if i == 0 {
					if b == bOrig {
						break
					}
					c := b
					for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
					}
					i = bucketCnt - 1
				} else {
					i--
				}
				if b.tophash[i] != emptyOne {
					break
				}
			}
		notLast:
			h.count--
			break search
		}
	}

最后是一些收尾工作,与写入类似

if h.flags&hashWriting == 0 {
	throw("concurrent map writes")
}
h.flags &^= hashWriting

 5.扩容

扩容操作是在makeassign时触发的,触发条件有两个:

  • 当前负载因子超过6.5,即键值对数量超过桶的数量的6.5倍,说明大多数桶都满了
  • 溢出桶过多

在makeassign方法中,用到了hashGrow方法进行扩容

hashGrow(t *maptype, h *hmap)

 若扩容是由于溢出桶过多触发,则无需增加桶的数量

bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
	bigger = 0
	h.flags |= sameSizeGrow
}
oldbuckets := h.buckets
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)

将iterator标志位转移到oldbuckets上

flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
	flags |= oldIterator
}

接着是重置hmap一些字段的值,可以看到此处依旧没有键值对的迁移操作。键值对的迁移操作也是由写入或删除操作时触发的。 

h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
h.nevacuate = 0
h.noverflow = 0

if h.extra != nil && h.extra.overflow != nil {
	if h.extra.oldoverflow != nil {
		throw("oldoverflow is not nil")
	}
	h.extra.oldoverflow = h.extra.overflow
	h.extra.overflow = nil
}
if nextOverflow != nil {
	if h.extra == nil {
		h.extra = new(mapextra)
	}
	h.extra.nextOverflow = nextOverflow
}

当写入或删除的key所在的桶未迁移时,调用growWork方法执行迁移工作

func growWork(t *maptype, h *hmap, bucket uintptr) {
	// 确保迁移的桶即当前使用的桶
	evacuate(t, h, bucket&h.oldbucketmask())

    // 辅助迁移,加快迁移进程
	if h.growing() {
		evacuate(t, h, h.nevacuate)
	}
}

真正的迁移操作在evacuate方法中执行

evacuate(t *maptype, h *hmap, oldbucket uintptr)

 首先获取桶的位置,计算桶的容量

b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
newbit := h.noldbuckets()

若该桶没有迁移,则执行迁移工作

if !evacuated(b) {
	// 使用一个数组来保存迁移的桶的信息
	var xy [2]evacDst
	x := &xy[0]
	x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
	x.k = add(unsafe.Pointer(x.b), dataOffset)
	x.e = add(x.k, bucketCnt*uintptr(t.keysize))

	if !h.sameSizeGrow() {
		y := &xy[1]
		y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
		y.k = add(unsafe.Pointer(y.b), dataOffset)
		y.e = add(y.k, bucketCnt*uintptr(t.keysize))
	}

    // 遍历溢出桶
	for ; b != nil; b = b.overflow(t) {
		k := add(unsafe.Pointer(b), dataOffset)
		e := add(k, bucketCnt*uintptr(t.keysize))
        // 遍历当前桶的8个槽
		for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
			top := b.tophash[i]
            // 若当前槽为空,则标记为已搬迁
			if isEmpty(top) {
				b.tophash[i] = evacuatedEmpty
				continue
			}
            // 正常情况下未搬迁的槽top为键的哈希值的高8位,或为empty,若不属于这些情况则抛出异常
			if top < minTopHash {
				throw("bad map state")
			}
			k2 := k
			if t.indirectkey() {
				k2 = *((*unsafe.Pointer)(k2))
			}
			var useY uint8
			if !h.sameSizeGrow() {
				// 决定这个k/v放在哪个桶
				hash := t.hasher(k2, uintptr(h.hash0))
				if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
					useY = top & 1
					top = tophash(hash)
				} else {
					if hash&newbit != 0 {
						useY = 1
					}
				}
			}

			if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
				throw("bad evacuatedN")
			}

			b.tophash[i] = evacuatedX + useY
			dst := &xy[useY]

			if dst.i == bucketCnt {  // 如果桶满了则放在溢出桶
				dst.b = h.newoverflow(t, dst.b)
				dst.i = 0
				dst.k = add(unsafe.Pointer(dst.b), dataOffset)
				dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
			}
            // 将k/v复制至目标桶
			dst.b.tophash[dst.i&(bucketCnt-1)] = top 
			if t.indirectkey() {
				*(*unsafe.Pointer)(dst.k) = k2 // copy pointer
			} else {
				typedmemmove(t.key, dst.k, k) // copy elem
			}
			if t.indirectelem() {
				*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
			} else {
				typedmemmove(t.elem, dst.e, e)
			}
            // 定位到下一个槽
			dst.i++
			dst.k = add(dst.k, uintptr(t.keysize))
			dst.e = add(dst.e, uintptr(t.elemsize))
		}
	}
	// 若没有别的goroutine在使用老的bucket则清理,帮助gc
	if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
		b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
		ptr := add(b, dataOffset)
		n := uintptr(t.bucketsize) - dataOffset
		memclrHasPointers(ptr, n)
	}
}

最后更新迁移进度

if oldbucket == h.nevacuate {
	advanceEvacuationMark(h, t, newbit)
}

 

上一篇:Java并发_6 CAS


下一篇:java多线程并发之原子操作-CAS以及原子类atomic