《Linux从练气到飞升》No.29 生产者消费者模型

简介: 《Linux从练气到飞升》No.29 生产者消费者模型

前言

并发编程领域,生产者消费者模型是一个经典且重要的话题。它涉及到多线程之间的协作与通信,展现了在复杂系统中保持数据一致性和避免资源竞争的关键技术。通过深入探讨生产者消费者模型,我们可以了解如何利用同步和互斥的机制来实现线程之间的有效协作,从而提高程序的效率和可靠性。

在本篇博客中,我将带领读者逐步理解生产者消费者模型的设计思想、实现方法以及可能遇到的问题。无论是初学者还是有一定经验的开发人员,都可以通过本文深入了解生产者消费者模型,并掌握如何在实际项目中应用这一模型来优化程序结构和性能。

让我们一起探索生产者消费者模型的精妙之处,为并发编程的世界增添新的活力与智慧。

1 相关概念

  1. 321原则

3:3种关系,生产者之间(互斥)、消费者之间(互斥)、生产者和消费者之间(互斥+同步)

2:两种角色,生产者和消费者

1:一个交易场所

  1. 使用生产者消费者模型的原因

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

  1. 生产者消费者模型优点
  1. 解耦
  2. 支持并发
  3. 支持忙闲不均

  1. 基于BlockingQueue的生产者消费者模型

多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

2 基于BlockingQueue的生产者消费者代码实现

BlockQueue.cc

#pragma
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<queue>
#include<stdlib.h>
using namespace std;
class Task
{
public:
    int _x;
    int _y;
    Task(){}
    Task(int x,int y)
        :_x(x),_y(y)
    {}
    int run()
    {
        return _x+_y;
    }
    ~Task(){}
};
class BlockQueue
{
private:
    /* data */
    std::queue<Task> q;//设置一个队列
    int _cap;//容量
    pthread_mutex_t lock;//设置一把互斥锁
    pthread_cond_t c_cond;//满了通知消费者
    pthread_cond_t p_cond;//满了通知生产者
    void LockQueue()//加锁
    {
        pthread_mutex_lock(&lock);
    }
    void UnlockQueue()//解锁
    {
        pthread_mutex_unlock(&lock);
    }
    bool IsEmpty()//判断队列是否为空
    {
        return q.size()==0;
    }
    bool IsFull()//判断队列是否满
    {
        return q.size()==_cap;
    }
    void ProductWait()//生产者等待
    {
        pthread_cond_wait(&p_cond,&lock);
    }
    void ConsumerWait()//消费者等待
    {
        pthread_cond_wait(&c_cond,&lock);
    }
    void WakeUpProduct()//唤醒生产者
    {
        pthread_cond_signal(&p_cond);
    }
    void WakeUpConsumer()//唤醒消费者
    {
        pthread_cond_signal(&c_cond);
    }
public:
    BlockQueue(int cap)//初始化
        :_cap(cap)
    {
        pthread_mutex_init(&lock,NULL);
        pthread_cond_init(&c_cond,NULL);
        pthread_cond_init(&p_cond,NULL);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&lock);
        pthread_cond_destroy(&c_cond);
        pthread_cond_destroy(&p_cond);
    }
    void put(Task in)
    {
        LockQueue();
        while(IsFull()){
            WakeUpConsumer();//唤醒消费者
            std::cout<<"queue full,notify consume , stop product"<<std::endl;
            ProductWait();//生产者线程等待
        }
        q.push(in);
        UnlockQueue();
    }
    void get(Task&out){
        LockQueue();
        while(IsEmpty()){
            WakeUpProduct();//唤醒生产者
            std::cout<<"queue Empty,notify product , stop consum"<<std::endl;
            ConsumerWait();//消费者线程等待
        }
        out=q.front();
        q.pop();
        UnlockQueue();
    }
};

main.cc

#include"BlockQueue.cc"
pthread_mutex_t p_lock;
pthread_mutex_t c_lock;
void* Product_Run(void* arg)
{
    BlockQueue* bq = (BlockQueue*)arg;
    srand((unsigned int)time(NULL));
    while(1)
    {
        pthread_mutex_lock(&p_lock);
        int x = rand()%10 + 1;
        int y = rand()%100 + 1;
        Task t(x,y);
        bq->put(t);
        pthread_mutex_unlock(&p_lock);
        cout<<"product data is "<<t.run()<<endl;
    }
}
void* Consumer_Run(void* arg)
{
    BlockQueue* bq = (BlockQueue*)arg;
    srand((unsigned int)time(NULL));
    while(1)
    {
        pthread_mutex_lock(&c_lock);
        Task t;
        bq->get(t);
        pthread_mutex_unlock(&c_lock);
        cout<<"consumer is "<<t._x<<"+"<<t._y<<"="<<t.run()<<endl;
        sleep(1);
    }
}
int main()
{
    BlockQueue* bq = new BlockQueue(10);
    pthread_t c,p;
    pthread_create(&c,NULL,Product_Run,(void*)bq);
    pthread_create(&p,NULL,Consumer_Run,(void*)bq);
    pthread_join(c,NULL);
    pthread_join(p,NULL);
    delete bq;
    return 0;
}

makefile

main:main.cc
  g++ -o $@ $^ -lpthread
.PTHONY:
clean:
  rm -f main

结果:

可以观察到生产者生产任务,消费者完成任务

相关文章
|
5月前
|
消息中间件 Linux Kafka
linux命令使用消费kafka的生产者、消费者
linux命令使用消费kafka的生产者、消费者
245 16
|
缓存 安全 Linux
Linux 五种IO模型
Linux 五种IO模型
|
网络协议 Linux 数据安全/隐私保护
在Linux中,TCP/IP 的七层模型有哪些?
在Linux中,TCP/IP 的七层模型有哪些?
|
Kubernetes Linux API
在Linux中,LVS-DR模型的特性是什么?
在Linux中,LVS-DR模型的特性是什么?
|
负载均衡 算法 Linux
在Linux中,LVS-NAT模型的特性是什么?
在Linux中,LVS-NAT模型的特性是什么?
|
27天前
|
Unix Linux 程序员
Linux文本搜索工具grep命令使用指南
以上就是对Linux环境下强大工具 `grep` 的基础到进阶功能介绍。它不仅能够执行简单文字查询任务还能够处理复杂文字处理任务,并且支持强大而灵活地正则表达规范来增加查询精度与效率。无论您是程序员、数据分析师还是系统管理员,在日常工作中熟练运用该命令都将极大提升您处理和分析数据效率。
103 16
|
18天前
|
Linux
linux命令—stat
`stat` 是 Linux 系统中用于查看文件或文件系统详细状态信息的命令。相比 `ls -l`,它提供更全面的信息,包括文件大小、权限、所有者、时间戳(最后访问、修改、状态变更时间)、inode 号、设备信息等。其常用选项包括 `-f` 查看文件系统状态、`-t` 以简洁格式输出、`-L` 跟踪符号链接,以及 `-c` 或 `--format` 自定义输出格式。通过这些选项,用户可以灵活获取所需信息,适用于系统调试、权限检查、磁盘管理等场景。
|
3月前
|
监控 Linux 网络安全
Linux命令大全:从入门到精通
日常使用的linux命令整理
668 13
|
4月前
|
Linux 网络安全 数据安全/隐私保护
使用Linux系统的mount命令挂载远程服务器的文件夹。
如此一来,你就完成了一次从你的Linux发车站到远程服务器文件夹的有趣旅行。在这个技术之旅中,你既探索了新地方,也学到了如何桥接不同系统之间的距离。
555 21
|
4月前
|
JSON 自然语言处理 Linux
linux命令—tree
tree是一款强大的Linux命令行工具,用于以树状结构递归展示目录和文件,直观呈现层级关系。支持多种功能,如过滤、排序、权限显示及格式化输出等。安装方法因系统而异常用场景包括:基础用法(显示当前或指定目录结构)、核心参数应用(如层级控制-L、隐藏文件显示-a、完整路径输出-f)以及进阶操作(如磁盘空间分析--du、结合grep过滤内容、生成JSON格式列表-J等)。此外,还可生成网站目录结构图并导出为HTML文件。注意事项:使用Tab键补全路径避免错误;超大目录建议限制遍历层数;脚本中推荐禁用统计信息以优化性能。更多详情可查阅手册mantree。
linux命令—tree