【linux线程(三)】生产者消费者模型详解(多版本)

简介: 【linux线程(三)】生产者消费者模型详解(多版本)

1. 前言

学习进程和线程也很久了,它们具体能解决

什么问题?有什么实际的运用?

本章重点:

本篇文章着重讲解基于多线程下的
生产者消费者模型的概念以及实现.
不仅如此,文章还会拓展基于使用信号量
实现的环形队列版的生产者消费者模型


2. 初识生产者消费者模型

先回答第一个问题:

什么是生产者消费者模型?

生产者消费者模型的本质就是通过一个阻塞队列来将生产者和消费者解耦合的模型.可以将这个模型比作一个超时,生产者生产了数据(商品)后,不会直接拿给消费者使用,而是将数据(商品)先加入到超市,让超市帮我们卖数据(商品).同理,对于消费者来说它并不会直接去生产者的厂家直接拿数据(商品),而是去超市中购买数据(商品).这样生产者和消费者两个对象就解耦合了

再回答第二个问题:

这个模型有什么好处?

想象一下这个场景,假如没有超市的存在,消费者去买物品的伤害要直接到生产厂家去买,并且消费者还不知道生产厂家现在是否有物品,很容易跑空.对于生产者来说,他也不知道消费者需要哪些物品,也就不好生产,容易生产出来的物品没人问津,加入超市这个角色就可以解决上面的问题.综上所述,生产者消费者模型不仅仅将二者解耦了,并且还提高了效率,不会让两方出现跑空的情况(超市没有商品了,消费者最清楚,超市有商品时,生产者最清楚)

不仅如此,生产者消费者模型还支持多个线程一起进入,阻塞队列属于临界资源,所以同一时刻,不管是生产者还是消费者,都只允许一个线程进入到阻塞队列进行push或pop操作.除此之外,我们还需要两个条件变量push_cond和pop_cond,当阻塞队列已满时,push_cond条件变量会让生产者线程等待消费者线程在队列中取走数据,而当阻塞队列为空时,pop_cond条件变量会让消费者线程等待生产者往队列中加入数据,一来一回,也就是push操作会唤醒pop_cond条件变量,而pop操作会唤醒push_cond条件变量


3. 阻塞队列版本的代码实现

在的代码中一定会涉及到加解锁的问题,所以可以根据RAII思想,先设计出一个专门用于加解锁的类:

lockguard.hpp文件:

#pragma once
#include<iostream>
#include<pthread.h>
using namespace std;
class Mutex //封装pthread库的锁
{
public:
    Mutex(pthread_mutex_t* pmtx)
        :_pmtx(pmtx)
    {}
    void lock()
    {
        pthread_mutex_lock(_pmtx);
    }
    void unlock()
    {
        pthread_mutex_unlock(_pmtx);
    }
    ~Mutex()
    {
        delete _pmtx;
        _pmtx = nullptr;
    }   
private:
    pthread_mutex_t* _pmtx;
};
//利用RAII思想,用对象的生命周期控制资源
class LockGuard//封装了锁后,出了作用域会自动调用析构函数,解锁!
{
public:
    LockGuard(pthread_mutex_t* mtx)
        :_mtx(mtx)
    {
        _mtx.lock();
    }
    ~LockGuard()
    {
        _mtx.unlock();
    }
private:
    Mutex _mtx;
};

下一步,编写基于阻塞队列的模型,

在实现它之前需要先注意一些点:

下面的define语句可有可无,主要是为了图方便.第二点,检测临界资源是否就绪时要使用while而不是if,因为有时可能会错误的唤醒一个进程,如果是if语句,一旦错误唤醒,就会直接进入临界区,不合理,使用while的话,就算是错误唤醒了一个线程,只要资源不就绪也不能往后走.在下面的代码中并没有使用上上面封装的锁,这是为了方便同学们即使不封装锁也能很好的查看代码,最后,代码的解释都放在注释当中了,如果你还有问题,欢迎你来私信我

block_queue.hpp文件:

#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
#include"LockGuard.cpp"
using namespace std;
#define INIT_MTX(mtx) pthread_mutex_init(&mtx,nullptr)
#define INIT_COND(cond) pthread_cond_init(&cond,nullptr)
#define DESTORY_MTX(mtx) pthread_mutex_destroy(&mtx)
#define DESTORY_COND(cond) pthread_cond_destroy(&cond)
template<class T>
class BlockQueue
{
public:
    BlockQueue(size_t capacity = 5)
        :_capacity(capacity)
    {
        INIT_MTX(_mtx);
        INIT_COND(_condisempty);
        INIT_COND(_condisfull);
    }
    ~BlockQueue()
    {
        DESTORY_MTX(_mtx);
        DESTORY_COND(_condisempty);
        DESTORY_COND(_condisfull);
    }
    void push(const T& in)
    {
        //1. 加锁
        pthread_mutex_lock(&_mtx);
        //2. 检测临界资源是否满足访问条件,不满足就利用条件变量让它等待
        while(isfull())
        {
            cout<<"队列已满,push等待中..."<<endl;
            //我是抱着锁去等待的,所以即使消费者去pop了,没有锁也只能阻塞等待,就造成死循环了!
            //pthread_cond_wait的第二个参数是一把锁,当成功调用wait后,传入的锁会自动释放!
            //被唤醒时,还是在加锁和解锁之间,wait函数会自动帮线程获取锁
            // 1. wait函数可能会调用失败
            // 2. wait函数可能存在伪唤醒的情况
            // 所以不能使用if语句来判断,而是用while,醒来后再判断一下是不是真的唤醒
            pthread_cond_wait(&_condisfull,&_mtx);//若队列满了,就去isfull变量中等待
            cout<<"等待成功,继续执行..."<<endl;
        }
        //3. 访问临界资源
        _bq.push(in);
        //5. 生产完后唤醒消费者(满足一定条件再唤醒)
        if(_bq.size() >= _capacity/2)
            pthread_cond_signal(&_condisempty);
        //4. 解锁
        pthread_mutex_unlock(&_mtx);
    }
    void pop(T* out)
    {
        //1. 加锁
        pthread_mutex_lock(&_mtx);
        //2. 检测临界资源是否满足访问条件,不满足就利用条件变量让它等待
        while(isempty())
        {
            cout<<"队列为空,pop等待中..."<<endl;
            pthread_cond_wait(&_condisempty,&_mtx);
            cout<<"等待成功,继续执行..."<<endl;
        }
        //3. 访问临界资源
        *out = _bq.front();
        _bq.pop();
        //4. 解锁
        pthread_mutex_unlock(&_mtx);
        //5. 消费完后唤醒生产者
        pthread_cond_signal(&_condisfull);
    }
    bool isempty()
    {
        return _bq.size() == 0;
    }
    bool isfull()
    {
        return _bq.size() ==_capacity;
    }
private:
    queue<T> _bq;
    size_t _capacity;  //阻塞队列容量上限
    pthread_mutex_t _mtx;  //通过互斥锁保证队列push/pop时,不会被pop/push(正在生产时就来消费)
    //push或pop时,怎么知道队列是不是满或空呢?通过条件变量来同步信息
    pthread_cond_t _condisempty; //用它来表示阻塞队列是否为空的条件
    pthread_cond_t _condisfull; //用它来表示阻塞队列是否为满的条件
};

对于这段代码的测试代码较为繁杂,我

把代码的码云链接放出来,有兴趣的同学

下来写完可以去做一些测试:

我的码云链接


4. 初识posix信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

请思考以下问题:

在基于阻塞队列的生产者消费者模型中,生产者关心的是阻塞队列是否满了,而消费者关心的是阻塞队列是否有数据,既然它们关心的资源不同,但进入到阻塞队列就会直接加锁,是不是有一点不合理?有没有一种方法可以只让生产者与生产者之间以及消费者与消费者之间存在锁竞争?而生产者和消费者之间不存在锁竞争?答案是使用信号量!

什么是信号量?

信号量的本质是一个计数器,在使用资源前需要预定资源,也就是让信号量减一.类似于去电影院看电影需要提前买票预定座位,并且在访问完资源释放信号量,也就是让信号量加一

根据以上的信息可以简化模型:

生产者关心队列是否为满,刚开始队列是空,那么生产者的信号量就是n(队列的长度),而消费者关心队列是否有资源,刚开始队列为空,那么消费者的信号量就是0.并且在生产者push一个数据到队列后,生产者的信号量需要减一,而消费者的信号量会加一,同理,消费者从队列拿走一个数据后,生产者的信号量会加一,而消费者的信号量会减一

信号量的使用方法分为以下几步:

  • 初始化信号量
  • 销毁信号量

  • 等待信号量(信号量减一)

  • 发布信号量(信号量加一)

等待和发布信号量的操作又被称为PV操作


5. 基于信号量的环形队列模型

可以优化最初的生产者消费者模型:

使用一个环形队列,只有生产和消费指向环形队列中的同一个位置(队列为满或空)时,才会出现生产者和消费者互斥或同步的问题,而当生产和消费指向环形队列中的不同位置时,只需控制信号量即可

  • 生产者关心空间资源,使用信号量spacesem,起始值为N
  • 消费者关心数据资源,使用信号量datasem,起始值为0
  • 期望:生产者不能让消费者套圈(为空时要先让生产者先运行)
  • 期望:消费者不能超过生产者(为满时要让消费者先运行)
  • 需要两把锁,一把给生产者之间当互斥锁,另外一把给消费者用

6. 环形队列模型的代码编写

还是和之前一样,可以选择将操作信号量的函数专门写为一个类,当然也可以用原生的

sem.hpp文件中

#pragma once
#include<semaphore.h>
using namespace std;
class Sem
{
public:
    Sem(int value)
    {
        sem_init(&_sem,0,value);
    }
    ~Sem()
    {
        sem_destroy(&_sem);
    }
    void p() //PV操作分别代表信号量减一和信号量加一
    {
        sem_wait(&_sem);
    }
    void v()
    {
        sem_post(&_sem);
    }
private:
    sem_t _sem;
};

在RingQueue.hpp文件中:

#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<unistd.h>
#include<semaphore.h>
using namespace std;
#include"Sem.hpp"
const int g_num = 5;
template<class T>
class RingQueue
{
public:
    RingQueue(int num = g_num)
        :_num(num)
        ,_rq(num)
        ,_pushindex(0)
        ,_popindex(0)
        ,_space_sem(num)//最开始空间资源是满的
        ,_data_sem(0)//最开始数据资源是空的
    {
        pthread_mutex_init(&pushlock,nullptr);
        pthread_mutex_init(&poplock,nullptr);
    }
    ~RingQueue()
    {
        pthread_mutex_destroy(&pushlock);
        pthread_mutex_destroy(&poplock);
    }
    void push(const T& in)//生产者进行,关心空间资源,生产者们的临界资源是pushindex下标
    {
        _space_sem.p();
        pthread_mutex_lock(&pushlock);//对生产者们进行加锁
        _rq[_pushindex++] = in;
        _pushindex %= _num;
        pthread_mutex_unlock(&pushlock);
        _data_sem.v();
    }
    void pop(T* out)//消费者进行,关心数据资源,消费者们的临界资源也是popindex下标
    {
        //一般先申请信号量,再进行加锁
        _data_sem.p();
        pthread_mutex_lock(&poplock);//对消费者们进行加锁
        //竞争成功的生产者线程只有一个
        *out = _rq[_popindex++];
        _popindex %= _num;
        pthread_mutex_unlock(&poplock);
        _space_sem.v();
    }
private:
    vector<T> _rq;//环形队列可用数组表示
    size_t _num;//总共的元素个数
    size_t _pushindex = 0;//push的下标
    size_t _popindex = 0;//pop的下标
    Sem _space_sem;
    Sem _data_sem;
    pthread_mutex_t pushlock;//生产者的锁
    pthread_mutex_t poplock;//消费者的锁
};

关于代码的解释都在注释中,若你有疑问,欢迎私信我~


7. 总结以及拓展

生产者消费者模型是我们学习线程中的一个很重要的模型,它的思想有助于帮我们解决后续的难题,并且校招时,有可能会让手撕一个生产者消费者模型,所以同学要学扎实了


🔎 下期预告:线程池详解 🔍


相关文章
|
9月前
|
存储 Linux API
【Linux进程概念】—— 操作系统中的“生命体”,计算机里的“多线程”
在计算机系统的底层架构中,操作系统肩负着资源管理与任务调度的重任。当我们启动各类应用程序时,其背后复杂的运作机制便悄然展开。程序,作为静态的指令集合,如何在系统中实现动态执行?本文带你一探究竟!
【Linux进程概念】—— 操作系统中的“生命体”,计算机里的“多线程”
|
7月前
|
并行计算 Linux
Linux内核中的线程和进程实现详解
了解进程和线程如何工作,可以帮助我们更好地编写程序,充分利用多核CPU,实现并行计算,提高系统的响应速度和计算效能。记住,适当平衡进程和线程的使用,既要拥有独立空间的'兄弟',也需要在'家庭'中分享和并行的成员。对于这个世界,现在,你应该有一个全新的认识。
281 67
|
6月前
|
消息中间件 Linux Kafka
linux命令使用消费kafka的生产者、消费者
linux命令使用消费kafka的生产者、消费者
305 16
|
9月前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
本文详细介绍了如何在Linux中通过在业务线程中注册和处理信号。我们讨论了信号的基本概念,并通过完整的代码示例展示了在业务线程中注册和处理信号的方法。通过正确地使用信号处理机制,可以提高程序的健壮性和响应能力。希望本文能帮助您更好地理解和应用Linux信号处理,提高开发效率和代码质量。
178 17
|
2月前
|
Linux 应用服务中间件 Shell
二、Linux文本处理与文件操作核心命令
熟悉了Linux的基本“行走”后,就该拿起真正的“工具”干活了。用grep这个“放大镜”在文件里搜索内容,用find这个“探测器”在系统中寻找文件,再用tar把东西打包带走。最关键的是要学会使用管道符|,它像一条流水线,能把这些命令串联起来,让简单工具组合出强大的功能,比如 ps -ef | grep 'nginx' 就能快速找出nginx进程。
387 1
二、Linux文本处理与文件操作核心命令
|
2月前
|
Linux
linux命令—stat
`stat` 是 Linux 系统中用于查看文件或文件系统详细状态信息的命令。相比 `ls -l`,它提供更全面的信息,包括文件大小、权限、所有者、时间戳(最后访问、修改、状态变更时间)、inode 号、设备信息等。其常用选项包括 `-f` 查看文件系统状态、`-t` 以简洁格式输出、`-L` 跟踪符号链接,以及 `-c` 或 `--format` 自定义输出格式。通过这些选项,用户可以灵活获取所需信息,适用于系统调试、权限检查、磁盘管理等场景。
285 137
|
2月前
|
安全 Ubuntu Unix
一、初识 Linux 与基本命令
玩转Linux命令行,就像探索一座新城市。首先要熟悉它的“地图”,也就是/根目录下/etc(放配置)、/home(住家)这些核心区域。然后掌握几个“生存口令”:用ls看周围,cd去别处,mkdir建新房,cp/mv搬东西,再用cat或tail看文件内容。最后,别忘了随时按Tab键,它能帮你自动补全命令和路径,是提高效率的第一神器。
650 57
|
5月前
|
JSON 自然语言处理 Linux
linux命令—tree
tree是一款强大的Linux命令行工具,用于以树状结构递归展示目录和文件,直观呈现层级关系。支持多种功能,如过滤、排序、权限显示及格式化输出等。安装方法因系统而异常用场景包括:基础用法(显示当前或指定目录结构)、核心参数应用(如层级控制-L、隐藏文件显示-a、完整路径输出-f)以及进阶操作(如磁盘空间分析--du、结合grep过滤内容、生成JSON格式列表-J等)。此外,还可生成网站目录结构图并导出为HTML文件。注意事项:使用Tab键补全路径避免错误;超大目录建议限制遍历层数;脚本中推荐禁用统计信息以优化性能。更多详情可查阅手册mantree。
484 143
linux命令—tree
|
1月前
|
存储 安全 Linux
Linux卡在emergency mode怎么办?xfs_repair 命令轻松解决
Linux虚拟机遇紧急模式?别慌!多因磁盘挂载失败。本文教你通过日志定位问题,用`xfs_repair`等工具修复文件系统,三步快速恢复。掌握查日志、修磁盘、验重启,轻松应对紧急模式,保障系统稳定运行。
351 2
|
2月前
|
缓存 监控 Linux
Linux内存问题排查命令详解
Linux服务器卡顿?可能是内存问题。掌握free、vmstat、sar三大命令,快速排查内存使用情况。free查看实时内存,vmstat诊断系统整体性能瓶颈,sar实现长期监控,三者结合,高效定位并解决内存问题。
233 0
Linux内存问题排查命令详解