golang 锁原理剖析,你值得收藏

简介: golang 锁原理剖析,你值得收藏

640.png

golang 锁原理剖析



锁是一种同步机制,用于在多任务环境中限制资源的访问,以满足互斥需求。


go源码sync包中经常用于同步操作的方式:

  • 原子操作
  • 互斥锁
  • 读写锁
  • waitgroup


我们着重来分析下互斥锁和读写锁.


1. 互斥锁:


下面是互斥锁的数据结构:


// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
 state int32   // 互斥锁上锁状态枚举值如下所示
 sema  uint32  // 信号量,向处于Gwaitting的G发送信号
}
const (
 mutexLocked = 1 << iota // 值为1,表示在state中由低向高第1位,意义:锁是否可用,0可用,1不可用,锁定中
 mutexWoken   // 值为2,表示在state中由低向高第2位,意义:mutex是否被唤醒
 mutexStarving // 当前的互斥锁进入饥饿状态;
 mutexWaiterShift = iota  //值为2,表示state中统计阻塞在此mutex上goroutine的数目需要位移的偏移量
 starvationThresholdNs = 1e6

state和sema两个加起来只占 8 字节空间的结构体表示了 Go 语言中的互斥锁。


互斥锁的状态比较复杂,如下图所示,最低三位分别表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用来表示当前有多少个 Goroutine 等待互斥锁的释放.

640.png


在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:

  • mutexLocked 表示互斥锁的锁定状态;
  • mutexWoken 表示从正常模式被从唤醒;
  • mutexStarving 当前的互斥锁进入饥饿状态;
  • waitersCount 当前互斥锁上等待的 Goroutine 个数;


sync.Mutex 有两种模式,正常模式和饥饿模式。


在正常模式下,锁的等待者会按照先进先出的顺序获取锁。


但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被饿死。


饥饿模式是在 Go 语言 1.9 版本引入的优化的,引入的目的是保证互斥锁的公平性(Fairness)。


在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。


如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会被切换回正常模式。


相比于饥饿模式,正常模式下的互斥锁能够提供更好地性能,饥饿模式的能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。


互斥锁的加锁是靠 sync.Mutex.Lock 方法完成的, 当锁的状态是 0 时,将 mutexLocked 位置成 1:


// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
 // Fast path: grab unlocked mutex.
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  if race.Enabled {
   race.Acquire(unsafe.Pointer(m))
  }
  return
 }
 // Slow path (outlined so that the fast path can be inlined)
 m.lockSlow()
}


如果互斥锁的状态不是 0 时就会调用 sync.Mutex.lockSlow 尝试通过自旋(Spinnig)等方式等待锁的释放,


这个方法是一个非常大 for 循环,它获取锁的过程:

  • 判断当前 Goroutine 能否进入自旋;
  • 通过自旋等待互斥锁的释放;
  • 计算互斥锁的最新状态;
  • 更新互斥锁的状态并获取锁;


那么互斥锁是如何判断当前 Goroutine 能否进入自旋等互斥锁的释放,是通过它的lockSlow方法, 由于自旋是一种多线程同步机制,所以呢当前的进程在进入自旋的过程中会一直保持对 CPU 的占用,持续检查某个条件是否为真。通常在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用得当会对性能带来很大的增益,但是往往使用的不得当就会拖慢整个程序.


所以 Goroutine 进入自旋的条件非常苛刻:

  • 互斥锁只有在普通模式才能进入自旋;
  • runtime.sync_runtime_canSpin 需要返回 true:a. 需要运行在多 CPU 的机器上;b. 当前的Goroutine 为了获取该锁进入自旋的次数小于四次;c. 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;


一旦当前 Goroutine 能够进入自旋就会调用runtime.sync_runtime_doSpin 和 runtime.procyield 并执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间.

处理了自旋相关的特殊逻辑之后,互斥锁会根据上下文计算当前互斥锁最新的状态。


通过几个不同的条件分别会更新 state 字段中存储的不同信息,mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift:

new := old
 if old&mutexStarving == 0 {
  new |= mutexLocked
 }
 if old&(mutexLocked|mutexStarving) != 0 {
  new += 1 << mutexWaiterShift
 }
 if starving && old&mutexLocked != 0 {
  new |= mutexStarving
 }
 if awoke {
  new &^= mutexWoken
 }


计算了新的互斥锁状态之后,就会使用 CAS 函数 sync/atomic.CompareAndSwapInt32 更新该状态:


if atomic.CompareAndSwapInt32(&m.state, old, new) {
   if old&(mutexLocked|mutexStarving) == 0 {
    break // 通过 CAS 函数获取了锁
   }
   ...
   runtime_SemacquireMutex(&m.sema, queueLifo, 1)
   starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
   old = m.state
   if old&mutexStarving != 0 {
    delta := int32(mutexLocked - 1<<mutexWaiterShift)
    if !starving || old>>mutexWaiterShift == 1 {
     delta -= mutexStarving
    }
    atomic.AddInt32(&m.state, delta)
    break
   }
   awoke = true
   iter = 0
  } else {
   old = m.state
  }
 }
}


如果我们没有通过 CAS 获得锁,会调用 runtime.sync_runtime_SemacquireMutex 使用信号量保证资源不会被两个 Goroutine 获取。


runtime.sync_runtime_SemacquireMutex 会在方法中不断调用尝试获取锁并休眠当前 Goroutine 等待信号量的释放,一旦当前 Goroutine 可以获取信号量,它就会立刻返回,sync.Mutex.Lock 方法的剩余代码也会继续执行。


在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环.

在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出.


互斥锁的解锁过程 sync.Mutex.Unlock 与加锁过程相比就很简单,该过程会先使用 sync/atomic.AddInt32 函数快速解锁,这时会发生下面的两种情况:


  • 如果该函数返回的新状态等于 0,当前 Goroutine 就成功解锁了互斥锁;
  • 如果该函数返回的新状态不等于 0,这段代码会调用 sync.Mutex.unlockSlow 方法开始慢速解锁:
func (m *Mutex) Unlock() {
 if race.Enabled {
  _ = m.state
  race.Release(unsafe.Pointer(m))
 }
 // Fast path: drop lock bit.
 new := atomic.AddInt32(&m.state, -mutexLocked)
 if new != 0 {
  // Outlined slow path to allow inlining the fast path.
  // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
  m.unlockSlow(new)
 }
}


sync.Mutex.unlockSlow 方法首先会校验锁状态的合法性, 如果当前互斥锁已经被解锁过了就会直接抛出异常 sync: unlock of unlocked mutex 中止当前程序。


在正常情况下会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁.


func (m *Mutex) unlockSlow(new int32) {
 if (new+mutexLocked)&mutexLocked == 0 {
  throw("sync: unlock of unlocked mutex")
 }
 if new&mutexStarving == 0 {
  old := new
  for {
   // If there are no waiters or a goroutine has already
   // been woken or grabbed the lock, no need to wake anyone.
   // In starvation mode ownership is directly handed off from unlocking
   // goroutine to the next waiter. We are not part of this chain,
   // since we did not observe mutexStarving when we unlocked the mutex above.
   // So get off the way.
   if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    return
   }
   // Grab the right to wake someone.
   new = (old - 1<<mutexWaiterShift) | mutexWoken
   if atomic.CompareAndSwapInt32(&m.state, old, new) {
    runtime_Semrelease(&m.sema, false, 1)
    return
   }
   old = m.state
  }
 } else {
  // Starving mode: handoff mutex ownership to the next waiter, and yield
  // our time slice so that the next waiter can start to run immediately.
  // Note: mutexLocked is not set, the waiter will set it after wakeup.
  // But mutex is still considered locked if mutexStarving is set,
  // so new coming goroutines won't acquire it.
  runtime_Semrelease(&m.sema, true, 1)
 }
}


在正常模式下,这段代码会分别处理以下两种情况处理:

  • 如果互斥锁不存在等待者或者互斥锁的 mutexLocked、mutexStarving、mutexWoken 状态不都为 0,那么当前方法就可以直接返回,不需要唤醒其他等待者;
  • 如果互斥锁存在等待者,会通过 sync.runtime_Semrelease唤醒等待者并移交锁的所有权;


在饥饿模式下,上述代码会直接调用 sync.runtime_Semrelease 方法将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,在这时互斥锁还不会退出饥饿状态;


互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:

  • 如果互斥锁处于初始化状态,就会直接通过置位 mutexLocked 加锁;
  • 如果互斥锁处于 mutexLocked 并且在普通模式下工作,就会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放;
  • 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
  • 互斥锁在正常情况下会通过runtime.sync_runtime_SemacquireMutex函数将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒当前 Goroutine;
  • 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,当前 Goroutine 会将互斥锁切换回正常模式;


互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:

  • 当互斥锁已经被解锁时,那么调用 sync.Mutex.Unlock 会直接抛出异常;
  • 当互斥锁处于饥饿模式时,会直接将锁的所有权交给队列中的下一个等待者,等待者会负责设置 mutexLocked 标志位;
  • 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,就会直接返回;在其他情况下会通过sync.runtime_Semrelease 唤醒对应的 Goroutine.


2. 读写锁


读写互斥锁sync.RWMutex 是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。


sync.RWMutex 中总共包含5 个字段:


type RWMutex struct {
 w           Mutex  // 复用互斥锁提供的能力
 writerSem   uint32 // 写等待读
 readerSem   uint32 // 读等待写
 readerCount int32  // 存储了当前正在执行的读操作的数量
 readerWait  int32  // 当写操作被阻塞时等待的读操作个数
}


我们从写锁开始分析:

当我们想要获取写锁时,需要调用 sync.RWMutex.Lock 方法:


func (rw *RWMutex) Lock() {
 if race.Enabled {
  _ = rw.w.state
  race.Disable()
 }
 // First, resolve competition with other writers.
 rw.w.Lock()
 // Announce to readers there is a pending writer.
 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
 // Wait for active readers.
 if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
  runtime_SemacquireMutex(&rw.writerSem, false, 0)
 }
 if race.Enabled {
  race.Enable()
  race.Acquire(unsafe.Pointer(&rw.readerSem))
  race.Acquire(unsafe.Pointer(&rw.writerSem))
 }
}
  • 这里调用结构体持有的 sync.Mutex 的 sync.Mutex.Lock 方法阻塞后续的写操作;

因为互斥锁已经被获取,其他 Goroutine 在获取写锁时就会进入自旋或者休眠;

  • 调用 sync/atomic.AddInt32 方法阻塞后续的读操作:


如果仍然有其他 Goroutine 持有互斥锁的读锁(r != 0),该 Goroutine 会调用 runtime.sync_runtime_SemacquireMutex 进入休眠状态等待所有读锁所有者执行结束后释放 writerSem 信号量将当前协程唤醒。


写锁的释放会调用 sync.RWMutex.Unlock 方法:


func (rw *RWMutex) Unlock() {
 if race.Enabled {
  _ = rw.w.state
  race.Release(unsafe.Pointer(&rw.readerSem))
  race.Disable()
 }
 // Announce to readers there is no active writer.
 r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
 if r >= rwmutexMaxReaders {
  race.Enable()
  throw("sync: Unlock of unlocked RWMutex")
 }
 // Unblock blocked readers, if any.
 for i := 0; i < int(r); i++ {
  runtime_Semrelease(&rw.readerSem, false, 0)
 }
 // Allow other writers to proceed.
 rw.w.Unlock()
 if race.Enabled {
  race.Enable()
 }
}


解锁与加锁的过程正好相反,写锁的释放分为以下几个步骤:

  1. 调用 sync/atomic.AddInt32 函数将 readerCount 变回正数,释放读锁;
  2. 通过 for 循环触发所有由于获取读锁而陷入等待的 Goroutine:
  3. 调用 sync.Mutex.Unlock 方法释放写锁;


获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作饿死。


接着是读锁:

读锁的加锁方法 sync.RWMutex.RLock 就比较简单了,该方法会通过 sync/atomic.AddInt32 将 readerCount 加一:


func (rw *RWMutex) RLock() {
 if race.Enabled {
  _ = rw.w.state
  race.Disable()
 }
 if atomic.AddInt32(&rw.readerCount, 1) < 0 {
  // A writer is pending, wait for it.
  runtime_SemacquireMutex(&rw.readerSem, false, 0)
 }
 if race.Enabled {
  race.Enable()
  race.Acquire(unsafe.Pointer(&rw.readerSem))
 }
}


如果RLock该方法返回负数,其他 Goroutine 获得了写锁,当前 Goroutine 就会调用runtime.sync_runtime_SemacquireMutex 陷入休眠等待锁的释放;如果RLock该方法的结果为非负数,没有 Goroutine 获得写锁,当前方法就会成功返回.


当 Goroutine 想要释放读锁时,会调用如下所示的RUnlock方法:


func (rw *RWMutex) RUnlock() {
 if race.Enabled {
  _ = rw.w.state
  race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
  race.Disable()
 }
 if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
  // Outlined slow-path to allow the fast-path to be inlined
  rw.rUnlockSlow(r)
 }
 if race.Enabled {
  race.Enable()
 }
}


该方法会先减少正在读资源的readerCount 整数,根据 sync/atomic.AddInt32 的返回值不同会分别进行处理:


  • 如果返回值大于等于零,表示读锁直接解锁成功.
  • 如果返回值小于零 ,表示有一个正在执行的写操作,在这时会调用rUnlockSlow方法.


func (rw *RWMutex) rUnlockSlow(r int32) {
 if r+1 == 0 || r+1 == -rwmutexMaxReaders {
  race.Enable()
  throw("sync: RUnlock of unlocked RWMutex")
 }
 // A writer is pending.
 if atomic.AddInt32(&rw.readerWait, -1) == 0 {
  // The last reader unblocks the writer.
  runtime_Semrelease(&rw.writerSem, false, 1)
 }
}


rUnlockSlow该方法会减少获取锁的写操作等待的读操作数readerWait并在所有读操作都被释放之后触发写操作的信号量,writerSem,该信号量被触发时,调度器就会唤醒尝试获取写锁的 Goroutine。


其实读写互斥锁(sync.RWMutex),虽然提供的功能非常复杂,不过因为它是在互斥锁( sync.Mutex)的基础上,所以整体的实现上会简单很多。


因此呢:

  • 调用 sync.RWMutex.Lock 尝试获取写锁时;


每次 sync.RWMutex.RUnlock 都会将 readerCount 其减一,当它归零时该 Goroutine 就会获得写锁, 将 readerCount 减少 rwmutexMaxReaders 个数以阻塞后续的读操作.


  • 调用 sync.RWMutex.Unlock 释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁;


读写互斥锁在互斥锁之上提供了额外的更细粒度的控制,能够在读操作远远多于写操作时提升性能。

相关文章
|
5月前
|
Java Go
Golang底层原理剖析之垃圾回收GC(二)
Golang底层原理剖析之垃圾回收GC(二)
107 0
|
5月前
|
存储 SQL 安全
Golang底层原理剖析之上下文Context
Golang底层原理剖析之上下文Context
129 0
|
5月前
|
安全 Go
Golang深入浅出之-互斥锁(sync.Mutex)与读写锁(sync.RWMutex)
【4月更文挑战第23天】Go语言并发编程中,`sync.Mutex`和`sync.RWMutex`是保证线程安全的关键。互斥锁确保单个goroutine访问资源,而读写锁允许多个读者并发访问。常见问题包括忘记解锁、重复解锁以及混淆锁类型。使用`defer`可确保解锁,读写锁不支持直接升级或降级,需释放后再获取。根据读写模式选择合适锁以避免性能下降和竞态条件。理解并正确使用锁是编写并发安全程序的基础。
97 3
|
1月前
|
安全 Go
Golang语言goroutine协程并发安全及锁机制
这篇文章是关于Go语言中多协程操作同一数据问题、互斥锁Mutex和读写互斥锁RWMutex的详细介绍及使用案例,涵盖了如何使用这些同步原语来解决并发访问共享资源时的数据安全问题。
69 4
|
2月前
|
SQL 安全 Java
golang为什么不支持可重入锁?
本文对比分析了Java与Go语言中锁机制的不同。在Java中,无论是`synchronized`关键字还是`ReentrantLock`都支持可重入特性,通过维护一个计数器来跟踪锁的嵌套级别,确保同一线程可以多次获取同一把锁而不会造成死锁。然而,Go语言的`sync.Mutex`并不支持这一特性,其设计理念认为可重入锁往往指向代码设计问题,鼓励开发者重构代码以避免此类需求。文章进一步解释了这种设计理念背后的原因,并提供了替代方案示例。总体而言,Go语言试图从设计层面避免潜在的代码问题,尽管这可能会增加一定的开发复杂性。
golang为什么不支持可重入锁?
|
2月前
|
算法 NoSQL 关系型数据库
熔断原理与实现Golang版
熔断原理与实现Golang版
|
2月前
|
存储 关系型数据库 Go
SOLID原理:用Golang的例子来解释
SOLID原理:用Golang的例子来解释
|
2月前
|
存储 人工智能 Go
golang 反射基本原理及用法
golang 反射基本原理及用法
25 0
|
5月前
|
负载均衡 监控 Go
Golang深入浅出之-Go语言中的服务网格(Service Mesh)原理与应用
【5月更文挑战第5天】服务网格是处理服务间通信的基础设施层,常由数据平面(代理,如Envoy)和控制平面(管理配置)组成。本文讨论了服务发现、负载均衡和追踪等常见问题及其解决方案,并展示了使用Go语言实现Envoy sidecar配置的例子,强调Go语言在构建服务网格中的优势。服务网格能提升微服务的管理和可观测性,正确应对问题能构建更健壮的分布式系统。
437 1
|
5月前
|
JSON 监控 安全
Golang深入浅出之-Go语言中的反射(reflect):原理与实战应用
【5月更文挑战第1天】Go语言的反射允许运行时检查和修改结构,主要通过`reflect`包的`Type`和`Value`实现。然而,滥用反射可能导致代码复杂和性能下降。要安全使用,应注意避免过度使用,始终进行类型检查,并尊重封装。反射的应用包括动态接口实现、JSON序列化和元编程。理解反射原理并谨慎使用是关键,应尽量保持代码静态类型。
83 2