golang map原理探究
Go中Map是一个KV对集合。底层使用hash table,用链表来解决冲突 ,出现冲突时,不是每一个Key都申请一个结构通过链表串起来,而是以bmap为最小粒度挂载,一个bmap可以放8个kv。
在哈希函数的选择上,会在程序启动时,检测 cpu 是否支持 aes,如果支持,则使用aes hash,否则使用memhash。
hash函数,有加密型和非加密型。加密型的一般用于加密数据、数字摘要等,典型代表就是md5、sha1、sha256、aes256 这种,非加密型的一般就是查找。 在map的应用场景中,用的是查找。 选择hash函数主要考察的是两点:性能、碰撞概率。
每个map的底层结构是hmap,是有若干个结构为bmap的bucket组成的数组。每个bucket底层都采用链表结构。
type hmap struct { count int // 元素个数 flags uint8 // 用来标记状态 B uint8 // 扩容常量相关字段B是buckets数组的长度的对数 2^B noverflow uint16 // noverflow是溢出桶的数量,当B<16时,为精确值,当B>=16时,为估计值 hash0 uint32 // 是哈希的种子,它能为哈希函数的结果引入随机性,这个值在创建哈希表时确定,并在调用哈希函数时作为参数传入 buckets unsafe.Pointer // 桶的地址 oldbuckets unsafe.Pointer // 旧桶的地址,用于扩容 nevacuate uintptr // 搬迁进度,扩容需要将旧数据搬迁至新数据,这里是利用指针来比较判断有没有迁移 extra *mapextra // 用于扩容的指针 } type mapextra struct { overflow *[]*bmap oldoverflow *[]*bmap // nextOverflow holds a pointer to a free overflow bucket. nextOverflow *bmap } // A bucket for a Go map. type bmap struct { tophash [bucketCnt]uint8 // tophash用于记录8个key哈希值的高8位,这样在寻找对应key的时候可以更快,不必每次都对key做全等判断 } //实际上编辑期间会动态生成一个新的结构体 type bmap struct { topbits [8]uint8 keys [8]keytype values [8]valuetype pad uintptr overflow uintptr }
bmap 就是我们常说的“桶”,桶里面会最多装 8 个 key,这些 key之所以会落入同一个桶,是因为它们经过哈希计算后,哈希结果是“一类”的,关于key的定位我们在map的查询和赋值中详细说明。
在桶内,又会根据key计算出来的hash值的高8位来决定 key到底落入桶内的哪个位置(一个桶内最多有8个位置)。
当map的key和value都不是指针,并且 size都小于128字节的情况下,会把bmap标记为不含指针,这样可以避免gc时扫描整个hmap。
但是,我们看bmap其实有一个overflow的字段,是指针类型的,破坏了 bmap 不含指针的设想,这时会把overflow移动到 hmap的extra 字段来。
这样随着哈希表存储的数据逐渐增多,我们会扩容哈希表或者使用额外的桶存储溢出的数据,不会让单个桶中的数据超过 8 个,不过溢出桶只是临时的解决方案,创建过多的溢出桶最终也会导致哈希的扩容。
哈希表作为一种数据结构,我们肯定要分析它的常见操作,首先就是读写操作的原理。哈希表的访问一般都是通过下标或者遍历进行的:
_ = hash[key] for k, v := range hash { // k, v }
这两种方式虽然都能读取哈希表的数据,但是使用的函数和底层原理完全不同。
第一个需要知道哈希的键并且一次只能获取单个键对应的值,而第二个可以遍历哈希中的全部键值对,访问数据时也不需要预先知道哈希的键。
在编译的类型检查期间,hash[key] 以及类似的操作都会被转换成哈希的 OINDEXMAP 操作,中间代码生成阶段会在 cmd/compile/internal/gc.walkexpr 函数中将这些 OINDEXMAP 操作转换成如下的代码:
v := hash[key] // => v := *mapaccess1(maptype, hash, &key) v, ok := hash[key] // => v, ok := mapaccess2(maptype, hash, &key)
这里根据赋值语句左侧接受参数的个数会决定使用的运行时方法:
当接受一个参数时,会使用 runtime.mapaccess1,该函数仅会返回一个指向目标值的指针;当接受两个参数时,会使用 runtime.mapaccess2,除了返回目标值之外,它还会返回一个用于表示当前键对应的值是否存在的 bool 值:
mapaccess1 会先通过哈希表设置的哈希函数、种子获取当前键对应的哈希,再通过 runtime.bucketMask 和 runtime.add 拿到该键值对所在的桶序号和哈希高位的 8 位数字。
如果在bucket中没有找到,此时如果overflow不为空,那么就沿着overflow继续查找,如果还是没有找到,那就从别的key槽位查找,直到遍历所有bucket。
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer { if raceenabled && h != nil { callerpc := getcallerpc() pc := funcPC(mapaccess1) racereadpc(unsafe.Pointer(h), callerpc, pc) raceReadObjectPC(t.key, key, callerpc, pc) } if msanenabled && h != nil { msanread(key, t.key.size) } //如果h说明都没有,返回零值 if h == nil || h.count == 0 { if t.hashMightPanic() { //如果哈希函数出错 t.key.alg.hash(key, 0) // see issue 23734 } return unsafe.Pointer(&zeroVal[0]) } //写和读冲突 if h.flags&hashWriting != 0 { throw("concurrent map read and map write") } //不同类型的key需要不同的hash算法需要在编译期间确定 alg := t.key.alg //利用hash0引入随机性,计算哈希值 hash := alg.hash(key, uintptr(h.hash0)) //比如B=5那m就是31二进制是全1, //求bucket num时,将hash与m相与, //达到bucket num由hash的低8位决定的效果, //bucketMask函数掩蔽了移位量,省略了溢出检查。 m := bucketMask(h.B) //b即bucket的地址 b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize))) // oldbuckets 不为 nil,说明发生了扩容 if c := h.oldbuckets; c != nil { if !h.sameSizeGrow() { //新的bucket是旧的bucket两倍 m >>= 1 } //求出key在旧的bucket中的位置 oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize))) //如果旧的bucket还没有搬迁到新的bucket中,那就在老的bucket中寻找 if !evacuated(oldb) { b = oldb } } //计算tophash高8位 top := tophash(hash) bucketloop: //遍历所有overflow里面的bucket for ; b != nil; b = b.overflow(t) { //遍历8个bucket for i := uintptr(0); i < bucketCnt; i++ { //tophash不匹配,继续 if b.tophash[i] != top { if b.tophash[i] == emptyRest { break bucketloop } continue } //tophash匹配,定位到key的位置 k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) //若key为指针 if t.indirectkey() { //解引用 k = *((*unsafe.Pointer)(k)) } //key相等 if alg.equal(key, k) { //定位value的位置 e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) if t.indirectelem() { //value解引用 e = *((*unsafe.Pointer)(e)) } return e } } } //没有找到,返回0值 return unsafe.Pointer(&zeroVal[0]) }
在 bucketloop 循环中,哈希会依次遍历正常桶和溢出桶中的数据,它先会比较哈希的高 8 位和桶中存储的 tophash,后比较传入的和桶中的值以加速数据的读写。用于选择桶序号的是哈希的最低几位,而用于加速访问的是哈希的高8位,这种设计能够减少同一个桶中有大量相等 tophash 的概率影响性能。
因此bucket里key的起始地址就是unsafe.Pointer(b)+dataOffset;第i个key的地址就要此基础上加i个key大小;value的地址是在key之后,所以第i个value,要加上所有的key的偏移。
另一个同样用于访问哈希表中数据的 runtime.mapaccess2 只是在 runtime.mapaccess1 的基础上多返回了一个标识键值对是否存在的 bool 值:
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) { ... bucketloop: for ; b != nil; b = b.overflow(t) { for i := uintptr(0); i < bucketCnt; i++ { if b.tophash[i] != top { if b.tophash[i] == emptyRest { break bucketloop } continue } k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) if t.indirectkey() { k = *((*unsafe.Pointer)(k)) } if t.key.equal(key, k) { e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) if t.indirectelem() { e = *((*unsafe.Pointer)(e)) } return e, true } } } return unsafe.Pointer(&zeroVal[0]), false }
使用 v, ok := hash[k]的形式访问哈希表中元素时,我们能够通过这个布尔值更准确地知道当 v == nil 时,v 到底是哈希中存储的元素还是表示该键对应的元素不存在,所以在访问哈希时,我们更推荐使用这种方式判断元素是否存在。
写入:
当形如 hash[k] 的表达式出现在赋值符号左侧时,该表达式也会在编译期间转换成 mapassign 函数的调用,该函数与 mapaccess1 比较相似:
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer { ... hash := t.hasher(key, uintptr(h.hash0)) // Set hashWriting after calling t.hasher, since t.hasher may panic, // in which case we have not actually done a write. h.flags ^= hashWriting if h.buckets == nil { h.buckets = newobject(t.bucket) // newarray(t.bucket, 1) } again: bucket := hash & bucketMask(h.B) if h.growing() { growWork(t, h, bucket) } b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize))) top := tophash(hash) ... }
我们可以通过遍历比较桶中存储的tophash 和键的哈希,如果找到了相同结果就会返回目标位置的地址。
如果当前桶已经满了,哈希会调用 newoverflow 创建新桶或者使用 hmap 预先在 noverflow 中创建好的桶来保存数据,新创建的桶不仅会被追加到已有桶的末尾,还会增加哈希表的 noverflow 计数器。
如果当前键值对在哈希中不存在,哈希会为新键值对规划存储的内存地址,通过typedmemmove 将键移动到对应的内存空间中并返回键对应值的地址 val。
如果当前键值对在哈希中存在,那么就会直接返回目标区域的内存地址,哈希并不会在mapassign 这个运行时函数中将值拷贝到桶中,该函数只会返回内存地址,真正的赋值操作是在编译期间插入的.
扩容:
随着哈希表中元素的逐渐增加,哈希的性能会逐渐恶化,所以我们需要更多的桶和更大的内存保证哈希的读写性能,这个时候我们就需要用到扩容了.
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer { ... // Did not find mapping for key. Allocate new cell & add entry. // If we hit the max load factor or we have too many overflow buckets, // and we're not already in the middle of growing, start growing. if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) { hashGrow(t, h) goto again // Growing the table invalidates everything, so try again } ... } // 装载因子超过 6.5 func overLoadFactor(count int64, B uint8) bool { return count >= bucketCnt && float32(count) >= loadFactor*float32((uint64(1)<<B)) } // overflow buckets func tooManyOverflowBuckets(noverflow uint16, B uint8) bool { if B < 16 { return noverflow >= uint16(1)<<B } return noverflow >= 1<<15 }
mapassign 函数会在以下两种情况发生时触发哈希的扩容:
- 装载因子已经超过 6.5;
- 哈希使用了太多溢出桶;
不过因为 Go 语言哈希的扩容不是一个原子的过程,所以mapassign 还需要判断当前哈希是否已经处于扩容状态,避免二次扩容造成混乱。
根据触发的条件不同扩容的方式分成两种,如果这次扩容是溢出的桶太多导致的,那么这次扩容就是等量扩容sameSizeGrow,sameSizeGrow 是一种特殊情况下发生的扩容,当我们持续向哈希中插入数据并将它们全部删除时,如果哈希表中的数据量没有超过阈值,就会不断积累溢出桶造成缓慢的内存泄漏。
runtime: limit the number of map overflow buckets 引入了 sameSizeGrow 通过复用已有的哈希扩容机制解决该问题,一旦哈希中出现了过多的溢出桶,它会创建新桶保存数据,垃圾回收会清理老的溢出桶并释放内存\。
扩容的入口是 hashGrow:
func hashGrow(t *maptype, h *hmap) { // If we've hit the load factor, get bigger. // Otherwise, there are too many overflow buckets, // so keep the same number of buckets and "grow" laterally. // B+1 相当于是原来 2 倍的空间 bigger := uint8(1) if !overLoadFactor(h.count+1, h.B) { // 进行等量的内存扩容,所以 B 不变 bigger = 0 h.flags |= sameSizeGrow } // 将老 buckets 挂到 buckets 上 oldbuckets := h.buckets // 申请新的 buckets 空间 newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil) flags := h.flags &^ (iterator | oldIterator) if h.flags&iterator != 0 { flags |= oldIterator } // commit the grow (atomic wrt gc) // 提交 grow 的动作 h.B += bigger h.flags = flags h.oldbuckets = oldbuckets h.buckets = newbuckets // 搬迁进度为 0 h.nevacuate = 0 // overflow buckets 数为 0 h.noverflow = 0 if h.extra != nil && h.extra.overflow != nil { // Promote current overflow buckets to the old generation. if h.extra.oldoverflow != nil { throw("oldoverflow is not nil") } h.extra.oldoverflow = h.extra.overflow h.extra.overflow = nil } if nextOverflow != nil { if h.extra == nil { h.extra = new(mapextra) } h.extra.nextOverflow = nextOverflow } // the actual copying of the hash table data is done incrementally // by growWork() and evacuate(). }
哈希在扩容的过程中会通过 makeBucketArray 创建一组新桶和预创建的溢出桶,随后将原有的桶数组设置到 oldbuckets 上并将新的空桶设置到 buckets 上,溢出桶也使用了相同的逻辑更新. 这里会申请到了新的 buckets 空间,把相关的标志位都进行了处理,例如标志 nevacuate 被置为 0, 表示当前搬迁进度为 0。
在hashGrow 中还看不出来等量扩容和翻倍扩容的太多区别,等量扩容创建的新桶数量只是和旧桶一样,该函数中只是创建了新的桶,并没有对数据进行拷贝和转移。
哈希表的数据迁移的过程在是 evacuate 中完成的,它会对传入桶中的元素进行再分配。
func evacuate(t *maptype, h *hmap, oldbucket uintptr) { // 这里会定位老的 bucket 地址 b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))) // 结果是 2^B newbit := h.noldbuckets() // 如果吧没有搬迁过 if !evacuated(b) { // TODO: reuse overflow buckets instead of using new ones, if there // is no iterator using the old buckets. (If !oldIterator.) // xy contains the x and y (low and high) evacuation destinations. var xy [2]evacDst x := &xy[0] x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize))) x.k = add(unsafe.Pointer(x.b), dataOffset) x.e = add(x.k, bucketCnt*uintptr(t.keysize)) // 如果不是等size 扩容,前后bucket序号有变,使用y 进行搬迁 if !h.sameSizeGrow() { // Only calculate y pointers if we're growing bigger. // Otherwise GC can see bad pointers. y := &xy[1] y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize))) y.k = add(unsafe.Pointer(y.b), dataOffset) y.e = add(y.k, bucketCnt*uintptr(t.keysize)) } // 遍历所有老的bucket地址 for ; b != nil; b = b.overflow(t) { k := add(unsafe.Pointer(b), dataOffset) e := add(k, bucketCnt*uintptr(t.keysize)) for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) { top := b.tophash[i] if isEmpty(top) { b.tophash[i] = evacuatedEmpty continue } if top < minTopHash { throw("bad map state") } k2 := k if t.indirectkey() { k2 = *((*unsafe.Pointer)(k2)) } var useY uint8 if !h.sameSizeGrow() { // Compute hash to make our evacuation decision (whether we need // to send this key/elem to bucket x or bucket y). hash := t.hasher(k2, uintptr(h.hash0)) if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) { // If key != key (NaNs), then the hash could be (and probably // will be) entirely different from the old hash. Moreover, // it isn't reproducible. Reproducibility is required in the // presence of iterators, as our evacuation decision must // match whatever decision the iterator made. // Fortunately, we have the freedom to send these keys either // way. Also, tophash is meaningless for these kinds of keys. // We let the low bit of tophash drive the evacuation decision. // We recompute a new random tophash for the next level so // these keys will get evenly distributed across all buckets // after multiple grows. useY = top & 1 top = tophash(hash) } else { if hash&newbit != 0 { useY = 1 } } } if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY { throw("bad evacuatedN") } b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY dst := &xy[useY] // evacuation destination if dst.i == bucketCnt { dst.b = h.newoverflow(t, dst.b) dst.i = 0 dst.k = add(unsafe.Pointer(dst.b), dataOffset) dst.e = add(dst.k, bucketCnt*uintptr(t.keysize)) } dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check if t.indirectkey() { *(*unsafe.Pointer)(dst.k) = k2 // copy pointer } else { typedmemmove(t.key, dst.k, k) // copy elem } if t.indirectelem() { *(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e) } else { typedmemmove(t.elem, dst.e, e) } dst.i++ // These updates might push these pointers past the end of the // key or elem arrays. That's ok, as we have the overflow pointer // at the end of the bucket to protect against pointing past the // end of the bucket. dst.k = add(dst.k, uintptr(t.keysize)) dst.e = add(dst.e, uintptr(t.elemsize)) } } // Unlink the overflow buckets & clear key/elem to help GC. if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 { b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)) // Preserve b.tophash because the evacuation // state is maintained there. ptr := add(b, dataOffset) n := uintptr(t.bucketsize) - dataOffset memclrHasPointers(ptr, n) } } if oldbucket == h.nevacuate { advanceEvacuationMark(h, t, newbit) } }
evacuate 会将一个旧桶中的数据分流到两个新桶,所以它会创建两个用于保存分配上下文的 evacDst 结构体,这两个结构体分别指向了一个新桶:
哈希表扩容目的:
如果这是等量扩容,那么旧桶与新桶之间是一对一的关系,所以两个evacDst只会初始化一个。而当哈希表的容量翻倍时,每个旧桶的元素会都分流到新创建的两个桶中.
只使用哈希函数是不能定位到具体某一个桶的,哈希函数只会返回很长的哈希,我们还需一些方法将哈希映射到具体的桶上。
那么如何定位key呢?
key 经过哈希计算后得到哈希值,共64个 bit 位(64位机,32位机就不讨论了,现在主流都是64位机),计算它到底要落在哪个桶时,只会用到最后 B 个 bit 位。
如果 B = 5,那么桶的数量,也就是 buckets 数组的长度是 2^5 = 32。
例如,现在有一个 key 经过哈希函数计算后,得到的哈希结果是:
10010111 | 000011110110110010001111001010100010010110010101010 │ 01010
用最后的 5 个bit 位,也就是01010,值为 10,那么这个就是10号桶。
再用哈希值的高 8 位,找到此 key 在bucket中的位置,这是在寻找已有的 key。最开始桶内还没有 key,新加入的 key 会找到第一个空位,放入。
buckets 编号就是桶编号,当两个不同的key落在同一个桶中,也就是发生了哈希冲突。
通常哈希冲突的解决手段是用链表法,在 bucket 中,从前往后找到第一个空位。这样,在查找某个 key 时,先找到对应的桶,再去遍历 bucket 中的 key。
因此哈希表扩容的设计和原理,哈希在存储元素过多时会触发扩容操作,每次都会将桶的数量翻倍,扩容过程不是原子的,而是通过growWork 增量触发的,在扩容期间访问哈希表时会使用旧桶,向哈希表写入数据时会触发旧桶元素的分流。
除了这种正常的扩容之外,为了解决大量写入、删除造成的内存泄漏问题,哈希引入了sameSizeGrow 这一机制,在出现较多溢出桶时会整理哈希的内存减少空间的占用。
删除: 如果想要删除哈希中的元素,就需要使用 Go 语言中的 delete 关键字,这个关键字的唯一作用就是将某一个键对应的元素从哈希表中删除,无论是该键对应的值是否存在,这个内建的函数都不会返回任何的结果。
因此呢Go采用拉链法来解决哈希碰撞的问题实现了哈希表,它的访问、写入和删除等操作都在编译期间转换成了运行时的函数或者方法。
哈希在每一个桶中存储键对应哈希的前 8 位,当对哈希进行操作时,这些 tophash 就成为可以帮助哈希快速遍历桶中元素的缓存。
哈希表的每个桶都只能存储 8 个键值对,一旦当前哈希的某个桶超出 8 个,新的键值对就会存储到哈希的溢出桶中。
随着键值对数量的增加,溢出桶的数量和哈希的装载因子也会逐渐升高,超过一定范围就会触发扩容,扩容会将桶的数量翻倍,元素再分配的过程也是在调用写操作时增量进行的,不会造成性能的瞬时巨大损耗。