【Linux】深入理解生产者消费者模型

简介: 【Linux】深入理解生产者消费者模型

生产者 - 消费者模型Producer-consumer problem 是一个非常经典的多线程并发协作的模型,在分布式系统里非常常见。

一、为何要使用生产者消费者模型


在多线程开发中,如果生产者生产数据的速度很快,而消费者消费数据的速度很慢,那么生产者就必须等待消费者消费完了数据才能够继续生产数据,同理如果消费者的速度大于生产者那么消费者就会经常处理等待状态,所以为了达到生产者和消费者生产数据和消费数据之间的平衡,那么就需要一个缓冲区用来存储生产者生产的数据,所以就引入了生产者-消费者模式

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

二、生产者消费者模型的理解

1、生产者消费者模型的特点


  • 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
  • 两种角色: 生产者和消费者。(通常由进程或线程承担)
  • 一个交易场所: 通常指的是内存中的一段缓冲区。(可以自己通过某种数据结构组织起来)

为什么生产者和消费者之间为什么会存在互斥且同步关系?

  • 互斥关系:由于所有的生产者和消费者之间都是访问的同一段缓冲区(临界资源),为了避免出现数据不一致性,我们需要对访问缓冲区的线程进行加锁,于是无论是生产者还是消费者都要竞争同一把锁,所以是互斥关系。
  • 同步关系: 如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者再生产数据就会生产失败。反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。为了让生产者和消费者线程之间协同起来就需要有同步关系!

2、生产者消费者模型的优点

  • 解耦 :假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
  • 支持并发 :生产者如果直接调用消费者的某个方法,还有另一个弊端就是由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白浪费时间。

    使用了生产者-消费者模式之后,生产者和消费者可以是两个独立的并发主体。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。
  • 支持忙闲不均:缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。

四、基于BlockQueue的生产者消费者模型


在多线程编程中阻塞队列(Block Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于:

  • 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;
  • 当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出

联系: 管道的实现其实就是依据阻塞队列实现的!

1、C++实现阻塞队列

// 阻塞队列
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
// 容量的默认值
const size_t g_cap = 5;
template<class T>
class BlockQueue
{
public:
    BlockQueue(size_t cap = g_cap)
        :_cap(cap)
    {
      // 对锁和条件变量进行初始化
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_consumerCond, nullptr);
        pthread_cond_init(&_producerCond, nullptr);
    }
    // 插入数据
    void push(const T& data)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 这里使用while能够防止被误唤醒(例如消费者使用的是broadcast)
        while (_q.size() == _cap)
        {
            // 不满足生产条件,需要进行等待
            pthread_cond_wait(&_producerCond, &_mutex);
        }
        // 插入数据
        _q.push(data);
        // 唤醒队列中的第一个消费者
        pthread_cond_signal(&_consumerCond);
        pthread_mutex_unlock(&_mutex);
    }
    // 删除数据,out是一个输出型数据表示删除的值,如果不关心删除的值可以传nullptr
    void pop(T* out)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 这里使用while能够防止被误唤醒(例如生成者使用的是broadcast)
        while (_q.empty())
        {
          // 不满足消费条件,需要进行等待
            pthread_cond_wait(&_consumerCond, &_mutex);
        }
        if (out != nullptr)
        {
            *out = _q.front();
        }
        // 删除数据
        _q.pop();
        // 唤醒队列中的第一个生成者
        pthread_cond_signal(&_producerCond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
      // 对锁和条件变量进行销毁
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_consumerCond);
        pthread_cond_destroy(&_producerCond);
    }
private:
  // 队列
    std::queue<T> _q;
    // 容量
    size_t _cap;
    // 互斥锁
    pthread_mutex_t _mutex;
    // 生产者条件变量
    pthread_cond_t _consumerCond;
    // 消费者条件变量
    pthread_cond_t _producerCond;
};

2、一些注意事项

判断是否满足生产消费条件时不能用if,而应该用while

  • pthread_cond_wait函数是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行,有可能会导致产生的数据大于容量上限,或者队列为空还在消费。
  • 其次,在多消费者的情况下,当生产者生产了一个数据后如果使用pthread_cond_broadcast函数唤醒消费者,就会一次性唤醒多个消费者,但待消费的数据只有一个,此时其他消费者就被伪唤醒了。

为了避免出现上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断。


为了测试这份代码,我们先让生产者生成的慢一些,让消费者消费的快一些,我们应该看到:生成者生成一个,消费者消费一个。

#include "BlockQueue.hpp"
#include <cstdlib>
#include <unistd.h>
using namespace std;
void* producer(void* args)
{
    BlockQueue<int>* pbq = static_cast<BlockQueue<int>*> (args);
    srand((unsigned int)time(nullptr));
    while (true)
    {
      // 让生产者生产的慢一些
        sleep(1);
        // 1.获取数据
        int data = rand() % 100;
        // 2.将数据交给阻塞队列
        pbq->push(data);
        cout << "A data is generated, it is :" << data << endl;
    }
    return nullptr;
}
void* consumer(void* args)
{
    BlockQueue<int>* pbq = static_cast<BlockQueue<int>*> (args);
    while (true)
    {
        //sleep(1);  // 让消费者消费的慢一些
        // 1.从阻塞队列中获取数据
        int out;
        pbq->pop(&out);
        // 2.处理数据
        cout << "A data is consumed, it is :" << out << endl;
    }
    return nullptr;
}
int main()
{
    pthread_t tp, tc;
    BlockQueue<int> bq;
    pthread_create(&tp, nullptr, producer, &bq);
    pthread_create(&tc, nullptr, consumer, &bq);
    pthread_join(tp, nullptr);
    pthread_join(tc, nullptr);
    return 0;
}

可以看到结果符合我们的预期:

接下来我们让生成者生成快一些,让消费者消费慢一些,我们应该看到生产者先把阻塞队列塞满,然后消费者消费一个,生产者生成一个。

(将上述代码的void* producer(void* args)函数中的sleep注释掉,将void* consumer(void* args)函数中的sleep注释去掉)

五、基于环形队列的生产者消费者模型

1、信号量的原理

  • 我们知道一把互斥锁只能对一份临界资源进行保护,当我们对加锁的资源使用时相当于将这块临界资源看作一个整体,同一时刻只允许一个执行流对这块临界资源进行访问。
  • 实际我们可以将这块临界资源再分割为多个区域,当多个执行流需要访问临界资源时,如果这些执行流访问的是临界资源的不同区域,那么我们可以让这些执行流进行同时访问,此时并不会出现数据不一致等问题。
  • 信号量(信号灯)本质是一个计数器,是描述临界资源中资源数目的计数器,信号量能够更细粒度的对临界资源进行管理。
  • 每个执行流在进入临界区之前都应该先申请信号量,申请成功就有了操作特点的临界资源的权限,当操作完毕后就应该释放信号量。
  • 信号量的工作机制类似于我们看电影买票,是一种资源的预订机制!不管线程对这份资源是用还是不用,这份资源一定是有的!如果申请信号量失败,则线程会被挂起等待,直到有资源可以使用才会自动被唤醒。
  • 如果将信号量的初始值设置为1,那么此时该信号量叫做二元信号量,说明信号量所描述的临界资源只有一份,此时信号量的作用基本等价于互斥锁。

信号量的PV操作:

P操作:我们将申请信号量称为P操作,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请成功时临界资源中资源的数目应该减一,因此P操作的本质就是让计数器减一。

V操作:我们将释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让计数器加一。

此外,PV操作是原子操作,只有这样才能保证信号量的线程安全


2、POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

使用下面的函数需要包含头文件 : #include <semaphore.h>,并链接库-lpthread

初始化信号量函数

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数说明:

  • sem:需要初始化的信号量。
  • pshared:传入0值表示线程间共享,传入非零值表示进程间共享。
  • value:计数器的初始值。

返回值说明:

  • 初始化信号量成功返回0,失败返回-1。

销毁信号量

int sem_destroy(sem_t *sem);

参数说明:

  • sem:需要销毁的信号量。

返回值说明:

  • 销毁信号量成功返回0,失败返回-1。

等待信号量

功能:等待信号量,会将信号量的值减1

int sem_wait(sem_t *sem);

参数说明:

  • sem:需要等待的信号量。

返回值说明:

  • 等待信号量成功返回0,信号量的值减1。
  • 等待信号量失败返回-1,信号量的值保持不变。

发布信号量

int sem_post(sem_t *sem);

参数说明:

  • sem:需要发布的信号量。

返回值说明:

  • 发布信号量成功返回0,信号量的值加一。
  • 发布信号量失败返回-1,信号量的值保持不变。

3、基于环形队列的生产消费模型

在阻塞队列中,我们将队列作为整体使用,生产者和消费者在同一时刻只能有一个人进行访问,但是在环形队列里面我们可以发现,生产者和消费者关心的内容是不一样的!

  • 生产者关心空间,消费者关心的是数据,环形队列只要生产者和消费者访问不同的区域,生产和消费行为可以同时并发进行。

那么它们什么时候会访问同一块区域呢?

  1. 刚开始时,数据为空,空间为满,生产者和消费者指向同一个位置,存在竞争关系,这时我们应该让生产者先运行!

  2. 数据为满,空间为空,生产者和消费者指向同一个位置,存在竞争关系,这时我们应该让消费者先运行!

4、代码实现

#include <vector>
#include <semaphore.h>
#include <pthread.h>
template<class T>
class RingQueue
{
public:
  //  构造函数设置默认环形队列的大小是5 
    RingQueue(int cap = 5)
        :_cap(cap),  _ring(cap),_c_step(0), _p_step(0)
    {
      // 刚开始时,数据信号量为0
        sem_init(&_data_sem, 0, 0);
        // 刚开始时,空间信号量为cap
        sem_init(&_space_sem, 0, cap);
        pthread_mutex_init(&_c_step_mtx, nullptr);
        pthread_mutex_init(&_p_step_mtx, nullptr);
    }
    // 插入数据
    void push(const T& data)
    {
        // 生产者申请空间资源,P操作
        sem_wait(&_space_sem);
        // 信号量申请成功,必定有资源可以使用,具体是哪一个资源由程序员分配资源
        //_p_step是临界资源
        pthread_mutex_lock(&_p_step_mtx);
        _ring[_p_step++] = data;
        // 防止越界
        _p_step %= _cap;
        pthread_mutex_unlock(&_p_step_mtx);
        // 释放对方关心的信号量,增加了一个数据,V操作
        sem_post(&_data_sem);
    }
    void pop(T* out)
    {
      //  消费者者申请数据资源,P操作
        sem_wait(&_data_sem);
        pthread_mutex_lock(&_c_step_mtx);
        *out = _ring[_c_step++];
        _c_step %= _cap;
        pthread_mutex_unlock(&_c_step_mtx);
    // 释放对方关心的信号量,增加了一个空间,V操作
        sem_post(&_space_sem);
    }
    ~RingQueue()
    {
        sem_destroy(&_data_sem);
        sem_destroy(&_space_sem);
    }
private:
    int _cap;               // 容量
    std::vector<T> _ring;   // 容器
    sem_t _data_sem;        // 数据信号量
    sem_t _space_sem;       // 空间信号量
    size_t _c_step;         // 消费者位置
    size_t _p_step;         // 生产者位置
    pthread_mutex_t _c_step_mtx; // _c_step对应的锁
    pthread_mutex_t _p_step_mtx; // _p_step对应的锁
};

相关说明

  • 生产者 / 消费者每次生产数据后_p_step / _c_step都会进行++,标记下一次生产/消费数据的存放位置,++后的下标会与环形队列的容量进行取模运算,实现“环形”的效果。
  • 尽管我们已经通过信号量保证了生产者和消费者大部分情况下在该环形队列可以让生产者和消费者并发的执行,但是由于生产者和生产者,消费者和消费者存在竞争关系,所以我们还需要两把锁, _c_step_mtx, _p_step_mtx来保证它们之间的竞争关系。
相关文章
|
12天前
|
缓存 网络协议 算法
【Linux系统编程】深入剖析:四大IO模型机制与应用(阻塞、非阻塞、多路复用、信号驱动IO 全解读)
在Linux环境下,主要存在四种IO模型,它们分别是阻塞IO(Blocking IO)、非阻塞IO(Non-blocking IO)、IO多路复用(I/O Multiplexing)和异步IO(Asynchronous IO)。下面我将逐一介绍这些模型的定义:
|
5天前
|
Linux
【Linux】生产者消费者模型——环形队列RingQueue(信号量)
【Linux】生产者消费者模型——环形队列RingQueue(信号量)
7 0
|
5天前
|
存储 Linux 容器
【Linux】生产者消费者模型——阻塞队列BlockQueue
【Linux】生产者消费者模型——阻塞队列BlockQueue
9 0
|
12天前
|
Linux 网络安全 虚拟化
Ngnix04系统环境准备-上面软件是免费版的,下面是收费版的,他更快的原因使用了epoll模型,查看当前Linux系统版本, uname -a,VMWARE建议使用NAT,PC端电脑必须使用网线连接
Ngnix04系统环境准备-上面软件是免费版的,下面是收费版的,他更快的原因使用了epoll模型,查看当前Linux系统版本, uname -a,VMWARE建议使用NAT,PC端电脑必须使用网线连接
|
21天前
|
安全 Linux 数据安全/隐私保护
【linux】线程同步和生产消费者模型
【linux】线程同步和生产消费者模型
13 0
|
3天前
|
存储 安全 Linux
Linux命令sync详解
`sync`命令在Linux中用于将内存缓冲区的数据强制写入磁盘,保证数据持久性和一致性。它在关机、重启或重要文件操作前后使用,以防数据丢失。工作原理是强制将内存中的数据同步到磁盘,特点是阻塞式执行且通常无需参数。常见用法包括安全关机、数据备份和配置文件修改后确保更改生效。应注意,过度使用可能影响性能,应适时使用`fsck`检查文件系统一致性。
|
3天前
|
安全 数据管理 Shell
Linux命令su详解
`su`命令在Linux中用于切换用户身份,常用于权限管理。它允许用户无须注销当前会话就切换到另一个用户,尤其是root。`su`有多种选项,如`-`或`--login`加载目标用户环境,`-c`执行指定命令后返回。使用时需注意权限安全,建议用`sudo`以减少风险。通过限制`/etc/pam.d/su`可加强访问控制。`su`在系统维护和数据管理中扮演角色,但不直接处理数据。
|
3天前
|
存储 运维 安全
Linux命令stat:深入了解文件与文件系统状态
`stat`命令在Linux中用于显示文件和文件系统的详细状态,包括权限、大小、时间戳等。它通过读取inode获取信息,特点是显示全面、易用且支持多种参数,如`-c`自定义格式,`-f`查看文件系统状态,`-L`处理符号链接。例如,`stat example.txt`显示文件详情,`stat -c &quot;%n 的大小是 %s 字节&quot; example.txt`输出文件大小。理解`stat`有助于系统管理和故障排查。
|
3天前
|
关系型数据库 MySQL Linux
Linux命令systemctl详解
`systemctl`是Linux系统用于管理systemd服务的核心命令,它与systemd守护进程交互,实现启动、停止、重启服务及查看服务状态等功能。主要参数包括`start`、`stop`、`restart`、`status`、`enable`和`disable`等。例如,启动Apache服务使用`systemctl start httpd.service`,查看服务状态用`systemctl status &lt;service&gt;`。使用时需注意权限,服务名通常以`.service`结尾,但命令中可省略。最佳实践包括利用tab键补全、定期查看服务状态和合理配置服务自启。
|
3天前
|
安全 Linux 数据安全/隐私保护
Linux命令strings详解
`strings`是Linux工具,用于从二进制文件中提取可打印字符串,常用于文件分析、安全审计和逆向工程。它可以识别至少4个连续可打印字符的序列,并支持多种参数,如`-n`调整最小长度,`-f`显示文件名。示例用法包括`strings /bin/ls`和`strings -n 6 /usr/bin/uptime | grep GLIBC`。注意敏感信息泄露,结合其他命令可增强分析能力。