Linux线程的生产者消费者模型 --- 阻塞队列(blockqueue)(二)

简介: Linux线程的生产者消费者模型 --- 阻塞队列(blockqueue)(二)

CP.cc

#include "BlockQueue.hpp"
#include <ctime>
#include <unistd.h>
// 生产
void *Producer(void *argc)
{
    blockqueue<int> *t = (blockqueue<int> *)argc;
    while (1)
    {
        // 随机产生数据插入
        int x = rand() % 100 + 1;
        t->push(x);
        std::cout << "生产计算数据:" << x << std::endl;
        sleep(1);
    }
    return nullptr;
}
// 消费
void *Consumer(void *argc)
{
    blockqueue<int> *t = (blockqueue<int> *)argc;
    while (1)
    {
        // 拿出数据
        int x;
        t->pop(&x);
        std::cout << "消费计算数据:" << x << std::endl;
    }
    return nullptr;
}
int main()
{
    // 设置随机种子
    srand(time(nullptr));
    blockqueue<int>* dq = new blockqueue<int>();
    pthread_t c, p;
    // 创建计算生产者
    pthread_create(&p, nullptr, Producer, dq);
    // 创建计算消费者
    pthread_create(&c, nullptr, Consumer, dq);
    pthread_join(p, nullptr);
    pthread_join(c, nullptr);
    return 0;
}

上面的代码就可以实现单消费者和单生产者的模型。生产者就会往阻塞队列里面写入数据,消费者就可以往阻塞队列里面读数据


9a8cbdbbc332a5f3c204019457866363.png

那么根据这个模式再来实现一个加大点难度的模型代码

生产者 -> queue -> 消费者兼生产者 -> queue -> 消费者


8b30a8562d158920e393510792bc4ee1.png

实现大致目的

  1. 一个生产者,一个消费者兼生产者,一个消费者
  2. 计算过程由随机数,随机符号
  3. 第一个消费者读到数据后传到第二个队列中
  4. 最后读取计算结果的消费者将数据读到文件中

大致步骤

  1. 因为有不同的任务,所以创建一个任务头文件
  2. 由于是两个不同的队列,因此可以创建一个队列组的类
  3. ±*/ 随机
  4. 以下代码均有注释

blockqueue.hpp

#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
// 设置默认的最大容量
static int max = 10;
template <class T>
class blockqueue
{
public:
    blockqueue(const int &maxnum = max)
        : _maxnum(maxnum)
    {
        pthread_mutex_init(&_lock, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }
    // 插入数据
    void push(const T &in)
    {
        // 加锁
        pthread_mutex_lock(&_lock);
        // 判断队列是否满了,如果为空则等待
        // 充当条件判断的语法必须是while,不能用if
        while (_q.size() == _maxnum)
            pthread_cond_wait(&_pcond, &_lock);
        // 插入数据
        _q.push(in);
        // 走到这里说明队列一定有数据,就可以唤醒消费者的线程
        pthread_cond_signal(&_ccond);
        // 解锁
        pthread_mutex_unlock(&_lock);
    }
    // 拿到头部数据并删除
    void pop(T *out)
    {
        // 加锁
        pthread_mutex_lock(&_lock);
        // 判断队列是否满了,如果为空则等待
        // 充当条件判断的语法必须是while,不能用if
        while (_q.size() == 0)
            pthread_cond_wait(&_ccond, &_lock);
        // 拿到头部数据并删除
        *out = _q.front();
        _q.pop();
        // 走到这里说明队列一定不会满,就可以唤醒生产者的线程
        pthread_cond_signal(&_pcond);
        // 解锁
        pthread_mutex_unlock(&_lock);
    }
    ~blockqueue()
    {
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    std::queue<T> _q;
    int _maxnum; // 最大容量
    pthread_mutex_t _lock;
    pthread_cond_t _pcond; // 生产者的条件变量
    pthread_cond_t _ccond; // 消费者的条件变量
};
// 将负责计算的队列和负责保存的队列归并成一个类以便后续调用
// 队列组的类
template <class C, class S>
class blockqueues
{
public:
  // 计算队列
    blockqueue<C>* _cp;
    // 保存队列
    blockqueue<S>* _sc;
};

Task.hpp – 任务头文件

#include <iostream>
#include <string>
#include <functional>
#include <cstdio>
// 负责计算的任务类
class CPTask
{
    // 调用的计算方法,根据传入的字符参数决定
    typedef std::function<int(int, int, char)> func_t;
public:
    CPTask()
    {
    }
    CPTask(int x, int y, char op, func_t func)
        : _x(x), _y(y), _op(op), _func(func)
    {
    }
    // 实现传入的函数调用
    std::string operator()()
    {
        int count = _func(_x, _y, _op);
        // 将结果以自定义的字符串形式返回
        char res[2048];
        snprintf(res, sizeof res, "%d %c %d = %d", _x, _op, _y, count);
        return res;
    }
    // 显示出当前传入的参数
    std::string tostring()
    {
        char res[1024];
        snprintf(res, sizeof res, "%d %c %d = ", _x, _op, _y);
        return res;
    }
private:
    int _x;
    int _y;
    char _op;// +-*/
    func_t _func;// 实现方法
};
// 负责计算的任务函数
// 实现+-*/ 随机
int Math(int x, int y, char c)
{
    int count;
    switch (c)
    {
    case '+':
        count = x + y;
        break;
    case '-':
        count = x - y;
        break;
    case '*':
        count = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            std::cout << "div zero" << std::endl;
            count = -1;
        }
        else
            count = x / y;
        break;
    }
    default:
        break;
    }
    return count;
}
class SCTask
{
    // 获取保存数据的方法
    typedef std::function<void(std::string)> func_t;
public:
    SCTask()
    {
    }
    SCTask(const std::string &str, func_t func)
        : _str(str), _func(func)
    {
    }
  //调用方法
    void operator()()
    {
        _func(_str);
    }
private:
    std::string _str;// 数据
    func_t _func;// 实现方法
};
// 负责保存的方法,将数据读取到保存至文件
void Save(const std::string &str)
{
    std::string res = "./log.txt";
    FILE *fd = fopen(res.c_str(), "a+");
    if (!fd)
        return;
    fwrite(str.c_str(), 1, sizeof str.c_str(), fd);
    fputs("\n", fd);
    fclose(fd);
}

CP.cc

#include "BlockQueue.hpp"
#include <ctime>
#include <unistd.h>
#include "Task.hpp"
// 生产
void *Producer(void *argc)
{
  // 将参数转换回计算队列的类型
    blockqueue<CPTask> *t = (blockqueue<CPTask> *)((blockqueues<CPTask, SCTask> *)argc)->_cp;
    while (1)
    {
        std::string ops("+-*/");
        // 随机产生数据插入
        int x = rand() % 100 + 1;
        int y = rand() % 100 + 1;
        int opnum = rand() % ops.size();
      // 随机提取+-*/
        char op = ops[opnum];
    // 定义好实现类的对象
        CPTask C(x, y, op, Math);
    //将整个对象插入到计算队列中
        t->push(C);
        std::cout << "生产计算数据:" << C.tostring() << std::endl;
        sleep(1);
    }
    return nullptr;
}
// 消费
void *Consumer(void *argc)
{
  // 因为这个是身兼两者身份
  // 因此要有两种队列的类型对象
    blockqueue<CPTask> *t = (blockqueue<CPTask> *)((blockqueues<CPTask, SCTask> *)argc)->_cp;
    blockqueue<SCTask> *s = (blockqueue<SCTask> *)((blockqueues<CPTask, SCTask> *)argc)->_sc;
    while (1)
    {
        // 计算队列类型拿出数据
        std::string res;
        CPTask c;
        t->pop(&c);
        res = c();
        std::cout << "消费计算数据:" << res << std::endl;
        // 插入保存数据队列
        SCTask sc(res, Save);
        s->push(sc);
        std::cout << "生产保存数据: ......done" << std::endl;
    }
    return nullptr;
}
void *Saver(void *argc)
{
  // 将参数转换回保存队列的类型
    blockqueue<SCTask> *s = (blockqueue<SCTask> *)((blockqueues<CPTask, SCTask> *)argc)->_sc;
    while (1)
    {
        // 拿出数据
        SCTask t;
        s->pop(&t);
        //调用方法
        t();
        std::cout << "消费保存数据:......done" << std::endl;
    }
    return nullptr;
}
int main()
{
    // 设置随机种子
    srand(time(nullptr));
    // 创建队列对象
    blockqueues<CPTask, SCTask> dqs;
    dqs._cp = new blockqueue<CPTask>;
    dqs._sc = new blockqueue<SCTask>;
    pthread_t c, p, s;
    // 创建计算生产者
    pthread_create(&p, nullptr, Producer, &dqs);
    // 创建计算消费者兼保护生产者
    pthread_create(&c, nullptr, Consumer, &dqs);
    // 创建保存消费者
    pthread_create(&c, nullptr, Saver, &dqs);
    pthread_join(p, nullptr);
    pthread_join(c, nullptr);
    pthread_join(s, nullptr);
    delete dqs._cp;
    delete dqs._sc;
    return 0;
}

实现效果


07ffafaedbb6ff21644b306f7ec1ea02.png

log.txt:


1d511e9040408694b0eac057b980fe5f.png

总结

上面的代码都是单线程去做一个工作的,事实上多线程也是可以的,因为对于访问共享资源(缓冲区、阻塞队列)一次只能有一个线程做这个工作。上面也提到了对于效率的提高并不是体现在共享资源内的,而是访问共享资源前的工作。因此多线程的效率提高也就在这方面。

线程的学习需要熟知各个概念和多动手写代码,像这个生产者消费者模型理解起来不算很难,但是上手写代码就非常复杂。线程的接口较多,多练才能熟记

目录
相关文章
|
4月前
|
设计模式 缓存 安全
【JUC】(6)带你了解共享模型之 享元和不可变 模型并初步带你了解并发工具 线程池Pool,文章内还有饥饿问题、设计模式之工作线程的解决于实现
JUC专栏第六篇,本文带你了解两个共享模型:享元和不可变 模型,并初步带你了解并发工具 线程池Pool,文章中还有解决饥饿问题、设计模式之工作线程的实现
305 2
|
9月前
|
消息中间件 Linux Kafka
linux命令使用消费kafka的生产者、消费者
linux命令使用消费kafka的生产者、消费者
393 16
|
9月前
|
安全 Java 调度
Netty源码—3.Reactor线程模型二
本文主要介绍了NioEventLoop的执行总体框架、Reactor线程执行一次事件轮询、Reactor线程处理产生IO事件的Channel、Reactor线程处理任务队列之添加任务、Reactor线程处理任务队列之执行任务、NioEventLoop总结。
|
9月前
|
安全 Java
Netty源码—2.Reactor线程模型一
本文主要介绍了关于NioEventLoop的问题整理、理解Reactor线程模型主要分三部分、NioEventLoop的创建和NioEventLoop的启动。
|
11月前
|
缓存 NoSQL 中间件
Redis的线程模型
Redis采用单线程模型确保操作的原子性,每次只执行一个操作,避免并发冲突。它通过MULTI/EXEC事务机制、Lua脚本和复合指令(如MSET、GETSET等)保证多个操作要么全成功,要么全失败,确保数据一致性。Redis事务在EXEC前失败则不执行任何操作,EXEC后失败不影响其他操作。Pipeline虽高效但不具备原子性,适合非热点时段的数据调整。Redis 7引入Function功能,支持函数复用,简化复杂业务逻辑。总结来说,Redis的单线程模型简单高效,适用于高并发场景,但仍需合理选择指令执行方式以发挥其性能优势。
299 6
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
302 1
Java—多线程实现生产消费者
|
并行计算 JavaScript 前端开发
单线程模型
【10月更文挑战第15天】
|
5月前
|
Linux 应用服务中间件 Shell
二、Linux文本处理与文件操作核心命令
熟悉了Linux的基本“行走”后,就该拿起真正的“工具”干活了。用grep这个“放大镜”在文件里搜索内容,用find这个“探测器”在系统中寻找文件,再用tar把东西打包带走。最关键的是要学会使用管道符|,它像一条流水线,能把这些命令串联起来,让简单工具组合出强大的功能,比如 ps -ef | grep 'nginx' 就能快速找出nginx进程。
646 1
二、Linux文本处理与文件操作核心命令
|
5月前
|
Linux
linux命令—stat
`stat` 是 Linux 系统中用于查看文件或文件系统详细状态信息的命令。相比 `ls -l`,它提供更全面的信息,包括文件大小、权限、所有者、时间戳(最后访问、修改、状态变更时间)、inode 号、设备信息等。其常用选项包括 `-f` 查看文件系统状态、`-t` 以简洁格式输出、`-L` 跟踪符号链接,以及 `-c` 或 `--format` 自定义输出格式。通过这些选项,用户可以灵活获取所需信息,适用于系统调试、权限检查、磁盘管理等场景。
414 137
|
5月前
|
安全 Ubuntu Unix
一、初识 Linux 与基本命令
玩转Linux命令行,就像探索一座新城市。首先要熟悉它的“地图”,也就是/根目录下/etc(放配置)、/home(住家)这些核心区域。然后掌握几个“生存口令”:用ls看周围,cd去别处,mkdir建新房,cp/mv搬东西,再用cat或tail看文件内容。最后,别忘了随时按Tab键,它能帮你自动补全命令和路径,是提高效率的第一神器。
991 57

热门文章

最新文章