1. 前言
学习进程和线程也很久了,它们具体能解决
什么问题?有什么实际的运用?
本章重点:
本篇文章着重讲解基于多线程下的
生产者消费者模型的概念以及实现.
不仅如此,文章还会拓展基于使用信号量
实现的环形队列版的生产者消费者模型
2. 初识生产者消费者模型
先回答第一个问题:
什么是生产者消费者模型?
生产者消费者模型的本质就是通过一个阻塞队列来将生产者和消费者解耦合的模型.可以将这个模型比作一个超时,生产者生产了数据(商品)后,不会直接拿给消费者使用,而是将数据(商品)先加入到超市,让超市帮我们卖数据(商品).同理,对于消费者来说它并不会直接去生产者的厂家直接拿数据(商品),而是去超市中购买数据(商品).这样生产者和消费者两个对象就解耦合了
再回答第二个问题:
这个模型有什么好处?
想象一下这个场景,假如没有超市的存在,消费者去买物品的伤害要直接到生产厂家去买,并且消费者还不知道生产厂家现在是否有物品,很容易跑空.对于生产者来说,他也不知道消费者需要哪些物品,也就不好生产,容易生产出来的物品没人问津,加入超市这个角色就可以解决上面的问题.综上所述,生产者消费者模型不仅仅将二者解耦了,并且还提高了效率,不会让两方出现跑空的情况(超市没有商品了,消费者最清楚,超市有商品时,生产者最清楚)
不仅如此,生产者消费者模型还支持多个线程一起进入,阻塞队列属于临界资源,所以同一时刻,不管是生产者还是消费者,都只允许一个线程进入到阻塞队列进行push或pop操作.除此之外,我们还需要两个条件变量push_cond和pop_cond,当阻塞队列已满时,push_cond条件变量会让生产者线程等待消费者线程在队列中取走数据,而当阻塞队列为空时,pop_cond条件变量会让消费者线程等待生产者往队列中加入数据,一来一回,也就是push操作会唤醒pop_cond条件变量,而pop操作会唤醒push_cond条件变量
3. 阻塞队列版本的代码实现
在的代码中一定会涉及到加解锁的问题,所以可以根据RAII思想,先设计出一个专门用于加解锁的类:
lockguard.hpp文件:
#pragma once #include<iostream> #include<pthread.h> using namespace std; class Mutex //封装pthread库的锁 { public: Mutex(pthread_mutex_t* pmtx) :_pmtx(pmtx) {} void lock() { pthread_mutex_lock(_pmtx); } void unlock() { pthread_mutex_unlock(_pmtx); } ~Mutex() { delete _pmtx; _pmtx = nullptr; } private: pthread_mutex_t* _pmtx; }; //利用RAII思想,用对象的生命周期控制资源 class LockGuard//封装了锁后,出了作用域会自动调用析构函数,解锁! { public: LockGuard(pthread_mutex_t* mtx) :_mtx(mtx) { _mtx.lock(); } ~LockGuard() { _mtx.unlock(); } private: Mutex _mtx; };
下一步,编写基于阻塞队列的模型,
在实现它之前需要先注意一些点:
下面的define语句可有可无,主要是为了图方便.第二点,检测临界资源是否就绪时要使用while而不是if,因为有时可能会错误的唤醒一个进程,如果是if语句,一旦错误唤醒,就会直接进入临界区,不合理,使用while的话,就算是错误唤醒了一个线程,只要资源不就绪也不能往后走.在下面的代码中并没有使用上上面封装的锁,这是为了方便同学们即使不封装锁也能很好的查看代码,最后,代码的解释都放在注释当中了,如果你还有问题,欢迎你来私信我
block_queue.hpp文件:
#pragma once #include<iostream> #include<queue> #include<pthread.h> #include"LockGuard.cpp" using namespace std; #define INIT_MTX(mtx) pthread_mutex_init(&mtx,nullptr) #define INIT_COND(cond) pthread_cond_init(&cond,nullptr) #define DESTORY_MTX(mtx) pthread_mutex_destroy(&mtx) #define DESTORY_COND(cond) pthread_cond_destroy(&cond) template<class T> class BlockQueue { public: BlockQueue(size_t capacity = 5) :_capacity(capacity) { INIT_MTX(_mtx); INIT_COND(_condisempty); INIT_COND(_condisfull); } ~BlockQueue() { DESTORY_MTX(_mtx); DESTORY_COND(_condisempty); DESTORY_COND(_condisfull); } void push(const T& in) { //1. 加锁 pthread_mutex_lock(&_mtx); //2. 检测临界资源是否满足访问条件,不满足就利用条件变量让它等待 while(isfull()) { cout<<"队列已满,push等待中..."<<endl; //我是抱着锁去等待的,所以即使消费者去pop了,没有锁也只能阻塞等待,就造成死循环了! //pthread_cond_wait的第二个参数是一把锁,当成功调用wait后,传入的锁会自动释放! //被唤醒时,还是在加锁和解锁之间,wait函数会自动帮线程获取锁 // 1. wait函数可能会调用失败 // 2. wait函数可能存在伪唤醒的情况 // 所以不能使用if语句来判断,而是用while,醒来后再判断一下是不是真的唤醒 pthread_cond_wait(&_condisfull,&_mtx);//若队列满了,就去isfull变量中等待 cout<<"等待成功,继续执行..."<<endl; } //3. 访问临界资源 _bq.push(in); //5. 生产完后唤醒消费者(满足一定条件再唤醒) if(_bq.size() >= _capacity/2) pthread_cond_signal(&_condisempty); //4. 解锁 pthread_mutex_unlock(&_mtx); } void pop(T* out) { //1. 加锁 pthread_mutex_lock(&_mtx); //2. 检测临界资源是否满足访问条件,不满足就利用条件变量让它等待 while(isempty()) { cout<<"队列为空,pop等待中..."<<endl; pthread_cond_wait(&_condisempty,&_mtx); cout<<"等待成功,继续执行..."<<endl; } //3. 访问临界资源 *out = _bq.front(); _bq.pop(); //4. 解锁 pthread_mutex_unlock(&_mtx); //5. 消费完后唤醒生产者 pthread_cond_signal(&_condisfull); } bool isempty() { return _bq.size() == 0; } bool isfull() { return _bq.size() ==_capacity; } private: queue<T> _bq; size_t _capacity; //阻塞队列容量上限 pthread_mutex_t _mtx; //通过互斥锁保证队列push/pop时,不会被pop/push(正在生产时就来消费) //push或pop时,怎么知道队列是不是满或空呢?通过条件变量来同步信息 pthread_cond_t _condisempty; //用它来表示阻塞队列是否为空的条件 pthread_cond_t _condisfull; //用它来表示阻塞队列是否为满的条件 };
对于这段代码的测试代码较为繁杂,我
把代码的码云链接放出来,有兴趣的同学
下来写完可以去做一些测试:
4. 初识posix信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
请思考以下问题:
在基于阻塞队列的生产者消费者模型中,生产者关心的是阻塞队列是否满了,而消费者关心的是阻塞队列是否有数据,既然它们关心的资源不同,但进入到阻塞队列就会直接加锁,是不是有一点不合理?有没有一种方法可以只让生产者与生产者之间以及消费者与消费者之间存在锁竞争?而生产者和消费者之间不存在锁竞争?答案是使用信号量!
什么是信号量?
信号量的本质是一个计数器,在使用资源前需要预定资源,也就是让信号量减一.类似于去电影院看电影需要提前买票预定座位,并且在访问完资源释放信号量,也就是让信号量加一
根据以上的信息可以简化模型:
生产者关心队列是否为满,刚开始队列是空,那么生产者的信号量就是n(队列的长度),而消费者关心队列是否有资源,刚开始队列为空,那么消费者的信号量就是0.并且在生产者push一个数据到队列后,生产者的信号量需要减一,而消费者的信号量会加一,同理,消费者从队列拿走一个数据后,生产者的信号量会加一,而消费者的信号量会减一
信号量的使用方法分为以下几步:
初始化信号量
销毁信号量
等待信号量(信号量减一)
发布信号量(信号量加一)
等待和发布信号量的操作又被称为PV操作
5. 基于信号量的环形队列模型
可以优化最初的生产者消费者模型:
使用一个环形队列,只有生产和消费指向环形队列中的同一个位置(队列为满或空)时,才会出现生产者和消费者互斥或同步的问题,而当生产和消费指向环形队列中的不同位置时,只需控制信号量即可
- 生产者关心空间资源,使用信号量spacesem,起始值为N
- 消费者关心数据资源,使用信号量datasem,起始值为0
- 期望:生产者不能让消费者套圈(为空时要先让生产者先运行)
- 期望:消费者不能超过生产者(为满时要让消费者先运行)
- 需要两把锁,一把给生产者之间当互斥锁,另外一把给消费者用
6. 环形队列模型的代码编写
还是和之前一样,可以选择将操作信号量的函数专门写为一个类,当然也可以用原生的
sem.hpp文件中
#pragma once #include<semaphore.h> using namespace std; class Sem { public: Sem(int value) { sem_init(&_sem,0,value); } ~Sem() { sem_destroy(&_sem); } void p() //PV操作分别代表信号量减一和信号量加一 { sem_wait(&_sem); } void v() { sem_post(&_sem); } private: sem_t _sem; };
在RingQueue.hpp文件中:
#pragma once #include<iostream> #include<pthread.h> #include<vector> #include<unistd.h> #include<semaphore.h> using namespace std; #include"Sem.hpp" const int g_num = 5; template<class T> class RingQueue { public: RingQueue(int num = g_num) :_num(num) ,_rq(num) ,_pushindex(0) ,_popindex(0) ,_space_sem(num)//最开始空间资源是满的 ,_data_sem(0)//最开始数据资源是空的 { pthread_mutex_init(&pushlock,nullptr); pthread_mutex_init(&poplock,nullptr); } ~RingQueue() { pthread_mutex_destroy(&pushlock); pthread_mutex_destroy(&poplock); } void push(const T& in)//生产者进行,关心空间资源,生产者们的临界资源是pushindex下标 { _space_sem.p(); pthread_mutex_lock(&pushlock);//对生产者们进行加锁 _rq[_pushindex++] = in; _pushindex %= _num; pthread_mutex_unlock(&pushlock); _data_sem.v(); } void pop(T* out)//消费者进行,关心数据资源,消费者们的临界资源也是popindex下标 { //一般先申请信号量,再进行加锁 _data_sem.p(); pthread_mutex_lock(&poplock);//对消费者们进行加锁 //竞争成功的生产者线程只有一个 *out = _rq[_popindex++]; _popindex %= _num; pthread_mutex_unlock(&poplock); _space_sem.v(); } private: vector<T> _rq;//环形队列可用数组表示 size_t _num;//总共的元素个数 size_t _pushindex = 0;//push的下标 size_t _popindex = 0;//pop的下标 Sem _space_sem; Sem _data_sem; pthread_mutex_t pushlock;//生产者的锁 pthread_mutex_t poplock;//消费者的锁 };
关于代码的解释都在注释中,若你有疑问,欢迎私信我~
7. 总结以及拓展
生产者消费者模型是我们学习线程中的一个很重要的模型,它的思想有助于帮我们解决后续的难题,并且校招时,有可能会让手撕一个生产者消费者模型,所以同学要学扎实了
🔎 下期预告:线程池详解 🔍