参考
Golang map 实现原理 Golang sync.Map 实现原理
Map 概述
map 又称字典,是一种常用的数据结构,核心特征包含下述三点:
(1)存储基于 key-value 对映射的模式;
(2)基于 key 维度实现存储数据的去重;
(3)读、写、删操作控制,时间复杂度 O(1).
Map 初始化
Golang 中,对 map 的初始化分为以下几种方式:
myMap1 := make(map[int]int,2)
//通过 make 关键字进行初始化,同时指定 map 预分配的容量.
myMap2 := make(map[int]int)
//通过 make 关键字进行初始化,不显式声明容量,因此默认容量 为 0.
myMap3 :=map[int]int{
1:2,
3:4,
}
Map 的 key 可以是哪些类型?可以嵌套 map 吗?
-
Map key 必须是可比较的类型,
- 语言规范中定义了可比较的类型:boolean, numeric, string, pointer, channel, interface,以及仅包含这些类型的 struct 和 array 。
- 不能作为 map key 的类型有:slice,map, function。
-
可以嵌套 map。
Map 读
- 第一种方式是直接读,倘若 key 存在,则获取到对应的 val,倘若 key 不存在或者 map 未初始化,会返回 val 类型的零值作为兜底
v1 := myMap[10]
- 第二种方式是读的同时添加一个 bool 类型的 flag 标识是否读取成功. 倘若 ok == false,说明读取失败, key 不存在,或者 map 未初始化.
v2,ok := myMap[10]
- 如果 map 没有初始化,取值得到零值
package main
import "fmt"
func main() {
var myMap map[string]int // 未初始化的 map
value := myMap["some_key"] // 尝试获取一个键的值
fmt.Println(value)
}
//panic: assignment to entry in nil map
Map 的查询复杂度
空间复杂度:
由于溢出桶数量超过 hash 桶数量时会触发缩容,所以最坏的情况是数据被集中在一条链上,hash 表基本是空的,这时空间浪费 O (n)。
最好的情况下,数据均匀散列在 hash 表上,没有元素溢出,这时最好的空间复杂度就是扩散因子决定了,当前 go 的扩散因子由全局变量决定,即 loadFactorNum/loadFactorDen = 6.5。即平均每个 hash 桶被分配到 6.5 个元素以上时,开始扩容。所以最小的空间浪费是 (8-6.5)/8 = 0.1875,即 O (0.1875n)
结论:go map 的空间复杂度(指除去正常存储元素所需空间之外的空间浪费)是 O (0.1875 n) ~ O (n)之间。
具体细节:https://blog.csdn.net/dongjijiaoxiangqu/article/details/109643025
时间复杂度:
Go 采用的 hash 算法应是很成熟的算法,极端情况暂不考虑。所以综合情况下 go map 的时间复杂度应为 O(1)
Map 写
myMap[5] = 6
倘若 map 未初始化,直接执行写操作会导致 panic:
const plainError string
panic(plainError("assignment to entry in nil map"))
Map 删除
delete(myMap,5)
执行 delete 方法时,倘若 key 存在,则会从 map 中将对应的 key-value 对删除;倘若 key 不存在或 map 未初始化,则方法直接结束,不会产生显式提示.
Map 遍历
遍历分为下面两种方式:
for k,v := range myMap{
// ...
}
基于 k,v 依次承接 map 中的 key-value 对;
for k := range myMap{
// ...
}
基于 k 依次承接 map 中的 key,不关注 val 的取值.
需要注意的是,在执行 map 遍历操作时,获取的 key-value 对并没有一个固定的顺序,因此前后两次遍历顺序可能存在差异.
map 的底层实现原理?
map 又称为 hash map,在算法上基于 hash 实现 key 的映射和寻址;在数据结构上基于桶数组实现 key-value 对的存储.
以一组 key-value 对写入 map 的流程为例进行简述:
(1)通过哈希方法取得 key 的 hash 值;
(2)hash 值对桶数组长度取模,确定其所属的桶;
(3)在桶中插入 key-value 对.
Hash
hash 译作散列,是一种将任意长度的输入压缩到某一固定长度的输出摘要的过程
- (1)hash 的可重入性:相同的 key,必然产生相同的 hash 值;
- (2)hash 的离散性:只要两个 key 不相同,不论其相似度的高低,产生的 hash 值会在整个输出域内均匀地离散化;
- (3)hash 的单向性:企图通过 hash 值反向映射回 key 是无迹可寻的.
- (4)hash 冲突:由于输入域(key)无穷大,输出域(hash 值)有限,因此必然存在不同 key 映射到相同 hash 值的情况,称之为 hash 冲突.
桶数组
map 中,会通过长度为 2 的整数次幂的桶数组进行 key-value 对的存储:
(1)每个桶固定可以存放 8 个 key-value 对;
(2)倘若超过 8 个 key-value 对打到桶数组的同一个索引当中,此时会通过创建桶链表的方式来化解这一问题.
map 冲突的解决方式?
比较常用的 Hash 冲突解决方案有链地址法和开放寻址法:
链地址法
当哈希冲突发生时,创建新单元,并将新单元添加到冲突单元所在链表的尾部. 在 Go 中将命中同一个桶的元素通过链表的形式进行链接,因此很便于动态扩展
开放寻址法
当哈希冲突发生时,从发生冲突的那个单元起,按照一定的次序,从哈希表中寻找一个空闲的单元,然后把发生冲突的元素存入到该单元。开放寻址法需要的表长度要大于等于所需要存放的元素数量
开放寻址法有多种方式:线性探测法、平方探测法、随机探测法和双重哈希法。这里以线性探测法来帮助读者理解开放寻址法思想
两种解决方案比较
方法 | 优点 |
---|---|
拉链法 | 简单常用;无需预先为元素分配内存. |
开放寻址法 | 无需额外的指针用于链接元素;内存地址完全连续,可以基于局部性原理,充分利用 CPU 高速缓存. |
总结
GO 中 实际上结合了拉链法和开放寻址法两种思路. 以 map 的插入写流程为例,进行思路阐述:
-
(1)桶数组中的每个桶,严格意义上是一个单向桶链表,以桶为节点进行串联;
-
(2)每个桶固定可以存放 8 个 key-value 对;
-
(3)当 key 命中一个桶时,首先根据开放寻址法,在桶的 8 个位置中寻找空位进行插入;
-
(4)倘若桶的 8 个位置都已被占满,则基于桶的溢出桶指针,找到下一个桶,重复第(3)步;
-
(5)倘若遍历到链表尾部,仍未找到空位,则基于拉链法,在桶链表尾部续接新桶,并插入 key-value 对.
扩容
map 扩容机制的核心点包括:
-
(1)扩容分为增量扩容和等量扩容;
- 双倍扩容:新建一个 buckets 数组,新的 buckets 大小是原来的 2 倍,然后旧 buckets 数据搬迁到新的 buckets。该方法我们称之为双倍扩容
- 等量扩容:并不扩大容量,buckets 数量维持不变,重新做一遍类似双倍扩容的搬迁动作,把松散的键值对重新排列一次,使得同一个 bucket 中的 key 排列地更紧密,节省空间,提高 bucket 利用率,进而保证更快的存取。该方法我们称之为等量扩容。
-
(2)当桶内 key-value 总数/桶数组长度 > 6.5 时发生增量扩容,桶数组长度增长为原值的两倍;
-
(3)当桶内溢出桶数量大于等于 2^B 时( B 为桶数组长度的指数,B 最大取 15),发生等量扩容,桶的长度保持为原值;
-
(4)采用渐进扩容的方式,当桶被实际操作到时,由使用者负责完成数据迁移,避免因为一次性的全量数据迁移引发性能抖动.
hmap 结构体
Go 中的 map 是一个应用,占用 8 个字节,指向 hmap 结构体
源码包中 src/runtime/map.go
定义了 hmap 的数据结构:
Hmap 包含若干个结构为 bmap 的数组,每个 bmap 底层都采用链表结构,bmap 通常叫其 bucket
// A header for a Go map.
type hmap struct {
count int
// 代表哈希表中的元素个数,调用len(map)时,返回的就是该字段值。
flags uint8
// 状态标志(是否处于正在写入的状态等)
B uint8
// buckets(桶)的对数
// 如果B=5,则buckets数组的长度 = 2^B=32,意味着有32个桶
noverflow uint16
// 溢出桶的数量
hash0 uint32
// 生成hash的随机数种子
buckets unsafe.Pointer
// 指向buckets数组的指针,数组大小为2^B,如果元素个数为0,它为nil。
oldbuckets unsafe.Pointer
// 如果发生扩容,oldbuckets是指向老的buckets数组的指针,老的buckets数组大小是新的buckets的1/2;非扩容状态下,它为nil。
nevacuate uintptr
// 表示扩容进度,小于此地址的buckets代表已搬迁完成。
extra *mapextra
// 存储溢出桶,这个字段是为了优化GC扫描而设计的,下面详细介绍
}
-
(1)count:map 中的 key-value 总数;
-
(2)flags:map 状态标识,可以标识出 map 是否被 goroutine 并发读写;
-
(3)B:桶数组长度的指数,桶数组长度为 2^B;
-
(4)noverflow:map 中溢出桶的数量;
-
(5)hash0:hash 随机因子,生成 key 的 hash 值时会使用到;
-
(6)buckets:桶数组;
-
(7)oldbuckets:扩容过程中老的桶数组;
-
(8)nevacuate:扩容时的进度标识,index 小于 nevacuate 的桶都已经由老桶转移到新桶中;
-
(9)extra:预申请的溢出桶.
bmap 结构体
bmap
就是我们常说的“桶”,一个桶里面会最多装 8 个 key,
- 这些 key 之所以会落入同一个桶,是因为它们经过哈希计算后,哈希结果的低 B 位是相同的,
- 在桶内,又会根据 key 计算出来的 hash 值的高 8 位来决定 key 到底落入桶内的哪个位置(一个桶内最多有 8 个位置)。
// A bucket for a Go map.
type bmap struct {
tophash [bucketCnt]uint8
// len为8的数组
// 用来快速定位key是否在这个bmap中
// 一个桶最多8个槽位,如果key所在的tophash值在tophash中,则代表该key在这个桶中
}
上面 bmap 结构是静态结构,在编译过程中 runtime.bmap
会拓展成以下结构体:
type bmap struct{
tophash [8]uint8
keys [8]keytype
// keytype 由编译器编译时候确定
values [8]elemtype
// elemtype 由编译器编译时候确定
overflow uintptr
// overflow指向下一个bmap,overflow是uintptr而不是*bmap类型,保证bmap完全不含指针,是为了减少gc,溢出桶存储到extra字段中
}
-
Tophash 就是用于实现快速定位 key 的位置,在实现过程中会使用 key 的 hash 值的高 8 位作为 tophash 值,存放在 bmap 的 tophash 字段中
-
Tophash 字段不仅存储 key 哈希值的高 8 位,还会存储一些状态值,用来表明当前桶单元状态,这些状态值都是小于 minTopHash 的
-
为了避免 key 哈希值的高 8 位值和这些状态值相等,产生混淆情况,所以当 key 哈希值高 8 位若小于 minTopHash 时候,自动将其值加上 minTopHash 作为该 key 的 tophash。桶单元的状态值如下:
emptyRest = 0 // 表明此桶单元为空,且更高索引的单元也是空
emptyOne = 1 // 表明此桶单元为空
evacuatedX = 2 // 用于表示扩容迁移到新桶前半段区间
evacuatedY = 3 // 用于表示扩容迁移到新桶后半段区间
evacuatedEmpty = 4 // 用于表示此单元已迁移
minTopHash = 5 // key的tophash值与桶状态值分割线值,小于此值的一定代表着桶单元的状态,大于此值的一定是key对应的tophash值
func tophash(hash uintptr) uint8 {
top := uint8(hash >> (goarch.PtrSize*8 - 8))
if top < minTopHash {
top += minTopHash
}
return top
}
总结
Bmap(bucket)内存数据结构可视化如下:
mapextra 结构体
当 map 的 key 和 value 都不是指针类型时候,bmap 将完全不包含指针,那么 gc 时候就不用扫描 bmap。Bmap 指向溢出桶的字段 overflow 是 uintptr 类型,为了防止这些 overflow 桶被 gc 掉,所以需要 mapextra. Overflow 将它保存起来。如果 bmap 的 overflow 是 bmap 类型,那么 gc 扫描的是一个个拉链表,效率明显不如直接扫描一段内存 (hmap. Mapextra. Overflow)
type mapextra struct {
overflow *[]*bmap
// overflow 包含的是 hmap.buckets 的 overflow 的 buckets
oldoverflow *[]*bma
// oldoverflow 包含扩容时 hmap.oldbuckets 的 overflow 的 bucket
nextOverflow *bmap
// 指向空闲的 overflow bucket 的指针
}
在 map 初始化时,倘若容量过大,会提前申请好一批溢出桶,以供后续使用,这部分溢出桶存放在 hmap.mapextra 当中:
(1)mapextra.overflow:供桶数组 buckets 使用的溢出桶;
(2)mapextra.oldoverFlow: 扩容流程中,供老桶数组 oldBuckets 使用的溢出桶;
(3)mapextra.nextOverflow:下一个可用的溢出桶.
Map 的负载因子为什么是 6.5?
什么是负载因子?
负载因子(load factor),用于衡量当前哈希表中空间占用率的核心指标,也就是每个 bucket 桶存储的平均元素个数。
负载因子 = 哈希表存储的元素个数/桶个 |
---|
另外负载因子与扩容、迁移等重新散列(rehash)行为有直接关系:
- 在程序运行时,会不断地进行插入、删除等,会导致 bucket 不均,内存利用率低,需要迁移。
- 在程序运行时,出现负载因子过大,需要做扩容,解决 bucket 过大的问题。
负载因子是哈希表中的一个重要指标,在各种版本的哈希表实现中都有类似的东西,主要目的是为了平衡 buckets 的存储空间大小和查找元素时的性能高低。
在接触各种哈希表时都可以关注一下,做不同的对比,看看各家的考量。
为什么是 6.5?
为什么 Go 语言中哈希表的负载因子是 6.5,为什么不是 8 ,也不是 1。这里面有可靠的数据支撑吗?
测试报告
实际上这是 Go 官方的经过认真的测试得出的数字,一起来看看官方的这份测试报告。
报告中共包含 4 个关键指标,如下:
loadFactor | %overflow | bytes/entry | hitprobe | missprobe |
---|---|---|---|---|
4.00 | 2.13 | 20.77 | 3.00 | 4.00 |
4.50 | 4.05 | 17.30 | 3.25 | 4.50 |
5.00 | 6.85 | 14.77 | 3.50 | 5.00 |
5.50 | 10.55 | 12.94 | 3.75 | 5.50 |
6.00 | 15.27 | 11.67 | 4.00 | 6.00 |
6.50 | 20.90 | 10.79 | 4.25 | 6.50 |
7.00 | 27.14 | 10.15 | 4.50 | 7.00 |
7.50 | 34.03 | 9.73 | 4.75 | 7.50 |
8.00 | 41.10 | 9.40 | 5.00 | 8.00 |
- LoadFactor:负载因子,也有叫装载因子。
- %overflow:溢出率,有溢出 bukcet 的百分比。
- Bytes/entry:平均每对 key/value 的开销字节数.
- Hitprobe:查找一个存在的 key 时,要查找的平均个数。
- Missprobe:查找一个不存在的 key 时,要查找的平均个数。
选择数值
Go 官方发现:装载因子越大,填入的元素越多,空间利用率就越高,但发生哈希冲突的几率就变大。反之,装载因子越小,填入的元素越少,冲突发生的几率减小,但空间浪费也会变得更多,而且还会提高扩容操作的次数
根据这份测试结果和讨论,Go 官方取了一个相对适中的值,把 Go 中的 map 的负载因子硬编码为 6.5,这就是 6.5 的选择缘由。
这意味着在 Go 语言中,当 map 存储的元素个数大于或等于 6.5 * 桶个数时,就会触发扩容行为。
构造流程
创建 map 时,实际上会调用 runtime/map.go 文件中的 makemap 方法,下面对源码展开分析:
func makemap(t *maptype, hint int, h *hmap) *hmap {
mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
if overflow || mem > maxAlloc {
hint = 0
}
if h == nil {
h = new(hmap)
}
h.hash0 = fastrand()
B := uint8(0)
for overLoadFactor(hint, B) {
B++
}
h.B = B
if h.B != 0 {
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
return
-
(1)hint 为 map 拟分配的容量;在分配前,会提前对拟分配的内存大小进行判断,倘若超限,会将 hint 置为零;
-
(2)通过 new 方法初始化 hmap;
-
(3)调用 fastrand,构造 hash 因子:hmap.hash0;
-
(4)大致上基于 log2(B) >= hint 的思路,计算桶数组的容量 B;
- overLoadFactor 中实现
- 倘若 map 预分配容量小于等于 8,B 取 0,桶的个数为 1;
- 保证 map 预分配容量小于等于桶数组长度 * 6.5.
-
(5)调用 makeBucketArray 方法,初始化桶数组 hmap.buckets
- makeBucketArray 方法会进行桶数组的初始化,并根据桶的数量决定是否需要提前作溢出桶的初始化.
-
(6)倘若 map 容量较大,会提前申请一批溢出桶 hmap.extra.
读流程
map 读流程主要分为以下几步:
(1)根据 key 取 hash 值;
(2)根据 hash 值对桶数组取模,确定所在的桶;
(3)沿着桶链表依次遍历各个桶内的 key-value 对;
(4)命中相同的 key,则返回 value;倘若 key 不存在,则返回零值.
map 读操作最终会走进 runtime/map.go 的 mapaccess 方法中,下面开始阅读源码
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if h == nil || h.count == 0 {
return unsafe.Pointer(&zeroVal[0])
}
if h.flags&hashWriting != 0 {
fatal("concurrent map read and map write")
}
hash := t.hasher(key, uintptr(h.hash0))
m := bucketMask(h.B)
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
if c := h.oldbuckets; c != nil {
if !h.sameSizeGrow() {
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
if !evacuated(oldb) {
b = oldb
}
}
top := tophash(hash)
bucketloop:
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
if t.key.equal(key, k) {
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
return e
}
}
}
return unsafe.Pointer(&zeroVal[0])
}
func (h *hmap) sameSizeGrow() bool {
return h.flags&sameSizeGrow != 0
}
func evacuated(b *bmap) bool {
h := b.tophash[0]
return h > emptyOne && h < minTopHash
}
(1)倘若 map 未初始化,或此时存在 key-value 对数量为 0,直接返回零值;
(2)倘若发现存在其他 goroutine 在写 map,直接抛出并发读写的 fatal error;其中,并发写标记,位于 hmap.flags 的第 3 个 bit 位;
(3)通过 maptype.hasher() 方法计算得到 key 的 hash 值,并对桶数组长度取模,取得对应的桶. 关于 hash 方法的内部实现,golang 并未暴露.
- 其中,bucketMast 方法会根据 B 求得桶数组长度 - 1 的值,用于后续的 & 运算,实现取模的效果:
(4)在取桶时,会关注当前 map 是否处于扩容的流程,倘若是的话,需要在老的桶数组 oldBuckets 中取桶,通过 evacuated 方法判断桶数据是已迁到新桶还是仍存留在老桶,倘若仍在老桶,需要取老桶进行遍历.
- 在取老桶前,会先判断 map 的扩容流程是否是增量扩容,倘若是的话,说明老桶数组的长度是新桶数组的一半,需要将桶长度值 m 除以 2.
(5)取 key hash 值的高 8 位值 top. 倘若该值 < 5,会累加 5,以避开 0 ~ 4 的取值. 因为这几个值会用于枚举,具有一些特殊的含义.
(6)开启两层 for 循环进行遍历流程,外层基于桶链表,依次遍历首个桶和后续的每个溢出桶,内层依次遍历一个桶内的 key-value 对.
-
内存遍历时,首先查询高 8 位的 tophash 值,看是否和 key 的 top 值匹配.
-
倘若不匹配且当前位置 tophash 值为 0,说明桶的后续位置都未放入过元素,当前 key 在 map 中不存在,可以直接打破循环,返回零值.
-
倘若找到了相等的 key,则通过地址偏移的方式取到 value 并返回.
其中 dataOffset 为一个桶中 tophash 数组所占用的空间大小.
写流程
map 写流程主要分为以下几步:
(1)根据 key 取 hash 值;
(2)根据 hash 值对桶数组取模,确定所在的桶;
(3)倘若 map 处于扩容,则迁移命中的桶,帮助推进渐进式扩容;
(4)沿着桶链表依次遍历各个桶内的 key-value 对;
(5)倘若命中相同的 key,则对 value 中进行更新;
(6)倘若 key 不存在,则插入 key-value 对;
(7)倘若发现 map 达成扩容条件,则会开启扩容模式,并重新返回第(2)步.
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if h == nil {
panic(plainError("assignment to entry in nil map"))
}
if h.flags&hashWriting != 0 {
fatal("concurrent map writes")
}
hash := t.hasher(key, uintptr(h.hash0))
h.flags ^= hashWriting
if h.buckets == nil {
h.buckets = newobject(t.bucket)
}
again:
bucket := hash & bucketMask(h.B)
if h.growing() {
growWork(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
top := tophash(hash)
var inserti *uint8
var insertk unsafe.Pointer
var elem unsafe.Pointer
bucketloop:
for {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if isEmpty(b.tophash[i]) && inserti == nil {
inserti = &b.tophash[i]
insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
}
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
if !t.key.equal(key, k) {
continue
}
if t.needkeyupdate() {
typedmemmove(t.key, k, key)
}
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
goto done
}
ovf := b.overflow(t)
if ovf == nil {
break
}
b = ovf
}
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again
}
if inserti == nil {
newb := h.newoverflow(t, b)
inserti = &newb.tophash[0]
insertk = add(unsafe.Pointer(newb), dataOffset)
elem = add(insertk, bucketCnt*uintptr(t.keysize))
}
if t.indirectkey() {
kmem := newobject(t.key)
*(*unsafe.Pointer)(insertk) = kmem
insertk = kmem
}
if t.indirectelem() {
vmem := newobject(t.elem)
*(*unsafe.Pointer)(elem) = vmem
}
typedmemmove(t.key, insertk, key)
*inserti = top
h.count++
done:
if h.flags&hashWriting == 0 {
fatal("concurrent map writes")
}
h.flags &^= hashWriting
if t.indirectelem() {
elem = *((*unsafe.Pointer)(elem))
}
return
}
(1)写操作时,倘若 map 未初始化,直接 panic;
(2)倘若其他 goroutine 在进行写或删操作,抛出并发写 fatal error;
(3)通过 maptype.hasher() 方法求得 key 对应的 hash 值;
(4)通过异或位运算,将 map.flags 的第 3 个 bit 位置为 1,添加写标记;
(5)倘若 map 的桶数组 buckets 未空,则对其进行初始化;
(6)找到当前 key 对应的桶索引 bucket;
(7)倘若发现当前 map 正处于扩容过程,则帮助其渐进扩容,具体内容在第 9 节中再作展开;
(8)从 map 的桶数组 buckets 出发,结合桶索引和桶容量大小,进行地址偏移,获得对应桶 b;
(9)取得 key 的高 8 位 tophash:
(10)提前声明好的三个指针,用于指向存放 key-value 的空槽:
inserti:tophash 拟插入位置;
insertk:key 拟插入位置 ;
elem:val 拟插入位置;
(11)开启两层 for 循环,外层沿着桶链表依次遍历,内层依次遍历桶内的 key-value 对:
(12)倘若 key 的 tophash 和当前位置 tophash 不同,则会尝试将 inserti、insertk elem 调整指向首个空位,用于后续的插入操作.
-
倘若发现当前位置 tophash 标识为 emtpyRest(0),则说明当前桶链表后续位置都未空,无需继续遍历,直接 break 遍历流程即可.
-
倘若桶中某个位置的 tophash 标识为 emptyOne(1),说明当前位置未放入元素,倘若为 emptyRest(0),说明包括当前位置在内,此后的位置都为空.
(13)倘若找到了相等的 key,则执行更新操作,并且直接跳转到方法的 done 标志位处,进行收尾处理
(14)倘若没找到相等的 key,会在执行插入操作前,判断 map 是否需要开启扩容模式. 这部分内容在第 9 节中作展开.
倘若需要扩容,会在开启扩容模式后,跳转回 again 标志位,重新开始桶的定位以及遍历流程.
(15)倘若遍历完桶链表,都没有为当前待插入的 key-value 对找到空位,则会创建一个新的溢出桶,挂载在桶链表的尾部,并将 inserti、insertk、elem 指向溢出桶的首个空位:
创建溢出桶时:
-
I 倘若 hmap.extra 中还有剩余可用的溢出桶,则直接获取 hmap.extra.nextOverflow,并将 nextOverflow 调整指向下一个空闲可用的溢出桶;
-
II 倘若 hmap 已经没有空闲溢出桶了,则创建一个新的溢出桶.
-
III hmap 的溢出桶数量 hmap.noverflow 累加 1;
-
IV 将新获得的溢出桶添加到原桶链表的尾部;
-
V 返回溢出桶.
(16)将 tophash、key、value 插入到取得空位中,并且将 map 的 key-value 对计数器 count 值加 1;
(17)收尾环节,再次校验是否有其他协程并发写,倘若有,则抛 fatal error. 将 hmap.flags 中的写标记抹去,然后退出方法.
删除流程
map 删楚 kv 对流程主要分为以下几步:
(1)根据 key 取 hash 值;
(2)根据 hash 值对桶数组取模,确定所在的桶;
(3)倘若 map 处于扩容,则迁移命中的桶,帮助推进渐进式扩容;
(4)沿着桶链表依次遍历各个桶内的 key-value 对;
(5)倘若命中相同的 key,删除对应的 key-value 对;并将当前位置的 tophash 置为 emptyOne,表示为空;
(6)倘若当前位置为末位,或者下一个位置的 tophash 为 emptyRest,则沿当前位置向前遍历,将毗邻的 emptyOne 统一更新为 emptyRest.
map 删操作最终会走进 runtime/map.go 的 mapdelete 方法中,下面开始阅读源码:
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
if h == nil || h.count == 0 {
return
}
if h.flags&hashWriting != 0 {
fatal("concurrent map writes")
}
hash := t.hasher(key, uintptr(h.hash0))
h.flags ^= hashWriting
bucket := hash & bucketMask(h.B)
if h.growing() {
growWork(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
bOrig := b
top := tophash(hash)
search:
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
break search
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
if !t.key.equal(key, k2) {
continue
}
// Only clear key if there are pointers in it.
if t.indirectkey() {
*(*unsafe.Pointer)(k) = nil
} else if t.key.ptrdata != 0 {
memclrHasPointers(k, t.key.size)
}
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
if t.indirectelem() {
*(*unsafe.Pointer)(e) = nil
} else if t.elem.ptrdata != 0 {
memclrHasPointers(e, t.elem.size)
} else {
memclrNoHeapPointers(e, t.elem.size)
}
b.tophash[i] = emptyOne
if i == bucketCnt-1 {
if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
goto notLast
}
} else {
if b.tophash[i+1] != emptyRest {
goto notLast
}
}
for {
b.tophash[i] = emptyRest
if i == 0 {
if b == bOrig {
break
}
c := b
for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
}
i = bucketCnt - 1
} else {
i--
}
if b.tophash[i] != emptyOne {
break
}
}
notLast:
h.count--
if h.count == 0 {
h.hash0 = fastrand()
}
break search
}
}
if h.flags&hashWriting == 0 {
fatal("concurrent map writes")
}
h.flags &^= hashWritin
(1)倘若 map 未初始化或者内部 key-value 对数量为 0,删除时不会报错,直接返回;
(2)倘若存在其他 goroutine 在进行写或删操作,抛出并发写的 fatal error;
(3)通过 maptype.hasher() 方法求得 key 对应的 hash 值;
(4)通过异或位运算,将 map.flags 的第 3 个 bit 位置为 1,添加写标记;
(5)找到当前 key 对应的桶索引 bucket;
(6)倘若发现当前 map 正处于扩容过程,则帮助其渐进扩容
(7)从 map 的桶数组 buckets 出发,结合桶索引和桶容量大小,进行地址偏移,获得对应桶 b,并赋值给 bOrg
(8)取得 key 的高 8 位 tophash:
(9)开启两层 for 循环,外层沿着桶链表依次遍历,内层依次遍历桶内的 key-value 对
(10)遍历时,倘若发现当前位置 tophash 值为 emptyRest,则直接结束遍历流程:
(11)倘若 key 不相等,则继续遍历:
(12)倘若 key 相等,则删除对应的 key-value 对,并且将当前位置的 tophash 置为 emptyOne:
(13)倘若当前位置不位于最后一个桶的最后一个位置,或者当前位置的后置位 tophash 不为 emptyRest,则无需向前遍历更新 tophash 标识,直接跳转到 notLast 位置即可;
(14)向前遍历,将沿途的空位( tophash 为 emptyOne )的 tophash 都更新为 emptySet.
(15)倘若成功从 map 中删除了一组 key-value 对,则将 hmap 的计数器 count 值减 1. 倘若 map 中的元素全都被删除完了,会为 map 更换一个新的随机因子 hash0.
(16)收尾环节,再次校验是否有其他协程并发写,倘若有,则抛 fatal error. 将 hmap.flags 中的写标记抹去,然后退出方法.
遍历流程
Map 的遍历流程首先会走进 runtime/map.go 的 mapiterinit() 方法当中,初始化用于遍历的迭代器 hiter;接着会调用 runtime/map.go 的 mapiternext() 方法开启遍历流程.
迭代器数据结构
type hiter struct {
key unsafe.Pointer
elem unsafe.Pointer
t *maptype
h *hmap
buckets unsafe.Pointer
bptr *bmap
overflow *[]*bmap
oldoverflow *[]*bmap
startBucket uintptr
offset uint8
wrapped bool
B uint8
i uint8
bucket uintptr
checkBucket uintptr
}
hiter 是遍历 map 时用于存放临时数据的迭代器:
(1)key:指向遍历得到 key 的指针;
(2)value:指向遍历得到 value 的指针;
(3)t:map 类型,包含了 key、value 类型大小等信息;
(4)h:map 的指针;
(5)buckets:map 的桶数组;
(6)bptr:当前遍历到的桶;
(7)overflow:新老桶数组对应的溢出桶;
(8)startBucket:遍历起始位置的桶索引;
(9)offset:遍历起始位置的 key-value 对索引;
(10)wrapped:遍历是否穿越桶数组尾端回到头部了;
(11)B:桶数组的长度指数;
(12)i:当前遍历到的 key-value 对在桶中的索引;
(13)bucket:当前遍历到的桶;
(14)checkBucket:因为扩容流程的存在,需要额外检查的桶.
map 遍历流程开始时,首先会走进 runtime/map.go 的 mapiterinit() 方法当中,此时会对创建 map 迭代器 hiter,并且通过取随机数的方式,决定遍历的起始桶号,以及起始 key-value 对索引号.
1)遍历时发现其他 goroutine 在并发写,直接抛出 fatal error:
(2)开启最外圈的循环,依次遍历桶数组中的每个桶链表,通过 next 和 goto next 关键字实现循环代码块;
(3)倘若已经遍历完所有的桶,重新回到起始桶为止,则直接结束方法;
(4)倘若 map 处于扩容流程,取桶时兼容新老桶数组的逻辑. 倘若桶处于旧桶数组且未完成迁移,需要将 checkBucket 置为当前的桶号;
5)遍历的桶号加 1,倘若来到桶数组末尾,则将桶号置为 0. 将 key-value 对的遍历索引 i 置为 0.
(6)依次遍历各个桶中每个 key-value 对:
(7)倘若遍历到的桶属于旧桶数组未迁移完成的桶,需要按照其在新桶中的顺序完成遍历. 比如,增量扩容流程中,旧桶中的 key-value 对最终应该被分散迁移到新桶数组的 x、y 两个区域,则此时遍历时,哪怕 key-value 对仍存留在旧桶中未完成迁移,遍历时也应该严格按照其在新桶数组中的顺序来执行.
(8)执行 mapaccessK 方法,基于读流程方法获取 key-value 对,通过迭代 hiter 的 key、value 指针进行接收,用于对用户的遍历操作进行响应:
扩容流程
扩容类型
(1)增量扩容
表现:扩容后,桶数组的长度增长为原长度的 2 倍;
目的:降低每个桶中 key-value 对的数量,优化 map 操作的时间复杂度.
(2)等量扩容
表现:扩容后,桶数组的长度和之前保持一致;但是溢出桶的数量会下降.
目的:提高桶主体结构的数据填充率,减少溢出桶数量,避免发生内存泄漏
何时扩容
(1)只有 map 的写流程可能开启扩容模式;
(2)写 map 新插入 key-value 对之前,会发起是否需要扩容的逻辑判断:
(3)根据 hmap 的 oldbuckets 是否空,可以判断 map 此前是否已开启扩容模式:
(4)倘若此前未进入扩容模式,且 map 中 key-value 对的数量超过 8 个,且大于桶数组长度的 6.5 倍,则进入增量扩容:
(5)倘若溢出桶的数量大于 2^B 个(即桶数组的长度;B 大于 15 时取15),则进入等量扩容:
如何开启扩容模式
开启扩容模式的方法位于 runtime/map.go 的 hashGrow 方法中:
func hashGrow(t *maptype, h *hmap) {
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
bigger = 0
h.flags |= sameSizeGrow
}
oldbuckets := h.buckets
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
// commit the grow (atomic wrt gc)
h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
h.nevacuate = 0
h.noverflow = 0
if h.extra != nil && h.extra.overflow != nil {
// Promote current overflow buckets to the old generation.
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil")
}
h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
}
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
h.extra.nextOverflow = nextOverflow
}
(1)倘若是增量扩容,bigger 值取 1;倘若是等量扩容,bigger 值取 0,并将 hmap.flags 的第 4 个 bit 位置为 1,标识当前处于等量扩容流程 (2)将原桶数组赋值给 oldBuckets,并创建新的桶数组和一批新的溢出桶.
此处会通过变量 bigger,实现不同扩容模式下,新桶数组长度的区别处理. (3)更新 hmap 的桶数组长度指数 B,flag 标识,并将新、老桶数组赋值给 hmap.oldBuckets 和 hmap.buckets;扩容迁移进度 hmap.nevacuate 标记为 0;新桶数组的溢出桶数量 hmap.noverflow 置为 0.
(4)将原本存量可用的溢出桶赋给 hmap.extra.oldoverflow;倘若存在下一个可用的溢出桶,赋给 hmap.extra.nextOverflow.
扩容迁移规则
(1)在等量扩容中,新桶数组长度与原桶数组相同;
(2)key-value 对在新桶数组和老桶数组的中的索引号保持一致;
(3)在增量扩容中,新桶数组长度为原桶数组的两倍;
(4)把新桶数组中桶号对应于老桶数组的区域称为 x 区域,新扩展的区域称为 y 区域.
(5)实际上,一个 key 属于哪个桶,取决于其 hash 值对桶数组长度取模得到的结果,因此依赖于其低位的 hash 值结果.;
(6)在增量扩容流程中,新桶数组的长度会扩展一位,假定 key 原本从属的桶号为 i,则在新桶数组中从属的桶号只可能是 i (x 区域)或者 i + 老桶数组长度(y 区域);
(7)当 key 低位 hash 值向左扩展一位的 bit 位为 0,则应该迁往 x 区域的 i 位置;倘若该 bit 位为 1,应该迁往 y 区域对应的 i + 老桶数组长度的位置.
渐进式扩容
map 采用的是渐进扩容的方式,避免因为一次性的全量数据迁移引发性能抖动.
当每次触发写、删操作时,会为处于扩容流程中的 map 完成两组桶的数据迁移:
(1)一组桶是当前写、删操作所命中的桶;
(2)另一组桶是,当前未迁移的桶中,索引最小的那个桶.
数据迁移的逻辑位于 runtime/map.go 的 evacuate 方法当中:
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
// 入参中,oldbucket 为当前要迁移的桶在旧桶数组中的索引
// 获取到待迁移桶的内存地址 b
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
// 获取到旧桶数组的容量 newbit
newbit := h.noldbuckets()
// evacuated 方法判断出桶 b 是否已经迁移过了,未迁移过,才进入此 if 分支进行迁移处理
if !evacuated(b) {
// 通过一个二元数组 xy 指向当前桶可能迁移到的目的桶
// x = xy[0],代表新桶数组中索引和旧桶数组一致的桶
// y = xy[1],代表新桶数组中,索引为原索引加上旧桶容量的桶,只在增量扩容中会使用到
var xy [2]evacDst
x := &xy[0]
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.Pointer(x.b), dataOffset)
x.e = add(x.k, bucketCnt*uintptr(t.keysize))
// 只有进入增量扩容的分支,才需要对 y 进行初始化
if !h.sameSizeGrow() {
// Only calculate y pointers if we're growing bigger.
// Otherwise GC can see bad pointers.
y := &xy[1]
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
y.k = add(unsafe.Pointer(y.b), dataOffset)
y.e = add(y.k, bucketCnt*uintptr(t.keysize))
}
// 外层 for 循环,遍历桶 b 和对应的溢出桶
for ; b != nil; b = b.overflow(t) {
// k,e 分别记录遍历桶时,当前的 key 和 value 的指针
k := add(unsafe.Pointer(b), dataOffset)
e := add(k, bucketCnt*uintptr(t.keysize))
// 遍历桶内的 key-value 对
for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
top := b.tophash[i]
if isEmpty(top) {
b.tophash[i] = evacuatedEmpty
continue
}
if top < minTopHash {
throw("bad map state")
}
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
var useY uint8
if !h.sameSizeGrow() {
// Compute hash to make our evacuation decision (whether we need
// to send this key/elem to bucket x or bucket y).
hash := t.hasher(k2, uintptr(h.hash0))
if hash&newbit != 0 {
useY = 1
}
}
b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
dst := &xy[useY] // evacuation destination
if dst.i == bucketCnt {
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
}
dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check
if t.indirectkey() {
*(*unsafe.Pointer)(dst.k) = k2 // copy pointer
} else {
typedmemmove(t.key, dst.k, k) // copy elem
}
if t.indirectelem() {
*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
} else {
typedmemmove(t.elem, dst.e, e)
}
dst.i++
dst.k = add(dst.k, uintptr(t.keysize))
dst.e = add(dst.e, uintptr(t.elemsize))
}
}
// Unlink the overflow buckets & clear key/elem to help GC.
if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
// Preserve b.tophash because the evacuation
// state is maintained there.
ptr := add(b, dataOffset)
n := uintptr(t.bucketsize) - dataOffset
memclrHasPointers(ptr, n)
}
}
if oldbucket == h.nevacuate {
advanceEvacuationMark(h, t, newbit)
}
}
func (h *hmap) noldbuckets() uintptr {
oldB := h.B
if !h.sameSizeGrow() {
oldB--
}
return bucketShift(oldB)
(1)从老桶数组中获取到待迁移的桶 b;
(2)获取到老桶数组的长度 newbit;
(3)倘若当前桶已经完成了迁移,则无需处理;
(4)创建一个二元数组 xy,分别承载 x 区域和 y 区域(含义定义见 9.4 小节)中的新桶位置,用于接受来自老桶数组的迁移数组;只有在增量扩容的流程中,才存在 y 区域,因此才需要对 xy 中的 y 进行定义;
(5)开启两层 for 循环,外层遍历桶链表,内层遍历每个桶中的 key-value 对:
(6)取每个位置的 tophash 值进行判断,倘若当前是个空位,则将当前位置 tophash 值置为 evacuatedEmpty,开始遍历下一个位置:
(7)基于 9.4 的规则,寻找到迁移的目的桶;
其中目的桶的类型定义如下:
type evacDst struct {
b *bmap // current destination bucket
i int // key/elem index into b
k unsafe.Pointer // pointer to current key storage
e unsafe.Pointer // pointer to current elem storage
}
I evacDst.b:目的地的所在桶;
II evacDst.i:即将入桶的 key-value 对在桶中的索引;
III evacDst.k:入桶 key 的存储指针;
IV evacDst.e:入桶 value 的存储指针.
(8)将 key-value 对迁移到目的桶中,并且更新目的桶结构内几个指针的指向:
(9)倘若当前迁移的桶是旧桶数组未迁移的桶中索引最小的一个,则 hmap.nevacuate 累加 1.
倘若已经迁移完所有的旧桶,则会确保 hmap.flags 中,等量扩容的标识位被置为 0.
map 遍历为什么是无序的?
使用 range 多次遍历 map 时输出的 key 和 value 的顺序可能不同。这是 Go 语言的设计者们有意为之,旨在提示开发者们,Go 底层实现并不保证 map 遍历顺序稳定,请大家不要依赖 range 遍历结果顺序
主要原因有 2 点:
- Map 在遍历时,并不是从固定的 0 号 bucket 开始遍历的,每次遍历,都会从一个随机值序号的 bucket,再从其中随机的 cell开始遍历
- Map 遍历时,是按序遍历 bucket,同时按需遍历 bucket 中和其 overflow bucket 中的 cell。但是 map 在扩容后,会发生 key 的搬迁,这造成原来落在一个 bucket 中的 key,搬迁后,有可能会落到其他 bucket 中了,从这个角度看,遍历 map 的结果就不可能是按照原来的顺序了
Map 本身是无序的,且遍历时顺序还会被随机化,如果想顺序遍历 map,需要对 map key 先排序,再按照 key 的顺序遍历 map。
func TestMapRange(t *testing.T) {
m := map[int]string{1: "a", 2: "b", 3: "c"}
t.Log("first range:")
for i, v := range m {
t.Logf("m[%v]=%v ", i, v)
}
t.Log("\nsecond range:")
for i, v := range m {
t.Logf("m[%v]=%v ", i, v)
}
// 实现有序遍历
var sl []int
// 把 key 单独取出放到切片
for k := range m {
sl = append(sl, k)
}
// 排序切片
sort.Ints(sl)
// 以切片中的 key 顺序遍历 map 就是有序的了
for _, k := range sl {
t.Log(k, m[k])
}
}
map 为什么是非线程安全的?
Map 默认是并发不安全的,同时对 map 进行并发读写时,程序会 panic,原因如下:
Go 官方在经过了长时间的讨论后,认为 Go map 更应适配典型使用场景(不需要从多个 goroutine 中进行安全访问),而不是为了小部分情况(并发访问),导致大部分程序付出加锁代价(性能),决定了不支持。
场景: 2 个协程同时读和写,以下程序会出现致命错误:fatal error: concurrent map writes
package main
import (
"fmt"
"time"
)
func main() {
s := make(map[int]int)
for i := 0; i < 100; i++ {
go func(i int) {
s[i] = i
}(i)
}
for i := 0; i < 100; i++ {
go func(i int) {
fmt.Printf("map第%d个元素值是%d\n", i, s[i])
}(i)
}
time.Sleep(1 * time.Second)
}
如果想实现 map 线程安全,有两种方式:
方式一:使用读写锁 map
+ sync.RWMutex
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var lock sync.RWMutex
s := make(map[int]int)
for i := 0; i < 100; i++ {
go func(i int) {
lock.Lock()
s[i] = i
lock.Unlock()
}(i)
}
for i := 0; i < 100; i++ {
go func(i int) {
lock.RLock()
fmt.Printf("map第%d个元素值是%d\n", i, s[i])
lock.RUnlock()
}(i)
}
time.Sleep(1 * time.Second)
}
方式二:使用 Go 提供的 sync.Map
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var m sync.Map
for i := 0; i < 100; i++ {
go func(i int) {
m.Store(i, i)
}(i)
}
for i := 0; i < 100; i++ {
go func(i int) {
v, ok := m.Load(i)
fmt.Printf("Load: %v, %v\n", v, ok)
}(i)
}
time.Sleep(1 * time.Second)
}
Map 怎么知道自己处于竞争状态?是 Go 编码实现的还是底层硬件实现的?
-
代码实现的,在查找、赋值、遍历、删除的过程中**都会检测写标志 flags,一旦发现写标志置位 (等于 1),抛出 fatal error,无法使用 recover 进行恢复。
-
赋值和删除函数载检测完标志是复位状态 (等于 0)之后,先将写标志位置位,才会进行之后的操作。
Map 的 panic 能被 recover 掉吗?了解 panic 和 recover 的机制?
抛出 fatal error,无法使用 recover 进行恢复
func main() {
defer errorHandler()
m := map[string]int{}
go func() {
for {
m["x"] = 1
}
}()
for {
_ = m["x"]
}
}
func errorHandler() {
if r := recover(); r != nil {
fmt.Println(r)
}
}//不能
Map 由于不是线程安全的,所以在遇到并发读写的时候会抛出 concurrent map read and map write 异常,从而使程序直接退出。
func mapaccess1_faststr(t *maptype, h *hmap, ky string) unsafe.Pointer {
...
if h.flags&hashWriting != 0 {
throw("concurrent map read and map write")
}
...
}
这里的 throw 和上面一样,最终会调用到 exit 执行退出。
Go 中两个 map 对象如何比较
使用 reflect. DeepEqual 这个函数进行比较。使用 reflect. DeepEqual 有一点注意:由于使用了反射,所以有性能的损失。如果你多做一些测试,那么你会发现 reflect. DeepEqual 会比 == 慢 100 倍以上。
Map 的优缺点以及改进?
优点:
-
Map 类似其他语言中的哈希表或字典,以 key-value 形式存储数据
-
Key 必须是支持==或!=比较运算的类型,不可以是函数、map 或 slice
-
Map 通过 key 查找 value 比线性搜索快很多。
-
Map 使用 make ()创建,支持:=这种简写方式
-
超出容量时会自动扩容,
-
当键值对不存在时自动添加,使用 delete ()删除某键值对
缺点:
并发中的 map 不是安全的
Sync. Map 怎么使用
package main
import (
"fmt"
"sync"
)
func main() {
var scene sync.Map
// 将键值对保存到sync.Map
scene.Store("1", 1)
scene.Store("2", 2)
scene.Store("3", 3)
// 从sync.Map中根据键取值
fmt.Println(scene.Load("1"))
// 根据键删除对应的键值对
scene.Delete("1")
// 遍历所有sync.Map中的键值对
scene.Range(func(k, v interface{}) bool {
fmt.Println("iterate:", k, v)
return true
})
}
Sync. Map 底层实现原理
Sync. Map
type Map struct {
mu Mutex
read atomic.Value
dirty map[any]*entry
misses int
}
sync.Map 主类中包含以下核心字段:
-
read:无锁化的只读 map,实际类型为 readOnly,2.3 小节会进一步介绍;
-
dirty:加锁处理的读写 map;
-
misses:记录访问 read 的失效次数,累计达到阈值时,会进行 read map/dirty map 的更新轮换;
-
mu:一把互斥锁,实现 dirty 和 misses 的并发管理.
可见,sync.Map 的特点是冗余了两份 map:read map 和 dirty map,后续的所介绍的交互流程也和这两个 map 息息相关,基本可以归结为两条主线
-
主线一:首先基于无锁操作访问 read map;倘若 read map 不存在该 key,则加锁并使用 dirty map 兜底;
-
主线二:read map 和 dirty map 之间会交替轮换更新
Entry
type entry struct {
p unsafe.Pointer
}
kv 对中的 value,统一采用 unsafe.Pointer 的形式进行存储,通过 entry.p 的指针进行链接.
entry.p 的指向分为三种情况:
-
I 存活态:正常指向元素;
-
II 软删除态:指向 nil;
-
III 硬删除态:指向固定的全局变量 expunged.
var expunged = unsafe.Pointer(new(any))
存活态很好理解,即 key-entry 对仍未删除;
nil 态表示软删除,read map 和 dirty map 底层的 map 结构仍存在 key-entry 对,但在逻辑上该 key-entry 对已经被删除,因此无法被用户查询到;
* expunged 态表示硬删除,dirty map 中已不存在该 key-entry 对.
readOnly (无锁化的只读 map)
type readOnly struct {
m map[any]*entry
amended bool // true if the dirty map contains some key not in m.
}
sync.Map 中的只读 map:read 内部包含两个成员属性:
-
m:真正意义上的 read map,实现从 key 到 entry 的映射;
-
amended:标识 read map 中的 key-entry 对是否存在缺失,需要通过 dirty map 兜底.
读流程
sync.Map.Load()
func (m *Map) Load(key any) (value any, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
-
查看 read map 中是否存在 key-entry 对,若存在,则直接读取 entry 返回;
-
倘若第一轮 read map 查询 miss,且 read map 不全,则需要加锁 double check;
-
第二轮 read map 查询仍 miss(加锁后),且 read map 不全,则查询 dirty map 兜底;
-
查询操作涉及到与 dirty map 的交互,misses 加一;
-
解锁,返回查得的结果.
entry.load()
func (e *entry) load() (value any, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*any)(p), true
}
-
sync.Map 中,kv 对的 value 是基于 entry 指针封装的形式;
-
从 map 取得 entry 后,最终需要调用 entry.load 方法读取指针指向的内容;
-
倘若 entry 的指针状态为 nil 或者 expunged,说明 key-entry 对已被删除,则返回 nil;
-
倘若 entry 未被删除,则读取指针内容,并且转为 any 的形式进行返回.
sync.Map.missLocked()
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
-
在读流程中,倘若未命中 read map,且由于 read map 内容存在缺失需要和 dirty map 交互时,会走进 missLocked 流程;
-
在 missLocked 流程中,首先 misses 计数器累加 1;
-
倘若 miss 次数小于 dirty map 中存在的 key-entry 对数量,直接返回即可;
-
倘若 miss 次数大于等于 dirty map 中存在的 key-entry 对数量,则使用 dirty map 覆盖 read map,并将 read map 的 amended flag 置为 false;
-
新的 dirty map 置为 nil,misses 计数器清零.
写流程
sync.Map 写流程
sync.Map.Store()
func (m *Map) Store(key, value any) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
} else {
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
func (e *entry) storeLocked(i *any) {
atomic.StorePointer(&e.p, unsafe.Pointe
}
(1)倘若 read map 存在拟写入的 key,且 entry 不为 expunged 状态,说明这次操作属于更新而非插入,直接基于 CAS 操作进行 entry 值的更新,并直接返回(存活态或者软删除,直接覆盖更新);
(2)倘若未命中(1)的分支,则需要加锁 double check;
(3)倘若第二轮检查中发现 read map 或者 dirty map 中存在 key-entry 对,则直接将 entry 更新为新值即可(存活态或者软删除,直接覆盖更新);
(4)在第(3)步中,如果发现 read map 中该 key-entry 为 expunged 态,需要在 dirty map 先补齐 key-entry 对,再更新 entry 值(从硬删除中恢复,然后覆盖更新);
(5)倘若 read map 和 dirty map 均不存在,则在 dirty map 中插入新 key-entry 对,并且保证 read map 的 amended flag 为 true.(插入)
(6)第(5)步的分支中,倘若发现 dirty map 未初始化,需要前置执行 dirtyLocked 流程;
(7)解锁返回.
下面补充介绍 Store() 方法中涉及到的几个子方法.
entry.tryStore()
func (m *Map) Store(key, value any) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
// ...
}
func (e *entry) tryStore(i *any) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}
-
• 在写流程中,倘若发现 read map 中已存在对应的 key-entry 对,则会对调用 tryStore 方法尝试进行更新;
-
• 倘若 entry 为 expunged 态,说明已被硬删除,dirty 中缺失该项数据,因此 tryStore 执行失败,回归主干流程;
-
• 倘若 entry 非 expunged 态,则直接执行 CAS 操作完成值的更新即可.
entry.unexpungeLocked()
func (m *Map) Store(key, value any) {
// ...
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
e.storeLocked(&value)
}
// ...
}
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
-
• 在写流程加锁 double check 的过程中,倘若发现 read map 中存在对应的 key-entry 对,会执行该方法;
-
• 倘若 key-entry 为硬删除 expunged 态,该方法会基于 CAS 操作将其更新为软删除 nil 态,然后进一步在 dirty map 中补齐该 key-entry 对,实现从硬删除到软删除的恢复.
entry.storeLocked()
func (m *Map) Store(key, value any) {
// ...
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
// ...
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
}
// ...
}
func (e *entry) storeLocked(i *any) {
atomic.StorePointer(&e.p, unsafe.Pointer)
}
写流程中,倘若 read map 或者 dirty map 存在对应 key-entry,最终会通过原子操作,将新值的指针存储到 entry.p 当中.
sync.Map.dirtyLocked()
dirtyLock 方法
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[any]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
-
• 在写流程中,倘若需要将 key-entry 插入到兜底的 dirty map 中,并且此时 dirty map 为空(从未写入过数据或者刚发生过 missLocked),会进入 dirtyLocked 流程;
-
• 此时会遍历一轮 read map ,将未删除的 key-entry 对拷贝到 dirty map 当中;
-
• 在遍历时,还会将 read map 中软删除 nil 态的 entry 更新为硬删除 expunged 态,因为在此流程中,不会将其拷贝到 dirty map.
删流程
Delete流程
sync.Map.Delete()
func (m *Map) Delete(key any) {
m.LoadAndDelete(key)
}
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}
(1)倘若 read map 中存在 key,则直接基于 cas 操作将其删除;
(2)倘若read map 不存在 key,且 read map 有缺失(amended flag 为 true),则加锁 dou check;
(3)倘若加锁 double check 时,read map 仍不存在 key 且 read map 有缺失,则从 dirty map 中取元素,并且将 key-entry 对从 dirty map 中物理删除;
(4)走入步骤(3),删操作需要和 dirty map 交互,需要走进 3.3 小节介绍的 missLocked 流程;
(5)解锁;
(6)倘若从 read map 或 dirty map 中获取到了 key 对应的 entry,则走入 entry.delete() 方法逻辑删除 entry;
(7)倘若 read map 和 dirty map 中均不存在 key,返回 false 标识删除失败.
entry.delete()
func (e *entry) delete() (value any, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*any)(p), true
}
}
}
-
• 该方法是 entry 的逻辑删除方法;
-
• 倘若 entry 此前已被删除,则直接返回 false 标识删除失败;
-
• 倘若 entry 当前仍存在,则通过 CAS 将 entry.p 指向 nil,标识其已进入软删除状态.
遍历流程
遍历流程
func (m *Map) Range(f func(key, value any) bool) {
read, _ := m.read.Load().(readOnly)
if read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
-
(1)在遍历过程中,倘若发现 read map 数据不全(amended flag 为 true),会额外加一次锁,并使用 dirty map 覆盖 read map;
-
(2)遍历 read map(通过步骤(1)保证 read map 有全量数据),执行用户传入的回调函数,倘若某次回调时返回值为 false,则会终止全流程.
总结
entry 的 expunged 态
思考问题:
为什么需要使用 expunged 态来区分软硬删除呢?仅用 nil 一种状态来标识删除不可以吗?
回答:
首先需要明确,无论是软删除(nil)还是硬删除(expunged),都表示在逻辑意义上 key-entry 对已经从 sync.Map 中删除,nil 和 expunged 的区别在于:
• 软删除态(nil):read map 和 dirty map 在物理上仍保有该 key-entry 对,因此倘若此时需要对该 entry 执行写操作,可以直接 CAS 操作;
• 硬删除态(expunged):dirty map 中已经没有该 key-entry 对,倘若执行写操作,必须加锁(dirty map 必须含有全量 key-entry 对数据).
复用 nil 态软删除的数据
设计 expunged 和 nil 两种状态的原因,就是为了优化在 dirtyLocked 前,针对同一个 key 先删后写的场景. 通过 expunged 态额外标识出 dirty map 中是否仍具有指向该 entry 的能力,这样能够实现对一部分 nil 态 key-entry 对的解放,能够基于 CAS 完成这部分内容写入操作而无需加锁.
read map 和 dirty map 的数据流转
sync.Map 由两个 map 构成:
-
• read map:访问时全程无锁;
-
• dirty map:是兜底的读写 map,访问时需要加锁.
之所以这样处理,是希望能根据对读、删、更新、写操作频次的探测,来实时动态地调整操作方式,希望在读、更新、删频次较高时,更多地采用 CAS 的方式无锁化地完成操作;在写操作频次较高时,则直接了当地采用加锁操作完成.
因此, sync.Map 本质上采取了一种以空间换时间 + 动态调整策略的设计思路,下面对两个 map 间的数据流转过程进行详细介绍:
两个 map
read map& dirty map
-
• 总体思想,希望能多用 read map,少用 dirty map,因为操作前者无锁,后者需要加锁;
-
• 除了 expunged 态的 entry 之外,read map 的内容为 dirty map 的子集;
dirty map → read map
dirty map 覆写 read map
- • 记录读/删流程中,通过 misses 记录访问 read map miss 由 dirty 兜底处理的次数,当 miss 次数达到阈值,则进入 missLocked 流程,进行新老 read/dirty 替换流程;此时将老 dirty 作为新 read,新 dirty map 则暂时为空,直到 dirtyLocked 流程完成对 dirty 的初始化;
read map → dirty map
遍历 read map 填充 dirty map
-
• 发生 dirtyLocked 的前置条件:I dirty 暂时为空(此前没有写操作或者近期进行过 missLocked 流程);II 接下来一次写操作访问 read 时 miss,需要由 dirty 兜底;
-
• 在 dirtyLocked 流程中,需要对 read 内的元素进行状态更新,因此需要遍历,是一个线性时间复杂度的过程,可能存在性能抖动;
-
• dirtyLocked 遍历中,会将 read 中未被删除的元素(非 nil 非 expunged)拷贝到 dirty 中;会将 read 中所有此前被删的元素统一置为 expunged 态.
Go map 和 sync. Map 谁的性能好,为什么?
Go 语言的 sync. Map
支持并发读写,采取了 “空间换时间” 的机制,冗余了两个数据结构,分别是:read 和 dirty
type Map struct {
mu Mutex
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int
}
对比原始 map:
和原始 map+RWLock 的实现并发的方式相比,减少了加锁对性能的影响。它做了一些优化:可以无锁访问 read map,而且会优先操作 read map,倘若只操作 read map 就可以满足要求,那就不用去操作 write map (dirty),所以在某些特定场景中它发生锁竞争的频率会远远小于 map+RWLock 的实现方式
优点:
适合读多写少的场景
缺点:
写多的场景,会导致 read map 缓存失效,需要加锁,冲突变多,性能急剧下降