本文基于 go 1.19
哈希表作为一种非常常用的数据结构,存在于各种编程语言中,它可以让我们保存键值对数据,而且有着非常高的查找效率。
本文就以 go 语言中的 map
为例子,讲述一下哈希表在 go 中的实现。
哈希表基本操作
在开始之前,需要大概讲解一下哈希表这种数据结构,哈希表会预先在内存中分配一段比较大的内存,这段内存用在将来往里面写入数据的时候使用。
哈希表有点类似数组,都是一段连续的内存,但是我们往哈希表读写数据的时候不同于数组,数组的时候是直接通过下标访问的,
而哈希表的读写需要先计算 key
的哈希值,根据这个哈希值对哈希表长度取模得到 key
对应的哈希表的下标,然后对哈希表这个下标进行读写操作。
对于哈希表有以下几种常见的操作:
- 写入:根据
key
计算hash
值,对哈希表长度取模得到key
在内存中的地址,然后往这个地址写入数据。 - 读取:根据
key
计算hash
值,对哈希表长度取模得到key
在内存中的地址,然后读取这个地址中的数据。 - 修改:根据
key
计算hash
值,对哈希表长度取模得到key
在内存中的地址,然后修改这个内存地址里面的数据。 - 删除:根据
key
计算hash
值,对哈希表长度取模得到key
在内存中的地址,然后清空保存这个key
的那一小块内存。
注意:计算出的 hash
值可能比分配的内存大小要大,所以才需要对其取模(hash
值 / 哈希表长度 => key
在哈希表的索引),
保证计算出的 hash
最终落到哈希表的内存范围之内。比如,keyn
计算出来的哈希值为 100
,但是我们的哈希表长度只有 8
,
那么 keyn
最终会落在哈希表中下标为 2
的地方(100 % 7 = 2
,下标是从 0
开始的,所以这里是 7
)
哈希表的写入
假设我们现在要有一个长度为 8 的哈希表(下图左),我们有数据 {"a": 1, "b": 2}
需要存入这个哈希表,存入之后的哈希表为下图右:
说明:hash(a) = 5
,计算 a
的哈希值得到 5
,所以将 a:1
存入了哈希表中下标为 5
的地址处。b
同理。
注意:键值都会存储。
哈希表的读取
从哈希表中读取 key
为 b
的键值对:
哈希表的修改
现在,我们需要将 a
的值修改为 3
,同样的,计算其 hash
值,得到 5
,然后将哈希表中 5
这个下标的内存修改成 3
:
哈希表的删除
假设要将哈希表中 b
这个键删除,会先计算其 hash
值,得到 0
,然后将哈希表中 0
这个下标的内存清空:
哈希表的高效之处
通过上面的分析,我们可以知道,哈希表的内存布局跟数组类似,但是哈希表的存储要通过计算 key
的哈希值来得到其在哈希表中的索引,最后对这个索引的内存进行 CRUD 操作。这样一来,如果我们需要频繁的根据键查找其对应值的话,使用哈希表无疑会大大提高效率。相比之下,如果使用数组来存储,那么每次搜索都需要将整个数组遍历一次,效率非常低下。
比如下图,假设我们一个数组内存布局如下,那么在我们查找 a:1
的时候,需要从数组的第一个元素开始遍历,每一个元素读取出来看看它的键是不是 a
,
取到第 6
个元素(下标 5
)的时候,发现它的键是 a
,然后取出对应的值 1
。
在数组的元素个数少的时候,这种查找效率其实影响不大,但如果我们有上万个元素的时候,每次查找都要从第一个元素开始遍历,效率无疑会非常低。
相比之下,不管数据再怎么多,使用哈希表的方式,我们直接通过哈希算法计算一下就可以知道键保存到了哪个槽中。
哈希冲突解决方法
在上一节中,我们的图将哈希表的每一个槽(slot
,又或者叫 cell
,都是同一个东西)都表示成只有一个元素了。
但在实际中,往往会出现计算出来的哈希值对哈希表长度取模后是相等的,也就是不同的 key
会落到同一个槽中(这就是 哈希冲突),
这种情况下,一个槽存放不下的话,有两种办法可以处理:开放地址法和链表法。go 里面的 map
使用的是链表法,
具体来说,就是在 hash
冲突的地方,建立一个链表来保存相同 hash
值的 key
。
这样一来,我们通过 hash
算法计算出哈希值的时候,并不能唯一确定对应的值了,因为有可能两个 key
经过哈希算法计算之后,得到的哈希值是一样的。
这种情况怎么办呢?很简单,因为虽然哈希值是一样的,但是它们的 key
是不一样的,再比较一下 key
就可以确定了。具体可以参考下图:
有哈希冲突的情况下,读取哈希表数据的过程:
- 计算
c
的hash
值,得到0
,就是哈希表的索引 - 获取哈希表中地址
0
上的数据,这会遍历冲突产生的链表 - 比较
c
跟b
,不相等,继续比较链表下一个元素 - 比较
c
跟c
,相等,返回3
哈希扩容
go 里面 map
扩容有两种方式:增量扩容和等量扩容。
哈希表总元素个数过多导致的扩容
这种扩容方式也叫增量扩容。
我们在上面说过了,其实哈希表的内存布局跟数组类似,都是先分配一段连续的内存。然后在哈希冲突的时候,对于冲突的 key
建立一个链表来保存。
这样就会出现一种情况,在链表中存在很多冲突的键,这样一来,在查找冲突 key
的时候,需要在这一堆冲突的 key
中进行查找,这个查找类似数组的查找,效率较低。
为了避免这种情况的出现,一般的哈希表设计会在元素个数总数超过一定数量的时候,对哈希表进行扩容,
这样一来,那些哈希冲突的键就可以相对均匀地分布在哈希表中,从而避免了很多哈希冲突情况下导致的查找效率低下的问题。
扩容之后的容量为原来容量的 2 倍。
go map 的负载因子
在 go 中,map
在实际存储的元素数量超过 map
里 bucket
总数量的 6.5
倍的时候(也就是平均每个 bucket
中的元素个数大于 6.5
的时候),会进行扩容,
这个 6.5
是实现 map
的那个开发者经过实验计算出来的比较合适的数,这个 6.5
被称为负载因子。
为什么负载因子是 6.5
呢,在 go 的 map
源码中有相关的说明:
选择负载因子,如果太大,会有很多溢出桶,太小,则会浪费很多空间。
我编写了一个简单的程序来检查不同负载的一些统计数据:(64位、8字节 key 和元素)
loadFactor | %overflow | bytes/entry | hitprobe | missprobe |
4.00 | 2.13 | 20.77 | 3.00 | 4.00 |
4.50 | 4.05 | 17.30 | 3.25 | 4.50 |
5.00 | 6.85 | 14.77 | 3.50 | 5.00 |
5.50 | 10.55 | 12.94 | 3.75 | 5.50 |
6.00 | 15.27 | 11.67 | 4.00 | 6.00 |
6.50 | 20.90 | 10.79 | 4.25 | 6.50 |
7.00 | 27.14 | 10.15 | 4.50 | 7.00 |
7.50 | 34.03 | 9.73 | 4.75 | 7.50 |
8.00 | 41.10 | 9.40 | 5.00 | 8.00 |
列说明:
%overflow
: 具有溢出桶的桶的百分比bytes/entry
: 每个 key/elem 对使用的开销字节hitprobe
: 查找当前 key 时要检查的条目数missprobe
: 查找缺少的 key 时要检查的条目数
哈希冲突链上元素太少导致的扩容
这种扩容方式也叫等量扩容。
我们知道,在哈希冲突的时候,会建立链表来保存键冲突的元素,但是我们删除那些哈希冲突的键的时候,并不会对删除元素的内存进行释放,
如果每次删除都释放的话,在我们频繁插入跟删除的时候,效率就非常低下了。
因为插入一个元素就分配内存,删除一个元素就释放内存(分配内存和释放内存都是相对耗时的操作)。
而这样的结果是,保存冲突键的链表上,有很多空的元素,这样就会导致冲突的时候,查找键的效率降低,因为要遍历很多空的键。
删除的时候不释放,那什么时候会释放呢?哈希冲突的元素很多都被删除的时候,在 go 里面,map
会判断就算没有超过负载因子的情况下,
如果冲突链表占用的空间过大的话,也会进行扩容。但这里说的扩容其实并不是真正意义上的扩容,只是 map
的实现里面,使用的函数是同一个函数。
具体实现方式是,分配跟原哈希表相同大小的空间,然后将旧哈希表的数据迁移到新的哈希表。
这样迁移之后,对于哈希冲突链表上的那些元素,只会迁移非空的元素,最终结果就是,扩容之后,哈希冲突链表上的元素更加紧凑,在查找冲突的键的时候会更加高效。
虽然哈希表的总容量没变,但是数据分布更加紧凑了,省去了遍历空元素的时间。
go map 概述
map
只是一个哈希表。数据被排列成一组bucket
。- 每个
bucket
最多包含8
个键/值
对。 - 哈希值的低位字节位用于选择
bucket
。 - 每个
bucket
包含每个哈希的几个高位字节位(tophash
),以区分单个桶中的条目。 - 如果超过
8
个key
哈希到同一个桶,我们将额外的桶以链表的方式起来。(解决哈希冲突,链表法) - 当哈希表扩容时,我们会分配一个两倍大的新
bucket
数组。然后bucket
从旧bucket
数组增量复制到新bucket
数组。 map
迭代器遍历bucket
数组,并按遍历顺序返回键(遍历完普通桶之后,遍历溢出桶)。- 为了保持迭代语义,我们永远不会在它们的桶中移动键(
bucket
内键的顺序在扩容的时候不变。如果改变了桶内键的相对顺序,键可能会返回 0 或 2 次)。 - 在扩容哈希表时,迭代器仍在旧的桶中迭代,并且必须检查新桶,检查正在迭代的
bucket
是否已经被迁移到新桶。
go map 的整体模型
上面讲了哈希表的基本设计思路,接下来就要开始讲 go 里面 map
的设计与实现了。大体上其实就是上面说的样子,但是有下面几个不一样的地方:
下文的
bucket
和bmap
都是指的 “桶”。
- go
map
里面存储数据的地方是bucket
(桶),一个bucket
可以存储8
个元素,也就是说哈希冲突的时候还是会在同一个bucket
中先存储。 - 如果
bucket
也存放不下冲突的元素了,那么会新建另外一个桶(叫做溢出桶),旧的bucket
记录这个新桶的指针,旧的bucket
存放不下的元素,会存放到这个溢出桶中。 - 如果溢出桶还是存放不下,那么再新建一个溢出桶,链接到上一个溢出桶中。
也就是说,在 go 的 map
实现中,哈希计算出来的值决定了 key
应该存放在哪一个 bucket
中。
go map
的整体结构如下图:
buckets
记录了保存map
数据的所有bucket
(这种下文统一称为普通桶),go 中使用bmap
这个结构体来表示bucket
,溢出桶也是使用bmap
结构体表示。- 如果
bucket
(普通桶)哈希冲突太多导致存放不下,会新建一个bucket
,在原来的bucket
上会有一个指针记录新建bucket
的地址,这个新bucket
下文统一称为溢出桶。 - 在创建
map
的时候,如果我们指定的容量比较大(B >= 4
的时候),那么会预创建一个溢出桶。
也就是说,go 中解决哈希冲突的链表法,链表上的每一个元素是一个 bucket
。go map
的实现里面,一个 bucket
可以存放 8
个键值对。
上面的 bmap
的数据结构如下图:
bmap
就是bucket
(桶),不管是普通的桶还是溢出的桶,都是使用bmap
结构体表示。bmap
中存储数据的方式有点特别,它先存储了8
个tophash
值,一个tophash
的大小为 1 个字节,每一个tophash
记录的是bmap
中每一个元素的哈希值的最高的 8 bit。- 接下来是
bmap
存储的 8 个元素的key
,在 8 个key
之后是 8 个bmap
存储的值。我们会发现key
和value
的存储是分开的,而不是key/value
、key/value
这种方式。go 中这种分开存储的方式有一个好处是可以减少内存对齐的开销,从而更省内存。 - 最后是
overflow
(溢出桶),如果bmap
存满了,那就会新建一个溢出桶来保存新的数据,通过在旧的bmap
上记录指针来记录溢出桶。
tophash
的作用是,在哈希冲突的时候,在 bucket
内进行查找的时候,是需要在 bucket
内从第一个元素遍历到最后一个元素来查找的。
如果 key
太大,直接比较 key
的话效率会比较低下,通过记录哈希值的高 8 位,我们就可以在 buckeet
内查找的时候,先比较哈希值的
前 8 位,这样一来,map
的效率受到 key
大小的影响就会比较小。当然哈希值的高 8 位有可能相同,在这种情况下,我们再比较一下 key
本身
就可以确定 bucket
的那个槽(slot
/cell
)是否是我们正在查找的那一个 key
。
go map 相关数据结构
我们大部分内容是跟下面两个结构体打交道:
hmap
:map
的数据结构,包含了bucket
的指针、bucket
的数量、键值对的数量等信息。bmap
: 桶,存储key/value
的地方
// go map 数据结构 type hmap struct { count int // map 的元素数量。调用 len(map) 的时候返回此值 flags uint8 // map 标记:iterator/oldIterator/hashWriting/sameSizeGrow B uint8 // 指示了当前哈希表持有的 buckets 的数量(2^B 是 bucket 的数量) noverflow uint16 // 溢出桶的数量 hash0 uint32 // 哈希种子,计算 key 的哈希的时候会传入哈希函数 buckets unsafe.Pointer // 指向 buckets 的数组,大小为 2^B。 如果元素个数为 0 则为 nil。 // 哈希表扩容的时候记录 buckets 字段。 // 增量扩容时,oldbuckets 的长度是 buckets 的一半。 oldbuckets unsafe.Pointer nevacuate uintptr // 指示扩容进度,小于此地址的 buckets 完成迁移 extra *mapextra // 可选字段 } // mapextra 包含并非在所有 map 上都存在的字段。 // 下面 mapextra 注释是原文的翻译(看完了 map 的全部源码也还不是很懂这个结构体的作用,除了 nextOverflow 字段)。 type mapextra struct { // 如果 key 和 elem 都不包含指针并且是内联的,那么我们将 bucket type 标记为不包含指针。这避免了扫描此类 map。 // 但是,bmap.overflow 是一个指针。 为了让溢出桶保持活动状态, // 我们将指向所有溢出桶的指针存储在 hmap.extra.overflow 和 hmap.extra.oldoverflow 中。 // overflow 和 oldoverflow 仅在 key 和 elem 不包含指针时使用。 // overflow 包含 hmap.buckets 的溢出桶。 // oldoverflow 包含 hmap.oldbuckets 的溢出桶。 // 间接允许在 hiter 中存储指向切片的指针。 overflow *[]*bmap oldoverflow *[]*bmap // nextOverflow 持有指向空闲溢出桶的指针。 nextOverflow *bmap } // go map 中的 bucket 结构体(实际保存 key/value 的地方) type bmap struct { // tophash 通常包含此 bucket 中每个键的哈希值的最高的 8 位(1 字节)。 // 如果 tophash[0] < minTopHash,则 tophash[0] 为桶已迁移状态。 // 这是一个长度为 8 的数组,因为一个 bucket 只能存储 8 个元素。 // tophash 存储的是每一个元素的键的哈希的高 8 位。 //(通过比较不同键的哈希的高 8 位可以提高 bucket 内的查找性能,因为键可能很大) tophash [bucketCnt]uint8 // 然后是 8 个键,然后是 8 个值。(这里的 8 是代码写死的) // 注意:将所有键放在在一起,然后将所有值放在一起使代码比交替的 key/elem/key/elem/… 复杂一些, // 但它允许我们消除填充(减少内存对齐导致的内存浪费),例如 map[int64]int8, // 这种如果使用 key/elem 的方式存储则需要浪费几个字节用来对齐。 keys [bucketCnt]keytype // 8 个键 values [bucketCnt]valuetype // 8 个值 overflow *bmap // 指向溢出桶的指针 }
对于 map
的数据结构,需要特别说明的是,bmap
的源码中实际只包含了 tophash
字段,而后面的三个字段 keys/values/overflow
都是在编译期间动态添加的。这是因为 map
中可能存储不同类型的键值对,所以键值对占据的内存空间大小只能在编译时进行推导。
这样一来,最终的结果是,我们在 map
的源码中,访问 key
和 value
的时候都需要通过 bmap
的首地址加上偏移量来进行访问的。
比如获取 bucket
中第 i
个 key
的方式:
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
这行代码中,add
是做指针加法运算的函数,具体来说就是第一个参数的地址加上第二个参数(偏移量),得到一个我们想要的指针。
dataOffset
代表了 bmap
的 keys
第一个元素的偏移量,i
代表了我们想要获取的 key
在 keys
中的索引:
这样一来,我们就可以通过 k
这个指针来访问 bucket
中的 key
了。同样的,要访问 value
的方式也是类似的,
只要将 dataOffset + i * uintptr(t.keysize)
替换成 dataOffset + bucketCnt * uintptr(t.keysize)
即可。
这种方式虽然不太优雅,但是在性能上可以达到最优。
bmap(桶)源码剖析
bmap
就是保存键值对的地方,但是它本身的方法并不多:
// bucket 是否已经完成迁移 // b 是 bucket 的指针 func evacuated(b *bmap) bool { h := b.tophash[0] return h > emptyOne && h < minTopHash } // 获取 b 的溢出桶 func (b *bmap) overflow(t *maptype) *bmap { // bmap 数据结构的最后一个指针就是指向溢出桶的指针 return *(**bmap)(add(unsafe.Pointer(b), uintptr(t.bucketsize)-goarch.PtrSize)) } // 设置 b 的溢出桶 // bmap 数据结构的最后一个指针指向溢出桶 func (b *bmap) setoverflow(t *maptype, ovf *bmap) { *(**bmap)(add(unsafe.Pointer(b), uintptr(t.bucketsize)-goarch.PtrSize)) = ovf } // 获取 b 中保存 keys 的指针(指向了桶内的第一个 key) func (b *bmap) keys() unsafe.Pointer { return add(unsafe.Pointer(b), dataOffset) }
在 ev
中用到了两个常量,在 bucket
的 tophash
里面,会通过下面几个标志来记录桶里面槽的状态:
// 这个 cell 是空的,并且在更高的索引或溢出处没有更多的非空 cell。 emptyRest = 0 // 这个 cell 是空的 emptyOne = 1 // key/elem 有效。 entry 已被迁移到较大的哈希表的前半部分(扩容了)。 evacuatedX = 2 // 同上,但迁移到大的哈希表的后半部分(扩容了)。 evacuatedY = 3 // cell 是空的,bucket 已经被迁移了 evacuatedEmpty = 4 // 正常填充单元格的最小 tophash。 minTopHash = 5
为了跟正常的 tophash
区分开来,如果计算出来的 tophash
小于 minTopHash
,会将计算出来的 tophash
加上 minTopHash
:
// tophash 计算 hash 的 tophash 值。 // 这是一个字节的大小的。(hash 最高的 8 位) func tophash(hash uintptr) uint8 { // top 本质上就是 hash 的前面 8 个字节(goarch.PtrSize*8 - 8,左移位数:指针的字节大小 - 8 字节) top := uint8(hash >> (goarch.PtrSize*8 - 8)) if top < minTopHash { top += minTopHash } return top }
这样一来,通过 tophash
这一个字节就可以记录桶里面槽的状态了,非常节省空间。
map 的创建的实现
我们已经了解了哈希表的基本工作机制了,现在就让我们来深入了解一下 go 里 map
的实现,先是 map
的创建,
map
的创建是通过 makemap()
函数实现的(对应我们写的代码是 make(map[int]int, 10)
),map
的创建步骤如下:
- 计算
map
所需内存,判断是否在一个合理范围之内。 - 使用
new
初始化hmap
结构体。 - 生成随机哈希种子。
- 计算出一个最小的
B
,也就是根据用户传递给make
的第二个参数算出一个最小的B
的值,最终桶的数量为2^B
个。 - 如果
B
大于0
,则给哈希表的buckets
分配内存。 - 最后,返回新创建好的
hmap
。
下文在寻址过程中,大量使用了指针的运算,所以如果对 unsafe.Pointer
比较熟悉的话,看起来会比较轻松,如果不熟悉也没关系,可以看看我另外一篇文章《深入理解 go unsafe》。
makemap 实现
makemap
具体源码如下:
// makemap 是 make(map[k]v, hint) 的实现,创建一个 map。 // 如果编译器可以确定 map 或者第一个 bucket 可以在栈上创建,h 和/或 bucket 可能为 non-nil。 // 如果 h != nil,map 可以直接在 h 中创建。 // 如果 h.buckets != nil,buckets 指针指向的那个元素可以作为第一个 bucket。 func makemap(t *maptype, hint int, h *hmap) *hmap { // 计算所需要的内存大小 mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size) // 如果溢出或者超出最大分配内存,则设置 hint = 0; // 这样的话,B 也会等于 0; // 则最终的 map 只会有一个 bucket(2^B = 1) if overflow || mem > maxAlloc { hint = 0 } // 初始化 hmap(分配 hmap 结构体本身所需要的内存) if h == nil { h = new(hmap) } // 生成一个随机的哈希种子 h.hash0 = fastrand() // 根据传入的 hint,计算出需要的最小桶数量。 // 实际上是计算 B 的大小,桶的数量都是运行时通过 2^B 计算的。 B := uint8(0) // 如果 hint 导致超过了负载因子,则将 B 加 1,一直加到小于负载因子。 // 简单来说就是:hint / (2^B) > 负载因子,也就是 hint 个键值对放到所有桶中, // 每个桶中元素数量大于负载因子(6.5)的时候,则将 B 加 1。 // 注:map 扩容的时候也是这个判断标准。 for overLoadFactor(hint, B) { B++ } h.B = B // 分配初始哈希表。 // 如果 B==0,则稍后(在mapassign中)延迟分配桶字段。 // 若 hint 较大,则清零此内存可能需要一段时间。 if h.B != 0 { var nextOverflow *bmap // 初始化 buckets(分配 buckets 所需要的内存) h.buckets, nextOverflow = makeBucketArray(t, h.B, nil) // 如果 hint 比较大,则会预先分配溢出桶,记录到 extra 字段中。 if nextOverflow != nil { h.extra = new(mapextra) h.extra.nextOverflow = nextOverflow } } return h }
overLoadFactor 实现
这里面有一个比较重要的函数,那就是 overLoadFactor
,这个函数用来判断某一个数量是否超过 map
的负载因子,如果超过,那就需要扩容了:
// overLoadFactor 报告放置在 2^B 个桶中的键值对数量是否超过负载因子。 func overLoadFactor(count int, B uint8) bool { return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen) }
count > bucketCnt
,前半部分判断很简单,就是判断数量一个桶能不能放得下。一个桶就能装下所有数据的话,根本就不用计算了,肯定没超过负载因子。- 后半部分判断翻译过来是:
桶数量 * 负载因子(6.5) < 总键值对数量
,意味着平均每个桶存储的元素个数大于6.5
了,也就是说超过了负载因子了。 - 在其他函数中,判断是否超过负载因子的时候都是使用上面这个函数。
loadFactorNum
和loadFactorDen
是预定义的变量,它们相除就是负载因子6.5
。bucketShift(B)
很简单,就是2^B
。
makeBucketArray 实现
在创建 map
的时候,使用了 makeBucketArray
来给 map
的桶分配内存,makeBucketArray
实现步骤如下:
- 如果判断到
B >= 4
,也就是初始化时需要分配的桶数量大于等于2^4 = 16
的时候,则会预先分配溢出桶,分配的溢出桶个数为2^(b-4)
个。 - 接着,给
map
的桶(buckets
字段)分配内存(包含了普通桶和溢出桶,普通桶和溢出桶的内存一次性分配,溢出桶的内存在普通桶后面)。 - 最后,判断到需要分配溢出桶的话(
B >= 4
),则将溢出桶的指针写入到最后一个普通桶的overflow
字段。
分配 buckets
内存的两种情况:
- 如果
B < 4
,那么分配内存的过程很简单,就是分配buckets
所需要的内存,也就是分配普通桶所需要的内存就足够了,如下图:
- 如果
B >= 4
,那么分配内存的过程就相对复杂,会预先分配一部分溢出桶。在后面需要创建溢出桶的时候,就会先使用这时候创建的溢出桶,而不是直接新建,如下图:
说明:
- 分配
buckets
所需要的内存的时候,会分配一部分溢出桶所需要的内存,普通桶和溢出桶的内存是连续的,分配给溢出桶的内存就在普通桶的后面。 - 在
makemap
中会新建mapextra
结构体,用nextOverflow
字段来保存溢出桶的指针,指向第一个溢出桶的位置。 - 最后一个溢出桶的
overflow
指针(指向溢出桶的指针),指向了buckets
入口,这里并不是说将第一个普通桶作为最后一个溢出桶的溢出桶,而是一个标记作用。因为前面的溢出桶的overflow
字段都是nil
,而最后一个溢出桶的overflow
不是nil
,这样一来,我们通过判断溢出桶的overflow
是否为nil
就可以知道是否是最后一个溢出桶。如果是最后一个溢出桶,那么将map
里面的extra.nextOverflow
字段设置为nil
,表示预分配的溢出桶用完了,后面如果再需要溢出桶的时候,就只能直接new
一个了。 buckets
指针下面的普通桶和溢出桶所需要的内存大小都是t.bucketsize
,也就是bmap
所需要的内存大小(当然是内存对齐之后的)。
具体实现如下:
// makeBucketArray 初始化 map bucket 底层的数组(分配 buckets 的内存)。 // 1<<b 是要分配的最小存储桶数。 // dirtyalloc 应该是 nil(不为 nil,表示清空 map),或者是 makeBucketArray 之前使用相同的 t 和 b 参数分配的桶数组。 // 如果 dirtyalloc 为 nil,将分配一段新的内存;否则将清除 dirtyalloc 指向的内存,将其作为新分配的内存。 // // 参数: // t:底层表示 map 的类型 // b:bucket 的大小为 2^b // dirtyalloc: 不为 nil 表示要清空,用于 mapclear 函数。 // // 返回值: // buckets:正常桶数组入口 // nextOverflow:溢出桶数组入口 func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) { // bucket 数量 = 1 << b base := bucketShift(b) nbuckets := base // 对于小的 b,溢出桶不太可能。避免计算开销。 // 桶的数量小于 2^4 时候,由于数据较少、使用溢出桶的可能性较低, // 会省略创建溢出桶的过程以减少额外开销。 // // 但是大于等于 2^4 的时候,使用到溢出桶的可能性就会比较大,所以需要预先分配溢出桶。 if b >= 4 { // 大于等于 2^4 的时候,额外创建 2^(B-4) 个溢出桶。 nbuckets += bucketShift(b - 4) sz := t.bucket.size * nbuckets // 溢出桶所需要的内存大小 up := roundupsize(sz) // 计算需要的内存大小 if up != sz { // 分配的内存与实际需要的内存不一样, // 可能会比 sz 大一点,重新计算 nbuckets。 // 下面的 nbuckets 才是最终的 bucket 数量(普通桶 + 溢出桶的数量) nbuckets = up / t.bucket.size } } // buckets 所在的内存初始化/清空(mapclear) if dirtyalloc == nil { // 创建新的 bucket 数组 buckets = newarray(t.bucket, int(nbuckets)) } else { // 清空原有的内存 buckets = dirtyalloc // buckets 所需要的总内存大小(单位:字节) size := t.bucket.size * nbuckets // 清空 buckets 开始的 size 字节大小的内存 if t.bucket.ptrdata != 0 { // 有指针 memclrHasPointers(buckets, size) } else { memclrNoHeapPointers(buckets, size) } } // 上面 b >= 4 的情况 if base != nbuckets { // 处理一下预先分配的溢出桶。 nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize))) // buckets 和溢出桶内存是相邻的,计算第一个溢出桶的指针 last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize))) // 最后一个溢出桶 last.setoverflow(t, (*bmap)(buckets)) // 最后一个溢出桶的 overflow 指针链接到第一个普通桶 } // 返回普通桶、溢出桶的指针 return buckets, nextOverflow }
map 定位 key 的实现
我们从上面的讲解中应该很清楚 map
中 bucket
这个数据结构了(上面的 bmap
那个图),在做查找、修改、删除操作的时候,
都需要先根据 key
找到具体的键值对保存在哪一个 bucket
以及 bucket
中的哪一个位置,所以这个操作其实是非常关键的。
在开始下文之前,就先来讲讲 map
中是如何定位一个 key
的。
其实定位的过程比较简单,假设现在要查找一个 key
,那定位 key
的大概步骤如下:
- 计算
hash
: 根据key
计算出其哈希值hash
。 - 计算
bucket
: 哈希值对buckets
长度取模(hash % len(buckets)
),不过实际实现的时候使用了一种优化的方式,位运算(hash & (2^B - 1)
),也就是由哈希值的最低B
位来决定key
最终使用哪一个bucket
(结果跟直接取模不一样,但是思想一样,都能保证得出的结果落在len(buckets)
范围内)。 - 遍历
bucket
(先是普通桶)里面的每一个槽(有 8 个),比较哈希值的最高 8 位(8 bit,也就是tophash
)是否相等,如果相等,则获取存储在bucket
里面的key
,跟我们需要定位的key
做比较,如果相等,则说明已经找到了key
,如果key
不相等,则继续遍历下一个槽,直到bucket
中所有的槽都被遍历完毕。 - 如果
bucket
里面的 8 个槽都遍历完了,仍然没有找到我们需要找的key
。那么会从bucket
的溢出桶去查找,溢出桶内的查找过程跟普通桶内的查找过程是一样的。 - 如此遍历,直到所有溢出桶都遍历完(在找不到的情况下才会遍历
bucket
(普通桶)所有的溢出桶)。
这个查找过程可以表示为下图:
注意:确定一个 key
需要 tophash
和 key
都相等,如果 tophash
相等而 key
不相等,则需要继续比较 bucket
中其他的槽。
map 读取数据的实现
从 map
中读取某一个键的方法主要有三个:mapaccess1
、mapaccess2
、mapaccessK
,这三个方法的代码其实是大同小异的,所以这里只拿 mapaccessK
来讲解。
这三个方法的不同之处在于:
mapaccess1
只返回key
对应的值,对应v := map["k"]
这种写法。mapaccess2
返回key
对应的值,以及是否存在的标志,对应v, ok := map["k"]
这种写法。mapaccessK
用于遍历map
的时候,返回key
和value
,对应for k, v := range map {}
这种写法(在迭代的时候在mapiternext
里面调用)。
mapaccessK
的查找步骤
mapaccessK
的查找步骤大概就是上一个图说的,只不过下面的描述更加详细一点(em… 有点重复了):
- 判断
map
是否为空,为空直接返回 - 计算
key
对应的哈希值 =>hash
- 计算桶的掩码 =>
m
,hash & m
得到bucket
的索引 - 判断是否正在扩容,如果是,需要判断 key 对应的桶的数据是否已经被迁移到新的桶里面:
- 如果是,则需要从新的桶里面查找。
- 如果还没有被迁移,则需要从旧桶中读取。
- 计算
tophash
=>top
(也就是hash
的最高 8 位) - 遍历找到的桶的每一个槽(
slot/cell
)
- 比较
tophash
是否相等 - 如果不等,判断桶后面是否都没有数据了(
b.tophash[i] == emptyRest
) - 如果没有数据了,跳出循环 => 找不到
key
对应的值 - 如果还有数据,则继续遍历下一个槽
- 如果找到一个槽的
tophash
跟上面计算的tophash
相等,则比较key
是否相等
- 是,则返回对应的值(
tophash
和key
都相等,则表明找到了相应的key
,返回对应的值)。 - 否,继续遍历下一个槽(依然是先比较
tophash
,tophash
相等则再比较key
是否相等)
- 溢出桶也找不到,则返回零值(及是否找到的标志)。
mapaccessK
的实现
对于整型的键值的 map
,go 里面有针对优化的实现,但其实代码逻辑上都是差不多的,所以不细说了。下面来看看 mapaccessK
的实现:
// 在迭代 map 的时候,返回键值对。 // 参数: // t: map 类型元信息(记录了 map 中的 key/value 的类型等信息,比如 key 的大小,可用于计算内存偏移) // h: map 结构体类型(也就是实际的哈希表类型) // key: 需要查找的 key // 返回值: // 第一个返回值:当前遍历到的 key 的指针 // 第二个返回值:当前遍历到的 key 对应的值的指针 func mapaccessK(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, unsafe.Pointer) { // map 是空的 if h == nil || h.count == 0 { return nil, nil } // 根据 key 和 hash0 计算 hash hash := t.hasher(key, uintptr(h.hash0)) // hash 的掩码,类似 IP 的掩码 //(比如,假设 B = 3,一共有 8 个元素,索引为 0~7,那么掩码 m 表示为二进制就是 111)。 // 用于跟 hash 值做 & 运算(hash & m),得到 hash 对应 bucket 的索引(0~m) m := bucketMask(h.B) // 根据 hash 计算 bucket 地址,b 是 hash 匹配到的 bucket b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize))) // 正在扩容,如果 bucket 还没迁移到新的地址,则需要从 oldbuckets 中访问 if c := h.oldbuckets; c != nil { // 不是等量扩容,需要从旧桶中读取,所以 m 要移除最高位(右移一位) if !h.sameSizeGrow() { // 不是等量扩容,则将 m 除以 2,因为是 2 倍扩容, // 所以 buckets 的大小为 oldbuckets 长度的 2 倍, // 除以 2 才是旧的桶数量 m >>= 1 } // key 在 oldbuckets 中的地址 oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize))) if !evacuated(oldb) { // b 尚未迁移到新的 buckets 中,还在 oldbuckets 中 // 则需要从旧桶中查找 b = oldb } } // 计算 tophash top := tophash(hash) bucketloop: for ; b != nil; b = b.overflow(t) { // 从 bucket 中查找,一个 bucket 可以存储的元素个数是 bucketCnt,也就是 8 for i := uintptr(0); i < bucketCnt; i++ { // tophash 不匹配,肯定不是这个槽 if b.tophash[i] != top { // bucket 剩余的槽是空的,不用再找了,跳出循环 if b.tophash[i] == emptyRest { break bucketloop } // 继续比较下一个槽 continue } // tophash 匹配 // 接下来将这个槽的 key 取出来 k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) // key 是指针类型 if t.indirectkey() { k = *((*unsafe.Pointer)(k)) } // 比较 key 跟 k 是否相等 if t.key.equal(key, k) { // 相等则读取对应的值(表示找到了匹配的 key 了) e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) // 值是指针类型 if t.indirectelem() { e = *((*unsafe.Pointer)(e)) } // 找到了,返回键值对 return k, e } // 注意:执行到这里,说明 tophash 相等, // 但是 key 不匹配,仍然需要继续遍历。 } } // 普通桶和溢出桶的所有槽都找不到,返回 nil return nil, nil }
这里需要注意的是,如果在扩容的过程中查找,会先判断数据是否已经被迁移到新桶,如果还没有被迁移,则需要从旧的桶中查找。
mapaccessK
关键代码
bucket
的定位代码
m := bucketMask(h.B) // b 就是定位到的 bucket 所在的内存地址 b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
bucket
索引(hash & m
)的定位方式如下:
因为 buckets
是指向 bmap
的指针数组,所以我们可以通过 buckets
加上 bucket
的索引,就可以定位到 bucket
的内存地址。
因为每一个 bmap
的大小是 t.bucketsize
,所以 bucket
的索引乘以 t.bucketsize
,也即 (hash&m)*uintptr(t.bucketsize)
,就是 bucket
的相对偏移量。
然后 buckets
的地址加上 bucket
的相对偏移量,就可以定位到 bucket
的内存地址。
我们需要注意的是,我们计算得到 bucket
的指针后,需要将其转换为 bmap
类型的指针,才能进行后续的操作。
然后 key
和 value
也是通过类似的指针运算来定位的。需要说明的是:
add
函数是做指针算术运算的函数,具体来说就是add(a, b)
就是a
地址加上b
的偏移量,返回的是unsafe.Pointer
类型的指针。unsafe.Pointer(b)
是bucket
的内存地址dataOffset
是bucket
中第一个key
的地址偏移量,t.keysize
是key
的大小t.elemsize
是map
值的大小bucketCnt
是bucket
中槽的数量(8 个,预定义的常量)。
然后大家可以结合上面的 bmap
内存布局图来理解上面指针计算的代码。
// 读取 bucket 中的第 i 个 key。 k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) // 读取 bucket 中的第 i 个 value e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
b.tophash[i] == emptyRest
判断的理解
emptyRest
是一个比较特殊的标记,它表示 bucket
中的后续槽都是空的。
在 map
删除元素的时候,会判断后面还有没有元素,如果没有元素的话,就会将 b.tophash[i]
设置为 emptyRest
。
这样在查找的时候,就可以通过 b.tophash[i] == emptyRest
来判断后面的槽都是空的,就不需要继续遍历了。
indirectkey
和indirectelem
的理解
我们发现上面读取 key
和 value
的时候有一个比较特别的操作:
if t.indirectkey() { k = *((*unsafe.Pointer)(k)) } if t.indirectelem() { e = *((*unsafe.Pointer)(e)) }
相信不少读者第一次看到这几行代码的时候会跟我一样有点懵逼,从 maptype
的定义我们可以看出一点端倪:
func (mt *maptype) indirectkey() bool { // store ptr to key instead of key itself return mt.flags&1 != 0 // 存储了 key 的指针,而不是 key 本身 } func (mt *maptype) indirectelem() bool { // store ptr to elem instead of elem itself return mt.flags&2 != 0 // 存储了 elem 的指针,而不是 elem 本身 }
简单来说,就是 go 底层在有时候会将 key
和 value
保存为指针,而不是直接保存 key
和 value
本身。
这样一来,go 里面的 map
操作就需要根据 key
和 value
的类型来判断是否需要进行指针解引用,也就是取出实际的 key
和 value
。
map 写入和修改的设计与实现
在 go 中,map
的写入和修改都是通过 mapassign
函数来实现的,因为写入和修改本质上是同一个操作,都是找到对应的 key
,然后修改对应的值。
mapassign
的实现步骤
- 计算
key
的hash
值。 - 如果
buckets
还没有初始化,则进行分配内存。 - 计算出
bucket
的索引。 - 如果正在扩容,则迁移当前即将要操作的
bucket
(也就是上一步计算出来的索引对应的bucket
)。 - 计算
tophash
。 - 遍历
bucket
中的槽,记录下第一个空的槽的tophash
索引指针、key
指针,value
指针。如果最后找不到key
的时候,会插入到这里。 - 如果
bucket
的所有桶都找不到key
,则判断是否需要扩容,需要的话就进行扩容,然后再执行 6 的操作。 - 另外一种情况,不需要扩容,而且
bucket
以及它的溢出桶也满了,则需要新建溢出桶来保存key
- 最后,将
tophash/key/value
插入到需要bucket
第一个空的槽。又或者如果已经存在,对value
进行更新。
mapassign
图解
可以分两种情况:
- 普通桶和溢出桶都找不到
key
的情况下,将key
插入桶中第一个空的槽
- 在
bucket
中找到了key
,则会对其进行更新
对于 key
和 value
的存储,有以下两种情况:
- 直接保存在
bucket
中:
这样在我们需要修改 key
/value
的时候,通过 bucket
加上 索引 * keysize/valuesize
就可以得到对应键值存储的实际内存。
- 在
bucket
中保存的是key/value
的内存地址(unsafe.Pointer
)类型
这样如果我们需要修改/读取实际的键值的时候,就需要先从 bucket
中获取这个指针,然后解引用得到实际存储键值的内存指针。
注意:在我们做如下运算的时候(假设 bucket
没有存储实际的 key
,而是存储了 key
的指针):
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
得到的结果是,指向 keys[i]
(也就是第 i
个 key
)的指针(unsafe.Pointer
类型)。
如果 key
保存在 bucket
中,通过这个指针我们就可以读写 key
了。
否则,表示这个指针指向的内存存储的只是一个指针,如果我们需要修改实际的 key
,
就需要通过 key
指针(A
)拿到这里存储的指针(B
),然后再通过 B
来修改实际的 key
。
对
value
的读写同理。
mapassign
源码剖析
扩容的操作后面会有解析,这一节就先不细说了。
这个函数的定义如下:
// 功能:插入 key 或者修改 map 中的 key // 参数: // t:map 类型元信息 // h:map 结构体(实际保存键值对的地方) // key:我们要修改或者插入的 key // 返回值: // 只有一个,那就是我们插入或者修改之后的值。 func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer { if h == nil { panic(plainError("assignment to entry in nil map")) } // 并发写,直接报错 if h.flags&hashWriting != 0 { fatal("concurrent map writes") } // 计算 key 的哈希值(是一个无符号整数) hash := t.hasher(key, uintptr(h.hash0)) // 在调用 t.hasher 之后设置 hashWriting, // 因为 t.hasher 可能会 panic,在这种情况下,我们实际上还没有进行写操作。 // 写标记。(如果读操作发现有写标志则会报错) h.flags ^= hashWriting // 如果 buckets 是空,则新建一个 bucket if h.buckets == nil { h.buckets = newobject(t.bucket) // newarray(t.bucket, 1) } again: // bucket 指示第几个 bucket(命名貌似有点不合适) bucket := hash & bucketMask(h.B) // 正在扩容的话,则将 bucket 迁移到新桶 if h.growing() { growWork(t, h, bucket) } // b 为即将要写入的 bucket b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize))) // 计算 key 的 tophash top := tophash(hash) var inserti *uint8 // 要插入的 tophash 地址 var insertk unsafe.Pointer // 要插入的键地址 var elem unsafe.Pointer // 要插入的值地址 bucketloop: for { // 循环 bucket 的槽 for i := uintptr(0); i < bucketCnt; i++ { if b.tophash[i] != top { // tophash 不匹配,并且当前槽为空,则记录要插入的位置(找不到 key 的时候,最后会插入到这里) if isEmpty(b.tophash[i]) && inserti == nil { inserti = &b.tophash[i] insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) // 为什么找到了可以插入的地方,不中断循环? // 原因是,这个函数是寻找已经存在的 key 的(插入和修改都是用这个函数), // 如果对应的 key 保存到了后面的槽里面的话,这里直接中断循环就是不对的。 // 因为在这种情况下,理应更新后面的那个槽。 } // bucket 没有找到对应的 key,同时 bucket 中剩余的槽都是空的。 // (这意味着 map 中找不到 key,需要插入这个 key 了) // 中止对 bucket 里面槽的遍历。 if b.tophash[i] == emptyRest { break bucketloop } // 虽然找到了空闲的槽,但还是要查看 bucket 中的其他槽,看 key 是否已经存在。 // (如果存在的话,修改 key 对应的值就可以了,当然这个函数里面不会修改,而是返回值的地址,从函数外部修改) // 这个很好理解,保存值的指针都拿到了,想修改就很简单了。 // 如果 key 已经存在,则返回已存在 key 的对应的 elem 的地址。 continue } // tophash 相等,依然需要比较 key 是否相等。 k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) if t.indirectkey() { k = *((*unsafe.Pointer)(k)) } // 但是 key 不想等,继续比较下一个槽 if !t.key.equal(key, k) { continue } // tophash 相等、key 也相等,说明已经存在,更新它即可 // 用 key 更新 k if t.needkeyupdate() { typedmemmove(t.key, k, key) } // 计算 value 所在的地址 elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) // 找到了 key,直接跳转到 done goto done } // 当前的桶里面所有槽都找不到。 // 继续遍历溢出桶(在溢出桶中查找 key 是否存在/有没有空余的槽) ovf := b.overflow(t) if ovf == nil { // 溢出桶也找不到,跳出循环 break } // b 指向下一个溢出桶,下次循环遍历这个溢出桶 b = ovf } // 未找到 key,需要插入这个新的 key。 // 如果我们达到了最大负载系数或者我们有太多的溢出桶, // 同时,如果还没有开始扩容,那么现在开始扩容。 if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) { hashGrow(t, h) // 扩容哈希表会使所有内容无效,因此需要再次尝试插入。 // (上面循环获取到的插入位置的指针已经失效了,扩容之后插入的位置改变了,所以需要再次计算要插入的 bucket,以及要插入的槽中的位置) goto again } // 表明没有找到可以插入的地方,则新建一个溢出桶来保存, // 溢出桶的第一个元素就用来保存 key,返回溢出桶第一个元素 elem 的地址 // (这意味着,我们要插入的桶,所有的槽都有数据了,并且也不是我们要找的 key,所需要溢出桶了) if inserti == nil { // 当前存储桶及其连接的所有溢出存储桶已满,分配一个新的溢出桶。 newb := h.newoverflow(t, b) // 溢出桶的第一个 tophash 的指针 inserti = &newb.tophash[0] // 溢出桶的第一个 key 的指针 insertk = add(unsafe.Pointer(newb), dataOffset) // 溢出桶的第一个 value 的指针 elem = add(insertk, bucketCnt*uintptr(t.keysize)) } // 在插入位置存储新 key/elem if t.indirectkey() { // 这意味着需要给 key 分配内存来保存它 kmem := newobject(t.key) // 给 key 分配内存(kmem 是保存 key 的内存指针) *(*unsafe.Pointer)(insertk) = kmem // bucket 里面的 key 保存新分配的内存指针 insertk = kmem // insertk 指向新分配的地址(跟上一行并不重复) // 最终效果是,insertk 和 kmem 指向了新分配的保存 key 的内存地址。 // 当然,insertk = kmem 不需要也可以,但这样一来下面也要改成: // if t.indirectkey() { // typedmemmove(t.key, kmem, key) // } else { // typedmemmove(t.key, insertk, key) // } } if t.indirectelem() { vmem := newobject(t.elem) // 给 elem 分配内存 *(*unsafe.Pointer)(elem) = vmem // bucket 里面 elem 的槽保存新分配的地址 } // 移动 key 到 insertK(保存新的 key) typedmemmove(t.key, insertk, key) // 保存 tophash *inserti = top // map 元素个数 +1 h.count++ done: // 并发写则报错(多个协程同时写 map) if h.flags&hashWriting == 0 { // 写标志被清除了 fatal("concurrent map writes") } // 清除写标志 h.flags &^= hashWriting // bucket 中的值保存的是指针,这个时候就不能返回 bucket 中值的地址了,而是返回 bucket 中值指向的另外一个地址的指针。 if t.indirectelem() { // 获取指向 elem 实际存储地址的指针 elem = *((*unsafe.Pointer)(elem)) } // 返回存储值的指针 return elem }
有几点要注意的:
- go 中
map
是不允许并发读写的,如果有,直接报错。 - 这里面我们看到了有扩容的操作,在 go 中,
map
的扩容发生在插入、修改和删除的时候,是一种渐进式扩容的方式,每次扩容会迁移两个bucket
,详细的后面讲到扩容的时候会细说。 - 在
bucketloop
这个循环中,会记录下第一个空的槽,在找不到key
的时候会进行插入操作。 - 如果找到,则返回保存值的地址的指针。如果没找到,则将
key
插入到上一步找到的空的槽中,如果也没有空的槽,则会新建溢出桶来保存新插入的key
。 - 在这个函数中,会判断插入之后是否超过负载因子,又或者溢出桶是否太多,来决定是否扩容。
关于
key
匹配的过程,其实跟上面的mapaccess
是一样的过程,先找普通桶,然后查找溢出桶。
map 删除 key 的实现
对于删除操作,其实有一些操作上面已经说过了,如如何定位一个 key
。所以下面的讲述会侧重讲解跟删除操作密切相关的操作。
map 删除 key 的步骤
- 定位
key
所在bucket
,计算tophash
。 - 遍历
bucket
的每一个槽,比较tophash
以及key
,普通桶中查找不到会继续查找溢出桶。 - 如果查找到
key
的话,会清空对应key
在bucket
内存中的tophash
、key
和value
。 - 如果后面的槽没有元素了,则设置
emptyRest
标记。这样在后续查找的时候就可以避免不必要的搜索了。
map 删除过程图解
删除的过程比较简单,定位 key
的过程上面有过详细的讲解了,这里只详细画图阐述一下 emptyRest
的标记设置:
map 删除源码剖析
// 从 map 中删除 key(以及对应的 elem) // 参数: // t:map 类型元数据的结构体 // h:实际保存键值对的桶 // key:需要删除的 key func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) { // map 是空的 if h == nil || h.count == 0 { // 如果 key 的类型不可哈希则 panic if t.hashMightPanic() { t.hasher(key, 0) // see issue 23734 } return } // 并发写则抛出 fatal 错误 if h.flags&hashWriting != 0 { fatal("concurrent map writes") } // 计算 key 的 hash hash := t.hasher(key, uintptr(h.hash0)) // 在调用 t.hasher 之后设置 hashWriting, // 因为 t.hasher 可能会 panic ,在这种情况下,我们实际上没有执行 write(delete)。 h.flags ^= hashWriting // 计算 key 落在哪一个 bucket 中 bucket := hash & bucketMask(h.B) // 如果正在扩容,则迁移 bucket 到新的内存地址中(迁移到新的桶) // (迁移当前正在访问的 bucket) if h.growing() { growWork(t, h, bucket) } // 获取 bucket 实例(key 所在的 bucket) b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize))) // 记录原始的 bucket 实例 //(目的是为了方便加 emptyRest 标记) bOrig := b // 计算 tophash top := tophash(hash) search: // 在 bucket 内进行搜索 for ; b != nil; b = b.overflow(t) { // 遍历 bucket 的每一个槽 for i := uintptr(0); i < bucketCnt; i++ { // tophash 不想等的时候,需要判断后面是否还有元素 if b.tophash[i] != top { // 从 i 开始 bucket 后面都是空的了, // 中止搜索过程(去除写标记,函数执行完毕) if b.tophash[i] == emptyRest { break search } // 还有元素,继续搜索 continue } // tophash 相等 // 获取 key 在 bucket 中的内存地址 k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) k2 := k // k2 代表的是指向实际存储 key 的指针(unsafe.Pointer) // k2 指向原始的地址 if t.indirectkey() { k2 = *((*unsafe.Pointer)(k2)) } // key 不想等,继续检查下一个 slot(continue) if !t.key.equal(key, k2) { continue } // 只有在键中有指针时才清除键。 // 清空 key 的内存 if t.indirectkey() { // 清除 bucket 里面保存 key 的内存 //(bucket 只是存储了 key 的指针) *(*unsafe.Pointer)(k) = nil } else if t.key.ptrdata != 0 { // 清除包含指针的 key memclrHasPointers(k, t.key.size) } // 值的地址 e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) // 清空值 if t.indirectelem() { // 清除 bucket 里面保存 elem 的内存 *(*unsafe.Pointer)(e) = nil } else if t.elem.ptrdata != 0 { // 清除包含指针的 elem(值) memclrHasPointers(e, t.elem.size) } else { // 清除不包含指针的 elem 的内存 memclrNoHeapPointers(e, t.elem.size) } // tophash 设置为空标记 b.tophash[i] = emptyOne // 如果 bucket 现在以一堆 emptyOne 状态结束, // 将这些状态更改为 emptyRest 状态。 // 最好将其作为一个单独的函数,但 for 循环当前不可内联。(所以用 goto) if i == bucketCnt-1 { // 要删除的 key 是 bucket 里面的最后一个元素。 // 同时还有溢出桶,并且溢出桶里面还有元素 => 表明当前删除的 key 不是 bucket 以及溢出桶里面的最后一个元素。 if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest { // 不是最后一个元素 goto notLast } } else { // 要删除的 key 不是 bucket 的最后一个元素。 // 并且后面还有元素。 if b.tophash[i+1] != emptyRest { goto notLast } } // 执行到这里的时候,表明刚刚删除的 key 是 bucket 以及溢出桶中的最后一个元素。 // 如果不是最后一个元素的话,上面的 if 判断已经跳转了。 // 下面的 for 循环做的事情是: // 1. 将当前 key 的标志设置为:emptyRest,表示后面没有元素了。 // 2. 从 bucket 的第一个 key 出发遍历所有的槽(包含溢出桶), // 将最后一个元素以后一直到被删除的 key 的中间的所有槽标记为 emptyRest // 目的是,在后续遍历的时候可以避免一些不必要的查找操作,见到 emptyRest 就可以直接中断循环了。 // 例子,bucket 里 key 的内存布局为 | nil | a | nil | nil | b | nil | // b 被删除的时候,b 以及 a 后面的两个元素都要标记为 emptyRest(溢出桶同理) for { // 给当前遍历到的 bucket 槽打上 emptyRest 标记 b.tophash[i] = emptyRest if i == 0 { // 当前的桶遍历完了(因为是从后往前遍历) // b 是普通桶(不是溢出桶) if b == bOrig { // 所有空的槽都处理完了 break } // b 是上一次循环处理的桶 c := b // c 是上一个 b // 查找上一个存储桶,在其最后一个条目处继续。 // 如:bucket <- overflow1 <- overflow2 <- ... <- overflowN // 假设 c 是 overflow2,那么下面的循环过后,b 就是 overflow1 // prevB.overflow = b => 找 prevB,也即:遍历完当前的桶,找前一个桶。 for b = bOrig; b.overflow(t) != c; b = b.overflow(t) { } // b 指向了前一个 bucket(前一个桶) // 处理前一个 bucket 的最后一个槽 i = bucketCnt - 1 } else { // 继续处理前一个槽 i-- } // 找到了一个不是空的槽,表示有数据了,不需要再打 emptyRest 标记了。 if b.tophash[i] != emptyOne { break } } // 被删除的元素如果不是最后一个元素,直接跳转到这里。 notLast: // 找到了元素,并且删除了。 // map 的元素个数减 1 h.count-- // 重置哈希种子,使攻击者更难重复触发哈希冲突。见 issue 25237。 if h.count == 0 { h.hash0 = fastrand() } // 在 bucket 内找到了对应的元素,并且删除了。 // 退出循环。 break search } } // 如果当前没有写标记,则抛出 fatal 错误(不能并发读写 map) if h.flags&hashWriting == 0 { fatal("concurrent map writes") } // 清除写标记 h.flags &^= hashWriting }
注意:
- go
map
不允许并发写,所以如果发现有并发读写,则抛出 fatal 错误。 - 如果删除的是最后一个元素,则需要往前遍历,将每一个空的槽设置为
emptyRest
状态。 - 如果是
indirectkey
、indirectelem
,在删除的时候,只会将bucket
中的指针置为nil
,对于实际的key
和value
不会进行处理。(无所谓,GC 会出手)。
map 的扩容实现
从上面的讲解中,我们知道,底层存储 key/value
的是 bucket
,而 bucekt
的大小是一段有一定大小的连续内存。
如果我们插入的元素过多,我们初始化时分配的 bucket
的内存就会放不下了,这个时候 go 的 map
会有两种方式解决这个问题:
- 使用溢出桶(在
bmap
的上再链接一个bmap
,也就是溢出桶,普通桶放不下的时候,就放溢出桶中) - 分配新的更大的空间来存放现有的这些键值对。在 go 里面新分配的内存空间将会是原来的 2 倍。
map 扩容的条件
map
的扩容发生在插入或者修改或者删除 key
的时候,扩容的条件在 mapassign
中写了:
// 条件: // 1. 没有在扩容 // 2. 超过负载因子 // 3. 太多溢出桶 if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) { hashGrow(t, h) } // 判断是否超过负载因子。 func overLoadFactor(count int, B uint8) bool { return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen) } // 判断是否有太多的溢出桶了 // 多的标准是: // B <= 15 的时候,溢出桶数量大于 2^B 的时候 // B > 15 的时候,溢出桶的数量大于 2^15 的时候 func tooManyOverflowBuckets(noverflow uint16, B uint8) bool { // 如果阈值太低,我们会做额外的工作。 // 如果阈值太高,则增长和收缩的 map 会保留大量未使用的内存。 // “太多” 意味着(大约)与常规桶一样多的溢出桶。 // 有关详细信息,请参见incrnoverflow。 // B > 15 => 2 ^ 15,B <= 15 => 2 ^ B if B > 15 { B = 15 } return noverflow >= uint16(1)<<(B&15) }
hashGrow 实现
hashGrow
是被用来分配新的内存空间的,新的内存空间将被用来保存旧的 buckets
。需要注意的是,这个函数里面并没有做数据迁移的操作。
go 的 map
扩容的时候,数据迁移的方式是渐进式扩容,在我们插入/修改/删除 key
的时候会迁移 2 个 bucket
,这样可以避免性能的瞬时抖动。
我们熟知的 redis
的扩容过程也是渐进式扩容的。
下面是 hashGrow
的实现源码:
// hash 扩容(这个方法只是分配了空间,实际上还没有做数据复制的操作) // 参数: // t:map 类型元信息 // h:实际保存键值对的结构体 func hashGrow(t *maptype, h *hmap) { // 扩容的两种情况: // 1、如果我们达到了负载系数,就要进行 2 倍扩容。 // 2、否则,如果溢出桶太多,进行等量扩容。 bigger := uint8(1) // 尚未超过负载因子,进行等量扩容 if !overLoadFactor(h.count+1, h.B) { bigger = 0 // 等量扩容 h.flags |= sameSizeGrow } // 记录旧的 buckets oldbuckets := h.buckets // 分配新的内存空间,oldbuckets 将会被渐进式迁移到 newbuckets 中 newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil) // flags 标记有使用旧桶 // 正在迭代的时候扩容 // 先把 h.flags 中的迭代标记位清除。 // 最后如果发现 h.flags 中还有迭代标记位,说明在扩容的过程中有新的迭代操作, // 那就把它转移到 oldIterator 中。 flags := h.flags &^ (iterator | oldIterator) if h.flags&iterator != 0 { flags |= oldIterator } // 提交扩容操作 h.B += bigger // 加上扩容的数量 h.flags = flags h.oldbuckets = oldbuckets // 记录旧桶 h.buckets = newbuckets // 指向新的桶数组 h.nevacuate = 0 // 0 个桶已完成迁移(当前函数只是分配空间,不做迁移) h.noverflow = 0 // 所有溢出桶没有了(移动到了 oldoverflow) // 记录扩容前的溢出桶 if h.extra != nil && h.extra.overflow != nil { if h.extra.oldoverflow != nil { throw("oldoverflow is not nil") } h.extra.oldoverflow = h.extra.overflow h.extra.overflow = nil } // 扩容之后,h.B+bigger >= 4 了,预分配了溢出桶(扩容前没有溢出桶) // 所以这里要记录溢出桶 if nextOverflow != nil { if h.extra == nil { h.extra = new(mapextra) } h.extra.nextOverflow = nextOverflow } // 哈希表数据的实际迁移过程是通过 growWork() 和 evacuate() 增量完成的。 }
数据迁移的两条线
go map
在扩容的时候,数据迁移会有两条线进行:
- 从第一个
bucket
开始迁移。 - 插入、修改、删除的时候,
key
的哈希值定位到的bucket
会被迁移。
具体如下图:
如果已经被迁移,则不再需要迁移。
这样一来就可以保证在一定的操作次数之后,全部 bucket
都被迁移。就算你每次插入、修改、删除都是同一个 key
(也就是同一个 bucket
),
我们第一条线的迁移都会在每次写操作的时候,迁移一个 bucket
。这样无论如何,写操作到了一定次数之后,所有的 bucket
都会被迁移了。
什么时候 bucket 迁移之后下标会改变?
当然这里说的是增量扩容,如果是等量扩容,bucket
的下标不会改变。
先说答案:hash & m
的最高位是 1 的时候。
我们知道,在定位 bucket
的时候是通过 hash & m
的方式来定位 bucket
的索引的(具体可以看上面定位 key
的那一节),
而 2 倍扩容之后,bucket
的长度是原来的 2 倍,转换为二进制的时候,就是在原来的基础上多了一位,所以 hash & m
的结果就会多一位,
而最高的那个二进制位如果是 1,说明 hash & m
的结果是大于新的 bucket
数组长度的一半的(也就是比原来的索引都要大)。
那么会最高位是 1 会比原来的索引会大多少呢?答案是 bucket
数组的长度的一半(也就是 2^(B-1)
):
我们可以再看看之前的那个计算 bucket
索引的图:
我们会发现,当 B = 3
的时候,不管 hash
是什么,hash & m
的结果都是 0~7
。
而只有当 B = 4
的时候,hash & m
的结果才有可能落入 8~15
范围内,而且只有 hash & m
的最高位是 1 的时候才有可能。
所以结论是,当 hash & m
的最高位是 1 的时候,bucket
的下标就会改变。
而 hash % m
的其他位跟之前是一样的,所以下标增加的范围其实就是 2^(B - 1)
,也就是旧的 buckets
的个数。
bucket 迁移图解
说明:
oldbuckets
是迁移前的桶,buckets
是迁移后的桶(也就是当前的h.buckets
)。hash & m
为0xxx
的时候,会迁移到x
这个bucket
中。hash & m
为1xxx
的时候,会迁移到x + 2^(B-1)
这个bucket
中(也就是y
中),因为1000
就是2^(4 - 1)
。
假设需要迁移的是 oldbucket
,下标为 3
,那么 oldbucket
里面的 key
可能迁移的位置只可能是右边的 x
和 y
指向的 bucket
。
这取决于 oldbucket
里面的 key
哈希值的倒数第 4
位;
如果是 0
,那么就迁移到 x
指向的 bucket
,如果是 1
,那么就迁移到 y
指向的 bucket
。
oldbucket
中所有的 key
只会迁移到 x
或 y
中。同时 x
和 y
也只可能有 oldbucket
的 key
,不可能有其他旧的 bucket
的 key
。这是由 hash & m
可以推断出来的。
bucket 迁移源码剖析
开始之前,我们要记住 x
和 y
是怎么来的。
在开始看代码之前,我们需要明确一点:虽然整个哈希表是渐进式迁移的,但是单个 bucket
的迁移不是渐进式的。
- 我们先看看
growWork
函数:
这是迁移桶的函数,一次会迁移两个桶。不过实际上并不是严格的两个,因为迁移的函数会先判断桶是否已经被迁移,
如果桶还没有被迁移,才会进行迁移,如果桶已经被迁移则不做任何操作。
// 桶迁移 // 参数:h 需要扩容的 map,t map 类型信息,bucket 旧桶的索引 // 1、迁移当前访问到的桶(mapassign、mapdelete 的时候) // 2、继续逐个迁移 bucket,直到迁移完成(这个是从第一个桶开始迁移的) func growWork(t *maptype, h *hmap, bucket uintptr) { // 迁移当前访问到的桶 evacuate(t, h, bucket&h.oldbucketmask()) // 继续逐个迁移 bucket,直到迁移完成 if h.growing() { evacuate(t, h, h.nevacuate) } }
- 然后看看
evacuate
函数:
这个就是实际做迁移操作的函数。它会根据 hash & m
的倒数第 B
位是否为 1 来决定将 key
迁移到 h.buckets
的前半部分还是后半部分。
// 迁移的目的地(旧桶 -> 目标桶) // evacuate 中会定义两个这个 evacDst 变量, // 一个指向 h.buckets 的前半部分,一个指向后半部分。(对应前一个图的 x,y) // 迁移的时候,会根据 key 的哈希值的倒数第 4 位来决定迁移到哪个 evacDst 中。 type evacDst struct { b *bmap // 迁移目的地 bucket i int // key/elem 在迁移目标 bucket 里面对应的下标 k unsafe.Pointer // 目标 bucket 下一个保存 key 的地址指针 e unsafe.Pointer // 目标 bucket 下一个保存 elem 的地址指针 } // 扩容的时候,bucket 迁移的实现 // oldbucket 需要迁移的旧桶索引 func evacuate(t *maptype, h *hmap, oldbucket uintptr) { // b 指向旧桶 b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))) // 扩容之前的桶的数量 // oldbucket+newbit 对应 y,oldbucket 对应 x newbit := h.noldbuckets() // 如果 b 尚未迁移,则进行迁移 if !evacuated(b) { // xy 包含 x 和 y(低和高)迁移目的地。 // 迁移的时候,在 h.buckets 中,前 noldbuckets 个桶就是 x,后 noldbuckets 个桶就是代表 y。 var xy [2]evacDst // x 存储了新桶的地址 x := &xy[0] // x 指向 oldbucket 可能迁移到的新桶(h.buckets 的前半部分) x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize))) // x 桶中下一个用来保存旧桶的 key 的地址 x.k = add(unsafe.Pointer(x.b), dataOffset) // x 桶中下一个用来保存旧桶的 elem 的地址 x.e = add(x.k, bucketCnt*uintptr(t.keysize)) // 如果不是等量扩容(有可能会迁移到 oldbucket+newbit 的位置上) if !h.sameSizeGrow() { y := &xy[1] // y 指向增量空间上的第 oldbucket+newbit 个位置 y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize))) // y 桶中下一个保存旧桶 key 的地址 y.k = add(unsafe.Pointer(y.b), dataOffset) // y 桶中下一个保存旧桶 elem 的地址 y.e = add(y.k, bucketCnt*uintptr(t.keysize)) } // 开始迁移旧桶(同时也会迁移溢出桶) for ; b != nil; b = b.overflow(t) { // 旧 bucket 上第一个 key k := add(unsafe.Pointer(b), dataOffset) // 旧 bucket 上第一个 value e := add(k, bucketCnt*uintptr(t.keysize)) // 遍历 bucket 的槽 // 获取需要迁移的 key/elem(k/e) 对,迁移到新桶上(&xy[useY]) for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) { // 获取旧的 tophash top := b.tophash[i] // bucket 的这个槽是空的 if isEmpty(top) { // 写入已迁移标记 b.tophash[i] = evacuatedEmpty // 处理下一个 key/elem continue } // tophash 错误 if top < minTopHash { throw("bad map state") } // 复制一份 k k2 := k // k2 指向实际的 key if t.indirectkey() { k2 = *((*unsafe.Pointer)(k2)) } // useY 决定了是迁移到 x 还是 y,如果是等量扩容,那么就是迁移到 x var useY uint8 // 如果不是等量扩容 if !h.sameSizeGrow() { // 计算哈希以做出迁移决定(是否需要将此 key/elem 迁移到桶 x 或桶 y)。 hash := t.hasher(k2, uintptr(h.hash0)) if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) { // 下面这段是原英文注释的翻译(可能不太准确): // 如果 key != key(不是一个数字),则散列可能(并且可能)与旧散列完全不同。 // 此外,它是不可再现的。 // 在迭代器存在的情况下,再现性是必需的,因为我们的迁移决策必须与迭代器所做的任何决策相匹配。 // 幸运的是,我们可以任意发送这些 key。 // 此外,tophash 对于这些类型的 key 没有意义。我们让 tophash 的最低位的决定如何迁移。 // 我们为下一个级别重新计算一个新的随机 tophash,这样在多次扩容后,这些 key 将均匀分布在所有桶中。 useY = top & 1 top = tophash(hash) } else { // 原理参考上面那个图。 if hash&newbit != 0 { // 取决于 oldB + 1 位是 0 还是 1 useY = 1 } } } if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY { throw("bad evacuatedN") } // 记录旧桶的迁移状态 b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY // dst 是迁移的目标 bucket dst := &xy[useY] // 目标 bucket 装不下了,使用溢出桶 if dst.i == bucketCnt { // 创建溢出桶 dst.b = h.newoverflow(t, dst.b) dst.i = 0 // dst.k 指向溢出桶的第一个 key 的地址 dst.k = add(unsafe.Pointer(dst.b), dataOffset) // dst.e 指向溢出桶的第一个 elem 的地址 dst.e = add(dst.k, bucketCnt*uintptr(t.keysize)) } // 使用 & 运算优化,不用进行边界检查 // 记录 tophash dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check // 将旧的 key/elem 复制到 dst 指向的槽 if t.indirectkey() { // bucket 的 key 保存的是指针 // 修改实际存储 key 的内存 *(*unsafe.Pointer)(dst.k) = k2 // copy pointer } else { // 修改 bucket 中保存 key 的内存 typedmemmove(t.key, dst.k, k) // copy elem } if t.indirectelem() { // bucket 的 elem 保存的是指针 // 修改实际存储 elem 的内存 *(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e) } else { // 修改 bucket 中保存 elem 的内存 typedmemmove(t.elem, dst.e, e) } // 目标桶的元素个数 +1 // dst 指向下一个空的槽(slot/cell) dst.i++ // 这些更新可能会将这些指针推到 key 或 elem 数组的末尾。 // 这没关系,因为我们在桶的末端有溢出桶指针,以防止指针指向桶的末端。 // 如果指向数组末尾,在下次迁移的时候,会创建溢出桶。 dst.k = add(dst.k, uintptr(t.keysize)) dst.e = add(dst.e, uintptr(t.elemsize)) // 上面有判断 dst.i == bucketCnt,所以这里不会溢出 } } // 取消链接溢出桶并清除 key/elem 以帮助 GC。(清除旧桶的内存) if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 { b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)) // tophash 状态不能清除。 // 但是 key/elem 都可以清除。 ptr := add(b, dataOffset) n := uintptr(t.bucketsize) - dataOffset memclrHasPointers(ptr, n) } } // 刚刚迁移的桶,就是顺序迁移的下一个桶, // 则需要更新 nevacuate 字段,表示已经迁移了多少个桶 if oldbucket == h.nevacuate { advanceEvacuationMark(h, t, newbit) } }
advanceEvacuationMark
的作用是更新nevacuate
字段,表示已经迁移了多少个桶。
我们上面说了,哈希表扩容的时候,会有两条线,advanceEvacuationMark
就是处理顺序迁移的那条线,让 nevacuate
指向下一个未迁移的桶。
为什么需要做这个处理呢?这是因为另外一条线的迁移是随机的,访问到哪个桶就迁移哪个桶,这就导致了,顺序迁移的那条线,在将 nevacuate
指向下一个桶的时候,其实下一个桶是已经迁移了的,我们下次顺序迁移的时候肯定不需要迁移这个桶。
那么解决办法就是,继续向后查找,找到第一个未迁移的桶。
// 记录迁移进度(从顺序迁移的位置往后遍历,保证 nevacuate 指向下一个尚未迁移的桶) // 参数:h 需要扩容的 map,t map 类型信息,newbit 是旧桶的数量 func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) { // nevacuate 索引加 1 h.nevacuate++ // 往后找一个未迁移的桶(最多遍历 1024 个桶)。 stop := h.nevacuate + 1024 if stop > newbit { stop = newbit } // 向后遍历,找到第一个未迁移的桶。 for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) { h.nevacuate++ } // 这意味着所有旧桶都迁移完了 if h.nevacuate == newbit { // 扩容已经完成。释放旧的普通桶数组。 h.oldbuckets = nil // 也可以丢弃旧的溢出桶。 // 如果它们仍然被迭代器引用,那么迭代器将保存指向切片的指针。 //(但是 h 不再需要保存指向切片的指针) if h.extra != nil { h.extra.oldoverflow = nil } // 移除等量扩容标志 h.flags &^= sameSizeGrow } }
这里需要注意的是下面几行代码:
stop := h.nevacuate + 1024 if stop > newbit { stop = newbit }
我们在将 nevacuate
加上 1 之后,还会继续往后遍历 1024 个 bucket,如果 bucket 已迁移,则将 nevacuate
加 1。
如果没有这个操作会怎样?那就意味着可能有很多的 bucket 都已经迁移了,但是顺序迁移的位置(nevacuate
)还没有更新,
这样可能会导致顺序迁移的位置每次都指向了已迁移的 bucket
。
最终导致,迁移的时候,只迁移了访问到的 bucket
,而没有迁移顺序迁移位置上的那个 bucket
。
也就是说,每次迁移的时候只会迁移 1 个 bucket
,而不是 2 个,这样一来 growWork
需要调用的次数比原来更多,也就是说,需要写操作次数更多才能完成全部 bucket
的迁移。
这个函数做的事情可以表示成下图:
在这个图中,nevacuate
一开始是 1
,然后因为 3
这个 bucket
在这次 growWork
中已经迁移了,
nevacuate
如果要指向下一个未迁移的 bucket
的话,就要跳过之前已经迁移的 2
,以及本次 growWork
中已经迁移的 3
,
所以最终 nevacuate
指向了 4
,也就是下一个未迁移的 bucket
。
等量扩容的效果
在上面我们讲解的时候,其实已经假设了扩容是增量扩容(2 倍扩容),但实际上还有一种扩容方式,就是等量扩容。
等量扩容的时候,扩容前后的 bucket
其实数量是一样的,那么为什么还要进行扩容呢?
这是因为,溢出桶太多了,数据非常零散地分布在了很多的溢出桶中,这样会导致 bucket
中很多槽都是空的,
这样一来,进行查找、修改、删除的时候,需要遍历很多的溢出桶,这样会导致性能下降。如下图:
这个图中,我们假设要查找的 key
所在的普通桶以及前两个溢出桶都是空的,又或者 key
不在前面三个桶中,那只有遍历到最后一个溢出桶的时候才能找到我们要查找的 key
。为了针对这种键值对数量没有达到扩容的阈值,但是溢出桶太多的情况,Go 语言提供了等量扩容的方式。
在等量扩容的时候,会将所有的溢出桶都迁移到新的 bucket
中,这样一来,bucket
中的槽就会被填满,而溢出桶也可能不再需要。
最后,针对上图的情况,key = foo
会被迁移到普通桶中,这样在查找的时候,只需要遍历普通桶就可以找到了。
当然,实际中的情况是,对于零散分布在多个溢出桶中的键值对,会被逐个往前挪,最终效果就是,桶中没有空的槽,除了最后一个 key
以后的槽。
大家可以结合下图想象一下:
当然下图只是描述了一下部分 key
,如果 key
分布。实际上 key
在触发等量扩容的情况下,是零散地分布在不同的 bucket
中的(包括溢出桶)。
map 的迭代实现
go 的 map
迭代的时候,我们会发现,返回结果的顺序并不固定,这是因为 map
的迭代是无序的。
在 map
遍历的时候,每次都会从一个随机的 bucket
开始遍历,而且选了一个 bucket
之后,
还会从 bucket
中随机选择一个槽开始遍历,这样一来,每次遍历的结果都是不一样的。
map 迭代器数据结构
迭代器的功能:记录要遍历的 map
,以及当前遍历到的 bucket
以及槽的索引,以便进行下一次遍历。
go 的 map
迭代器的数据结构如下:
// 哈希迭代结构。 type hiter struct { // 必须排在第一位。 写入 nil 表示迭代结束 key unsafe.Pointer // 必须在第二个位置(参见 cmd/compile/internal/walk/range.go)。 elem unsafe.Pointer t *maptype // map 的类型信息,包括 key、elem 的类型等信息 h *hmap // 需要迭代的 hmap // hash_iter 初始化时的 bucket 指针 buckets unsafe.Pointer // 当前正在遍历的 bucket bptr *bmap // 保持 hmap.buckets 的溢出桶存活 overflow *[]*bmap // 保持 hmap.oldbuckets 的溢出桶存活 oldoverflow *[]*bmap // bucket 迭代开始位置(随机选择的 bucket) startBucket uintptr // 在迭代期间开始的桶内偏移量(应该足够大以容纳 bucketCnt-1) offset uint8 // 已经从桶数组的末尾环绕到开始 wrapped bool B uint8 // 就是当前遍历的 hmap 的那个 B i uint8 // 当前遍历的 bucket 内 key 的索引 bucket uintptr // 当前遍历的 bucket checkBucket uintptr }
几点注意的:
- 全部键值对遍历完的时候,
key
会被置为nil
,这样一来,我们就可以通过key
是否为nil
来判断是否遍历完了。(当然这个不用开发者来判断,for...range
底层已经帮我们做了这个判断)。 hiter
结构体保存了当前正在迭代的bucket
(bptr
)、bucket
中的key
的索引(i
)等信息,这样一来,我们就可以通过这些信息来确定下一个key
的位置。
迭代器的初始化实现
go 的 map
初始化是通过 runtime.mapiterinit
来实现的,这个函数的实现如下:
// mapiterinit 初始化用于 range 遍历 map 的 hiter 结构。(初始化 hiter) // 'it' 指向的 hiter 结构由编译器顺序传递在堆栈上分配,或由 reflect_mapiterinit 在堆上分配。 // 两者都需要将 hiter 置零,因为结构包含指针。 func mapiterinit(t *maptype, h *hmap, it *hiter) { // hiter 记录 map 的类型信息 it.t = t // 如果 map 是空的直接返回 if h == nil || h.count == 0 { return } // 个人猜测:内存对齐判断,这个由编译器决定的 if unsafe.Sizeof(hiter{})/goarch.PtrSize != 12 { throw("hash_iter size incorrect") // see cmd/compile/internal/reflectdata/reflect.go } // it 关联上 map it.h = h // 获取桶状态的快照 it.B = h.B // 当前的 buckets it.buckets = h.buckets // bucket 里面没有包含指针 if t.bucket.ptrdata == 0 { // 分配当前切片并记住指向当前切片和旧切片的指针。 // 这将保持所有相关的溢出桶处于活动状态,即使在迭代时表增长和/或溢出桶添加到表中。 h.createOverflow() it.overflow = h.extra.overflow it.oldoverflow = h.extra.oldoverflow } // 决定从哪里开始遍历。 // 策略:随机选定一个 bucket,然后从该 bucket 开始遍历。 // 生成随机数 var r uintptr if h.B > 31-bucketCntBits { r = uintptr(fastrand64()) } else { r = uintptr(fastrand()) } // r => 扫描的入口(随机选的 bucket) it.startBucket = r & bucketMask(h.B) // 随机定位的 bucket 中的槽。 it.offset = uint8(r >> h.B & (bucketCnt - 1)) // 记录当前扫描的 bucket it.bucket = it.startBucket // 记住我们有一个迭代器。 // 可以与另一个 mapiteinit() 同时运行。 if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator { atomic.Or8(&h.flags, iterator|oldIterator) } // 开始遍历 mapiternext(it) }
这个函数主要功能是初始化迭代器,我们最需要关心的是,这个函数里面会生成一个随机数,然后通过这个随机数来决定从哪一个 bucket
开始遍历,
以及从 bucket
中的哪一个槽开始遍历(也是随机的)。
map 遍历图解
那为什么从随机定位的 bucket
以及随机定位的 key
就可以实现遍历呢?其实很简单,如果看过我之前写的 《go chan 设计与实现》的话,
就会知道 chan
的实现中是通过数组来实现环形队列的。而我们可以借助环形队列的特性来理解 map
的遍历。遍历到最后一个 bucket
之后,
下一个 bucket
就是第一个 bucket
,这样就实现了环形遍历。同样的,遍历到最后一个 key
之后,下一个 key
就是第一个 key
,这样也实现了环形遍历。
我们需要做的就是,在遍历开始的时候,记录第一个 bucket
,然后每次遍历 bucket
的时候,
比较当前的 bucket
是否是第一个 bucket
,是的话,意味着遍历结束了。
同样的,对于 bucket
内 key
的遍历也是。
不过,在实际实现中,key
有点不一样,key
的遍历是通过将 i
从 0
遍历到 7
,对于每一个 i
,加上 it.offset
,然后对 8
取模,
这样就可以实现环形遍历了,代码如下:
for ; i < bucketCnt; i++ { // 桶内第 i 个元素 offi := (i + it.offset) & (bucketCnt - 1) }
具体遍历过程如下图:
发生在扩容期间的遍历
go 的 map
设计中,是不允许在迭代的时候进行插入、修改、删除的,但只是在获取下一个键值对的时候不允许,
在迭代器获取了键值对之后,就算还没有全部遍历完 map
的所有元素,还是可以允许做插入、修改、删除操作的。
这样一来,有可能会出现一些奇怪的现象,比如插入的 key
遍历不出来,或者遍历出来的 key
有重复等等。这是需要开发者注意的地方。
另外,迭代可以发生在扩容的过程中,但是扩容其实对迭代其实是没有什么影响的。因为迭代的时候会做一些判断尽量保证所有 key
都能被遍历到。
但不能保证我们对 map
做了写操作后依然可以全部 key
都遍历。
在遍历的过程中,如果 map
发生了扩容,那么遍历的过程就会变得复杂一些。因为在扩容的过程中,map
会新建一个 bucket
,
然后将原来的 bucket
中的 key
重新散列到新的 bucket
中。所以在遍历的过程中,如果发现当前的 bucket
已经发生了扩容,
需要做一些判断,比如:
- 如果发现
bucket
还没有迁移,则从oldbuckets
中遍历。 - 如果发现
bucket
在迁移之后索引跟原来的不一样,则跳过。
具体可以参考下图:
这里假设的条件是:旧桶个数为 4
,增量扩容后,新桶个数为 8
。hiter
当前迭代的是新桶。
说明:
h.oldbuckets
指向了还没迁移完的桶,h.buckets
是当前的桶。hiter
迭代器要迭代的是新的桶。迭代器初始化的时候正在进行 2 倍扩容。checkBucket
是下一个要遍历的桶(索引为1
),图中的情况是,这个桶还没有被迁移。所以需要从h.oldbuckets
中读取。checkBucket
中的key
有可能会迁移到h.buckets
中的1
或者5
位置。(具体可以看上面桶迁移的实现那一节)- 如果
key
是要被迁移到5
中的话,那么遍历的时候会跳过,因为后面会遍历到5
中的key
。 - 对于第 5 点,在遍历
5
这个bucket
的时候,由于我们是使用当前遍历的bucket
的下标结合旧桶的长度计算在旧桶中的下标的,所以还是可以取得到旧桶,然后遍历的时候取出那些应该迁移到5
这个bucket
的key
,对于那些应该要迁移到1
的key
则跳过。 - 下一个要遍历的桶的索引为
2
。
对于第 6 点桶索引计算的特别说明,如果是增量扩容,计算 bucket
的下标方式如下:
// bucket 是当前要遍历的 bucket 的下标 // it.h.oldbucketmask() 是旧桶的 B 的掩码 oldbucket := bucket & it.h.oldbucketmask()
当然如果这个桶已经迁移,那么还是会从新桶遍历(也就是 bucket & it.h.oldbucketmask()
里的 bucket
本身)。
键值对遍历源码剖析
map
中实现遍历的函数是 mapiternext
,这个函数做的事情,也就是上面两个图描述的遍历过程,代码如下:
// 参数:迭代器实例 func mapiternext(it *hiter) { // 获取底层的 hmap 实例 h := it.h // 正在插入、修改、删除 key 但时候,不能获取下一个 key, // 注意:这不能保证插入、修改、删除之后,进行迭代。 if h.flags&hashWriting != 0 { fatal("concurrent map iteration and map write") } t := it.t // *maptype bucket := it.bucket // 当前遍历到的 bucket 下标(int 类型) b := it.bptr // 当前 bucket 的指针(实际的 bucket,bmap 指针类型) i := it.i // 迭代器当前遍历到的位置(bucket 内 key 的位置) checkBucket := it.checkBucket // 这个用来判断是否是增量扩容的 next: // 注意:下面的 if 的功能是,获取下一个桶。 // 如果 b 是 nil,有以下两种情况: // a. 第一次遍历,还没有遍历到任何 bucket。 // b. 遍历完最后一个 bucket 了。 if b == nil { // 已经迭代完了,直接退出函数 if bucket == it.startBucket && it.wrapped { // for...range 查看到 key 是 nil 会中止迭代 it.key = nil it.elem = nil return } // 如果正在扩容 if h.growing() && it.B == h.B { // 迭代器是在扩容过程中启动的,扩容尚未完成。 // 如果我们正在查看的存储桶尚未迁移, // 那么我们需要遍历旧存储桶,同时只返回将迁移到此存储桶的数据。 // 那些需要迁移到另一个下标的桶则跳过。 oldbucket := bucket & it.h.oldbucketmask() // 获取旧桶 b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))) if !evacuated(b) { // 旧桶还没有迁移 checkBucket = bucket } else { // 已经迁移了,获取新桶 b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize))) checkBucket = noCheck } } else { // 并没有在扩容,获取新的桶 b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize))) checkBucket = noCheck } // bucket 指向了下一个要迭代的桶的下标 bucket++ // 判断当前遍历的桶是否到最后一个桶了 if bucket == bucketShift(it.B) { bucket = 0 it.wrapped = true // 已经从第一个随机定位的 bucket 遍历到最后一个 bucket 了,下一个应该是第一个 bucket 了 } // 开始遍历新的 bucket 的时候,重置 i i = 0 } // 开始遍历桶内的键值对 for ; i < bucketCnt; i++ { // 计算桶内键值对的下标(从随机的 it.offset 下标开始遍历) offi := (i + it.offset) & (bucketCnt - 1) // 如果槽是空的,或者 key 已经迁移,则跳过。 if isEmpty(b.tophash[offi]) || b.tophash[offi] == evacuatedEmpty { continue } // 桶内第 i 个元素的 key 的指针 // 获取 key 或 elem 的解析前面的小节有详细的解释了,不再赘述。 k := add(unsafe.Pointer(b), dataOffset+uintptr(offi)*uintptr(t.keysize)) if t.indirectkey() { k = *((*unsafe.Pointer)(k)) } // 桶内第 i 个元素的 elem 的指针 e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+uintptr(offi)*uintptr(t.elemsize)) // 增量扩容的 key 判断 // 需要判断迁移之后的 key 落入的是 h.buckets 的前半部分还是后半部分(x 还是 y) // 具体看上面的迁移实现。 if checkBucket != noCheck && !h.sameSizeGrow() { if t.reflexivekey() || t.key.equal(k, k) { // key 是可比较的 // 如果这个 key 将会被迁移到 h.buckets 的后半部分,跳过它 hash := t.hasher(k, uintptr(h.hash0)) if hash&bucketMask(it.B) != checkBucket { continue } } else { // 对于不可比较的 key, // 是由 tophash 的最低位来决定迁移到前半部分还是后半部分的。 // 取 checkBucket 的最高位来比较,因为 checkBucket 的最高位决定了 // 当前遍历的 bucket 是上半部分还是后半部分的 bucket。 if checkBucket>>(it.B-1) != uintptr(b.tophash[offi]&1) { // 如果当前的 key 没有落入当前遍历的 bucket,则跳过它 continue } } } if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) || !(t.reflexivekey() || t.key.equal(k, k)) { // 上面的判断: // 1、表示 key 还没有迁移。 // 2、或者,key 是不可比较的。 // 则直接返回 it.key = k if t.indirectelem() { e = *((*unsafe.Pointer)(e)) } it.elem = e } else { // 自迭代器启动以来,哈希表已扩容。 // key 已经被迁移到其他地方。 // 检查当前哈希表中的数据。 // 此代码处理已删除、更新或删除并重新插入 key 的情况。 // 注意:我们需要重新标记 key,因为它可能已更新为 equal() 但不是相同的 key(例如 +0.0 vs -0.0)。 // 获取当前遍历到的 key/elem。 rk, re := mapaccessK(t, h, k) if rk == nil { // key 已经被删除了,继续遍历下一个 key continue } // 外部是通过 it 的 key/elem 来获取当前遍历到的键值对的 it.key = rk it.elem = re } // 记录当前迭代的 bucket it.bucket = bucket // 遍历到下一个 bucket 了,更新 bptr if it.bptr != b { // avoid unnecessary write barrier; see issue 14921 it.bptr = b } // 迭代器的 i 指向 bucket 内的下一个 key it.i = i + 1 // 记录是否需要检查 key 的标记 it.checkBucket = checkBucket // 找到了键值对(保存在 it.key/it.elem 中了),返回。 // mapiternext 外部可以通过 hiter 的 key/elem 属性来获取当前遍历到的 key/val。 // 遍历下一个元素的时候,再次调用 mapiternext 函数。 //(无所谓,hiter 会记住迭代到哪里的) // 也即:每遍历一个元素,调用一次 mapiternext 函数。 return } // 当前的 bucket 遍历完了,那么继续从溢出桶中查找下一个元素。 // b 指向溢出桶,迭代溢出桶 b = b.overflow(t) // 遍历下一个桶了,key 从 0 开始遍历 i = 0 // 遍历当前 bucket 的溢出桶 goto next }
小结
- 哈希表相比数组,可以很快速地查找,所以常见的编程语言都会有对应的实现。
- 哈希冲突有两种解决方法:开放地址法和链表法,go 里的
map
使用的是链表法。 - 哈希表初始化分配的是比较小的内存,只能存放少量键值对,但是随着我们插入的数据越来越多,哈希表会进行扩容,防止哈希表的读写性能下降。
- go 的
map
扩容有两种方式:键值对太多的时候会进行 2 倍扩容,溢出桶太多会进行等量扩容。 map
中使用buckets
来存储桶,一个桶里面可以存储8
个键值对,一个桶满了的时候,会创建溢出桶来保存多出来的key
。map
中桶的结构体是bmap
,它里面的会将所有key
连续存储,所有的value
也会连续存储。map
定位key
的时候,会使用哈希值与B
的掩码做&
运算(我们可以将其理解为一种模运算的另外一种实现),从而得到bucket
的下标,然后遍历这个bucket
中的每一个槽。先比较tophash
,如果tophash
相等再比较key
,如果tophash
和key
都相等,则表明找到了我们要找的key
。如果这两者有一个不等,继续比较下一个key
。- 如果一个
bucket
中的所有key
被遍历完了也没有找到,那么继续从溢出桶中查找。 map
读取数据是通过mapaccess1
、mapaccess2
、mapaccessK
实现的,对于整型键值的map
有优化的mapaccess
实现(对于bmap
里键值的访问更加高效)。map
写入和修改数据都是通过mapassign
函数实现的,这个函数在找不到key
的时候会进行插入操作。map
删除数据是通过mapdelete
函数实现的。删除的时候会需要将bucket
末尾的所有空的槽的标记更新为emptyRest
。map
扩容的条件有两个:超过负载因子、溢出桶太多。只有在增量扩容的时候,key
所对应的bucket
的下标才有可能发生变化。map
扩容的时候,会迁移当前正在写入、删除的bucket
,同时也会从第一个bucket
开始迁移,一次写操作会迁移两个bucket
。这样可以保证在一定的写操作以后,所有bucket
都能迁移完成。- 等量扩容的时候,
key
在新桶中的bucket
下标不变,但是key
在桶内的分布会更加地紧凑,从而会提高查找效率。 map
的迭代是通过hiter
结构体来实现的,迭代的过程中hiter
会记录当前的bucket
、key
,普通桶迭代完后,迭代溢出桶。map
的迭代是通过mapiternext
函数实现的,每次获取键值对都是通过这个mapiternext
函数。map
迭代如果发生在增量扩容的时候,对于未迁移的bucket
,会判断key
的bucket
是否会发生变化,如果key
对应的bucket
已经改变,则迭代的时候会跳过。