深度解密协程锁、信号量以及线程锁的实现原理

简介: 深度解密协程锁、信号量以及线程锁的实现原理

楔子



最近在交流群里面看到有人想让我讲关于信号量的内容,那么就来讲一讲吧。

关于什么是信号量,相信大家都知道,那么本文便从源码的角度来看看信号量是怎么实现的。不过在说信号量之前,必须先剖析一下锁,理解了锁才能更好地理解信号量。

那什么是锁呢?如果程序中某个部分在并发操作时会出现意想不到的结果(比如操作一个共享的数据结构),那么该部分就需要通过锁保护起来,而被锁保护起来的部分叫做临界区

线程在进入临界区之前必须先获取锁,然后才能操作共享资源。而锁一旦被获取,那么其它线程再尝试获取锁,就会陷入阻塞,直到锁被释放。

b944845e8f00ffebcaed6798855dba7b.png

通过锁,我们能确保同一时刻只能有一个线程操作共享资源,从而很好地解决资源竞争问题。这里的锁指的是互斥锁,也被称为排它锁。

而在 Python 里面,锁可以通过 asyncio 和 threading 模块来创建,这两个模块都提供了锁,一个是协程锁,一个是线程锁,当然也包括信号量。

import asyncio
import threading
lock1 = asyncio.Lock()
lock2 = threading.Lock()

当我们对类 Lock 实例化,便可以得到锁,然后锁有两个常用方法。

  • acquire():获取锁;
  • release():释放锁;


API 非常简单,我们先来看看协程里面的锁,以及信号量。


协程锁和信号量



之前在介绍 asyncio 的 Future 和 Task 时说过,Future 对象可以看作是一个容器,它保存了在未来某个时刻才会出现的结果。

如果 Future 对象里面还没有结果集,那么它就处于未完成状态,否则处于已完成状态。

import asyncio
future = asyncio.Future()
# 是否完成
print(future.done())
"""
False
"""
# 因为 future 此时还没有结果集,所以是未完成状态(PENDING)
# 设置结果集
future.set_result("S 老师不希望你们为了她而两败俱伤")
# 由于设置了结果集,所以变成已完成状态(FINISHED)
print(future.done())
"""
True
"""
# 获取结果
print(future.result())
"""
S 老师不希望你们为了她而两败俱伤
"""

问题来了,如何在 future 完成时立刻拿到结果呢?总不能一直调用 done 方法轮询吧。

很简单,我们可以对 future 使用 await 表达式,如果 future 内部还没有结果集,那么 await 会处于阻塞状态,否则不会阻塞,并且还会将值取出来。

import asyncio
async def delay(future, seconds):
    await asyncio.sleep(seconds)
    print("给 future 设置结果集")
    future.set_result(666)
async def main():
    # 创建一个 future
    future = asyncio.Future()
    loop = asyncio.get_running_loop()
    # 创建一个任务,扔到事件循环
    loop.create_task(delay(future, 3))
    print("await future 会陷入阻塞,因为它内部还没有结果集")
    # 该表达式会返回 666,因为给 future 设置的结果是 666
    await future
    print(f"3 秒后结束阻塞,因为 delay 协程内部给 future 设置了结果集")
asyncio.run(main())
"""
await future 会陷入阻塞,因为它内部还没有结果集
给 future 设置结果集
3 秒后结束阻塞,因为 delay 协程内部给 future 设置了结果集
"""

而协程在进入事件循环时会自动创建一个 future,并将协程和 future 组合起来得到任务,而 await 一个任务等价于 await future。当协程没有执行完毕时会处于阻塞,而协程执行完毕时会将返回值设置在 future 中,然后 await 表达式会拿到里面的结果。

在实际编码中,我们一般很少手动创建 Future 对象(future),但 Future 和 asyncio 的实现密切相关,其中就包括了锁。

当协程在获取锁时,如果发现锁已被获取,那么如何陷入阻塞呢?当锁被释放时,它又如何解除阻塞呢?答案就是通过 future。

假设协程 1 和协程 2 都要获取锁,它们都会调用锁的 acquire 方法。其中协程 1 先获取到,那么协程 2 就会创建一个 future 并 await。由于 future 内部还没有结果集,因此协程 2 会处于阻塞。当协程 1 释放锁时,会给协程 2 创建的 future 设置一个结果,从而让协程 2 解除阻塞、获取到锁。

我们手动实现一下锁。

import asyncio
from collections import deque
class Lock:
    def __init__(self):
        # 保存创建的 future
        self._waiters = deque()
        # 锁是否已被获取
        self._locked = False
    async def acquire(self):
        # 如果锁没有被获取,那么获取锁
        if not self._locked:
            self._locked = True
            return True
        # 否则说明锁已被获取,创建一个 future
        future = asyncio.Future()
        # 将它放在双端队列里面
        self._waiters.append(future)
        # 此时获取锁的协程就会陷入阻塞,等待其它协程唤醒
        await future
        # 如果解除阻塞,意味着该协程获取到锁了
        self._locked = True
        return True
    def release(self):
        # 释放锁,如果发现锁没被获取,说明对锁进行了二次释放
        if not self._locked:
            raise RuntimeError("锁没有被获取")
        # 将锁的状态改成 False,表示锁被释放了
        self._locked = False
        if len(self._waiters) == 0:
            return
        # 从双端队列 deque 的左侧弹出 future
        # 这个 future 就是某个协程在获取不到锁时创建的
        # 并通过 await future 让自身陷入阻塞状态,等待被唤醒
        future = self._waiters.popleft()
        # 拿到 future 之后,执行 future.set_result(),也就是设置结果集
        # 那么对应的协程就会解除阻塞,从而获取锁
        future.set_result(True)
        # 注意:因为 future 是从右边添加的,所以要从 deque 的左侧弹出
        # 因为先获取锁的协程要优先解除阻塞
    async def __aenter__(self):
        await self.acquire()
        return self
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.release()

整个过程非常简单,就是在获取不到锁时,创建一个 Future 对象并 await,此时就会陷入阻塞。当然获取锁的协程可能有很多,它们创建的 future 会保存在一个双端队列里面。

而拿到锁的协程,在操作完临界区并释放锁时,会从双端队列的左侧弹出一个 future,并为其设置结果集。那么创建该 future 的协程就会解除阻塞,从而获取到锁。

因此这就是 asyncio 锁的实现方式,一点都不神秘。当然 asyncio 内部还做了一些异常检测,以及检测 future 是否已取消等等,我们这里省略了。有兴趣可以看一看 asyncio 内部锁的实现细节,整体逻辑和我们这里基本一致,并且我们这里手动实现的锁在大部分场景下和 asyncio 的锁都是等效的。

然后补充一点,你在使用 asyncio 锁的时候,一定不要以全局变量的形式创建。

import asyncio
lock = asyncio.Lock()
async def a():
    async with lock:
        print("协程 a 成功获取了锁, 并进入临界区执行操作")
        await asyncio.sleep(2)
    print("协程 a 释放了锁")
async def b():
    async with lock:
        print("协程 b 成功获取了锁, 并进入临界区执行操作")
        await asyncio.sleep(2)
    print("协程 b 释放了锁")
async def main():
    await asyncio.gather(a(), b())
asyncio.run(main())

如果这样做,很快会看到崩溃的发生,并报告多个事件循环的错误:

RuntimeError: ..... attached to a different loop

这是 asyncio 库的一个令人困惑的地方,而且这种现象也不是锁特有的,asyncio 中的大多数对象在创建时都会提供一个可选的 loop 参数,允许你指定要运行的事件循环。

当未提供此参数时,asyncio 尝试获取当前正在运行的事件循环,如果没有,则创建一个新的事件循环。在上例中,创建锁的同时会创建一个事件循环,因为创建锁时还没有事件循环。然后 asyncio.run(main()) 会创建第二个事件循环,试图使用锁时,这两个独立的事件循环就会混合在一起导致崩溃。

这种行为比较棘手,因此在 Python 3.10 中会移除 loop 参数,这种令人困惑的行为也会消失。但在 3.10 之前,在使用全局 asyncio 变量时需要认真考虑这些情况。



说完了锁,再来说说信号量。锁负责保证同一时刻只能有一个协程去操作临界区,而信号量在创建时会接收一个初始值 value,可以保证同一时刻最多有 value 个协程去操作临界区。

因此可以把锁看成是初始值 value 等于 1 的信号量,它在源码中的实现和锁基本是类似的,我们也手动实现一下。

import asyncio
from collections import deque
class Semaphore:
    def __init__(self, value=1):
        self._waiters = deque()
        # 可以把 self._value 看成是令牌的数量
        # 每当一个协程进入临界区,令牌数减 1,离开临界区,令牌数加 1
        # 如果 self._value 小于等于 0,说明令牌用光了,此时就不允许进入临界区
        self._value = value
    @property
    def locked(self):
        return self._value <= 0
    async def acquire(self):
        # 如果 self._value > 0,说明可以进入临界区
        if not self.locked:
            self._value -= 1  # self._value 要减 1
            return True
        # 如果 self._value <= 0,说明此时不能进去临界区,必须等待某个协程从临界区出来
        # 那么和锁一样,也是创建一个 future 并放在双端队列里面
        future = asyncio.Future()
        self._waiters.append(future)
        # 此时获取信号量的协程会陷入阻塞
        await future
        # 解除阻塞,意味着该协程获取到信号量了
        self._value -= 1
        return True
    def release(self):
        # 释放信号量,说白了就是将 self._value 加 1
        self._value += 1
        if len(self._waiters) == 0:
            return
        future = self._waiters.popleft()
        future.set_result(True)
    async def __aenter__(self):
        await self.acquire()
        return self
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.release()

信号量和锁的实现方式是一样的,锁可以看成是 value 为 1 的信号量。当协程进入临界区,value 的值会减少 1,离开临界区 value 的值会增加 1。如果 value 为 0,那么后续协程就不允许进入临界区了,必须等到某个协程从临界区出来。

说到这,再来补充一个有界信号量,因为信号量有一个问题。

import asyncio
from asyncio import Semaphore
import time
async def bar(sem: Semaphore):
    async with sem:
        await asyncio.sleep(3)
async def main():
    # 每次允许两个协程进入临界区
    sem = Semaphore(2)
    # 创建 4 个任务
    task = [asyncio.create_task(bar(sem)) for _ in range(4)]
    # 直接对 sem 执行 release
    sem.release()
    sem.release()
    await asyncio.gather(*task)
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print(f"总耗时: {end - start}")
"""
总耗时: 3.003426834
"""

创建了 4 个任务,每次只允许两个协程进入临界区,因此总耗时应该是 6 秒才对。但问题是我们创建完信号量之后,调用了两次 release 方法,将内部的 value 值增加了 2,此时信号量就变成了同时允许 4 个协程进入临界区。

因此和锁不一样,锁一旦被释放,就不能再二次释放。而信号量被释放,其实就是将内部的 value 加 1,并且不会对内部的 value 进行检测。

import asyncio
from asyncio import Semaphore
async def main():
    sem = Semaphore(2)
    print(f"before value: {sem._value}")
    for _ in range(100):
        sem.release()
    print(f"after value: {sem._value}")
asyncio.run(main())
"""
before value: 2
after value: 102
"""

不过这个问题基本很少发生,当然也可以使用 async with 语句,这样获取和释放一定是成对出现的。

而有界信号量在信号量的基础上做了一层检测,如果在 release 的时候发现 value 已经达到了初始值,那么会报错。

d67de412d373e20ffbfefecf49152e31.png

有界信号量会将初始值 value 单独保存起来,如果释放时发现 value 大于等于初始值,那么报错。但是注意:有界信号量依旧可以多次 release,不过我们基本不会这么干,因为获取和释放应该是成对出现的。

以上我们就说完了协程里面的锁和信号量,再来看看线程提供的。


线程锁和信号量



线程锁可以通过 threading 模块创建。

import threading
lock = threading.Lock()

注意:Lock 并不是一个类,而是一个函数,看一下源代码。

Lock = _allocate_lock
# threading.Lock() 其实就是 _thread.allocate_lock()
_allocate_lock = _thread.allocate_lock

调用 _thread.allocate_lock() 时会在内部创建锁,而锁是由 _thread 模块实现的。

import threading
import _thread
lock = threading.Lock()
print(type(lock))
"""
<class '_thread.lock'>
"""
lock = _thread.allocate_lock()
print(type(lock))
"""
<class '_thread.lock'>
"""

所以线程锁其实是一个 _thread.lock 对象。

补充一下,Python 有很多的模块是由 C 实现的,因为它们和性能密切相关,编译之后会内嵌在解释器里面。举个例子:

import random, _random
import re, _sre
import ssl, _ssl
import io, _io
import bisect, _bisect
import heapq, _heapq
import asyncio, _asyncio
import threading, _thread

这些 C 实现的模块,名字前面一般会带有一个下滑线,它们内嵌在解释器里面,你在 Lib 目录下是找不到的。但我们不需要直接使用这些模块,解释器会提供相应的 Python 模块对其进行封装。

我们只需要导入 Python 模块即可,在内部会调用具体的 C 实现,以 io 模块为例。

58eb656a983490cfe506327d49d694d9.png

这些类都是 _io 实现的,而 io 只是做了一层封装,因此在实际编码时会使用 C 实现的 _io 模块里的逻辑。

再比如内置函数 open,它其实就是 io.open,而 io 里面的 open 是从 _io 导入进来的。

import io
import _io
print(open is io.open is _io.open)  # True

好了,说了这么多只是想表示线程锁的具体实现不在 threading 里面,而是在 _thread 里面。_thread 是一个 C 实现的模块,我们需要到解释器里面才能看到具体实现。

在 Modules/_threadmodule.c 中,有一个结构体实例 Locktype,它便是 _thread.lock 这个类的底层实现。

272dbc6f500094288594f5127b67ab21.png

_thread.lock 实例化后会得到锁,锁在底层对应的是 lockobject 结构体。

// _threadmodule.c
typedef struct {
    PyObject_HEAD
    PyThread_type_lock lock_lock;
    PyObject *in_weakreflist;
    char locked;
} lockobject;
// pythread.h
typedef void *PyThread_type_lock;

解释一下这个结构体。

PyObject_HEAD

每个对象都具备的头部信息,它包含了对象的引用计数和类型。

lock_lock

PyThread_type_lock 是 void * 的类型别名,所以 lock_lock 是一个 void * 类型的指针,该指针指向了真正的锁,这个锁是底层操作系统提供的。

和协程锁不同,由于操作系统感知不到协程,因此协程锁是基于 Future 对象实现的。但线程锁则是基于操作系统实现的,当 Python 代码创建锁、获取锁、解锁时,会通过 lock_lock 指针将这些操作转发到具体的锁实现上。

in_weakreflist

用于创建弱引用,关于什么是弱引用,我在之前的文章中介绍过。

locked

用于标记锁状态,把它当成 Python 的布尔值即可,值为 1 表示锁已被获取(已锁定),0 表示未被获取(未锁定)。

这几个字段应该很好理解,然后我们来看一下锁的具体方法,那么方法都定义在哪呢?我们说过,实例对象有哪些行为,取决于类型对象定义了哪些操作。

因此锁的操作都定义在 Locktype 里面,由内部的 tp_methods 字段负责维护。

db6b15c41c6a0bf121f411170a2c4f18.png

该字段被赋值为 lock_methods,所以锁的方法都在 lock_methods 数组中。

b9b7b3f56567b436523efac44cd4eca6.png

以上就是锁能够使用的方法,我们来验证一下。

import threading
lock = threading.Lock()
# acquire_lock 和 acquire 基本是等价的
# release_lock 和 release 也基本是等价的
# 不过我们一般都会使用 acquire 和 lock
lock.acquire_lock()  # 获取锁
lock.release_lock()  # 释放锁
lock.acquire()  # 获取锁
lock.release()  # 释放锁
# 同理 locked_lock 和 locked 也是等价的
# 表示锁是否被获取(已锁定),不过我们一般使用 locked
print(lock.locked_lock())
print(lock.locked())
lock.acquire()
print(lock.locked_lock())
print(lock.locked())
lock.release()
"""
False
False
True
True
"""
# 还提供了上下文管理,等价于 lock.acquire + lock.release
with lock:
    pass

好了,接下来我们看看 acquire 方法,也就是锁是怎么获取的。

static PyObject *
lock_PyThread_acquire_lock(
    lockobject *self, 
    PyObject *args, 
    PyObject *kwds
){
    _PyTime_t timeout;  // 超时时间
    // 一个枚举,表示锁状态,有三个可选值
    // PY_LOCK_FAILURE:表示因为锁已被持有,而获取失败
    // PY_LOCK_ACQUIRED:表示锁可用,并成功获取锁
    // PY_LOCK_INTR:表示获取锁的操作被中断,比如抵达超时时间
    PyLockStatus r;
    
    // 参数解析,该方法接收一个 timeout 参数
    if (lock_acquire_parse_args(args, kwds, &timeout) < 0)
        return NULL;
    
    // 获取锁,并指定一个超时时间,不传则表示没有超时时间
    // 那么在获取不到锁时,会无限等待
    r = acquire_timed(self->lock_lock, timeout);
    // 如果返回的状态为 PY_LOCK_INTR,说明达到超时时间
    // 因此获取锁的操作被中断,并且会抛出异常
    if (r == PY_LOCK_INTR) {
        return NULL;
    }
    // 如果返回的状态为 PY_LOCK_ACQUIRED,表示锁获取成功
    // 将锁的 locked 字段设置为 1,表示锁已被获取
    if (r == PY_LOCK_ACQUIRED)
        self->locked = 1;
    // 如果以上两种状态都不是,那么说明获取失败了
    // 将 r == PY_LOCK_ACQUIRED 转成布尔值返回
    // 获取成功返回 True,获取失败返回 False
    return PyBool_FromLong(r == PY_LOCK_ACQUIRED);
}

整个过程仍然很简单,因此我们看到协程锁和线程锁的实现是类似的,它们都有一个 locked 字段用于表示锁是否已被获取。

只不过协程锁是基于 Future 对象实现的,当 await future 陷入阻塞时,表示锁已被其它协程获取。当解除阻塞时,代表锁被释放了,自己获取到锁。

而线程锁是基于操作系统实现的,它本质上是对操作系统提供的锁做了一个封装。Python 线程在获取锁时,底层会获取操作系统的锁。

而操作系统的锁是怎么获取的呢?在源码中使用的是 acquire_time 函数,它接收一个指针和一个超时时间。该指针便是 lockobject 的 lock_lock 字段,类型是 void *,它指向了操作系统提供的锁实现。

649c3ab6b22c18d3bf54c98675789f51.png

acquire_time 函数做了一些参数处理后,又调用了 PyThread_acquire_lock_timed  函数,显然获取锁的逻辑位于该函数里面。

PyThread_acquire_lock_timed 函数在不同平台有着不同的实现,因为不同操作系统的锁实现是不是一样的,所以源码中使用 void *。

1098c086b3149d1abcd267a1c5535740.png

我们以 Windows 系统为例:

36ee6f2e22044e02d5ebf86fbddec3ad.png

虽然不同系统的函数实现不一样,但参数是一致的。

  • aLock:void * 指针,指向操作系统提供的锁;
  • microseconds:等待锁的时间,以微妙为单位。如果值是负数,表示无限等待,直到获取锁;
  • intr_flag:如果设置为 1,那么当等待过程中出现了信号中断时,函数会提前返回。

函数的核心实现如下:

81e738478b296ea02b28e11b4b22165e.png

又调用了 EnterNonRecursiveMutex 函数,该函数是真正获取锁的逻辑,参数 aLock 指向了操作系统的互斥锁。前面说过,不同系统有着不同的锁实现,所以具体使用时需要转换。在 Windows 系统上,它被转成了 PNRMUTEX。

typedef struct _NRMUTEX
{   
    // 对操作系统互斥锁的封装
    PyMUTEX_T cs;
    // 对条件变量的封装,用于线程间的同步
    // 允许线程在条件不满足时等待,条件满足时由其它线程通知等待的线程
    // 条件变量一般和互斥锁一起使用,避免竞争条件和死锁
    PyCOND_T cv;
    // 标记互斥锁是否已被获取,1 表示已被获取,0 表示未被获取
    int locked;
} NRMUTEX;
typedef NRMUTEX *PNRMUTEX;

所以 lockobject 的 lock_lock 指针指向的其实依旧不是 OS 互斥锁,而是一个结构体实例,结构体内部的字段 cs 封装的才是 OS 互斥锁。

lockobject 是线程锁,也就是 Python 代码中使用的锁的底层实现,而 NRMUTEX 则是封装了操作系统提供的互斥锁。注意这里面的两个 locked,它们都用于标记锁是否已被获取。

最后来看看 EnterNonRecursiveMutex 函数的具体逻辑。

DWORD
EnterNonRecursiveMutex(PNRMUTEX mutex, 
                       DWORD milliseconds)
{
    
    DWORD result = WAIT_OBJECT_0;
    // 对 OS 互斥锁进行锁定,用于保护共享数据,如果锁定失败直接返回
    if (PyMUTEX_LOCK(&mutex->cs))
        return WAIT_FAILED;
    // 如果锁定成功,那么将 locked 字段设置为 1,表示互斥锁被获取
    // 但如果发现 locked 已经为 1 了,则说明已有别的线程将 locked 修改为 1
    // 那么当前线程就要等待,直到 locked 不为 1(锁被释放)
    if (milliseconds == INFINITE) {
        // 无限等待
        while (mutex->locked) {
            if (PyCOND_WAIT(&mutex->cv, &mutex->cs)) {
                result = WAIT_FAILED;
                break;
            }
        }
    } else if (milliseconds != 0) {
        // 有时间限制的等待
        ULONGLONG now, target = GetTickCount64() + milliseconds;
        while (mutex->locked) {
            if (PyCOND_TIMEDWAIT(
                &mutex->cv, &mutex->cs, 
                (long long)milliseconds*1000) < 0) 
            {
                result = WAIT_FAILED;
                break;
            }
            now = GetTickCount64();
            if (target <= now)
                break;
            milliseconds = (DWORD)(target-now);
        }
    }
    // 在被唤醒之后,说明当前线程获取互斥锁成功,于是将 locked 改成 1
    if (!mutex->locked) {
        mutex->locked = 1;
        result = WAIT_OBJECT_0;
    } else if (result == WAIT_OBJECT_0)
        result = WAIT_TIMEOUT;
    // 这里必须将操作系统的锁释放掉,因为对于外界的线程而言,
    // 锁是否被获取(锁定),取决于 locked 字段是否为 1
    PyMUTEX_UNLOCK(&mutex->cs); 
    return result;
}

代码逻辑有一些让人疑惑的地方,下面解释一下。Python 里面调用 lock.acquire() 方法时,表示要获取线程锁。但获取线程锁之前,要先获取 OS 互斥锁,如果获取不到,那么压根不允许进入临界区。

但解释器在互斥锁的基础上又封装了一层,如果获取到了互斥锁,还要将 locked 字段修改为 1。因为从代码逻辑上讲,无论是线程锁还是互斥锁,只有当它们内部的 locked 字段为 1 时,才算是获取了锁。

所以将互斥锁的 locked 字段修改为 1 之后,后续还要将线程锁的 locked 字段修改为 1,这样才算是获取了线程锁。

到这里估计可能有人会产生一个疑问,为啥函数在一开始要获取系统的互斥锁,最后又释放掉,这岂不是多此一举?

if (PyMUTEX_LOCK(&mutex->cs))
        return WAIT_FAILED;
    //...
    PyMUTEX_UNLOCK(&mutex->cs);

直接检测 locked 字段是否等于 1 不就行了吗?其实原因有三个:

  • 保护共享状态:操作系统的互斥锁 mutex-> cs 用于保护共享状态 mutex -> locked 的读写,在多线程环境中,任何对共享状态的访问都要同步,以防止竞态条件;
  • 条件变量的同步:在使用条件变量 mutex -> cv 时,通常需要结合互斥锁使用,条件变量的等待和通知需要在互斥锁的保护下进行,以保证操作的原子性;
  • 避免忙等待:如果只使用 mutex -> locked 进行检查,可能会陷入忙等待,即不断地检查锁状态而占用 CPU 资源。使用互斥锁和条件变量可以让线程在等待时被挂起,从而更有效地利用 CPU;


所以解释器为 OS 互斥锁引入了一个自定义的锁状态 locked,OS 互斥锁提供了对 locked 的基本保护,因为多个线程都要修改它。而自定义的锁状态 locked 则用于实现同步逻辑,如果 locked 为 1,我们就认为锁被获取了,locked 为 0,锁就没有被获取。

协程锁和线程锁都是如此,所谓的获取锁、释放锁都是在修改 locked 字段的值。只不过在等待的时候,协程锁使用的是 Future 对象,而线程锁使用的是操作系统提供的互斥锁和条件变量。

所以上面代码中的 PyMUTEX_LOCK 通过之后,还要检测 locked 字段是否等于 1,代码片段如下。

while (mutex->locked) {
            if (PyCOND_WAIT(&mutex->cv, &mutex->cs)) {
                result = WAIT_FAILED;
                break;
            }
        //...

如果 locked 是 1,说明互斥锁已经被获取了,当前线程要进行等待,直到 locked 字段的值为 0。当其它线程释放锁时,会将 locked 字段修改为 0,并通过条件变量唤醒当前线程。

该线程醒来后检测到 locked 为 0,就知道互斥锁已被释放,自己可以获取了,于是再将 locked 字段修改为 1。

说完了线程锁的获取,再来看看线程锁的释放,所谓释放,其实就是将 locked 字段修改为 0 而已。

a9d9702b7c3a194a025649288610a98c.png

释放互斥锁的逻辑最终会调用如下函数:

deaadbb81bbde0bd61d4a40c461108d3.png

修改 locked 是不安全的,需要加锁保护。所以 OS 互斥锁就是为了保护 locked 变量的修改,再配合条件变量实现阻塞等待以及自动唤醒,但从代码逻辑上讲,将 locked 字段设置为 0,才算是真正释放了锁。

这部分逻辑稍微有点绕,总之记住一个重点:所谓的锁,它的核心就是结构体的一个字段,这里是 locked。如果字段的值为 1,表示锁被获取了,字段的值为 0,表示锁没有被获取。

  • 而获取锁,本质上就是将 locked 字段修改为 1;
  • 而释放锁,本质上就是将 locked 字段修改为 0;

当锁没有被获取时,那么线程在获取锁和释放锁时的逻辑可以简化为如下:

7f8fb4728103f617e6f27f04ee9e1f43.png

但实际情况会有多个线程一起竞争锁,因此为了保护这个共享字段,以及实现阻塞等待和自动唤醒,解释器使用了操作系统的互斥锁和条件变量。


小结



以上我们就剖析了协程锁、信号量以及线程锁的实现原理,至于线程里面的信号量,它的原理和协程的信号量是一样的,只是实现方式不一样。

bd47c7fc86e99a9facc26a5e3fa7eec7.png

线程的信号量包含了一个初始值 value,但它在实现阻塞等待以及唤醒的时候用的是条件变量,而条件变量的实现依赖于锁。简单来说,获取信号量的时候,self._value 会减 1,释放信号量的时候,self._value 会加 1。

当 self._value 为 0 时,获取信号量会陷入阻塞,而当某个线程退出临界区释放信号量的时候,会通过条件变量的 notify 机制唤醒阻塞的线程。

关于条件变量,我们以后再分析,有点饿了。

另外进程也有锁和信号量,这里也先不讨论了,有点困了。

相关文章
|
12天前
|
消息中间件 并行计算 安全
进程、线程、协程
【10月更文挑战第16天】进程、线程和协程是计算机程序执行的三种基本形式。进程是操作系统资源分配和调度的基本单位,具有独立的内存空间,稳定性高但资源消耗大。线程是进程内的执行单元,共享内存,轻量级且并发性好,但同步复杂。协程是用户态的轻量级调度单位,适用于高并发和IO密集型任务,资源消耗最小,但不支持多核并行。
32 1
|
24天前
|
存储 消息中间件 人工智能
进程,线程,协程 - 你了解多少?
本故事采用简洁明了的对话方式,尽洪荒之力让你在轻松无负担的氛围中,稍微深入地理解进程、线程和协程的相关原理知识
39 2
进程,线程,协程 - 你了解多少?
|
13天前
|
消息中间件 并行计算 安全
进程、线程、协程
【10月更文挑战第15天】进程、线程和协程是操作系统中三种不同的执行单元。进程是资源分配和调度的基本单位,每个进程有独立的内存空间;线程是进程内的执行路径,共享进程资源,切换成本较低;协程则更轻量,由用户态调度,适合处理高并发和IO密集型任务。进程提供高隔离性和安全性,线程支持高并发,协程则在资源消耗和调度灵活性方面表现优异。
38 2
|
30天前
|
存储 运维 API
源码解密协程队列和线程队列的实现原理(一)
源码解密协程队列和线程队列的实现原理(一)
32 1
|
19天前
|
Java 应用服务中间件 测试技术
Java21虚拟线程:我的锁去哪儿了?
【10月更文挑战第8天】
28 0
|
24天前
|
安全 调度 数据安全/隐私保护
iOS线程锁
iOS线程锁
24 0
|
28天前
|
Java API
【多线程】乐观/悲观锁、重量级/轻量级锁、挂起等待/自旋锁、公平/非公锁、可重入/不可重入锁、读写锁
【多线程】乐观/悲观锁、重量级/轻量级锁、挂起等待/自旋锁、公平/非公锁、可重入/不可重入锁、读写锁
27 0
|
28天前
|
安全 Java 程序员
【多线程-从零开始-肆】线程安全、加锁和死锁
【多线程-从零开始-肆】线程安全、加锁和死锁
36 0
|
28天前
|
安全 Linux
Linux线程(十一)线程互斥锁-条件变量详解
Linux线程(十一)线程互斥锁-条件变量详解
|
5月前
|
Go Python
使用python实现一个用户态协程
【6月更文挑战第28天】本文探讨了如何在Python中实现类似Golang中协程(goroutines)和通道(channels)的概念。文章最后提到了`wait_for`函数在处理超时和取消操作中的作
50 1
使用python实现一个用户态协程