Linux线程的生产者消费者模型 --- 阻塞队列(blockqueue)(二)

简介: Linux线程的生产者消费者模型 --- 阻塞队列(blockqueue)(二)

CP.cc

#include "BlockQueue.hpp"
#include <ctime>
#include <unistd.h>
// 生产
void *Producer(void *argc)
{
    blockqueue<int> *t = (blockqueue<int> *)argc;
    while (1)
    {
        // 随机产生数据插入
        int x = rand() % 100 + 1;
        t->push(x);
        std::cout << "生产计算数据:" << x << std::endl;
        sleep(1);
    }
    return nullptr;
}
// 消费
void *Consumer(void *argc)
{
    blockqueue<int> *t = (blockqueue<int> *)argc;
    while (1)
    {
        // 拿出数据
        int x;
        t->pop(&x);
        std::cout << "消费计算数据:" << x << std::endl;
    }
    return nullptr;
}
int main()
{
    // 设置随机种子
    srand(time(nullptr));
    blockqueue<int>* dq = new blockqueue<int>();
    pthread_t c, p;
    // 创建计算生产者
    pthread_create(&p, nullptr, Producer, dq);
    // 创建计算消费者
    pthread_create(&c, nullptr, Consumer, dq);
    pthread_join(p, nullptr);
    pthread_join(c, nullptr);
    return 0;
}

上面的代码就可以实现单消费者和单生产者的模型。生产者就会往阻塞队列里面写入数据,消费者就可以往阻塞队列里面读数据


9a8cbdbbc332a5f3c204019457866363.png

那么根据这个模式再来实现一个加大点难度的模型代码

生产者 -> queue -> 消费者兼生产者 -> queue -> 消费者


8b30a8562d158920e393510792bc4ee1.png

实现大致目的

  1. 一个生产者,一个消费者兼生产者,一个消费者
  2. 计算过程由随机数,随机符号
  3. 第一个消费者读到数据后传到第二个队列中
  4. 最后读取计算结果的消费者将数据读到文件中

大致步骤

  1. 因为有不同的任务,所以创建一个任务头文件
  2. 由于是两个不同的队列,因此可以创建一个队列组的类
  3. ±*/ 随机
  4. 以下代码均有注释

blockqueue.hpp

#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
// 设置默认的最大容量
static int max = 10;
template <class T>
class blockqueue
{
public:
    blockqueue(const int &maxnum = max)
        : _maxnum(maxnum)
    {
        pthread_mutex_init(&_lock, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }
    // 插入数据
    void push(const T &in)
    {
        // 加锁
        pthread_mutex_lock(&_lock);
        // 判断队列是否满了,如果为空则等待
        // 充当条件判断的语法必须是while,不能用if
        while (_q.size() == _maxnum)
            pthread_cond_wait(&_pcond, &_lock);
        // 插入数据
        _q.push(in);
        // 走到这里说明队列一定有数据,就可以唤醒消费者的线程
        pthread_cond_signal(&_ccond);
        // 解锁
        pthread_mutex_unlock(&_lock);
    }
    // 拿到头部数据并删除
    void pop(T *out)
    {
        // 加锁
        pthread_mutex_lock(&_lock);
        // 判断队列是否满了,如果为空则等待
        // 充当条件判断的语法必须是while,不能用if
        while (_q.size() == 0)
            pthread_cond_wait(&_ccond, &_lock);
        // 拿到头部数据并删除
        *out = _q.front();
        _q.pop();
        // 走到这里说明队列一定不会满,就可以唤醒生产者的线程
        pthread_cond_signal(&_pcond);
        // 解锁
        pthread_mutex_unlock(&_lock);
    }
    ~blockqueue()
    {
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    std::queue<T> _q;
    int _maxnum; // 最大容量
    pthread_mutex_t _lock;
    pthread_cond_t _pcond; // 生产者的条件变量
    pthread_cond_t _ccond; // 消费者的条件变量
};
// 将负责计算的队列和负责保存的队列归并成一个类以便后续调用
// 队列组的类
template <class C, class S>
class blockqueues
{
public:
  // 计算队列
    blockqueue<C>* _cp;
    // 保存队列
    blockqueue<S>* _sc;
};

Task.hpp – 任务头文件

#include <iostream>
#include <string>
#include <functional>
#include <cstdio>
// 负责计算的任务类
class CPTask
{
    // 调用的计算方法,根据传入的字符参数决定
    typedef std::function<int(int, int, char)> func_t;
public:
    CPTask()
    {
    }
    CPTask(int x, int y, char op, func_t func)
        : _x(x), _y(y), _op(op), _func(func)
    {
    }
    // 实现传入的函数调用
    std::string operator()()
    {
        int count = _func(_x, _y, _op);
        // 将结果以自定义的字符串形式返回
        char res[2048];
        snprintf(res, sizeof res, "%d %c %d = %d", _x, _op, _y, count);
        return res;
    }
    // 显示出当前传入的参数
    std::string tostring()
    {
        char res[1024];
        snprintf(res, sizeof res, "%d %c %d = ", _x, _op, _y);
        return res;
    }
private:
    int _x;
    int _y;
    char _op;// +-*/
    func_t _func;// 实现方法
};
// 负责计算的任务函数
// 实现+-*/ 随机
int Math(int x, int y, char c)
{
    int count;
    switch (c)
    {
    case '+':
        count = x + y;
        break;
    case '-':
        count = x - y;
        break;
    case '*':
        count = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            std::cout << "div zero" << std::endl;
            count = -1;
        }
        else
            count = x / y;
        break;
    }
    default:
        break;
    }
    return count;
}
class SCTask
{
    // 获取保存数据的方法
    typedef std::function<void(std::string)> func_t;
public:
    SCTask()
    {
    }
    SCTask(const std::string &str, func_t func)
        : _str(str), _func(func)
    {
    }
  //调用方法
    void operator()()
    {
        _func(_str);
    }
private:
    std::string _str;// 数据
    func_t _func;// 实现方法
};
// 负责保存的方法,将数据读取到保存至文件
void Save(const std::string &str)
{
    std::string res = "./log.txt";
    FILE *fd = fopen(res.c_str(), "a+");
    if (!fd)
        return;
    fwrite(str.c_str(), 1, sizeof str.c_str(), fd);
    fputs("\n", fd);
    fclose(fd);
}

CP.cc

#include "BlockQueue.hpp"
#include <ctime>
#include <unistd.h>
#include "Task.hpp"
// 生产
void *Producer(void *argc)
{
  // 将参数转换回计算队列的类型
    blockqueue<CPTask> *t = (blockqueue<CPTask> *)((blockqueues<CPTask, SCTask> *)argc)->_cp;
    while (1)
    {
        std::string ops("+-*/");
        // 随机产生数据插入
        int x = rand() % 100 + 1;
        int y = rand() % 100 + 1;
        int opnum = rand() % ops.size();
      // 随机提取+-*/
        char op = ops[opnum];
    // 定义好实现类的对象
        CPTask C(x, y, op, Math);
    //将整个对象插入到计算队列中
        t->push(C);
        std::cout << "生产计算数据:" << C.tostring() << std::endl;
        sleep(1);
    }
    return nullptr;
}
// 消费
void *Consumer(void *argc)
{
  // 因为这个是身兼两者身份
  // 因此要有两种队列的类型对象
    blockqueue<CPTask> *t = (blockqueue<CPTask> *)((blockqueues<CPTask, SCTask> *)argc)->_cp;
    blockqueue<SCTask> *s = (blockqueue<SCTask> *)((blockqueues<CPTask, SCTask> *)argc)->_sc;
    while (1)
    {
        // 计算队列类型拿出数据
        std::string res;
        CPTask c;
        t->pop(&c);
        res = c();
        std::cout << "消费计算数据:" << res << std::endl;
        // 插入保存数据队列
        SCTask sc(res, Save);
        s->push(sc);
        std::cout << "生产保存数据: ......done" << std::endl;
    }
    return nullptr;
}
void *Saver(void *argc)
{
  // 将参数转换回保存队列的类型
    blockqueue<SCTask> *s = (blockqueue<SCTask> *)((blockqueues<CPTask, SCTask> *)argc)->_sc;
    while (1)
    {
        // 拿出数据
        SCTask t;
        s->pop(&t);
        //调用方法
        t();
        std::cout << "消费保存数据:......done" << std::endl;
    }
    return nullptr;
}
int main()
{
    // 设置随机种子
    srand(time(nullptr));
    // 创建队列对象
    blockqueues<CPTask, SCTask> dqs;
    dqs._cp = new blockqueue<CPTask>;
    dqs._sc = new blockqueue<SCTask>;
    pthread_t c, p, s;
    // 创建计算生产者
    pthread_create(&p, nullptr, Producer, &dqs);
    // 创建计算消费者兼保护生产者
    pthread_create(&c, nullptr, Consumer, &dqs);
    // 创建保存消费者
    pthread_create(&c, nullptr, Saver, &dqs);
    pthread_join(p, nullptr);
    pthread_join(c, nullptr);
    pthread_join(s, nullptr);
    delete dqs._cp;
    delete dqs._sc;
    return 0;
}

实现效果


07ffafaedbb6ff21644b306f7ec1ea02.png

log.txt:


1d511e9040408694b0eac057b980fe5f.png

总结

上面的代码都是单线程去做一个工作的,事实上多线程也是可以的,因为对于访问共享资源(缓冲区、阻塞队列)一次只能有一个线程做这个工作。上面也提到了对于效率的提高并不是体现在共享资源内的,而是访问共享资源前的工作。因此多线程的效率提高也就在这方面。

线程的学习需要熟知各个概念和多动手写代码,像这个生产者消费者模型理解起来不算很难,但是上手写代码就非常复杂。线程的接口较多,多练才能熟记

目录
相关文章
|
12天前
|
缓存 网络协议 算法
【Linux系统编程】深入剖析:四大IO模型机制与应用(阻塞、非阻塞、多路复用、信号驱动IO 全解读)
在Linux环境下,主要存在四种IO模型,它们分别是阻塞IO(Blocking IO)、非阻塞IO(Non-blocking IO)、IO多路复用(I/O Multiplexing)和异步IO(Asynchronous IO)。下面我将逐一介绍这些模型的定义:
|
5天前
|
Linux
【Linux】生产者消费者模型——环形队列RingQueue(信号量)
【Linux】生产者消费者模型——环形队列RingQueue(信号量)
7 0
|
5天前
|
存储 Linux 容器
【Linux】生产者消费者模型——阻塞队列BlockQueue
【Linux】生产者消费者模型——阻塞队列BlockQueue
9 0
|
7天前
|
设计模式 安全 NoSQL
Java面试题:设计一个线程安全的单例模式,并解释其内存占用和垃圾回收机制;使用生产者消费者模式实现一个并发安全的队列;设计一个支持高并发的分布式锁
Java面试题:设计一个线程安全的单例模式,并解释其内存占用和垃圾回收机制;使用生产者消费者模式实现一个并发安全的队列;设计一个支持高并发的分布式锁
10 0
|
7天前
|
存储 设计模式 监控
Java面试题:如何在不牺牲性能的前提下,实现一个线程安全的单例模式?如何在生产者-消费者模式中平衡生产和消费的速度?Java内存模型规定了变量在内存中的存储和线程间的交互规则
Java面试题:如何在不牺牲性能的前提下,实现一个线程安全的单例模式?如何在生产者-消费者模式中平衡生产和消费的速度?Java内存模型规定了变量在内存中的存储和线程间的交互规则
17 0
|
12天前
|
Linux 网络安全 虚拟化
Ngnix04系统环境准备-上面软件是免费版的,下面是收费版的,他更快的原因使用了epoll模型,查看当前Linux系统版本, uname -a,VMWARE建议使用NAT,PC端电脑必须使用网线连接
Ngnix04系统环境准备-上面软件是免费版的,下面是收费版的,他更快的原因使用了epoll模型,查看当前Linux系统版本, uname -a,VMWARE建议使用NAT,PC端电脑必须使用网线连接
|
21天前
|
安全 Linux 数据安全/隐私保护
【linux】线程同步和生产消费者模型
【linux】线程同步和生产消费者模型
13 0
|
7天前
|
设计模式 安全 Java
Java面试题:设计模式如单例模式、工厂模式、观察者模式等在多线程环境下线程安全问题,Java内存模型定义了线程如何与内存交互,包括原子性、可见性、有序性,并发框架提供了更高层次的并发任务处理能力
Java面试题:设计模式如单例模式、工厂模式、观察者模式等在多线程环境下线程安全问题,Java内存模型定义了线程如何与内存交互,包括原子性、可见性、有序性,并发框架提供了更高层次的并发任务处理能力
19 1
|
7天前
|
设计模式 存储 安全
Java面试题:设计一个线程安全的单例类并解释其内存占用情况?使用Java多线程工具类实现一个高效的线程池,并解释其背后的原理。结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
Java面试题:设计一个线程安全的单例类并解释其内存占用情况?使用Java多线程工具类实现一个高效的线程池,并解释其背后的原理。结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
19 1
|
5天前
|
缓存 Linux 编译器
【Linux】多线程——线程概念|进程VS线程|线程控制(下)
【Linux】多线程——线程概念|进程VS线程|线程控制(下)
15 0

热门文章

最新文章