生产者消费者模型(一)

简介: 生产者消费者模型

一、生产者消费者模型的概念

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题


生产者和消费者彼此之间不直接通讯,而通过容器来通讯,所以生产者生产完数据之后不用等待消费者处理,直接将生产的数据放到这个容器当中;消费者也不用找生产者索要数据,而是直接从这个容器中取数据。容器就类似于一个缓冲区,平衡了生产者和消费者的处理能力,这个容器完成了生产者和消费者之间的解耦

219a1a84e2624cac818312d2c02a6075.png



二、生产者消费者模型的特点

三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)

两种角色: 生产者和消费者(通常由进程或线程承担)

一个交易场所: 通常指的是内存中的一段缓冲区(可以自己通过某种方式组织)

生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?


介于生产者和消费者之间的容器可能会被多个执行流同时访问,因此需要将该临界资源用互斥锁保护起来。所以所有生产者和消费者都会竞争式的申请锁,因此生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系


生产者和消费者之间为什么会存在同步关系?


若一直让生产者生产,那么当生产者生产的数据装满容器后,生产者再生产数据就会生产失败。

反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。

虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。


注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来


三、生产者消费者模型优点

解耦

支持并发,提高效率

支持忙闲不均

若在主函数中调用某一函数,那么必须等该函数体执行完后才继续执行主函数的后续代码,因此函数调用本质上是一种紧耦合。对应到生产者消费者模型中,函数传参实际上就是生产者生产的过程,而执行函数体实际上就是消费者消费的过程,但生产者只负责生产数据,消费者只负责消费数据,在消费者消费期间生产者可以同时进行生产,因此生产者消费者模型本质是一种松耦合


四、基于BlockingQueue的生产者消费者模型

4.1 基本认识

在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者消费者模型的数据结构


03932405022146bb9a0127654141d909.png


其与普通的队列的区别在于:


当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中放入了元素

当队列满时,往队列里存放元素的操作会被阻塞,直到有元素从队列中取出

阻塞队列最经典的应用场景:管道


4.2 模拟实现

下面以单生产者、单消费者为例进行讲解与实现

#include <iostream>
#include <queue>
#include <pthread.h>
template <class T>
class BlockQueue
{
public:
    BlockQueue(size_t capacity = 4) : _capacity(capacity)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_full,nullptr);
        pthread_cond_init(&_empty,nullptr);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_full);
        pthread_cond_destroy(&_empty);
    }
    void push(const T& data)
    {
        pthread_mutex_lock(&_mutex);
    while (IsFull()) {//不能进行生产,直到阻塞队列可以容纳新的数据
      pthread_cond_wait(&_full, &_mutex);
    }
    _queue.push(data);
        std::cout << "Producer: " << data << std::endl;
    pthread_mutex_unlock(&_mutex);
    pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程
    }
    void pop(T& data)
    {
        pthread_mutex_lock(&_mutex);
    while (IsEmpty()) {//不能进行消费,直到阻塞队列有新的数据
      pthread_cond_wait(&_empty, &_mutex);
    }
    data = _queue.front();
    _queue.pop();
        std::cout << "Consumer: " << data << std::endl;
    pthread_mutex_unlock(&_mutex);
    pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程
    }
private:
    bool IsFull() { return _queue.size() == _capacity; }
    bool IsEmpty() { return _queue.empty(); }
private:
    std::queue<T> _queue;
    size_t _capacity;
    pthread_mutex_t _mutex;
    pthread_cond_t _full;
    pthread_cond_t _empty;
};

判断是否满足生产消费条件时不能用if,而应该用while:


pthread_cond_wait函数有可能调用失败,调用失败后该执行流就会继续往后执行。为了避免出现上述情况,就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断


生产者消费者步调一致

#include <unistd.h>
#include "BlockQueue.hpp"
void* Producer(void* arg)
{
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
  while (true) { //生产者不断进行生产
    sleep(1);
    int data = rand() % 100 + 1;
    bq->push(data);
  }
}
void* Consumer(void* arg)
{
    int data = 0;
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
  while (true) { //消费者不断进行消费
    sleep(1);
    bq->pop(data);
  }
}
int main() 
{
    pthread_t producer,consumer;
    BlockQueue<int>* bq = new BlockQueue<int>;
    pthread_create(&producer,nullptr,Producer,(void*)bq);
    pthread_create(&consumer,nullptr,Consumer,(void*)bq);
    pthread_join(producer,nullptr);
    pthread_join(consumer,nullptr);
    delete bq;
    return 0;
}


由于代码中生产者是每隔一秒生产一个数据,而消费者是每隔一秒消费一个数据,因此运行代码后我们可以看到生产者和消费者的执行步调是一致的


41f14d806917493c84a164c535f9694c.png


生产者速度快,消费者速度慢

void* Producer(void* arg)
{
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
  while (true) { //生产者不断进行生产
    int data = rand() % 100 + 1;
    bq->push(data);
  }
}
void* Consumer(void* arg)
{
    int data = 0;
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
  while (true) { //消费者不断进行消费
    sleep(1);
    bq->pop(data);
  }
}

01134d6fb2c947c183ccbba0525b3373.png



此时由于生产者生产的很快,运行代码后一瞬间生产者就将阻塞队列装满。此时生产者想要再进行生产就只能在full条件变量下进行等待,直到消费者消费完一个数据后,生产者才会被唤醒进而继续进行生产,生产者生产完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了


生产者速度慢,消费者速度快

void* Producer(void* arg)
{
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
  while (true) { //生产者不断进行生产
        sleep(1);
    int data = rand() % 100 + 1;
    bq->push(data);
  }
}
void* Consumer(void* arg)
{
    int data = 0;
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
  while (true) { //消费者不断进行消费
    bq->pop(data);
  }
}


a6a6cfb0443249dbaf2d0e71d4b1507e.png


虽然消费者消费的快,但开始时阻塞队列中是没有数据的,因此消费者只能在empty条件变量下等待,直到生产者生产完一个数据后,消费者才会被唤醒进而进行消费,消费者消费完这一个数据后又会进行等待,因此生产者和消费者的步调就是一致的


设置唤醒策略


可以设置一些策略。譬如,下面当阻塞队列当中存储的数据大于队列容量的一半时,再唤醒消费者线程进行消费;当阻塞队列当中存储的数据小于队列容器的一半时,再唤醒生产者线程进行生产

void push(const T &data)
{
    pthread_mutex_lock(&_mutex);
    while (IsFull()) { // 不能进行生产,直到阻塞队列可以容纳新的数据
        pthread_cond_wait(&_full, &_mutex);
    }
    _queue.push(data);
    std::cout << "Producer: " << data << std::endl;
    if (_queue.size() >= _capacity / 2) {
        pthread_cond_signal(&_empty); // 唤醒在empty条件变量下等待的消费者线程
    }
    pthread_mutex_unlock(&_mutex);
}
void pop(T &data)
{
    pthread_mutex_lock(&_mutex);
    while (IsEmpty()) { // 不能进行消费,直到阻塞队列有新的数据
        pthread_cond_wait(&_empty, &_mutex);
    }
    data = _queue.front();
    _queue.pop();
    std::cout << "Consumer: " << data << std::endl;
    if (_queue.size() <= _capacity / 2) {
        pthread_cond_signal(&_full); // 唤醒在full条件变量下等待的生产者线程
    }
    pthread_mutex_unlock(&_mutex);
}


cc422bd29a404115930b20d3bb9cb36d.png


仍然让生产者生产快,消费者消费慢。运行代码后生产者还是一瞬间将阻塞队列装满后进行等待,但此时不是消费者消费一个数据就唤醒生产者线程,而是当阻塞队列当中的数据小于等于队列容器的一半时,才会唤醒生产者线程进行生产


基于任务的生产者消费者模型


实际使用生产者消费者模型时可不是简单的让生产者生产一个数字让消费者进行打印而已,前面的代码只是为了理解生产者消费者模型而已。

编写BlockingQueue时当中存储的数据就进行了模板化,那么就可以让BlockingQueue当中存储其他类型的数据。


譬如编写一个Task类(其中包含需要执行的任务),BlockingQueue中就存储Task对象。此时生产者放入阻塞队列的数据就是Task对象,而消费者从阻塞队列拿到Task对象后,就可以用该对象调用Run成员函数进行数据处理。


总之,根据需要进行编写即可


五、POSIX信号量

5.1 信号量概念

可能会被多个执行流同时访问的资源被称为临界资源,临界资源需要进行保护否则会出现数据不一致等问题

当仅用一个互斥锁对临界资源进行保护时,相当于将这块临界资源看作一个整体,同一时刻只允许一个执行流对这块临界资源进行访问(串行访问)

实际上可以将这块临界资源再分割为多个区域,当多个执行流需要访问临界资源时,若这些执行流访问的是临界资源的不同区域,那么可以让这些执行流同时访问临界资源的不同区域,此时并不会出现数据不一致等问题

信号量本质是一个计数器,是描述临界资源中资源数目的计数器,信号量能够更细粒度的对临界资源进行管理


每个执行流在进入临界区之前都应先申请信号量,申请成功就有了操作临界资源的权限,当操作完毕后就应该释放信号量


b1c6f920ce114b899a5132aa5a64d92b.png


信号量的P操作:将申请信号量称为P操作,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请成功时临界资源中资源的数目应该减一,因此P操作的本质就是让计数器减一

信号量的V操作:释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让计数器加一

PV操作为原子操作


多个执行流为了访问临界资源会竞争式的申请信号量,因此信号量是会被多个执行流同时访问的,即信号量本质也是临界资源。但信号量本质就是用于保护临界资源的,所以信号量的PV操作必须是原子操作


注意: 内存当中变量的自增、自减操作并不是原子操作,因此信号量不可能只是简单的对一个全局变量进行自增、自减操作


申请信号量失败被挂起等待


当执行流在申请信号量时,可能此时信号量的值为0,即信号量描述的临界资源已全部被申请了,此时该执行流就应该在该信号量的等待队列中进行等待,直到有信号量被释放时被唤醒


注意: 信号量的本质是计数器,但不意味着只有计数器,信号量还包括一个等待队列


5.2 信号量函数

5.2.1 初始化信号量

int sem_init(sem_t *sem, int pshared, unsigned int value);

sem:需要初始化的信号量

pshared:传入0值表示线程间共享,传入非零值表示进程间共享

value:信号量的初始值(计数器的初始值)

返回值:初始化信号量成功返回0,失败返回-1


注意:  POSIX信号量与System V信号量作用相同,都用于同步操作,达到无冲突的访问共享资源目的,但POSIX信号量可用于线程间同步


5.2.2 销毁信号量

int sem_destroy(sem_t *sem);

参数sem:需要销毁的信号量


返回值:销毁信号量成功返回0,失败返回-1


5.2.3 等待信号量

int sem_wait(sem_t *sem);

参数sem:需要等待的信号量


返回值:(P操作)


等待信号量成功返回0,信号量的值减一

等待信号量失败返回-1,信号量的值保持不变

5.2.4 发布信号量

int sem_post(sem_t *sem);

参数sem:需要发布的信号量


返回值:(V操作)


发布信号量成功返回0,信号量的值加一

发布信号量失败返回-1,信号量的值保持不变


目录
相关文章
|
8月前
|
存储 安全 Java
Qt线程池+生产者消费者模型
Qt线程池+生产者消费者模型
339 5
|
4月前
阻塞队列和生产者消费者模型
阻塞队列是一种特殊队列,当队列空时,获取元素操作会被阻塞;当队列满时,插入操作会被阻塞。可通过数组模拟实现,使用`wait`和`notify`控制阻塞与唤醒。生产者消费者模型中,阻塞队列作为缓冲区,实现生产者与消费者的解耦,提高系统可维护性和扩展性。
49 2
阻塞队列和生产者消费者模型
|
8月前
|
监控 安全 Java
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
134 1
|
8月前
|
C++
C++11实现生产者消费者
C++11实现生产者消费者
84 1
|
8月前
|
消息中间件 安全 Java
多线程(初阶七:阻塞队列和生产者消费者模型)
多线程(初阶七:阻塞队列和生产者消费者模型)
67 0
|
8月前
线程同步之 生产者消费者模型详解
前言 博主本来没打算讲这个比较前面的知识的(博主socket编程还有两个部分没讲,进程也才写完回收僵尸进程的三种方法,信号捕捉器也才完结),但是今天有朋友来问博主,什么是生产者消费者模型,所以博主就先为为数不多的朋友把生产者消费者模型讲一讲,希望大家能看懂(没有现成和锁知识的朋友不要急,这部分是写给有基础的朋友看的,这些知识博主都会慢慢的讲到)。 前言 博主本来没打算讲这个比较前面的知识的(博主socket编程还有两个部分没讲,进程也才写完回收僵尸进程的三种方法,信号捕捉器也才完结),但是今天有朋友来问博主,什么是生产者消费者模型,所以博主就先为为数不多的朋友把生产
55 0
|
8月前
|
SQL 供应链 安全
Linux多线程【生产者消费者模型】
Linux多线程【生产者消费者模型】
114 0
|
安全 Linux 数据安全/隐私保护
【Linux线程同步】生产者消费者模型
【Linux线程同步】生产者消费者模型
138 0
|
存储
生产者消费者模型(二)
生产者消费者模型
82 0
多线程实践-生产者消费者
多线程实践-生产者消费者
84 0