> 作者:დ旧言~
> 座右铭:松树千年终是朽,槿花一日自为荣。
> 目标:理解【Linux】生产者消费者模型——阻塞队列BlockQueue。
> 毒鸡汤:有些事情,总是不明白,所以我不会坚持。早安!
> 专栏选自:Linux初阶
> 望小伙伴们点赞👍收藏✨加关注哟💕💕
🌟前言
之前写的代码是存在不足的地方的:
我们使用线程操作临界资源的时候要先去判断临界资源是否满足条件:并不能事前得知,只能通过先加锁判断,再检测,再操作、解锁,因为我们在操作临界资源的时候,有可能不就绪,但是我们无法提前得知,所以只能先加锁再检测,根据检测结果,决定下一步怎么走,那我们能不能通过一种办法提前得知是否满足条件呢?这样就不用加锁了,直接让线程等待或者访问:答案就是信号量
⭐主体
学习【Linux】生产者消费者模型——环形队列RingQueue(信号量)咱们按照下面的图解:
🌙 信号量
💫 信号量的概念
概念分析:
信号量本质是一个计数器(不设置全局变量是因为进程间是相互独立的,而这不一定能看到,看到也不能保证++引用计数为原子操作),用于多进程对共享数据对象的读取,它和管道有所不同,它不以传送数据为主要目的,它主要是用来保护共享资源(信号量也属于临界资源),使得资源在一个时刻只有一个进程独享。
每个执行流在进入临界区之前都应该先申请信号量,申请成功就有了操作特点的临界资源的权限,当操作完毕后就应该释放信号量。
💫 信号量的工作原理
由于信号量只能进行两种操作等待和发送信号,即P(sv)和V(sv),他们的行为是这样的:
- P(sv):我们将申请信号量称为P操作,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请成功时临界资源中资源的数目应该减去一,因此P操作的本质就是让计数器减一,如果sv的值大于零,就给它减1;如果它的值为零,就挂起该进程的执行。
- V(sv):我们将释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作本质就是让计数器加一,如果有其他进程因等待sv而被挂起,就让它恢复运行,如果没有进程因等待sv而挂起,就给他加1。
PV操作必须是原子操作。
总结分析:
多个执行流为了访问临界资源会竞争式的申请信号量, 因此信号量是会被多个执行流同时访问的,也就是说信号量本质也是临界资源。
信号量本质就是用于保护临界资源的,我们不可能再用信号量去保护信号量,所以信号量的PV操作必须是原子操作。
两个注意:
- 内存当中变量的++、--操作并不是原子操作,因此信号量不可能只是简单的对一个全局变量进行++、--操作。
- 申请信号量失败被挂起等待。
- 当执行流在申请信号量时,可能此时信号量的值为0,也就是说信号量描述的临界资源已经全部被申请了,此时该执行流就应该在该信号量的等待队列当中进行等待,直到有信号量被释放时再被唤醒。
- 信号量的本质是计数器,但不意味着只有计数器,信号量还包括一个等待队列。
💫 信号量基本接口
初步看一下信号量的基本使用接口:
#include <semaphore.h> //信号量初始化 int sem_init(sem_t *sem, int pshared, unsigned int value) //sem:自己定义的信号量变量 //pshared:0表示线程间共享,非零表示进程间共享。 //value:信号量初始值(资源数目)。 //信号量销毁 int sem_destroy(sem_t *sem) //信号量等待 int sem_wait(sem_t *sem):p操作,-- //信号量发布 int sem_pos(sem_t *sem):V操作,++
🌙 环形对列的生产消费模型
💫 引入环形队列
概念:
环形队列之前我们就了解过了,只要是环形队列,就存在判空判满的问题。实际上并不是真正的环形队列,而是通过数组模拟的,当数据加入到最后的位置时直接模等于数组的大小即可。通常情况下,判空判满的问题我们是通过空出一个位置,当两个指针指向同一个位置的时候是空,当只剩一个位置的时候就是满,但是我们这里不需要关注。
💫 访问环形队列
分析:
生产者和消费者访问同一个位置的情况:空的时候,满的时候;其他情况下生产者与消费者访问的就是不同的区域了。
为了完成环形队列的生产消费,我们的核心工作就是:
- 消费者不能超过生产者。
- 生产者不能套消费者一个圈以上。
- 生产者和消费者指向同一个位置时,如果此时满了就让消费者先走,如果此时为空就让生产者先走。
部分情况下生产者与消费者是并发执行的,但是当环形队列为空或为满的时候就会存在着同步与互斥问题。
如何去进行保证:
信号量维护,信号量是衡量临界资源中资源数量的。
资源是什么:
- 对于生产者,看中的是队列中的剩余空间,空间资源定义成一个信号量。
- 对于消费者,看中的是队列中的数据资源,数据资源定义成一个信号量。
比如我们一共有10个位置,消费者初始信号量是0,生产者初始信号量是10,如果生产者线程生产数据,申请信号量,进行P操作,信号量变为9,申请失败则阻塞;申请成功后消费者线程看到了多一个数据资源,消费者信号量进行V操作.所以我们并不需要进行判空判满:当生产者生产满了,信号量申请不到,进行阻塞,只能让消费者先走;当消费者消费完了,信号量申请不到,只能让生产者先走。
💫 代码实现
单生产单消费的环形队列生产者消费者模型,利用随机数生成数据资源,通过生产线程与消费线程进行数据的生成与数据的消费:
#pragma once #include <iostream> #include <vector> #include <cassert> #include <semaphore.h> static const int gcap = 5; template<class T> class RingQueue { private: void P(sem_t&sem) { int n =sem_wait(&sem); assert(n==0); (void)n; } void V(sem_t&sem) { int n = sem_post(&sem); assert(n==0); (void)n; } public: RingQueue(const int&cap = gcap):_queue(cap),_cap(cap) { int n = sem_init(&_spaceSem,0,_cap); assert( n == 0); n = sem_init(&_dataSem, 0, 0); assert(n==0); _productorStep = _consumerStep = 0; } //生产者——空间 void Push(const T&in) { P(_spaceSem);//申请到了空间信号量,意味着我们一定能进行正常的生产 _queue[_productorStep++] = in; _productorStep%=_cap; V(_dataSem); } //消费者——数据 void Pop(T *out) { P(_dataSem); *out = _queue[_consumerStep--]; _consumerStep%=_cap; V(_spaceSem); } ~RingQueue() { sem_destroy(&_spaceSem); sem_destroy(&_dataSem); } private: std::vector<T> _queue; int _cap; sem_t _spaceSem;//生产者想生产,看中空间资源 sem_t _dataSem;//消费者想消费,看中数据资源 int _productorStep; int _consumerStep; };
#include "RingQueue.hpp" #include <pthread.h> #include <ctime> #include <cstdlib> #include <sys/types.h> #include <unistd.h> void*ProductorRoutine(void*rq) { RingQueue<int>*ringqueue = static_cast<RingQueue<int>*>(rq); while(true) { int data = rand()%10+1; ringqueue->Push(data); std::cout<<"生产完成,生产的数据是:"<<data<<std::endl; } } void*ConsumerRoutine(void*rq) { RingQueue<int>*ringqueue = static_cast<RingQueue<int>*>(rq); while(true) { int data; ringqueue->Pop(&data); std::cout<<"消费完成,消费的数据是:"<<data<<std::endl; sleep(1); } } int main() { srand((unsigned int) time(nullptr)^getpid()^pthread_self()^0x7432); RingQueue<int>*rq = new RingQueue<int>(); pthread_t p,c; pthread_create(&p,nullptr,ConsumerRoutine,rq); pthread_create(&c,nullptr,ProductorRoutine,rq); pthread_join(p,nullptr); pthread_join(c,nullptr); delete rq; return 0; }
💫 代码改造
分析:
实际上,生产线程和消费线程可不单单只能通过整型,我们还可以生产和消费任务,下面,我们只需要进行简单的改造即可完成:
- Task.hpp:完成计算器的任务:计算两个数的加减乘除模
- 对于任务类Task:包含两个数x与y,以及计算方式op,以及计算的回调方法callback。
同时为了后面的生产线程和消费线程能够清楚看到过程,提供了两个方法一个是重载(),把计算的结果保存于字符串并放回,此方法用于消费者线程在队列中取出任务,把结果打印出来;另一个方法是toTaskString(),把计算的过程保存于字符串并返回,此方法用于生产者线程生产任务存放队列中,并且可以把过程打印出来
外部通过构造任务类对象t,传入生成的随机数x与y,以及随机生成的计算方式op,同时传入了计算的方法mymath,进行计算。
Task.hpp:
#pragma once #include <iostream> #include <functional> #include <cstdio> #include <cstring> class Task { using func_t = std::function<int(int,int,char)>; public: Task() {} Task(int x,int y,char op,func_t func) :_x(x),_y(y),_op(op),_callback(func) {} std::string operator()() { int result = _callback(_x,_y,_op); char buffer[1024]; snprintf(buffer,sizeof buffer,"%d %c %d = %d",_x,_op,_y,result); return buffer; } std::string toTaskString() { char buffer[1024]; snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y); return buffer; } private: int _x; int _y; char _op; func_t _callback; }; const std::string oper = "+-*/%"; int mymath(int x,int y,char op) { int result = 0; switch (op) { case '+': result = x + y; break; case '-': result = x - y; break; case '*': result = x * y; break; case '/': { if (y == 0) { std::cerr << "div zero error!" << std::endl; result = -1; } else result = x / y; } break; case '%': { if (y == 0) { std::cerr << "mod zero error!" << std::endl; result = -1; } else result = x % y; } break; default: break; } return result; }
Main.cc:
#include "RingQueue.hpp" #include "Task.hpp" #include <pthread.h> #include <ctime> #include <cstdlib> #include <sys/types.h> #include <unistd.h> void*ProductorRoutine(void*rq) { RingQueue<Task>*ringqueue = static_cast<RingQueue<Task>*>(rq); while(true) { //构建任务 int x = rand()%10; int y = rand()%5; char op = oper[rand()%oper.size()]; Task t(x,y,op,mymath); //生产任务 ringqueue->Push(t); std::cout<<"生产者派发了一个任务:"<<t.toTaskString()<<std::endl; //sleep(1); } } void*ConsumerRoutine(void*rq) { RingQueue<Task>*ringqueue = static_cast<RingQueue<Task>*>(rq); while(true) { //构建任务 Task t; //消费任务 ringqueue->Pop(&t); std::string result = t(); std::cout<<"消费者消费了一个任务:"<<result<<std::endl; sleep(1); } } int main() { srand((unsigned int) time(nullptr)^getpid()^pthread_self()^0x7432); RingQueue<Task>*rq = new RingQueue<Task>(); pthread_t p,c; pthread_create(&p,nullptr,ConsumerRoutine,rq); pthread_create(&c,nullptr,ProductorRoutine,rq); pthread_join(p,nullptr); pthread_join(c,nullptr); delete rq; return 0; }
💫 多生产者多消费者代码
分析:
只要保证,最终进入临界区的是一个生产,一个消费就行,所以我们需要在环形队列提供的Push与Pop加锁,所以环形队列提供了多两个成员变量:一个是生产线程的锁,一个是消费线程的锁,也就是需要加两把锁,你拿你的,我拿我的。
RingQueue.hpp:
#pragma once #include <iostream> #include <vector> #include <cassert> #include <semaphore.h> #include <pthread.h> static const int gcap = 5; template<class T> class RingQueue { private: void P(sem_t&sem) { int n =sem_wait(&sem); assert(n==0); (void)n; } void V(sem_t&sem) { int n = sem_post(&sem); assert(n==0); (void)n; } public: RingQueue(const int&cap = gcap):_queue(cap),_cap(cap) { int n = sem_init(&_spaceSem,0,_cap); assert(n == 0); n = sem_init(&_dataSem, 0, 0); assert(n==0); _productorStep = _consumerStep = 0; pthread_mutex_init(&_pmutex,nullptr); pthread_mutex_init(&_cmutex,nullptr); } void Push(const T&in) { P(_spaceSem);//申请到了空间信号量,意味着我们一定能进行正常的生产 pthread_mutex_lock(&_pmutex); _queue[_productorStep++] = in; _productorStep%=_cap; pthread_mutex_unlock(&_pmutex); V(_dataSem); } void Pop(T *out) { P(_dataSem); pthread_mutex_lock(&_cmutex); *out = _queue[_consumerStep++]; _consumerStep%=_cap; pthread_mutex_unlock(&_cmutex); V(_spaceSem); } ~RingQueue() { sem_destroy(&_spaceSem); sem_destroy(&_dataSem); pthread_mutex_destroy(&_pmutex); pthread_mutex_destroy(&_cmutex); } private: std::vector<T> _queue; int _cap; sem_t _spaceSem; sem_t _dataSem; int _productorStep; int _consumerStep; pthread_mutex_t _pmutex; pthread_mutex_t _cmutex; };
Main.cc:
#include "RingQueue.hpp" #include "Task.hpp" #include <pthread.h> #include <ctime> #include <cstdlib> #include <sys/types.h> #include <unistd.h> std::string SelfName() { char name[128]; snprintf(name,sizeof(name),"thread[%0x%x]",pthread_self()); return name; } void*ProductorRoutine(void*rq) { RingQueue<Task>*ringqueue = static_cast<RingQueue<Task>*>(rq); while(true) { int x = rand()%10; int y = rand()%5; char op = oper[rand()%oper.size()]; Task t(x,y,op,mymath); //生产任务 ringqueue->Push(t); std::cout<<SelfName()<<",生产者派发了一个任务:"<<t.toTaskString()<<std::endl; //sleep(1); } } void*ConsumerRoutine(void*rq) { RingQueue<Task>*ringqueue = static_cast<RingQueue<Task>*>(rq); while(true) { Task t; //消费任务 ringqueue->Pop(&t); std::string result = t(); std::cout<<SelfName()<<",消费者消费了一个任务:"<<result<<std::endl; sleep(1); } } int main() { srand((unsigned int) time(nullptr)^getpid()^pthread_self()^0x7432); RingQueue<Task>*rq = new RingQueue<Task>(); pthread_t p[4],c[8]; for(int i = 0;i<4;i++) pthread_create(p+i,nullptr,ProductorRoutine,rq); for(int i = 0 ;i<8;i++) pthread_create(c+i,nullptr,ConsumerRoutine,rq); for(int i = 0;i<4;i++) pthread_join(p[i],nullptr); for(int i = 0 ;i<8;i++) pthread_join(c[i],nullptr); return 0; }
🌙 总结
多生产多消费的意义:
不管是环形队列还是阻塞队列,多线程的意义在于构建or获取任务是要花时间的,效率比较低,当消费的时候也是要花时间的,不单单只是拿出来就行了,所以多生产多消费的时候的意义在于生产之前,消费之后,处理任务获取任务的时候本身也是要花费时间的,可以在生产之前与消费之后让线程并行执行。
条件变量是一种同步机制,它允许线程等待某个条件的发生,通常与互斥锁一起使用。而信号量是一种计数器,它可以用于控制对共享资源的访问;如果想让每一刻只有一个线程访问共享资源,可以使用条件变量。但如果需要允许多个线程并发访问共享资源的不同区域,则可以使用信号量。
🌟结束语
今天内容就到这里啦,时间过得很快,大家沉下心来好好学习,会有一定的收获的,大家多多坚持,嘻嘻,成功路上注定孤独,因为坚持的人不多。那请大家举起自己的小手给博主一键三连,有你们的支持是我最大的动力💞💞💞,回见。