《Linux从练气到飞升》No.29 生产者消费者模型

简介: 《Linux从练气到飞升》No.29 生产者消费者模型

前言

并发编程领域,生产者消费者模型是一个经典且重要的话题。它涉及到多线程之间的协作与通信,展现了在复杂系统中保持数据一致性和避免资源竞争的关键技术。通过深入探讨生产者消费者模型,我们可以了解如何利用同步和互斥的机制来实现线程之间的有效协作,从而提高程序的效率和可靠性。

在本篇博客中,我将带领读者逐步理解生产者消费者模型的设计思想、实现方法以及可能遇到的问题。无论是初学者还是有一定经验的开发人员,都可以通过本文深入了解生产者消费者模型,并掌握如何在实际项目中应用这一模型来优化程序结构和性能。

让我们一起探索生产者消费者模型的精妙之处,为并发编程的世界增添新的活力与智慧。

1 相关概念

  1. 321原则

3:3种关系,生产者之间(互斥)、消费者之间(互斥)、生产者和消费者之间(互斥+同步)

2:两种角色,生产者和消费者

1:一个交易场所

  1. 使用生产者消费者模型的原因

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

  1. 生产者消费者模型优点
  1. 解耦
  2. 支持并发
  3. 支持忙闲不均

  1. 基于BlockingQueue的生产者消费者模型

多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

2 基于BlockingQueue的生产者消费者代码实现

BlockQueue.cc

#pragma
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<queue>
#include<stdlib.h>
using namespace std;
class Task
{
public:
    int _x;
    int _y;
    Task(){}
    Task(int x,int y)
        :_x(x),_y(y)
    {}
    int run()
    {
        return _x+_y;
    }
    ~Task(){}
};
class BlockQueue
{
private:
    /* data */
    std::queue<Task> q;//设置一个队列
    int _cap;//容量
    pthread_mutex_t lock;//设置一把互斥锁
    pthread_cond_t c_cond;//满了通知消费者
    pthread_cond_t p_cond;//满了通知生产者
    void LockQueue()//加锁
    {
        pthread_mutex_lock(&lock);
    }
    void UnlockQueue()//解锁
    {
        pthread_mutex_unlock(&lock);
    }
    bool IsEmpty()//判断队列是否为空
    {
        return q.size()==0;
    }
    bool IsFull()//判断队列是否满
    {
        return q.size()==_cap;
    }
    void ProductWait()//生产者等待
    {
        pthread_cond_wait(&p_cond,&lock);
    }
    void ConsumerWait()//消费者等待
    {
        pthread_cond_wait(&c_cond,&lock);
    }
    void WakeUpProduct()//唤醒生产者
    {
        pthread_cond_signal(&p_cond);
    }
    void WakeUpConsumer()//唤醒消费者
    {
        pthread_cond_signal(&c_cond);
    }
public:
    BlockQueue(int cap)//初始化
        :_cap(cap)
    {
        pthread_mutex_init(&lock,NULL);
        pthread_cond_init(&c_cond,NULL);
        pthread_cond_init(&p_cond,NULL);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&lock);
        pthread_cond_destroy(&c_cond);
        pthread_cond_destroy(&p_cond);
    }
    void put(Task in)
    {
        LockQueue();
        while(IsFull()){
            WakeUpConsumer();//唤醒消费者
            std::cout<<"queue full,notify consume , stop product"<<std::endl;
            ProductWait();//生产者线程等待
        }
        q.push(in);
        UnlockQueue();
    }
    void get(Task&out){
        LockQueue();
        while(IsEmpty()){
            WakeUpProduct();//唤醒生产者
            std::cout<<"queue Empty,notify product , stop consum"<<std::endl;
            ConsumerWait();//消费者线程等待
        }
        out=q.front();
        q.pop();
        UnlockQueue();
    }
};

main.cc

#include"BlockQueue.cc"
pthread_mutex_t p_lock;
pthread_mutex_t c_lock;
void* Product_Run(void* arg)
{
    BlockQueue* bq = (BlockQueue*)arg;
    srand((unsigned int)time(NULL));
    while(1)
    {
        pthread_mutex_lock(&p_lock);
        int x = rand()%10 + 1;
        int y = rand()%100 + 1;
        Task t(x,y);
        bq->put(t);
        pthread_mutex_unlock(&p_lock);
        cout<<"product data is "<<t.run()<<endl;
    }
}
void* Consumer_Run(void* arg)
{
    BlockQueue* bq = (BlockQueue*)arg;
    srand((unsigned int)time(NULL));
    while(1)
    {
        pthread_mutex_lock(&c_lock);
        Task t;
        bq->get(t);
        pthread_mutex_unlock(&c_lock);
        cout<<"consumer is "<<t._x<<"+"<<t._y<<"="<<t.run()<<endl;
        sleep(1);
    }
}
int main()
{
    BlockQueue* bq = new BlockQueue(10);
    pthread_t c,p;
    pthread_create(&c,NULL,Product_Run,(void*)bq);
    pthread_create(&p,NULL,Consumer_Run,(void*)bq);
    pthread_join(c,NULL);
    pthread_join(p,NULL);
    delete bq;
    return 0;
}

makefile

main:main.cc
  g++ -o $@ $^ -lpthread
.PTHONY:
clean:
  rm -f main

结果:

可以观察到生产者生产任务,消费者完成任务

相关文章
|
5天前
|
Linux
【Linux】生产者消费者模型——环形队列RingQueue(信号量)
【Linux】生产者消费者模型——环形队列RingQueue(信号量)
7 0
|
5天前
|
存储 Linux 容器
【Linux】生产者消费者模型——阻塞队列BlockQueue
【Linux】生产者消费者模型——阻塞队列BlockQueue
9 0
|
12天前
|
Linux 网络安全 虚拟化
Ngnix04系统环境准备-上面软件是免费版的,下面是收费版的,他更快的原因使用了epoll模型,查看当前Linux系统版本, uname -a,VMWARE建议使用NAT,PC端电脑必须使用网线连接
Ngnix04系统环境准备-上面软件是免费版的,下面是收费版的,他更快的原因使用了epoll模型,查看当前Linux系统版本, uname -a,VMWARE建议使用NAT,PC端电脑必须使用网线连接
|
6天前
|
运维 监控 网络协议
Linux抓包命令tcpdump使用技巧大全
【7月更文挑战第10天】
28 5
Linux抓包命令tcpdump使用技巧大全
|
3天前
|
存储 安全 Linux
Linux命令sync详解
`sync`命令在Linux中用于将内存缓冲区的数据强制写入磁盘,保证数据持久性和一致性。它在关机、重启或重要文件操作前后使用,以防数据丢失。工作原理是强制将内存中的数据同步到磁盘,特点是阻塞式执行且通常无需参数。常见用法包括安全关机、数据备份和配置文件修改后确保更改生效。应注意,过度使用可能影响性能,应适时使用`fsck`检查文件系统一致性。
|
3天前
|
安全 数据管理 Shell
Linux命令su详解
`su`命令在Linux中用于切换用户身份,常用于权限管理。它允许用户无须注销当前会话就切换到另一个用户,尤其是root。`su`有多种选项,如`-`或`--login`加载目标用户环境,`-c`执行指定命令后返回。使用时需注意权限安全,建议用`sudo`以减少风险。通过限制`/etc/pam.d/su`可加强访问控制。`su`在系统维护和数据管理中扮演角色,但不直接处理数据。
|
3天前
|
存储 运维 安全
Linux命令stat:深入了解文件与文件系统状态
`stat`命令在Linux中用于显示文件和文件系统的详细状态,包括权限、大小、时间戳等。它通过读取inode获取信息,特点是显示全面、易用且支持多种参数,如`-c`自定义格式,`-f`查看文件系统状态,`-L`处理符号链接。例如,`stat example.txt`显示文件详情,`stat -c &quot;%n 的大小是 %s 字节&quot; example.txt`输出文件大小。理解`stat`有助于系统管理和故障排查。
|
3天前
|
关系型数据库 MySQL Linux
Linux命令systemctl详解
`systemctl`是Linux系统用于管理systemd服务的核心命令,它与systemd守护进程交互,实现启动、停止、重启服务及查看服务状态等功能。主要参数包括`start`、`stop`、`restart`、`status`、`enable`和`disable`等。例如,启动Apache服务使用`systemctl start httpd.service`,查看服务状态用`systemctl status &lt;service&gt;`。使用时需注意权限,服务名通常以`.service`结尾,但命令中可省略。最佳实践包括利用tab键补全、定期查看服务状态和合理配置服务自启。
|
3天前
|
安全 Linux 数据安全/隐私保护
Linux命令strings详解
`strings`是Linux工具,用于从二进制文件中提取可打印字符串,常用于文件分析、安全审计和逆向工程。它可以识别至少4个连续可打印字符的序列,并支持多种参数,如`-n`调整最小长度,`-f`显示文件名。示例用法包括`strings /bin/ls`和`strings -n 6 /usr/bin/uptime | grep GLIBC`。注意敏感信息泄露,结合其他命令可增强分析能力。
|
3天前
|
存储 监控 Linux
stdbuf命令在Linux中的深度解析
`stdbuf`是Linux工具,用于控制命令的stdin、stdout和stderr的缓冲模式。它可以设置为无缓冲、行缓冲或块缓冲,以优化数据处理和实时性。例如,`stdbuf -o0 cmd`禁用cmd的输出缓冲,`-oL`则按行缓冲。在需要实时监控或高效处理大量数据时,选择合适的缓冲模式至关重要。注意,过度使用无缓冲可能影响性能,并非所有系统都支持`stdbuf`。