互斥锁的优化与应用

简介: 互斥锁的优化与应用

引言

  • 在上一章节中,我们初步实现了互斥锁机制,然而,我们实现的互斥锁却有着各种缺陷,本章节中,我们就来针对性的解决这些缺陷

问题一的优化

  • 针对上一章节的问题一,我们优化一下 “u_syscall.c” 中的代码,让任务 B 先尝试加锁在向下执行
E_RET MutexLock(MUTEX* mutex)
{
    do 
    {
        _SYS_CALL1(_NR_MutexLock, mutex);
    }
    while(!mutex->lock);
    return E_OK;
}
  • 注意:在 “u_syscall.h” 头文件中需要明确 MUTEX 类型定义(由于该类型定义还包含 LIST_NODE 和 QUEUE 类型,可以把内核中 “list.h” 头文件和 “queue.h” 头文件复制一份到 “user” 目录下的 “include” 文件夹中,不然 “user” 中的代码编译会报错)
// typedef void MUTEX;
typedef struct MUTEX
{
    LIST_NODE   node;       // 互斥锁链表节点
    U08         lock;       // 锁状态
    QUEUE       wait;       // 等待该互斥锁的任务队列(每个互斥锁都有一个等待队列)
} MUTEX;

问题二的优化

  • 重新设计一下互斥锁的 lock 元素,原先我们用 0 代表解锁状态,1 代表加锁状态,使用 U08 类型就可以表示两种状态,现在将其改为 U32 类型
typedef struct MUTEX
{
    LIST_NODE   node;       // 互斥锁链表节点
    // U08         lock;    // 锁状态
    U32         lock;       // 锁状态
    QUEUE       wait;       // 等待该互斥锁的任务队列(每个互斥锁都有一个等待队列)
} MUTEX;
  • 加锁时 lock 不再赋值为 1,而是将其赋值为当前任务对象的首地址赋值给 lock,同时我们还要避免同一个任务连续多次上锁导致把自己也挂起了,此时就再也无人可以解锁了
E_RET SYS_MutexLock(MUTEX* mutex)
{
    ...
    // 如果互斥锁处于空闲状态,则将其上锁,否则,将当前任务放到等待队列中并立即切换到下一个任务
    if(0 == mutex->lock)
    {
        printk(" %s Lock\n", current_task->name);
        // mutex->lock = 1;
        mutex->lock = (U32)current_task;
    }
    else
    {
        // 避免同一个任务连续加锁多次
        if((U32)current_task != mutex->lock)
        {
            printk(" %s enters the waiting queue\n", current_task->name);
            MutexSuspend(mutex);
        }
        return E_ERR;
  }
    ...
}
  • 解锁时,我们要做一下限制,谁加锁,谁才有解锁权限,其它任务不能解锁
E_RET SYS_MutexUnLock(MUTEX* mutex)
{
    ...
    // 加锁任务与解锁任务必须相同,不允许其它任务非法解锁
    if((U32)current_task != mutex->lock)
        return E_ERR;
    // 将互斥锁设置为空闲可用状态,并将互斥锁等待队列中的任务放回到就绪队列中
    mutex->lock = 0;
    printk(" %s Unlock\n", current_task->name);
    MutexResume(mutex); 
    ...
}
  • 销毁互斥锁我们也要做限制,互斥锁必须处在解锁状态才能销毁
E_RET SYS_MutexDestory(MUTEX* mutex)
{
    ...
    // 互斥锁解锁状态才能被销毁
    if(mutex->lock)
        return E_ERR;
    // 删除链表节点并释放内存
    ListDelNode(&mutex->node);
    Free(mutex);
    ...
}

生产者消费者基础代码模型

  • 互斥锁的一些漏洞我们也已经修复了,接下来,我们将在实际应用中检验互斥锁是否可用
  • 简单生产消费问题
  • 有 n 个生产者同时制造产品,并把产品存入仓库中
  • 有 m 个消费者同时需要从仓库中取出产品
  • 规则:一次只允许一个人进入仓库,生产者和消费者不能同时进入仓库
  • 将上述现实中的生产消费问题具现到计算机中,假设有 2 个生产者,2 个消费者,一个仓库
  • Producer A -生产者,生产 26 个大写字母,并放入仓库
  • Producer B -生产者,生产 26 个小写字母,并放入仓库
  • Consumer A -消费者,从仓库中取出大写字母并打印
  • Consumer B -消费者,从仓库中取出小写字母并打印
  • 继续具现
  • 使用链表模拟仓库(临界资源)
  • 任务 ProducerA 模拟生产者,向仓库(链表)中放入大写字母
  • 任务 ProducerB 模拟生产者,向仓库(链表)中放入小写字母
  • 任务 ConsumerA 模拟消费者,从仓库(链表)中获取大写字母并打印
  • 任务 ConsumerB 模拟消费者,从仓库(链表)中获取小写字母并打印
  • 生产者生产 MAX 个产品后结束生产
  • 想要保证仓库最多同时只有一个人进入,可以使用互斥锁保护仓库(临界资源)
  • 在开始写代码之前,我们先来看一下生产者消费者代码基础模型,很多生产者消费者代码实现都是从这个模型演化而来的
  • 先看 Producer 生产者,在 while 循环中,首先尝试获取互斥锁,在拿到互斥锁之后,就可以生产商品,并将商品入库,并将商品数量 gPutNum 加 1,同时 gPutNum 可以用来决定生产者要不要继续生产商品,在示例模型代码中,生产者最多生产 MAX 个商品就会停止生产
  • 在示例代码中,我们看到跳出 while 循环靠的是 running 变量,为什么不像下面这么写呢?
while(1)
{
    MutexLock(gMutex);
    if(gPutNum < MAX)
    {
        // 生产商品,放入仓库
        gPutNum++; // 商品计数
    }
    else
    {
        break;
    }
    MutexUnLock(gMutex);
}
  • 上面写法一看就有个巨大的漏洞,那就是 break 后就没人能够释放互斥锁了,所以必须重新定义一个变量作为退出循环的标志
  • 分析过生产者代码模型之后,我们来看一下消费者 Consumer 代码模型,消费者想要安全的从仓库中拿到商品,肯定也需要互斥锁的保护,有了互斥锁之后才能保证最多同时只有一个生产者或者消费者能够进入仓库,消费者的代码逻辑跟生产者几乎是一致的,所以就没必要再过多的分析了

互斥锁的应用一

  • 接着生产者消费者基础代码模型,我们继续来完整的实现一个实验代码
  • 由于示例代码写在 “app.c” 中,其中用到了内存分配和链表的接口函数,然而这些功能我们只在内核中实现,想要在用户程序中使用这些功能,有两种办法,一种是将相关接口通过系统调用的方式暴露出来,还有一种就是再用户程序直接复制一份相关代码即可
  • 由于系统调用方式我懒得实现,所以选择复制一份,实现起来简单
  • 动态内存分配代码全部在 “memory.c” 和 “memory.h” 中,链表相关代码在 “list.c” 和 “list.h” 中 所以把内核中这几个文件复制一份到 “user” 目录下对应的文件夹中即可,当然,复制到 “app” 文件夹下也行。复制完后记得修改对应目录下的 “BUILD.json” 配置文件,这个就不细说了,应该很熟悉了
  • 接下来就只剩下在 “app.c” 中实现实验代码了
  • 由于使用了动态内存分配以及链表,所以要提前做好初始化工作
void AppInit(void)
{
    MemInit((U08 *)&AppHeap, 4096);
    ListInit(&gStore); // 初始化仓库
    ...
}
  • 在调试过程中发现定义 U08 AppHeap[4096]; 数组时由于编译器优化,编译器好像并没有给数组分配实际空间,使用 volatile 关键字也不行,通过尝试发现数组定义完必须赋值一个非 0 值才行,就像这样 U08 AppHeap[4096] = {0xff}; 你要问我怎么发现的,那我只能说我定义一个很大的数组,结果编译生成的 “app.bin” 程序文件大小并不增大,这样子看之前定义的任务私有栈好像也有问题,好吧,这个就这样吧,不深究了,毕竟编译原理我也不是很懂
  • 首先把一些基础定义实现了
#define MAX 60              // 生产商品最大数量
U32 gPutNum = 0;            // 用于生产者计数
U32 gGetNum = MAX;          // 用于生产者计数
MUTEX* gMutex = NULL;       // 互斥锁
  • 接下来定义仓库
static LIST gStore; // 仓库链表
  • 仓库定义完成后我们在来定义商品,data 才是实际商品,node 用于链表操作
typedef struct PRODUCT
{
    LIST_NODE node;
    U08 data;
} PRODUCT;
  • 仓库和商品都定义好了,接下来我们就来实现两个功能,一个是向仓库中放入商品,另一个是从仓库中拿去商品,其中取商品时根据参数 type 来判断取出的商品种类, type=='A' 时拿取大写字母,type=='B' 时拿取小写字母
// 存
static void Store(U08 ch)
{
    PRODUCT* p = NULL;
    p = Malloc(sizeof(PRODUCT));
    if( NULL == p)
        return;
    p->data = ch;
    ListAddTail(&gStore, (LIST_NODE *)p);
}
// 取
static BOOL Fetch(char type, U08* ch)
{
    BOOL ret = FALSE;
    PRODUCT* p = NULL;
    LIST_NODE* pNode = NULL;
    LIST_FOR_EACH(&gStore, pNode)
    {
        p = (PRODUCT *)pNode;
        if('A' == type)
            ret = (p->data >= 'A') && (p->data <= 'Z');
        else if('B' == type)
            ret = (p->data >= 'a') && (p->data <= 'z');
        if(ret)
        {
            *ch = p->data;
            ListDelNode(pNode);
            Free(p);
            break;
        }
    }
    return ret;
}
  • 最后就是生产者消费者代码实现了,代码实现跟生产者消费者模型代码一致,仅多了实际的放入商品和取出商品的代码实现
void ProducerA(void) // 生产者 A
{
    U08 running = 0;
    U32 next = 0;
    gMutex = MutexCreat();
    while(1)
    {
        MutexLock(gMutex);
        if(running = (gPutNum < MAX))
        {
            // 生产商品,放入仓库
            U08 c = 'A' + next % 26;
            Store(c);
            SetCursorPos(next++, 8); // 打印到第 8 行
            print("%c", c);
            gPutNum++;
        }
        MutexUnLock(gMutex);
        if(running) 
            Delay(777777);
        else    
            break;
    }
}
void ProducerB(void)  // 生产者 B
{
    ...(省略,与生产者 A 的实现几乎一样)
}
void ConsumerA(void) // 消费者 A
{
    U08 running = 0;
    U32 next = 0;
    while(1)
    {
        MutexLock(gMutex);
        if(running = (gGetNum > 0))
        {
            // 商品出库,消耗库存
            U08 c = 0;
            if(Fetch('A', &c))
            {
                SetCursorPos(next++, 12); // 打印到第 12 行
                print("%c", c);
                gGetNum--; // 商品计数
            }
        }
        MutexUnLock(gMutex);
        if(running) 
            Delay(99999);
        else    
            break;
    }
}
void ConsumerB(void)  // 消费者 B
{
    ...(省略,与消费者 A 的实现几乎一样)
}
• 最后的最后,只要把生产者 A、B 和消费者 A、B 任务注册即可
void AppInit(void)
{
    ...
    AppRegister(ProducerA, PAStack, 256, "ProducerA", E_APP_PRI5);
    AppRegister(ProducerB, PBStack, 256, "ProducerB", E_APP_PRI5);
    AppRegister(ConsumerA, CAStack, 256, "ConsumerA", E_APP_PRI5);
    AppRegister(ConsumerB, CBStack, 256, "ConsumerB", E_APP_PRI5);
    ...
}
  • 本次实验完整代码见:app.c
  • 本次实验效果如下,其中,生产者 A、B 共生产 MAX=60 个商品,消费者 A、B 也一共取出 MAX=60 个商品

互斥锁的完善

  • 回顾一下前面实现的互斥锁机制,当互斥锁上锁后,再想获取该互斥锁的任务都会被挂起放到等待队列中,导致任务无法继续往下执行,如果我们确实需要程序跳过临界区继续向下执行怎么办
  • 目前我们实现的互斥锁相关功能函数肯定是无法满足需求的,于是我们准备在 “mutex.c” 文件中再实现一个功能函数 SYS_MutexTryLock(),其与 SYS_MutexLock() 不同之处在于 SYS_MutexLock() 会将任务挂起,从而导致程序无法继续向下执行(除非唤醒),而 SYS_MutexTryLock() 不会将任务挂起,程序依旧可以继续向下执行。SYS_MutexTryLock() 具体实现如下:
E_RET SYS_MutexTryLock(MUTEX* mutex)
{
    // 检查参数合法性
    if(NULL == mutex || !CheckMutex(mutex))
        return E_ERR;
    // 如果互斥锁处于空闲状态,则将其上锁,否则,返回加锁失败
    if(0 == mutex->lock)
    {
        mutex->lock = (U32)current_task;
        return E_OK;
    }
    return E_ERR;
}
  • 想要让用户程序调用到该内核函数,依旧要使用系统调用方式,用户 user 中也要增加接口实现
E_RET MutexTryLock(MUTEX* mutex)
{
    return _SYS_CALL1(_NR_MutexTryLock, mutex);
}
  • 关于系统调用的实现流程都说过好几遍了,在这里就不再叙述了
  • 相关改动代码见:mutex.cmutex.hsyscall.cu_syscall.cu_syscall.h
  • 接下来写个测试代码验证以下,仅改动 ProducerA() 和 ProducerB() 函数即可,将其中 MutexLock() 改为 MutexTryLock()
void ProducerA(void) // 生产者 A  
{
    U08 running = 0;
    U32 next = 0;
    gMutex = MutexCreat();
    while(1)
    {
        if(E_OK == MutexTryLock(gMutex))
        {
            if(running = (gPutNum < MAX))
            {
                // 生产商品,放入仓库
                U08 c = 'A' + next % 26;
                Store(c);
                SetCursorPos(next++, 8); // 打印到第 8 行
                print("%c", c);
                gPutNum++;
            }
            MutexUnLock(gMutex);
        }
        if(running) 
            Delay(777777);
        else    
            break;
    }
}
// ProducerB 省略
  • 完整测试代码见:app.c
  • 运行效果如下:

  • 你也可以在 MutexTryLock() == E_ERR 时添加打印,看看获取锁失败后任务会不会挂起,我已经实测过了,这里就懒得再写了,就这样吧
相关实践学习
部署Stable Diffusion玩转AI绘画(GPU云服务器)
本实验通过在ECS上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。
目录
相关文章
|
6月前
多线程并发锁的方案—原子操作
多线程并发锁的方案—原子操作
|
6月前
|
Java 编译器
多线程(锁升级, 锁消除, 锁粗化)
多线程(锁升级, 锁消除, 锁粗化)
56 1
|
6月前
|
Java
什么锁比读写锁性能更高?
Java并发编程中,ReentrantReadWriteLock是高效的锁机制,但在高并发环境下,乐观锁(如CAS)和JDK 8引入的StampedLock可提供更高性能。StampedLock支持读锁、写锁和乐观读锁,其乐观读锁在读多写少的场景下能提升并发性能,通过tryOptimisticRead方法实现。当乐观读锁无效时,可无缝切换至悲观读锁。
|
11月前
|
API 调度 C语言
互斥锁,自旋锁,原子操作的原理,区别和实现
v互斥锁,自旋锁,原子操作的原理,区别和实现
121 0
|
6月前
多线程并发锁的方案—互斥锁
多线程并发锁的方案—互斥锁
|
6月前
多线程并发锁方案—自旋锁
多线程并发锁方案—自旋锁
|
6月前
|
编译器 C++
vs2017下原子操作和互斥锁性能比较
vs2017下原子操作和互斥锁性能比较
|
6月前
|
Java 编译器 程序员
synchronized 原理(锁升级、锁消除和锁粗化)
synchronized 原理(锁升级、锁消除和锁粗化)
|
12月前
|
算法
互斥锁原理
互斥锁原理
94 0
|
Java 编译器
Java多线程【锁优化与死锁】
Java多线程【锁优化与死锁】
Java多线程【锁优化与死锁】