大白话讲讲 Go 语言的 sync.Map(二)

简介: 上一篇文章《大白话讲讲 Go 语言的 sync.Map(一)》讲到 entry 数据结构,原因是 Go 语言标准库的 map 不是线程安全的,通过加一层抽象回避这个问题……

上一篇文章 《大白话讲讲 Go 语言的 sync.Map(一)》 讲到 entry 数据结构,原因是 Go 语言标准库的 map 不是线程安全的,通过加一层抽象回避这个问题。

当一个 key 被删除的时候,比如李四销户了,以前要撕掉小账本,现在可以在大账本上写 expunged,

对,什么也不写也是 OK 的。也就是说,

entry.p 可能是真正的数据的地址,也可能是 nil,也可能是 expunged。

为什么无端端搞这个 expunged 干嘛?因为 sync.Map 实际上是有两个小账本,

一个叫 readOnly map(只读账本),一个叫 dirty map(可读、也可写账本):

type Map struct {
    mu sync.Mutex
    read atomic.Value // readOnly
    dirty map[interface{}]*entry
    misses int
}

type readOnly struct {
    m       map[interface{}]*entry
    amended bool // true if the dirty map contains some key not in m.
}

既然有账本一个变成两个,那肯定会有些时候出现两个 map 数据是不一致的情况。

readOnly 结构的 amended 字段,是一个标记,为 true 的时候代表 dirty map 包含了一些 key,这些 key 不会存在 readOnly map 中。

这个字段的作用,在于加速查找的过程。

假设 readOnly 账本上有 张三、李四、钱五,dirty 账本除了这三个人,后面又新增了 王六,查找逻辑就是这样的:

  1. 先在 readOnly 查找,王六不在
  2. 判断 amended ,发现两个账本数据是不一致的
  3. 再去 dirty 账本查找,终于找到王六

如果 2 的 amended 标记是两个账本数据一致,那就没有执行 3 的必要了。

我们可以看看源码是怎么实现的:

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
  read, _ := m.read.Load().(readOnly)
  // 1. 先在 readOnly 查找,王六不在
  e, ok := read.m[key]
  // 2. 判断 amended ,发现两个账本数据是不一致的
  if !ok && read.amended {
    // 加锁的原因是,前面步骤 1 的读取有可能被另一个协程的 missLocked 更改了
    // 导致读出来的值不符合预期,所以加锁再读取一次,老套路了。
    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    e, ok = read.m[key]
    if !ok && read.amended {
      // 3. 再去 dirty 账本查找,终于找到王六
      e, ok = m.dirty[key]
      // missLocked 拥有一个计数器,
      // 它的作用在于 readOnly 如果一直查不到,经常退化到 dirty,
      // 那就把 dirty 作为 readOnly ,直接取代它。
      m.missLocked()
    }
    m.mu.Unlock()
  }
  if !ok {
    return nil, false
  }
  return e.load() // 还记得大账本吗?这里是拿到最终的值,针对 entry.p == expunged 做了特殊处理。
}

func (m *Map) missLocked() {
  // 查不到就递增 misses 计数器
  m.misses++
  // 这个判断条件不是常数,而是 dirty map 的记录数。
  // 这个判断条件很奇妙,
  // 它使得 dirty 取代 readOnly 的时机,和 dirty 的数据量正相关了。
  // 也就是说,dirty map 越大,对两个 map 不一致的容忍度越大,
  // 不会有频繁的取代操作。
  if m.misses < len(m.dirty) {
    // 如果不是经常查不到,说明 readOnly 还是可以用的,退出。
    return
  }
  // 如果 readOnly 已经没有存在价值,那就把 dirty 取代 readOnly。
  // 此时,dirty 置空,并把 misses 计数器置 0。
  // read 和 dirty 的数据类型都是 map[interface{}]*entry,
  // 可以直接替换,无需类型转换,这个设计简直完美。
  m.read.Store(readOnly{m: m.dirty})
  m.dirty = nil
  m.misses = 0
}

func (e *entry) load() (value interface{}, ok bool) {
  p := atomic.LoadPointer(&e.p)
  // entry.p 可能是真正的数据的地址,也可能是 nil,也可能是 expunged
  if p == nil || p == expunged {
    // nil 或者是 expunged 都是不存在的,返回空
    return nil, false
  }
  // 如果是真正的数据地址,那就返回真正的数据(就是拿到大账本的某一页纸上的内容)
  return *(*interface{})(p), true
}

到这里已经讲完数据读取这部分的代码了,接着再讲数据是怎么写入的。

上一篇文章我留了一个思考题,

为什么小账本不能做到同时修改?限于篇幅,我不会展开。

我现在解答我们有了大账本,是如何做到同时修改的!

答案在这里:

// tryStore 顾名思义,就是不断尝试的意思。
// 你可以看到有一个无条件的死循环,只有某些条件满足的时候才会退出
// 计算机术语:自旋(自己一直在旋转)
func (e *entry) tryStore(i *interface{}) bool {
  for {
    p := atomic.LoadPointer(&e.p)
    // readOnly map 存储的是 entry 结构,p 就是所谓的大账本,
    // p 指向大账本上某一页纸上的内容,
    // 当账本查不到的时候,返回查不到。
    if p == expunged {
      return false
    }
    // 当账本可以查到的时候,使用 CAS 把旧的值,替换为新的值。
    // 可以查到并替换成功,返回成功,函数退出
    // 查不到或者替换失败,自旋,重试,直到成功为止
    if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
      return true
    }
  }
}

问题来了, CAS(Compare and Swap,比较并交换)是什么东西?我们看这个加法函数:

func add(delta int32) {
  for {
    // 把原先的值取出来
    oldValue := atomic.LoadInt32(&addr)
    // 读取后,如果没有其他人对它修改(Compare)
    // 那就用 oldValue+delta 新值,替换掉原来的值(Swap)
    // 成功程序退出,失败了就自旋重试(可能被其他人改了导致 Compare 不成功)
    if atomic.CompareAndSwapInt32(&addr, oldValue, oldValue+delta) {
      return
    }
  }
}

越来越有趣了,atomic.CompareAndSwapInt32 到底是个啥子哟?

它的具体实现在 src/runtime/internal/atomic/asm_amd64.s 里(不同 CPU 架构,使用的文件不同,这里以最常见的 amd64 为例):

// bool Cas(int32 *val, int32 old, int32 new)
// Atomically:
//  if (*val == old) {
//    *val = new;
//    return 1;
//  } else
//    return 0;
TEXT runtime∕internal∕atomic·Cas(SB),NOSPLIT,$0-17
  MOVQ  ptr+0(FP), BX
  MOVL  old+8(FP), AX
  MOVL  new+12(FP), CX
  LOCK
  CMPXCHGL  CX, 0(BX)
  SETEQ  ret+16(FP)
  RET

FP(Frame pointer: arguments and locals):

函数的输入参数,格式 symbol+offset(FP),symbol 没有实际意义,只为了增强代码可读性,但没有 symbol 程序无法编译。

ptr+0(FP) 代表第一个参数,取出复制给 BX 寄存器。

由于 ptr 是一个指针,在 64 位的处理器中,一个指针的占 8 个字节,

所以第二个参数 old+8(FP),偏移量 offset 等于 8,

而第三个参数 new+12(FP),偏移量再加 4 的原因是 int32 占据 4 个字节。

LOCK 指令前缀会设置处理器的 LOCK# 信号,锁定总线,阻止其他处理器接管总线访问内存,

设置 LOCK# 信号能保证某个处理器对共享内存的独占使用。

CMPXCHGL CX, 0(BX) 是比较并交换的指令,将 AX 和 CX 比较,相同将 BX 指向的内容 放入 CX,

CMPXCHGL 暗中使用了 AX 寄存器。

兜了一大圈,终于明白大账本的数据是怎样被更新的了。

看看数据是怎么写入之前,我们要知道数据是怎么被删除的:

// 删除的逻辑是比较简单的。
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
  read, _ := m.read.Load().(readOnly)
  e, ok := read.m[key]
  // key 不存在的时候并且 readOnly map 和 dirty map 不一致时,
  // 把 dirty map 对应的记录删了。
  if !ok && read.amended {
    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    e, ok = read.m[key]
    if !ok && read.amended {
      // 数据不一致的时候,最终读出来的值以 dirty map 为主,
      // 即使 readOnly map 是 !ok 的,但 dirty map 可能是 ok 的,
      // 既然值可能是存在的,那就读取出来。
      e, ok = m.dirty[key]
      // 删除操作
      delete(m.dirty, key)
      // 递增数据不一致的计数器。
      // 太多不一致会把 dirty map 提升为 readOnly map,前面讲过了。
      m.missLocked()
    }
    m.mu.Unlock()
  }
  // key 存在的时候,把 key 置为 nil,注意这里不是 expunged,
  // 这也是我为什么要先讲 Delete 的原因。
  if ok {
    return e.delete()
  }
  return nil, false
}

// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
  m.LoadAndDelete(key)
}

// delete 将对应的 key 置为 nil!而不是 expunged!
func (e *entry) delete() (value interface{}, ok bool) {
  for {
    p := atomic.LoadPointer(&e.p)
    if p == nil || p == expunged {
      return nil, false
    }
    if atomic.CompareAndSwapPointer(&e.p, p, nil) {
      return *(*interface{})(p), true
    }
  }
}

OK,我们看数据写入的逻辑,它是整个源码中最难理解的,隐含的逻辑关系非常多:

// unexpungeLocked 将 expunged 的标记变成 nil。
func (e *entry) unexpungeLocked() (wasExpunged bool) {
  return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

// storeLocked 将 entry.p 指向具体的值
func (e *entry) storeLocked(i *interface{}) {
  atomic.StorePointer(&e.p, unsafe.Pointer(i))
}
// tryExpungeLocked 尝试 entry.p == nil 的 entry 标记为删除(expunged)
func (e *entry) tryExpungeLocked() (isExpunged bool) {
  p := atomic.LoadPointer(&e.p)
  // for 循环的作用,可以保证 p != nil,
  // 保证写时复制过程中,p == nil 的情况不会被写到 dirty map 中。
  for p == nil {
    if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
      return true
    }
    p = atomic.LoadPointer(&e.p)
  }
  return p == expunged
}

// dirtyLocked 写时复制,两个 map 都找不到新增的 key 的时候调用的。
func (m *Map) dirtyLocked() {
  // dirty 被置为 nil 的情景还记得吗?
  // 
  // 当 readOnly map 一直读不到,需要退化到 dirty map 读取的时候,
  // dirty map 会被提升为 readOnly map,
  // 此时,dirty map 就会被置空。
  //
  // 但是,dirtyLocked 被调用之前,
  // 都是判断 read.amended 是否为 false
  // if !read.amended {...}
  // 个人认为,可以直接判断 if m.dirty == nil {...},
  // 代码可读性更强!下面三行代码也可以不要了。
  if m.dirty != nil {
    return
  }
  // 遍历 readOnly map,把里面的内容都复制到新创建的 dirty map 中。
  read, _ := m.read.Load().(readOnly)
  m.dirty = make(map[interface{}]*entry, len(read.m))
  for k, e := range read.m {
    // tryExpungeLocked 将 entry.p == nil 设置为 expunged,
    // 遍历之后,所有的 nil 都变成 expunged 了。
    // 返回 false 说明 p 是有值的,要拷贝到 dirty 里。
    // Delete 操作会把有值的状态,转移为 nil,
    // 并不会把 expunged 状态转移为 nil,
    // 由于 for 循环的存在,p 也不会等于 nil,
    // 也就是说,tryExpungeLocked 的 p == expunged 是可以信任的。
    if !e.tryExpungeLocked() {
      // 如果没有被删除,拷贝到 dirty map 中。
      m.dirty[k] = e
    }
  }
}

func (m *Map) Store(key, value interface{}) {
  // 如果 readOnly map 有对应的 key,
  // 通过 e.tryStore 直接写入(就是上面更新大账本的整个过程),
  // 注意,tryStore 会在 entry.p == expunged 的情况下失败。
  read, _ := m.read.Load().(readOnly)
  if e, ok := read.m[key]; ok && e.tryStore(&value) {
    return
  }
  // readOnly map 找不到,或者 key 被删除了,
  // 那就写到 dirty map 里面。
  m.mu.Lock()
  read, _ = m.read.Load().(readOnly)
  if e, ok := read.m[key]; ok {
    // unexpungeLocked 将 expunged 的标记变成 nil。
    // 当 entry.p == expunged,并且成功替换为 nil,
    // 返回 true。
    // 
    // 这个分支的意义在于,写时复制 dirtyLocked 的时候,
    // 数据从 readOnly map 搬迁到 dirty map 中,
    // 如果 p 是被删除的,dirty 是不会有这个 key 的,
    // 所以要把它也写进 dirty 中,保证数据的一致性。
    // 
    // 为什么好端端的 expunged,要改成 nil?
    // unexpungeLocked 是一个原子操作,成功的话,
    // 说明 p == expunged,
    // 说明写时复制已经完成。
    // 
    // 为什么要写时复制完成之后,才可以去改 dirty?
    // 我理解是这样的:
    // 如果不这样做,dirty 会被你修改成 Store 传进来的参数,
    // 写时复制又把它修改成 readOnly map 的值,
    // 所以更新 readOnly map 就好了。
    // 
    // 这一块的细节真的非常多,每一块地方都要小心处理好。
    if e.unexpungeLocked() {
      m.dirty[key] = e
    }
    // 写入值。
    e.storeLocked(&value)
  } else if e, ok := m.dirty[key]; ok {
    // 如果 dirty map 存在就直接更新进去,这个很好理解,
    // 因为 readOnly map 找不到会来 dirty 查。
    e.storeLocked(&value)
  } else {
    // 两个 map 都找不到的时候,说明这是一个新的 key。
    // 
    // 1. 如果 dirty 之前被提升为 readOnly,那就导一份没有被删除的 key 进来。
    // 
    // 这个判断条件,我理解等价于 if m.dirty == nil {...}
    if !read.amended {
      // 初始化 m.dirty,并把值写进去(写时复制)
      m.dirtyLocked()
      // amended 设置为不一致。
      // amended 表示 dirty 是否包含了 readOnly 没有的记录,
      // 很明显,read.m[key] 是 !ok 的,
      // 下面把值存到 dirty map 里面了。
      m.read.Store(readOnly{m: read.m, amended: true})
    }
    // 2. 这里,把值存到 dirty map 中。
    m.dirty[key] = newEntry(value)
  }
  m.mu.Unlock()
}

精妙绝伦!整个写入的逻辑就讲完了,最后看看遍历吧,非常简单:

func (m *Map) Range(f func(key, value interface{}) bool) {
  read, _ := m.read.Load().(readOnly)
  // 如果不一致,就把 dirty 提升为 readOnly,
  // 同时 dirty 置空,
  // 因为 dirty map 也包含了 readOnly map 没有的 key。
  if read.amended {
    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if read.amended {
      read = readOnly{m: m.dirty}
      m.read.Store(read)
      m.dirty = nil
      m.misses = 0
    }
    m.mu.Unlock()
  }
  // 遍历 readOnly map 的数据,执行回调函数。
  for k, e := range read.m {
    v, ok := e.load()
    if !ok {
      continue
    }
    if !f(k, v) {
      break
    }
  }
}

好了,到这里整个 sync.Map 就讲完了,剩下的代码也没多少了,套路差不多,我们总结一下:

  1. 在读多写少的场景下,sync.Map 的性能非常高,因为访问 readOnly map 是无锁的;
  2. Load:先查找 readOnly map,找不到会去找 dirty map,如果经常没命中,dirty map 会被提升为 readOnly map,提升的时机跟 dirty 的大小相关,dirty 越大,容忍不命中的次数就越多,也就越难提升;
  3. Delete:当 readOnly map 的 key 不存在的时候,会去删除 dirty map 中的 key;如果 readOnly map 的 key 存在,entry.p 置为 nil;
  4. Store :
    a. readOnly map 的 key 存在时,entry.p != expunged 时直接更新,entry.p == expunged 就改成 nil,此时数据也同步写入 dirty map;
    b. readOnly map 的 key 不存在时,dirty map 有就更新进去,两个都没有,触发写时复制机制:搬迁 readOnly map 的没有被删除的 key 到 dirty map 中,新值写入 dirty map,并设置 amended 标记为 true。
  5. sync.Map 的缺陷在于读少写多的时候,dirty map 会被一直更新,misses 次数增加,dirty 置空后,数据又重新从 readOnly map 同步回去,使得 sync.Map 忙于数据搬迁工作,影响性能。

这篇文章近 5000 字(第一篇差不多 2000 字),从构思、成文到校对,真的需要花费不少时间,希望对你有帮助!


文章来源于本人博客,发布于 2021-05-05,原文链接:https://imlht.com/archives/258/

目录
相关文章
|
5天前
|
存储 JSON 监控
Viper,一个Go语言配置管理神器!
Viper 是一个功能强大的 Go 语言配置管理库,支持从多种来源读取配置,包括文件、环境变量、远程配置中心等。本文详细介绍了 Viper 的核心特性和使用方法,包括从本地 YAML 文件和 Consul 远程配置中心读取配置的示例。Viper 的多来源配置、动态配置和轻松集成特性使其成为管理复杂应用配置的理想选择。
23 2
|
4天前
|
Go 索引
go语言中的循环语句
【11月更文挑战第4天】
13 2
|
4天前
|
Go C++
go语言中的条件语句
【11月更文挑战第4天】
15 2
|
6天前
|
监控 Go API
Go语言在微服务架构中的应用实践
在微服务架构的浪潮中,Go语言以其简洁、高效和并发处理能力脱颖而出,成为构建微服务的理想选择。本文将探讨Go语言在微服务架构中的应用实践,包括Go语言的特性如何适应微服务架构的需求,以及在实际开发中如何利用Go语言的特性来提高服务的性能和可维护性。我们将通过一个具体的案例分析,展示Go语言在微服务开发中的优势,并讨论在实际应用中可能遇到的挑战和解决方案。
|
4天前
|
Go
go语言中的 跳转语句
【11月更文挑战第4天】
12 4
|
4天前
|
JSON 安全 Go
Go语言中使用JWT鉴权、Token刷新完整示例,拿去直接用!
本文介绍了如何在 Go 语言中使用 Gin 框架实现 JWT 用户认证和安全保护。JWT(JSON Web Token)是一种轻量、高效的认证与授权解决方案,特别适合微服务架构。文章详细讲解了 JWT 的基本概念、结构以及如何在 Gin 中生成、解析和刷新 JWT。通过示例代码,展示了如何在实际项目中应用 JWT,确保用户身份验证和数据安全。完整代码可在 GitHub 仓库中查看。
14 1
|
5天前
|
Go 调度 开发者
探索Go语言中的并发模式:goroutine与channel
在本文中,我们将深入探讨Go语言中的核心并发特性——goroutine和channel。不同于传统的并发模型,Go语言的并发机制以其简洁性和高效性著称。本文将通过实际代码示例,展示如何利用goroutine实现轻量级的并发执行,以及如何通过channel安全地在goroutine之间传递数据。摘要部分将概述这些概念,并提示读者本文将提供哪些具体的技术洞见。
|
9天前
|
JavaScript Java Go
探索Go语言在微服务架构中的优势
在微服务架构的浪潮中,Go语言以其简洁、高效和并发处理能力脱颖而出。本文将深入探讨Go语言在构建微服务时的性能优势,包括其在内存管理、网络编程、并发模型以及工具链支持方面的特点。通过对比其他流行语言,我们将揭示Go语言如何成为微服务架构中的一股清流。
|
9天前
|
Ubuntu 编译器 Linux
go语言中SQLite3驱动安装
【11月更文挑战第2天】
31 7
|
9天前
|
关系型数据库 Go 网络安全
go语言中PostgreSQL驱动安装
【11月更文挑战第2天】
38 5