前言
在并发编程领域,生产者消费者模型是一个经典且重要的话题。它涉及到多线程之间的协作与通信,展现了在复杂系统中保持数据一致性和避免资源竞争的关键技术。通过深入探讨生产者消费者模型,我们可以了解如何利用同步和互斥的机制来实现线程之间的有效协作,从而提高程序的效率和可靠性。
在本篇博客中,我将带领读者逐步理解生产者消费者模型的设计思想、实现方法以及可能遇到的问题。无论是初学者还是有一定经验的开发人员,都可以通过本文深入了解生产者消费者模型,并掌握如何在实际项目中应用这一模型来优化程序结构和性能。
让我们一起探索生产者消费者模型的精妙之处,为并发编程的世界增添新的活力与智慧。
1 相关概念
- 321原则
3:3种关系,生产者之间(互斥)、消费者之间(互斥)、生产者和消费者之间(互斥+同步)
2:两种角色,生产者和消费者
1:一个交易场所
- 使用生产者消费者模型的原因
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
- 生产者消费者模型优点
- 解耦
- 支持并发
- 支持忙闲不均
- 基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
2 基于BlockingQueue的生产者消费者代码实现
BlockQueue.cc
#pragma #include<iostream> #include<pthread.h> #include<unistd.h> #include<queue> #include<stdlib.h> using namespace std; class Task { public: int _x; int _y; Task(){} Task(int x,int y) :_x(x),_y(y) {} int run() { return _x+_y; } ~Task(){} }; class BlockQueue { private: /* data */ std::queue<Task> q;//设置一个队列 int _cap;//容量 pthread_mutex_t lock;//设置一把互斥锁 pthread_cond_t c_cond;//满了通知消费者 pthread_cond_t p_cond;//满了通知生产者 void LockQueue()//加锁 { pthread_mutex_lock(&lock); } void UnlockQueue()//解锁 { pthread_mutex_unlock(&lock); } bool IsEmpty()//判断队列是否为空 { return q.size()==0; } bool IsFull()//判断队列是否满 { return q.size()==_cap; } void ProductWait()//生产者等待 { pthread_cond_wait(&p_cond,&lock); } void ConsumerWait()//消费者等待 { pthread_cond_wait(&c_cond,&lock); } void WakeUpProduct()//唤醒生产者 { pthread_cond_signal(&p_cond); } void WakeUpConsumer()//唤醒消费者 { pthread_cond_signal(&c_cond); } public: BlockQueue(int cap)//初始化 :_cap(cap) { pthread_mutex_init(&lock,NULL); pthread_cond_init(&c_cond,NULL); pthread_cond_init(&p_cond,NULL); } ~BlockQueue() { pthread_mutex_destroy(&lock); pthread_cond_destroy(&c_cond); pthread_cond_destroy(&p_cond); } void put(Task in) { LockQueue(); while(IsFull()){ WakeUpConsumer();//唤醒消费者 std::cout<<"queue full,notify consume , stop product"<<std::endl; ProductWait();//生产者线程等待 } q.push(in); UnlockQueue(); } void get(Task&out){ LockQueue(); while(IsEmpty()){ WakeUpProduct();//唤醒生产者 std::cout<<"queue Empty,notify product , stop consum"<<std::endl; ConsumerWait();//消费者线程等待 } out=q.front(); q.pop(); UnlockQueue(); } };
main.cc
#include"BlockQueue.cc" pthread_mutex_t p_lock; pthread_mutex_t c_lock; void* Product_Run(void* arg) { BlockQueue* bq = (BlockQueue*)arg; srand((unsigned int)time(NULL)); while(1) { pthread_mutex_lock(&p_lock); int x = rand()%10 + 1; int y = rand()%100 + 1; Task t(x,y); bq->put(t); pthread_mutex_unlock(&p_lock); cout<<"product data is "<<t.run()<<endl; } } void* Consumer_Run(void* arg) { BlockQueue* bq = (BlockQueue*)arg; srand((unsigned int)time(NULL)); while(1) { pthread_mutex_lock(&c_lock); Task t; bq->get(t); pthread_mutex_unlock(&c_lock); cout<<"consumer is "<<t._x<<"+"<<t._y<<"="<<t.run()<<endl; sleep(1); } } int main() { BlockQueue* bq = new BlockQueue(10); pthread_t c,p; pthread_create(&c,NULL,Product_Run,(void*)bq); pthread_create(&p,NULL,Consumer_Run,(void*)bq); pthread_join(c,NULL); pthread_join(p,NULL); delete bq; return 0; }
makefile
main:main.cc g++ -o $@ $^ -lpthread .PTHONY: clean: rm -f main
结果:
可以观察到生产者生产任务,消费者完成任务