Linux线程的生产者消费者模型 --- 阻塞队列(blockqueue)(二)

简介: Linux线程的生产者消费者模型 --- 阻塞队列(blockqueue)(二)

CP.cc

#include "BlockQueue.hpp"
#include <ctime>
#include <unistd.h>
// 生产
void *Producer(void *argc)
{
    blockqueue<int> *t = (blockqueue<int> *)argc;
    while (1)
    {
        // 随机产生数据插入
        int x = rand() % 100 + 1;
        t->push(x);
        std::cout << "生产计算数据:" << x << std::endl;
        sleep(1);
    }
    return nullptr;
}
// 消费
void *Consumer(void *argc)
{
    blockqueue<int> *t = (blockqueue<int> *)argc;
    while (1)
    {
        // 拿出数据
        int x;
        t->pop(&x);
        std::cout << "消费计算数据:" << x << std::endl;
    }
    return nullptr;
}
int main()
{
    // 设置随机种子
    srand(time(nullptr));
    blockqueue<int>* dq = new blockqueue<int>();
    pthread_t c, p;
    // 创建计算生产者
    pthread_create(&p, nullptr, Producer, dq);
    // 创建计算消费者
    pthread_create(&c, nullptr, Consumer, dq);
    pthread_join(p, nullptr);
    pthread_join(c, nullptr);
    return 0;
}

上面的代码就可以实现单消费者和单生产者的模型。生产者就会往阻塞队列里面写入数据,消费者就可以往阻塞队列里面读数据


9a8cbdbbc332a5f3c204019457866363.png

那么根据这个模式再来实现一个加大点难度的模型代码

生产者 -> queue -> 消费者兼生产者 -> queue -> 消费者


8b30a8562d158920e393510792bc4ee1.png

实现大致目的

  1. 一个生产者,一个消费者兼生产者,一个消费者
  2. 计算过程由随机数,随机符号
  3. 第一个消费者读到数据后传到第二个队列中
  4. 最后读取计算结果的消费者将数据读到文件中

大致步骤

  1. 因为有不同的任务,所以创建一个任务头文件
  2. 由于是两个不同的队列,因此可以创建一个队列组的类
  3. ±*/ 随机
  4. 以下代码均有注释

blockqueue.hpp

#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
// 设置默认的最大容量
static int max = 10;
template <class T>
class blockqueue
{
public:
    blockqueue(const int &maxnum = max)
        : _maxnum(maxnum)
    {
        pthread_mutex_init(&_lock, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }
    // 插入数据
    void push(const T &in)
    {
        // 加锁
        pthread_mutex_lock(&_lock);
        // 判断队列是否满了,如果为空则等待
        // 充当条件判断的语法必须是while,不能用if
        while (_q.size() == _maxnum)
            pthread_cond_wait(&_pcond, &_lock);
        // 插入数据
        _q.push(in);
        // 走到这里说明队列一定有数据,就可以唤醒消费者的线程
        pthread_cond_signal(&_ccond);
        // 解锁
        pthread_mutex_unlock(&_lock);
    }
    // 拿到头部数据并删除
    void pop(T *out)
    {
        // 加锁
        pthread_mutex_lock(&_lock);
        // 判断队列是否满了,如果为空则等待
        // 充当条件判断的语法必须是while,不能用if
        while (_q.size() == 0)
            pthread_cond_wait(&_ccond, &_lock);
        // 拿到头部数据并删除
        *out = _q.front();
        _q.pop();
        // 走到这里说明队列一定不会满,就可以唤醒生产者的线程
        pthread_cond_signal(&_pcond);
        // 解锁
        pthread_mutex_unlock(&_lock);
    }
    ~blockqueue()
    {
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    std::queue<T> _q;
    int _maxnum; // 最大容量
    pthread_mutex_t _lock;
    pthread_cond_t _pcond; // 生产者的条件变量
    pthread_cond_t _ccond; // 消费者的条件变量
};
// 将负责计算的队列和负责保存的队列归并成一个类以便后续调用
// 队列组的类
template <class C, class S>
class blockqueues
{
public:
  // 计算队列
    blockqueue<C>* _cp;
    // 保存队列
    blockqueue<S>* _sc;
};

Task.hpp – 任务头文件

#include <iostream>
#include <string>
#include <functional>
#include <cstdio>
// 负责计算的任务类
class CPTask
{
    // 调用的计算方法,根据传入的字符参数决定
    typedef std::function<int(int, int, char)> func_t;
public:
    CPTask()
    {
    }
    CPTask(int x, int y, char op, func_t func)
        : _x(x), _y(y), _op(op), _func(func)
    {
    }
    // 实现传入的函数调用
    std::string operator()()
    {
        int count = _func(_x, _y, _op);
        // 将结果以自定义的字符串形式返回
        char res[2048];
        snprintf(res, sizeof res, "%d %c %d = %d", _x, _op, _y, count);
        return res;
    }
    // 显示出当前传入的参数
    std::string tostring()
    {
        char res[1024];
        snprintf(res, sizeof res, "%d %c %d = ", _x, _op, _y);
        return res;
    }
private:
    int _x;
    int _y;
    char _op;// +-*/
    func_t _func;// 实现方法
};
// 负责计算的任务函数
// 实现+-*/ 随机
int Math(int x, int y, char c)
{
    int count;
    switch (c)
    {
    case '+':
        count = x + y;
        break;
    case '-':
        count = x - y;
        break;
    case '*':
        count = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            std::cout << "div zero" << std::endl;
            count = -1;
        }
        else
            count = x / y;
        break;
    }
    default:
        break;
    }
    return count;
}
class SCTask
{
    // 获取保存数据的方法
    typedef std::function<void(std::string)> func_t;
public:
    SCTask()
    {
    }
    SCTask(const std::string &str, func_t func)
        : _str(str), _func(func)
    {
    }
  //调用方法
    void operator()()
    {
        _func(_str);
    }
private:
    std::string _str;// 数据
    func_t _func;// 实现方法
};
// 负责保存的方法,将数据读取到保存至文件
void Save(const std::string &str)
{
    std::string res = "./log.txt";
    FILE *fd = fopen(res.c_str(), "a+");
    if (!fd)
        return;
    fwrite(str.c_str(), 1, sizeof str.c_str(), fd);
    fputs("\n", fd);
    fclose(fd);
}

CP.cc

#include "BlockQueue.hpp"
#include <ctime>
#include <unistd.h>
#include "Task.hpp"
// 生产
void *Producer(void *argc)
{
  // 将参数转换回计算队列的类型
    blockqueue<CPTask> *t = (blockqueue<CPTask> *)((blockqueues<CPTask, SCTask> *)argc)->_cp;
    while (1)
    {
        std::string ops("+-*/");
        // 随机产生数据插入
        int x = rand() % 100 + 1;
        int y = rand() % 100 + 1;
        int opnum = rand() % ops.size();
      // 随机提取+-*/
        char op = ops[opnum];
    // 定义好实现类的对象
        CPTask C(x, y, op, Math);
    //将整个对象插入到计算队列中
        t->push(C);
        std::cout << "生产计算数据:" << C.tostring() << std::endl;
        sleep(1);
    }
    return nullptr;
}
// 消费
void *Consumer(void *argc)
{
  // 因为这个是身兼两者身份
  // 因此要有两种队列的类型对象
    blockqueue<CPTask> *t = (blockqueue<CPTask> *)((blockqueues<CPTask, SCTask> *)argc)->_cp;
    blockqueue<SCTask> *s = (blockqueue<SCTask> *)((blockqueues<CPTask, SCTask> *)argc)->_sc;
    while (1)
    {
        // 计算队列类型拿出数据
        std::string res;
        CPTask c;
        t->pop(&c);
        res = c();
        std::cout << "消费计算数据:" << res << std::endl;
        // 插入保存数据队列
        SCTask sc(res, Save);
        s->push(sc);
        std::cout << "生产保存数据: ......done" << std::endl;
    }
    return nullptr;
}
void *Saver(void *argc)
{
  // 将参数转换回保存队列的类型
    blockqueue<SCTask> *s = (blockqueue<SCTask> *)((blockqueues<CPTask, SCTask> *)argc)->_sc;
    while (1)
    {
        // 拿出数据
        SCTask t;
        s->pop(&t);
        //调用方法
        t();
        std::cout << "消费保存数据:......done" << std::endl;
    }
    return nullptr;
}
int main()
{
    // 设置随机种子
    srand(time(nullptr));
    // 创建队列对象
    blockqueues<CPTask, SCTask> dqs;
    dqs._cp = new blockqueue<CPTask>;
    dqs._sc = new blockqueue<SCTask>;
    pthread_t c, p, s;
    // 创建计算生产者
    pthread_create(&p, nullptr, Producer, &dqs);
    // 创建计算消费者兼保护生产者
    pthread_create(&c, nullptr, Consumer, &dqs);
    // 创建保存消费者
    pthread_create(&c, nullptr, Saver, &dqs);
    pthread_join(p, nullptr);
    pthread_join(c, nullptr);
    pthread_join(s, nullptr);
    delete dqs._cp;
    delete dqs._sc;
    return 0;
}

实现效果


07ffafaedbb6ff21644b306f7ec1ea02.png

log.txt:


1d511e9040408694b0eac057b980fe5f.png

总结

上面的代码都是单线程去做一个工作的,事实上多线程也是可以的,因为对于访问共享资源(缓冲区、阻塞队列)一次只能有一个线程做这个工作。上面也提到了对于效率的提高并不是体现在共享资源内的,而是访问共享资源前的工作。因此多线程的效率提高也就在这方面。

线程的学习需要熟知各个概念和多动手写代码,像这个生产者消费者模型理解起来不算很难,但是上手写代码就非常复杂。线程的接口较多,多练才能熟记

相关实践学习
CentOS 8迁移Anolis OS 8
Anolis OS 8在做出差异性开发同时,在生态上和依赖管理上保持跟CentOS 8.x兼容,本文为您介绍如何通过AOMS迁移工具实现CentOS 8.x到Anolis OS 8的迁移。
目录
相关文章
|
4月前
|
存储 Linux API
【Linux进程概念】—— 操作系统中的“生命体”,计算机里的“多线程”
在计算机系统的底层架构中,操作系统肩负着资源管理与任务调度的重任。当我们启动各类应用程序时,其背后复杂的运作机制便悄然展开。程序,作为静态的指令集合,如何在系统中实现动态执行?本文带你一探究竟!
【Linux进程概念】—— 操作系统中的“生命体”,计算机里的“多线程”
|
2月前
|
并行计算 Linux
Linux内核中的线程和进程实现详解
了解进程和线程如何工作,可以帮助我们更好地编写程序,充分利用多核CPU,实现并行计算,提高系统的响应速度和计算效能。记住,适当平衡进程和线程的使用,既要拥有独立空间的'兄弟',也需要在'家庭'中分享和并行的成员。对于这个世界,现在,你应该有一个全新的认识。
176 67
|
26天前
|
安全 Java 调度
Netty源码—3.Reactor线程模型二
本文主要介绍了NioEventLoop的执行总体框架、Reactor线程执行一次事件轮询、Reactor线程处理产生IO事件的Channel、Reactor线程处理任务队列之添加任务、Reactor线程处理任务队列之执行任务、NioEventLoop总结。
|
26天前
|
安全 Java
Netty源码—2.Reactor线程模型一
本文主要介绍了关于NioEventLoop的问题整理、理解Reactor线程模型主要分三部分、NioEventLoop的创建和NioEventLoop的启动。
|
3月前
|
缓存 NoSQL 中间件
Redis的线程模型
Redis采用单线程模型确保操作的原子性,每次只执行一个操作,避免并发冲突。它通过MULTI/EXEC事务机制、Lua脚本和复合指令(如MSET、GETSET等)保证多个操作要么全成功,要么全失败,确保数据一致性。Redis事务在EXEC前失败则不执行任何操作,EXEC后失败不影响其他操作。Pipeline虽高效但不具备原子性,适合非热点时段的数据调整。Redis 7引入Function功能,支持函数复用,简化复杂业务逻辑。总结来说,Redis的单线程模型简单高效,适用于高并发场景,但仍需合理选择指令执行方式以发挥其性能优势。
99 6
|
4月前
|
存储 缓存 关系型数据库
MySQL底层概述—3.InnoDB线程模型
InnoDB存储引擎采用多线程模型,包含多个后台线程以处理不同任务。主要线程包括:IO Thread负责读写数据页和日志;Purge Thread回收已提交事务的undo日志;Page Cleaner Thread刷新脏页并清理redo日志;Master Thread调度其他线程,定时刷新脏页、回收undo日志、写入redo日志和合并写缓冲。各线程协同工作,确保数据一致性和高效性能。
MySQL底层概述—3.InnoDB线程模型
|
1月前
|
机器学习/深度学习 消息中间件 存储
【高薪程序员必看】万字长文拆解Java并发编程!(9-2):并发工具-线程池
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。
79 0
|
4月前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
通过本文,您可以了解如何在业务线程中注册和处理Linux信号。正确处理信号可以提高程序的健壮性和稳定性。希望这些内容能帮助您更好地理解和应用Linux信号处理机制。
91 26
|
4月前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
本文详细介绍了如何在Linux中通过在业务线程中注册和处理信号。我们讨论了信号的基本概念,并通过完整的代码示例展示了在业务线程中注册和处理信号的方法。通过正确地使用信号处理机制,可以提高程序的健壮性和响应能力。希望本文能帮助您更好地理解和应用Linux信号处理,提高开发效率和代码质量。
97 17
|
6月前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
509 2