【Linux】多线程02 --- 线程的同步互斥问题及生产消费模型

简介: 【Linux】多线程02 --- 线程的同步互斥问题及生产消费模型

线程同步互斥问题是指多线程程序中,如何保证共享资源的正确访问和线程间的协作。

因为线程互斥是实现线程同步的基础和前提,我们先讲解线程互斥问题。


一、线程互斥

1. 为什么要有共享资源临界保护?


在多线程中,假设我们有一个黄牛抢票的代码,其中有一份共享资源tickets,如果多个线程都在抢票也就是对这个全局变量tickets做–操作,如果我们没有对共享资源做保护(同一时间只能一个线程对资源进行访问)的话,就会存在并发访问的问题,进而导致数据不一致问题!这种情况下,票数最后会出现负数的情况。


ea8716a78ec14c3ab5390045dff55314.png


那为什么会出现并发访问导致数据不一致问题呢?–


了解上面的问题需要知道线程调度的特性,实际线程在被调度时他的上下文会被加载到CPU的寄存器中,而线程在被切换的时候,线程又会带着自己的上下文被切换下去,此时要进行线程的上下文保存,以便于下次该线程被切换上来的时候能够进行上下文数据的恢复。


除此之外,像tickets- -这样的操作,对应的汇编指令其实至少有三条,1.读取数据 2.修改数据 3.写回数据,而线程函数我们知道会在每个线程的私有栈都存在一份,在上面的例子中多个线程执行同一份线程函数,所以这个线程函数就绝对会处于被重入的状态,也就绝对会被多个线程执行!今天我们假设只有一个CPU(CPU就是核心,处理器芯片会集成多个核心)在调度当前进程中的线程,那么线程是CPU调度的基本单位,所以也就会出现一个线程可能执行一半的时候被切换下去了,并且该线程的上下文被保存起来,然后CPU又去调度进程中的另一个线程。


7b232b7806504c619f7f8b9cb0e67f49.png


当多个线程同时进入到分支判断语句,然后去阻塞等待的情况,假设tickets已经变成了1,然后其余的线程此时都被调度上来了,他们都开始执行tickets- -,- -之后不满足循环条件线程才会退出,那么如果我们创建出了4个线程,就会有3个线程在票数已经为0的情况下继续减减,所以就会出现票数为负数的情况


要解决以上的问题,我们提出的解决方案就是:加锁


在学习锁之间先搞清两个概念:


临界资源是指一次仅允许一个进程或线程使用的共享资源,如文件、变量等。

临界区是指每个进程或线程中访问临界资源的那段代码,需要保证互斥和同步的执行。


临界资源和临界区的区别是:


临界资源是一种系统资源,需要不同进程或线程互斥访问,而临界区则是每个进程或线程中访问临界资源的一段代码,是属于对应进程或线程的。

临界资源是一种抽象的概念,表示需要保护的共享数据或设备,而临界区是一种具体的实现,表示访问临界资源的具体操作和逻辑。

临界资源是一种静态的属性,表示某种资源是否可以被多个进程或线程同时使用,而临界区是一种动态的状态,表示某个进程或线程是否正在使用某种临界资源。



03868b85ccf74fec8c354b53c62eb9d3.png



2.理解加锁


2.1 认识锁,使用锁


如果我们想让多个执行流串行的访问临界资源,而不是并发或并行的访问临界资源,这样的线程调度方案就是互斥式的访问临界资源!(串行就是指只要一个线程开始执行这个任务,那么他就不能中断,必须得等这个线程执行完这个任务,你才能切换其他线程执行其他的任务)


加锁后线程的操作是原子性的,怎么理解?


当线程在执行一个对资源访问的操作时,要么做了这个操作,要么没有做这个操作,只要两种状态,不会出现做了一半这样的状态,我们称这样的操作是原子性的。(就比如你妈让你写作业,你要么给我把作业写完了再出去玩,要么就一个字也别写给我滚出家门,就这两种状态,不会出现你写了一半,然后你妈让你出去玩的这种情况,这样也是原子性)


我们下面讲解互斥锁


首先锁实际就是一种数据类型,这个锁就像我们平常定义出来的变量或是对象一样,只不过这个锁的类型是系统给我们封装好的一种类型,进行重定义后为pthread_mutex_t。变量或对象在生命的时候也是可以初始化的,变量初始化后,就是变量的定义,而不是声明了。变量和对象也都有自己的销毁方案,内置类型的变量销毁时,操作系统会自动回收其资源,而自定义对象销毁时,操作系统会调用其析构函数进行资源的回收。

锁同样也是如此,锁也有自己的初始化和销毁方案,如果你定义的是一把局部锁,就需要用pthread_mutex_init()和pthread_mutex_destroy()来进行初始化和销毁,如果你定义的是一把全局锁或静态所,则不需要用init初始化和destroy销毁,直接用PTHREAD_MUTEX_INITIALIZER进行初始化即可,他有自己的初始化和销毁方案,我们无须关心静态或全局锁如何销毁。


定义好锁之后,我们就可以对某一段代码进行加锁和解锁,加锁与解锁意味着,这段代码不是一般的代码,只有申请到锁,持有锁的线程才能访问这段代码,加锁和解锁之间的代码可以称为临界区,因为想要访问这段空间必须有锁才可以访问。

那我们该如何对共享资源进行加锁和解锁呢?

手册这样写的:


d8caa39eaddd488aa3335f3cc500a65a.png


加锁的使用方法一般包括以下几个步骤:


创建并初始化一个加锁原语对象,使用相应的API来分配内存并设置属性,如pthread_mutex_init用于创建并初始化一个互斥锁对象。

在访问共享资源或临界区域前,对加锁原语对象进行加锁操作,使用相应的API来获取锁的所有权,如pthread_mutex_lock用于以阻塞方式获取一个互斥锁。

在访问共享资源或临界区域后,对加锁原语对象进行解锁操作,使用相应的API来释放锁的所有权,如pthread_mutex_unlock用于释放一个互斥锁。

在不需要使用共享资源或临界区域时,销毁加锁原语对象,使用相应的API来释放内存并清理资源,如pthread_mutex_destroy用于销毁一个互斥锁对象。

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
static int g_count = 0;
DEFINE_MUTEX(static_lock); // 静态互斥锁
static void *thread_fun_1(void *data)
{
    mutex_lock(&static_lock); // 加锁
    g_count++;
    printf("%s g_count: %d\n", __func__, g_count);
    mutex_unlock(&static_lock); // 解锁
}
static void *thread_fun_2(void *data)
{
    mutex_lock(&static_lock); // 加锁
    g_count++;
    printf("%s g_count: %d\n", __func__, g_count);
    mutex_unlock(&static_lock); // 解锁
}
int main(int argc, char const *argv[])
{
    pthread_t pid[2];
    pthread_create(&pid[0], NULL, thread_fun_1, NULL);
    pthread_create(&pid[1], NULL, thread_fun_2, NULL);
    pthread_join(pid[0], NULL);
    pthread_join(pid[1], NULL);
    return 0;
}
}

这个示例中,两个线程都使用同一个互斥锁static_lock来保护 g_count 的访问。这样,当一个线程获得了锁,另一个线程就必须等待,直到锁被释放。这样就可以保证g_count的操作是串行的,而不会发生数据竞争。


如果忘记解锁,即一个线程在获取一个锁后,没有正确地释放锁,而导致其他线程无法获取该锁。为了避免这种情况,可以使用以下方法:


使用RAII技术,即将加锁和解锁操作封装在一个类中,在构造函数中加锁,在析构函数中解锁,这样可以利用对象的生命周期来自动管理锁的状态。

使用异常处理机制,即在加锁和解锁操作之间使用·try-catch语句来捕获可能抛出的异常,并在catch块中进行解锁操作,这样可以避免异常导致的忘记解锁。


理解局部和全局锁的两种加锁方案


除了代码使用局部锁的实现方案外,我们还可以使用静态锁或全局锁,局部的静态锁还是需要将锁的地址传给线程函数,否则线程函数无法使用锁,因为锁是局部的嘛!如果是全局锁,那就不需要将其地址传给线程函数了,因为线程函数可以直接看到这把锁,所以直接使用即可。


2.2 理解锁的本质


ea643f32f4b24bf58cc3dd46f23f6758.png



我们知道,共享资源在被多线程访问时,是不安全的,所以我们需要加锁来保护共享资源。但是我们回过头来想一想,锁本身是不是共享资源呢?所有的线程都需要申请锁和释放锁,那不就是在共同的访问锁这个资源嘛?所以锁本身不就是共享资源吗?那多个线程在访问锁这个共享资源的时候,锁本身是不是需要被保护呢?当然需要!其他的共享资源可以通过加锁来进行保护,那锁怎么办呢?

实际上,加锁和解锁的过程是原子的!也就是说只要你申请了锁,并且竞争能力恰好足够,那么你就一定能够拿到这个锁,否则你就不会拿到这个锁,不会说在申请锁申请一半的时候,线程被切换下去了,其他线程去申请锁了,不会出现这种中间态的情况!既然加锁和解锁的过程是原子的,那其实访问锁就是安全的!


那如果一个线程申请锁要是没成功呢?或者说暂时申请不到锁呢?执行流又会怎么样呢?


我们通过ps -axj命令可以查看其实这个线程 会变成Sl+状态,也就是阻塞状态,而不是R运行状态!


e508336a4e1746fd99a0cd3aa1c8afb2.png


所以如果申请不到锁,执行流就会阻塞。


因为你线程申请锁的时候,锁被别的线程拿走了,那你自然就无法申请到锁,操作系统会将这样的线程暂时处于休眠状态。只有当持有锁的线程释放锁的时候,操作系统会执行POSIX库的代码,重新唤醒休眠的线程,让这个线程去竞争锁,如果竞争到,那就持有锁继续向后运行,如果竞争不到,那就继续休眠。


先看看死锁


如果线程函数内部申请了两次互斥锁,这实际就会出问题了,我们可以看到代码不会继续运行了。


那为什么会出问题呢?实际是因为,当前线程已经申请到锁了,但是他又去申请锁了,而这个锁其实他自己正持有着呢,但是他又不知道自己持有锁,因为我们主观让线程执行了两次申请锁的语句,是我们让他这么干的,他自己拿着锁,然后他现在又要去申请锁,但锁实际已经被持有了,那么当前线程必然就会申请锁失败,也就是处于休眠状态,什么时候他才会唤醒呢?当然是锁被释放的时候!当锁被释放时,操作系统才会唤醒当前线程,但是锁会释放吗?当然是不会啦!因为你自己把锁拿着,你还等其他线程释放锁,人家其他线程又没有锁,你自己还运行不到pthread_mutex_unlock这段代码,也就是说你自己又不释放锁,你还让没有这个锁的线程去释放锁,这不就是自己把自己给搞阻塞了吗?这其实就是产生死锁了,线程永远都无法等待锁成功释放,那么这个线程将永远处于阻塞状态,无法运行,同样其他线程道理也如此!

后面会详细讲解死锁问题


实际上关于锁的使用总结下来也就一句话,谁持有锁谁才能进入临界区,你没有锁那就只能在临界区外面乖乖的阻塞等待,等待锁被释放,然后你去竞争这把锁,竞争到就拿着锁进入临界区执行代码,竞争不到就老样子,继续乖乖的在临界区外面阻塞等待!



那么!对于其他未持有锁的线程而言,实际有意义的锁的状态,无非就两种!一种是申请锁前,一种是释放锁后!申请锁前,锁还没有被申请到,那么对于其他未持有锁的线程来说,当然是有意义的。释放锁后,锁此时处于未被申请到的状态,那未持有锁的线程当然有可能竞争到这把锁,所以这也是一种有意义的状态!

而我们站在未持有锁的线程角度来看的话,当前持有锁的线程不就是原子的吗?他们看到的锁只有在未申请前和持有锁线程释放锁之后这两种有意义的状态,那这就是原子的,不会出现中间态的情况。

所以,在未来使用锁的时候,一定要保证临界区的代码粒度非常小,因为加锁之后,线程会串行执行,如果粒度非常大,那么执行这段临界区所耗费的时间就越多,整体代码运行的效率自然就会降下来,因为其余非临界区是并发或并行执行,而临界区是串行,所以整体效率会由于临界区的执行效率受较大影响,那么在平常加锁和解锁时,我们就要保证临界区的粒度较小,为此能够让程序整体的运行效率依旧保持较高的状态!


下面看看硬件层面该如何理解加锁?


我们谈到过单纯的i++和++i的语句都不是原子的,因为这样的语句实际还要至少对应三条汇编语句,从内存中读取数据,在寄存器中修改数据,最后再将修改后的数据写回内存,所以++i和i++这样的语句一定不是原子的,因为他在执行的时候是有中间态的,可能在执行一半的时候由于某些原因被切换下去,这样就会停下来。这种非原子性的操作就会导致数据不一致性的问题,也就是前面我们常谈的共享资源访问不安全的问题!随之而来的解决方案就是我们所说的加锁,对共享资源进行互斥式的访问,以保证其安全性。

而加锁和解锁的过程实际也是访问共享资源锁的过程,那么加锁和解锁是如何保证其访问锁的原子性呢?答案是通过一条汇编语句来实现。

为了实现互斥锁的加锁过程,大多数CPU架构都提供了swap和exchange指令,该指令的作用是把寄存器和内存单元的数据进行交换,因为只有一条汇编指令,保证了其原子性。并且即便是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时,另一个处理器的交换指令只能等待总线周期就绪后才能访问。


实际上除我们语言所说的一条汇编语句交换数据,而保证的原子性外,在操作系统内还有另一种硬件层面上的实现原子性的简单做法。因为线程在执行过程中,有可能出现线程执行一半被切换了,那么线程完成任务就不是原子的了,所以我们能不能让线程在执行的时候,压根就不能被切换,只要你线程上了CPU的贼船就不能下去,必须得等你完全执行完代码之后才可以被切换下去。

至于线程在执行一半的时候被切换走,原因有很多,可能是时间片到了,来了更高优先级的线程,线程由于访问某些外设或自己的原因等等,需要进行阻塞等待,这些情况下,都有可能在线程执行一半的时候被切换下去!

所以在系统层面,我们只要禁止一切中断,对线程的中断不做任何响应,禁止中断的总线做出任何响应,关闭外部中断以达到线程不被切换下去的效果,从而实现访问共享资源的原子性。

当然这样的方案比较偏底层,算是一个比较重量级的方案,在硬件层面实现这样的方案的话,成本还是挺高的,除非线程要完成的工作优先级特别高且必须是原子性的,我们才会这么做,否则一半情况下,不会采用这样的方案来实现原子性。


fef82529eb964eb79db24619bfd76712.png

在谈论加锁过程的汇编代码之前,我们先来谈几个共识性的话题,CPU内寄存器只有一套,被所有的执行流共享,并且CPU内寄存器的内容是每个执行流都私有的,称为运行时的上下文。可以看到加锁的汇编语句就是将0放到al寄存器内部,然后就是执行只有一条的汇编语句xchgb,将al寄存器的内容和物理内存单元进行数据交换,此时al寄存器内容就会变为1,物理内存中的mutex互斥量的值变为0,将物理内存中mutex的1和al寄存器内0进行交换,我们可以形象化的表示为线程A把锁拿走了,在拿走锁之后,线程A有没有可能被切换走呢?当然有可能,但线程A在切换的时候,他是带着自己的上下文数据被切换走的。

此时线程B被重新调度上来后,他也会先将0加载到自己上下文中的al寄存器内部,然后再执行xchgb汇编语句,但此时物理内存的mutex是0,代表锁已经被申请了,所以交换以后,al寄存器内部的值依旧是0,继续判断之后会进入else分支语句,该线程就会由于等待锁被持有锁的线程释放而处于挂起等待的状态。

所以,只要线程A申请锁成功了,即使线程A的运行被中断了,我们也不担心,因为交换寄存器和内存的汇编语句只有一条,这能保证加锁过程,也就是申请锁过程的原子性。并且在线程A被切走时,线程A是持有锁被切走的,那么即使其他线程此时被调度上来,他们也一定无法申请到锁,那就必须进行阻塞等待!只有重新调度线程A,将线程A的上下文加载到寄存器内部,此时al内容就会变为1,则返回return 0代表申请锁成功,线程A就可以持有锁式的访问临界区。


上面说的加锁过程是原子的,交换寄存器和mutex内容仅由一条汇编语句来完成,而mutex是我们所说的共享资源,所以一条汇编语句保证了mutex操作的原子性。

而解锁的过程也非常简单,直接将1mov到mutex里面就完成了释放锁的过程,然后唤醒阻塞等待锁的线程,让他们现在去竞争锁,因为锁已经被释放了,所以同样的,释放锁的汇编语句也只有一条,这也能保证释放锁过程的原子性!


3.RAII风格的封装锁


我们可以先定义一个互斥量的类,类中实现构造函数将锁的地址进行初始化,然后定义出加锁和解锁的两个接口,这样就可以定义出来一个内部能够进行加锁和解锁的类。

然后我们再加一层封装,实现出RAII( Resource Acquisition Is Initialization)风格的加锁,即为构造函数处进行加锁,析构函数处进行解锁!

至于锁的初始化和销毁方案,是类外面的事情,使用时需要自己先初始化好一把锁,确定初始化和销毁的方案,然后利用Mutex.hpp这个小组件来进行加锁和解锁的过程!

利用对象的生命周期是随代码块儿的特性,当对象离开代码块儿的时候,会自动调用析构函数销毁锁,这样对我们就方便了很多!

#pragma once
#include <iostream>
#include <pthread.h>
class Mutex // 自己不维护锁,有外部传入
{
public:
    Mutex(pthread_mutex_t *mutex):_pmutex(mutex)
    {}
    void lock()
    {
    pthread_mutex_lock(_pmutex);
    }
    void unlock()
    {
      pthread_mutex_unlock(_pmutex);
    }
    ~Mutex()
    {}
private:
    pthread_mutex_t *_pmutex;
};
class LockGuard // 自己不维护锁,有外部传入
{
public:
    LockGuard(pthread_mutex_t *mutex):_mutex(mutex)//指针构造Mutex类对象
    {
        _mutex.lock(); //构造函数里加锁了
    }
    ~LockGuard()
    {
        _mutex.unlock();//解锁
    }
private:
    Mutex _mutex;
};


实际上std::lock_guard就是C++11标准库提供的一个RAII风格的互斥锁包装器,它在构造时接收一个互斥锁并尝试加锁,在析构时释放互斥锁。


4.死锁


死锁是指一个进程中的各个线程,都持有着锁,但同时又去申请其他线程的锁,而每个线程持有的锁都是占有不会释放的,所以大家都会等着,等对方先释放锁,但是呢,大家又都不释放锁,全都占有着锁,所以大家就会处于一种永久等待的状态,也就是永久性的阻塞状态,所有执行流都不会被运行,这样的问题就是死锁!

之前抢票的代码中,多个线程使用的是同一把锁,未来有些场景一定是要使用多把锁的,在多把锁的情况下,如果某些线程持有锁不释放,还要去申请其他线程正持有的锁,而每个线程都是这样的状态,那就是死锁问题。


产生死锁的四个必要条件


互斥条件:一个资源每次只能被一个执行流使用,互斥其实就是加锁之后线程的串行执行。

请求与保持条件:一个执行流由于请求资源而阻塞时,对自己已经获得的资源保持不放。说白了就是我自己的东西不释放,我还要你的东西,你不给我就一直等,等到你给我为止。

不剥夺条件:一个线程在未使用完自己获得的资源之前,是不能够强行剥夺其他线程的资源的。说白了就是你先在还有资源呢,你想要别人的自由你就得等,不能强行剥夺!当你使用完自己的资源后,你可以去等待申请别人的资源。总之就是不能强行剥夺其他线程的资源,想要就必须阻塞等待别人释放资源才可以。

循环等待条件:若干个执行流之间,形成一种头尾相接的互相等待对方资源的关系。我们也称这样的现象为环路等待。


如何避免死锁? 核心思想:破坏死锁的4个必要条件的任意一个!


2e85c6e57bf149328603822b7caef876.png


二、线程同步


1. 问题引入


我们可以举一个例子来理解条件变量是如何实现线程同步的。

假设现在学校开了一间学霸vip自习室,学校规定这间自习室一次只能进去一个人上自习,自习室门口挂着一把钥匙,谁来的早先拿到这把钥匙,就可以打开门进入自习室学习,并且进入自习室之后,把门一反锁,其他人谁都不能进来。然后你第二天准备去学习了,卷的不行,直接凌晨三点就跑过来,拿着钥匙进入自习室上自习了,然后卷了3小时之后,你想出来上个厕所,一打开门发现外面站的一堆人,都在叽叽喳喳的讨论谁先来的,怎么来的这么早?这么卷?然后你怕自己等会儿把钥匙放到墙上之后,上完厕所回来之后有人拿着钥匙进入了自习室,你就又卷不了了,所以你把钥匙揣兜里,拿着钥匙去上厕所了,其他人当然进入不了自习室,因为你拿着钥匙去上厕所了。等你回来的时候,你又打开门,又来里面上了3小时自习,你感觉自己饿的不行了,在不吃饭就饿死在里面了,所以你打开门,准备出去吃饭了,然后突然你自己感觉负罪感直接拉满,我凌晨3点好不容易抢到自习室,现在离开是不太亏了,所以你又打开自习室回去上自习去了,别人当然竞争不过你呀!因为钥匙一直都在你兜里,你出来之后把钥匙放到墙上,你发现有点负罪感,你又拿起来钥匙回去上自习,因为你离钥匙最近,所以你的竞争能力最强。结果你来自习室上了1分钟自习又出来了,然后又负罪的不行,又回去了,周而复始的这么干,结果别人连自习室长啥样都没见到。

像这样由于长时间无法得到锁的线程,没办法进入临界区访问临界资源,我们称这样的线程处于饥饿状态!


所以学校推出了新政策,所有刚刚从自习室出来的人,都必须回到队列的尾部重新排队等待进入自习室,这样的话,其他人也就可以拿到钥匙进入自习室了。

所以,在保证数据安全的前提下,让线程能够按照某种特定的顺序来访问临界资源,从而有效避免其他线程的饥饿问题,这就叫做线程同步!


2. 条件变量


为了能够让多线程协同工作,就需要实现多线程的同步关系,为了维护同步关系,就需要引入条件变量。那条件变量是一个什么东西呢?他其实和互斥锁一样,都是一个数据类型定义出来的对象。初始化和销毁方案和互斥锁一模一样。唯一不同的是,条件变量在使用时有两个高频使用的接口,一个是pthread_cond_wait,该函数的作用是将等待某一个具体锁的线程放入条件变量的等待队列中进行等待,另一个是pthread_cond_signal,该函数的作用是唤醒条件变量中等待队列的第一个等待线程,另一个用的不怎么高频,但也偶尔会用一下的接口就是pthread_cond_broadcast,该函数将条件变量中的所有等待线程都会唤醒,让所有线程重新回归竞争锁的状态。而不是像signal那样,唤醒cond队列中任意一个阻塞等待锁的线程。

2774756b68734e84bd14df52ffdc6bd1.png



条件变量的功能是阻塞线程,直到某个特定条件为真为止。条件变量始终与互斥锁一起使用,对条件的测试是在互斥锁(互斥)的保护下进行的。如果条件为假,线程通常会基于条件变量阻塞,并以原子方式释放等待条件变化的互斥锁。


条件变量的类型和初始化


在Linux中,条件变量用pthread_cond_t类型的变量表示,此类型定义在<pthread.h>头文件中。例如:

#include <pthread.h>
pthread_cond_t cond; // 创建一个条件变量


要想使用cond条件变量,还需要进行初始化操作。初始化条件变量的方式有两种,一种是直接将PTHREAD_COND_INITIALIZER赋值给条件变量,例如:


pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 静态初始化


还可以借助pthread_cond_init()函数初始化条件变量,语法格式如下:

int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr); // 动态初始化
// 成功返回0,失败返回错误码

参数cond用于指明要初始化的条件变量;参数attr用于自定义条件变量的属性,通常我们将它赋值为NULL,表示以系统默认的属性完成初始化操作。

当attr参数为NULL时,以上两种初始化方式完全等价。


条件变量的使用

等待和唤醒

使用条件变量可以分为两部分:等待线程和激活线程。

等待线程

有两种等待方式,无条件等待pthread_cond_wait()和计时等待pthread_cond_timedwait()。接口为:

int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex);
int pthread_cond_timedwait(pthread_cond_t* cond, pthread_mutex_t* mutex,const struct timespec* abstime);
// 成功返回0,失败返回错误码

cond参数表示已初始化好的条件变量;mutex参数表示与条件变量配合使用的互斥锁;abstime参数表示阻塞线程的时间。


注意,abstime参数指的是绝对时间,例如您打算阻塞线程5秒钟,那么首先要得到当前系统的时间,然后再加上5秒,最终得到的时间才是传递的实参值。


无论哪种等待方式,都必须和一个互斥锁配合,以防止多个线程同时请求pthread_cond_wait()(或pthread_cond_timedwait())的竞争条件(Race Condition)。mutex互斥锁必须是普通锁或者适应锁,并在调用pthread_cond_wait()前必须由本线程加锁,而在更新条件等待队列以前,mutex保持锁定状态,并在线程挂起进入等待前解锁。在条件满足从而离开pthread_cond_wait()之前,mutex将被重新加锁,以与进入pthread_cond_wait()前的加锁动作对应。


pthread_cond_wait()函数的返回并不意味着条件的值一定发生了变化,必须重新检查条件的值。阻塞在条件变量上的线程被唤醒以后,直到pthread_cond_wait()函数返回之前条件的值都有可能发生变化。所以函数返回以后,在锁定相应的互斥锁之前,必须重新测试条件值。最好的测试方法是循环调用pthread_cond_wait函数,并把满足条件的表达式置为循环的终止条件。如:

pthread_mutex_lock(&mutex); // 加锁
while (condition_is_false) // 循环检查条件
    pthread_cond_wait(&cond, &mutex); // 等待条件成立
pthread_mutex_unlock(&mutex); // 解锁

阻塞在同一个条件变量上的不同线程被释放的次序是不一定的。


注意:pthread_cond_wait()函数是退出点,如果在调用这个函数时,已有一个挂起的退出请求,且线程允许退出,这个线程将被终止并开始执行善后处理函数,而这时和条件变量相关的互斥锁仍将处在锁定状态。


激活线程 – 广播


唤醒条件有两种形式,pthread_cond_signal()唤醒一个等待该条件的线程,存在多个等待线程时按入队顺序唤醒其中一个;而pthread_cond_broadcast()则唤醒所有等待线程。

int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
// 成功返回0,失败返回错误码

cond参数表示初始化好的条件变量。


必须在互斥锁的保护下使用相应的条件变量。否则对条件变量的解锁有可能发生在锁定条件变量之前,从而造成死锁。


唤醒阻塞在条件变量上的所有线程的顺序由调度策略决定,如果线程的调度策略是SCHED_OTHER类型的,系统将根据线程的优先级唤醒线程。


如果没有线程被阻塞在条件变量上,那么调用pthread_cond_signal()将没有作用。


由于pthread_cond_broadcast函数唤醒所有阻塞在某个条件变量上的线程,这些线程被唤醒后将再次竞争相应的互斥锁,所以必须小心使用pthread_cond_broadcast函数。


销毁


对于初始化好的条件变量,我们可以调用pthread_cond_destroy()函数销毁它。只有在没有线程在该条件变量上等待的时候才能销毁这个条件变量,否则返回EBUSY。因为Linux实现的条件变量没有分配什么资源,所以注销动作只包括检查是否有等待线程。


int pthread_cond_destroy(pthread_cond_t *cond); // 成功返回0,失败返回错误码

下面是一个使用条件变量实现生产者-消费者模型(后面讲解)的示例:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#define MAX 10 // 缓冲区大小
// 缓冲区结构体
typedef struct {
    int buffer[MAX]; // 存放数据
    int len; // 当前数据长度
    pthread_mutex_t mutex; // 互斥锁
    pthread_cond_t not_full; // 条件变量:缓冲区未满
    pthread_cond_t not_empty; // 条件变量:缓冲区非空
} pool_t;
// 初始化缓冲区
void pool_init(pool_t *pool) {
    pool->len = 0; // 初始长度为0
    pthread_mutex_init(&pool->mutex, NULL); // 初始化互斥锁
    pthread_cond_init(&pool->not_full, NULL); // 初始化条件变量
    pthread_cond_init(&pool->not_empty, NULL);
}
// 销毁缓冲区
void pool_destroy(pool_t *pool) {
    pthread_mutex_destroy(&pool->mutex); // 销毁互斥锁
    pthread_cond_destroy(&pool->not_full); // 销毁条件变量
    pthread_cond_destroy(&pool->not_empty);
}
// 生产者线程函数
void *producer(void *arg) {
    pool_t *pool = (pool_t *)arg; // 获取缓冲区指针
    int i;
    for (i = 0; i < 20; i++) { // 循环生产20个数据
        pthread_mutex_lock(&pool->mutex); // 加锁
        while (pool->len == MAX) { // 如果缓冲区已满,等待条件变量
            printf("producer is waiting...\n");
            pthread_cond_wait(&pool->not_full, &pool->mutex);
        }
        pool->buffer[pool->len] = i; // 向缓冲区中添加数据
        pool->len++; // 长度加一
        printf("producer produces %d\n", i);
        pthread_cond_signal(&pool->not_empty); // 发送条件变量信号,通知消费者
        pthread_mutex_unlock(&pool->mutex); // 解锁
        sleep(1); // 模拟生产时间
    }
}
// 消费者线程函数
void *consumer(void *arg) {
    pool_t *pool = (pool_t *)arg; // 获取缓冲区指针
    int data;
    while (1) { // 循环消费数据
        pthread_mutex_lock(&pool->mutex); // 加锁
        while (pool->len == 0) { // 如果缓冲区为空,等待条件变量
            printf("consumer is waiting...\n");
            pthread_cond_wait(&pool->not_empty, &pool->mutex);
        }
        data = pool->buffer[pool->len - 1]; // 从缓冲区中取出数据
        pool->len--; // 长度减一
        printf("consumer consumes %d\n", data);
        pthread_cond_signal(&pool->not_full); // 发送条件变量信号,通知生产者
        pthread_mutex_unlock(&pool->mutex); // 解锁
        sleep(2); // 模拟消费时间
    }
}
int main() {
    pool_t pool; // 创建一个缓冲区对象
    pool_init(&pool); // 初始化缓冲区
    pthread_t tid1, tid2; // 创建两个线程ID
    pthread_create(&tid1, NULL, producer, &pool); // 创建生产者线程,传入缓冲区指针作为参数
    pthread_create(&tid2, NULL, consumer, &pool); // 创建消费者线程,传入缓冲区指针作为参数
    pthread_join(tid1, NULL); // 等待生产者线程结束
    pthread_join(tid2, NULL); // 等待消费者线程结束
    pool_destroy(&pool); // 销毁缓冲区
    return 0;
}


3.生产消费模型的概念理解 — 321原则


先来理解关于串行、并发、并行的概念:



2e4e7f0e15064ac78e3ad97c381a435b.png



实际我们的计算机在工作时,是一定要进行并发的,因为并发能很好解决用户同时想要运行多个程序的需求,也就是我们所说的多任务处理,但同时也需要进行并行。就比如上面图中举得例子,每个大核跑不同的程序,但同时某一个大核在跑程序时,也可以时间片轮转的去执行另一个程序,所以并行和并发在计算中是同时存在的。

而并发一定要比并行效率高的前提是多任务情况,如果你站在多任务处理的角度去看待串行和并发,你一定可以理解为什么并发效率要更高,因为串行在线程被切换下去或者等锁被释放的时候,这段时间CPU什么都做不了,那这段时间就会被白白浪费掉,在多任务处理的情况下,效率一定就会下降。而对于并发来讲,如果某个线程被切换下去或者他在等待锁被释放的时候,是完全没有关系的,因为CPU会调度运行其他线程,所以被切换下去的线程在等待的时候,时间完全不会被浪费掉,而是会被CPU利用起来去跑其他的线程。


再看看生产消费模型的概念


实际生活中,我们作为消费者,一般都会去超市这样的地方去购买产品,而不是去生产者那里购买产品,因为供货商一般不零售产品,他们都会统一将大量的商品供货到超市,然后我们消费者从超市这样的交易场所中购买产品。

而当我们在购买产品的时候,生产者在做什么呢?生产者可能正在生产商品呢,或者正在放假呢,也可能正在干着别的事情,所以生产和消费的过程互相并不怎么影响,这就实现了生产者和消费者之间的解耦。

而超市充当着一个什么样的角色呢?比如当放假期间,消费爆棚的季节中,来超市购买东西的人就会非常的多,所以就容易出现供不应求的情况,但超市一般也会有对策,因为超市的仓库中都会预先屯一批货,所以在消费爆棚的时间段内,超市也不用担心没有货卖的情况。而当工作期间,大家由于忙着通过劳动来换取报酬,可能来消费的人就会比较少,商品流量也会比较低,那此时供货商如果还是给超市供大量的货呢?虽然超市可能最近确实卖不出去东西,但是超市还是可以把供货商的商品先存储到仓库中,以备在消费爆棚的季节时,能够应对大量消费的场景。所以超市其实就是充当一个缓冲区的角色,在计算机中充当的就是数据缓冲区的角色。

而计算机中哪些场景是强耦合的呢?其实函数调用就是强耦合的一个场景,例如当main调用func的时候,func在执行代码的时候,main在做什么呢?main什么都做不了,他只能等待func调用完毕返回之后,main才能继续向后执行代码,所以我们称main和func之间就是一种强耦合的关系,而上面所说的生产者和消费者并不是一种强耦合的关系。

350c7af56eef4a468ab6b005d09fd114.png


所以为什么要有超市? 核心理由:效率高 ! 忙闲不均 — 允许生产消费的步调不一致

超市其实就是典型的共享资源,因为生产者和消费者都要访问超市,所以对于超市这个共享资源,他在被访问的时候,也是需要被保护起来的,而保护其实就是通过加锁来实现互斥式的访问共享资源,从而保证安全性。

在只有一份超市共享资源的情况下,生产和生产,消费和消费,以及生产和消费都需要进行串行的访问共享资源。但为了提高效率我们搞出了同步这样的关系,因为有可能消费者一直霸占着锁,一直在那里消费,但实际超市已经没有物资了,此时消费者由于竞争能力过强,也会造成不合理的问题,因为消费者消费过多之后,应该轮到生产者来生产了,所以对于生产者和消费者之间仅仅只有互斥关系是不够的,还需要有同步关系。


我们可以从生产消费模型中可以提取出来一个321原则帮助记忆。即为3种关系,两个角色,1个交易场所。对应的其实是消费线程和消费线程的关系,消费线程和生产线程的关系,生产线程和生产线程的关系,交易场所就是阻塞队列blockqueue。而实现线程同步就需要一个条件变量,比如生产者生产完之后,超市给消费者打个电话,让消费者过来消费,消费完之后,超市在给生产者打个电话,让生产者来生产,这样就不会存在由于某一个线程竞争能力过强,一直生产或一直消费的情况产生,从而导致其他线程饥饿的问题。

58f0040eb7f44e0d8d9abda174c9f37c.png


实际上只要我们想写生产消费模型,本质上做的工作就是在维护”321“原则嘛!


生产消费模型都有哪些好处:


a.他实现了生产和消费的解耦,使他们之间并不互相影响。

b.支持生产和消费一段时间的忙闲不均的问题。因为缓冲区可以预留一部分数据,进行数据的缓冲。

c.由于生产和消费的互斥与同步关系,提升了生产消费模型的效率。


相关文章
|
4天前
|
关系型数据库 MySQL Java
实时计算 Flink版产品使用合集之mysql通过flink cdc同步数据,有没有办法所有表共用一个dump线程
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
8 0
|
6天前
|
Linux C语言 调度
|
6天前
|
Unix Linux 调度
linux线程与进程的区别及线程的优势
linux线程与进程的区别及线程的优势
|
6天前
|
算法 安全 Linux
【探索Linux】P.20(多线程 | 线程互斥 | 互斥锁 | 死锁 | 资源饥饿)
【探索Linux】P.20(多线程 | 线程互斥 | 互斥锁 | 死锁 | 资源饥饿)
13 0
|
6天前
|
安全 Linux 调度
【linux线程(二)】线程互斥与线程同步
【linux线程(二)】线程互斥与线程同步
|
6天前
|
Linux 调度 C语言
【Linux C/C++ 线程同步 】Linux互斥锁和条件变量:互斥锁和条件变量在Linux线程同步中的编程实践
【Linux C/C++ 线程同步 】Linux互斥锁和条件变量:互斥锁和条件变量在Linux线程同步中的编程实践
48 0
|
6天前
|
Linux C语言
linux c 多线程 互斥锁、自旋锁、原子操作的分析与使用
生活中,我们常常会在12306或者其他购票软件上买票,特别是春节期间或者国庆长假的时候,总会出现抢票的现象,最后总会有人买不到票而埋怨这埋怨那,其实这还好,至少不会跑去现场或者网上去找客服理论,如果出现了付款,但是却没买到票的现象,那才是真的会出现很多问题,将这里的票引入到多线程中,票就被称为临界资源。
32 0
|
6天前
|
安全 Linux 调度
Linux C/C++ 开发(学习笔记四):多线程并发锁:互斥锁、自旋锁、原子操作、CAS
Linux C/C++ 开发(学习笔记四):多线程并发锁:互斥锁、自旋锁、原子操作、CAS
47 0