CP.cc
#include "BlockQueue.hpp" #include <ctime> #include <unistd.h> // 生产 void *Producer(void *argc) { blockqueue<int> *t = (blockqueue<int> *)argc; while (1) { // 随机产生数据插入 int x = rand() % 100 + 1; t->push(x); std::cout << "生产计算数据:" << x << std::endl; sleep(1); } return nullptr; } // 消费 void *Consumer(void *argc) { blockqueue<int> *t = (blockqueue<int> *)argc; while (1) { // 拿出数据 int x; t->pop(&x); std::cout << "消费计算数据:" << x << std::endl; } return nullptr; } int main() { // 设置随机种子 srand(time(nullptr)); blockqueue<int>* dq = new blockqueue<int>(); pthread_t c, p; // 创建计算生产者 pthread_create(&p, nullptr, Producer, dq); // 创建计算消费者 pthread_create(&c, nullptr, Consumer, dq); pthread_join(p, nullptr); pthread_join(c, nullptr); return 0; }
上面的代码就可以实现单消费者和单生产者的模型。生产者就会往阻塞队列里面写入数据,消费者就可以往阻塞队列里面读数据
那么根据这个模式再来实现一个加大点难度的模型代码
生产者 -> queue -> 消费者兼生产者 -> queue -> 消费者
实现大致目的
- 一个生产者,一个消费者兼生产者,一个消费者
- 计算过程由随机数,随机符号
- 第一个消费者读到数据后传到第二个队列中
- 最后读取计算结果的消费者将数据读到文件中
大致步骤
- 因为有不同的任务,所以创建一个任务头文件
- 由于是两个不同的队列,因此可以创建一个队列组的类
- ±*/ 随机
- 以下代码均有注释
blockqueue.hpp
#pragma once #include <iostream> #include <pthread.h> #include <queue> // 设置默认的最大容量 static int max = 10; template <class T> class blockqueue { public: blockqueue(const int &maxnum = max) : _maxnum(maxnum) { pthread_mutex_init(&_lock, nullptr); pthread_cond_init(&_pcond, nullptr); pthread_cond_init(&_ccond, nullptr); } // 插入数据 void push(const T &in) { // 加锁 pthread_mutex_lock(&_lock); // 判断队列是否满了,如果为空则等待 // 充当条件判断的语法必须是while,不能用if while (_q.size() == _maxnum) pthread_cond_wait(&_pcond, &_lock); // 插入数据 _q.push(in); // 走到这里说明队列一定有数据,就可以唤醒消费者的线程 pthread_cond_signal(&_ccond); // 解锁 pthread_mutex_unlock(&_lock); } // 拿到头部数据并删除 void pop(T *out) { // 加锁 pthread_mutex_lock(&_lock); // 判断队列是否满了,如果为空则等待 // 充当条件判断的语法必须是while,不能用if while (_q.size() == 0) pthread_cond_wait(&_ccond, &_lock); // 拿到头部数据并删除 *out = _q.front(); _q.pop(); // 走到这里说明队列一定不会满,就可以唤醒生产者的线程 pthread_cond_signal(&_pcond); // 解锁 pthread_mutex_unlock(&_lock); } ~blockqueue() { pthread_mutex_destroy(&_lock); pthread_cond_destroy(&_pcond); pthread_cond_destroy(&_ccond); } private: std::queue<T> _q; int _maxnum; // 最大容量 pthread_mutex_t _lock; pthread_cond_t _pcond; // 生产者的条件变量 pthread_cond_t _ccond; // 消费者的条件变量 }; // 将负责计算的队列和负责保存的队列归并成一个类以便后续调用 // 队列组的类 template <class C, class S> class blockqueues { public: // 计算队列 blockqueue<C>* _cp; // 保存队列 blockqueue<S>* _sc; };
Task.hpp – 任务头文件
#include <iostream> #include <string> #include <functional> #include <cstdio> // 负责计算的任务类 class CPTask { // 调用的计算方法,根据传入的字符参数决定 typedef std::function<int(int, int, char)> func_t; public: CPTask() { } CPTask(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _func(func) { } // 实现传入的函数调用 std::string operator()() { int count = _func(_x, _y, _op); // 将结果以自定义的字符串形式返回 char res[2048]; snprintf(res, sizeof res, "%d %c %d = %d", _x, _op, _y, count); return res; } // 显示出当前传入的参数 std::string tostring() { char res[1024]; snprintf(res, sizeof res, "%d %c %d = ", _x, _op, _y); return res; } private: int _x; int _y; char _op;// +-*/ func_t _func;// 实现方法 }; // 负责计算的任务函数 // 实现+-*/ 随机 int Math(int x, int y, char c) { int count; switch (c) { case '+': count = x + y; break; case '-': count = x - y; break; case '*': count = x * y; break; case '/': { if (y == 0) { std::cout << "div zero" << std::endl; count = -1; } else count = x / y; break; } default: break; } return count; } class SCTask { // 获取保存数据的方法 typedef std::function<void(std::string)> func_t; public: SCTask() { } SCTask(const std::string &str, func_t func) : _str(str), _func(func) { } //调用方法 void operator()() { _func(_str); } private: std::string _str;// 数据 func_t _func;// 实现方法 }; // 负责保存的方法,将数据读取到保存至文件 void Save(const std::string &str) { std::string res = "./log.txt"; FILE *fd = fopen(res.c_str(), "a+"); if (!fd) return; fwrite(str.c_str(), 1, sizeof str.c_str(), fd); fputs("\n", fd); fclose(fd); }
CP.cc
#include "BlockQueue.hpp" #include <ctime> #include <unistd.h> #include "Task.hpp" // 生产 void *Producer(void *argc) { // 将参数转换回计算队列的类型 blockqueue<CPTask> *t = (blockqueue<CPTask> *)((blockqueues<CPTask, SCTask> *)argc)->_cp; while (1) { std::string ops("+-*/"); // 随机产生数据插入 int x = rand() % 100 + 1; int y = rand() % 100 + 1; int opnum = rand() % ops.size(); // 随机提取+-*/ char op = ops[opnum]; // 定义好实现类的对象 CPTask C(x, y, op, Math); //将整个对象插入到计算队列中 t->push(C); std::cout << "生产计算数据:" << C.tostring() << std::endl; sleep(1); } return nullptr; } // 消费 void *Consumer(void *argc) { // 因为这个是身兼两者身份 // 因此要有两种队列的类型对象 blockqueue<CPTask> *t = (blockqueue<CPTask> *)((blockqueues<CPTask, SCTask> *)argc)->_cp; blockqueue<SCTask> *s = (blockqueue<SCTask> *)((blockqueues<CPTask, SCTask> *)argc)->_sc; while (1) { // 计算队列类型拿出数据 std::string res; CPTask c; t->pop(&c); res = c(); std::cout << "消费计算数据:" << res << std::endl; // 插入保存数据队列 SCTask sc(res, Save); s->push(sc); std::cout << "生产保存数据: ......done" << std::endl; } return nullptr; } void *Saver(void *argc) { // 将参数转换回保存队列的类型 blockqueue<SCTask> *s = (blockqueue<SCTask> *)((blockqueues<CPTask, SCTask> *)argc)->_sc; while (1) { // 拿出数据 SCTask t; s->pop(&t); //调用方法 t(); std::cout << "消费保存数据:......done" << std::endl; } return nullptr; } int main() { // 设置随机种子 srand(time(nullptr)); // 创建队列对象 blockqueues<CPTask, SCTask> dqs; dqs._cp = new blockqueue<CPTask>; dqs._sc = new blockqueue<SCTask>; pthread_t c, p, s; // 创建计算生产者 pthread_create(&p, nullptr, Producer, &dqs); // 创建计算消费者兼保护生产者 pthread_create(&c, nullptr, Consumer, &dqs); // 创建保存消费者 pthread_create(&c, nullptr, Saver, &dqs); pthread_join(p, nullptr); pthread_join(c, nullptr); pthread_join(s, nullptr); delete dqs._cp; delete dqs._sc; return 0; }
实现效果
log.txt:
总结
上面的代码都是单线程去做一个工作的,事实上多线程也是可以的,因为对于访问共享资源(缓冲区、阻塞队列)一次只能有一个线程做这个工作。上面也提到了对于效率的提高并不是体现在共享资源内的,而是访问共享资源前的工作。因此多线程的效率提高也就在这方面。
线程的学习需要熟知各个概念和多动手写代码,像这个生产者消费者模型理解起来不算很难,但是上手写代码就非常复杂。线程的接口较多,多练才能熟记