模拟生产者-消费者问题和读者-写者问题

简介: 模拟生产者-消费者问题和读者-写者问题

生产者-消费者问题

一组生产者进程生产产品给一组消费者进程消费。一个有n个缓冲区的缓冲池,生产者一次向一个缓冲区中投入消息,消费者从一个缓冲区中取得。生产者——消费者问题实际上是相互合作进程关系的一种抽象。该类问题不允许消费者进程到一个空缓冲区中取产品,同时也不允许生产者进程到一个已满且还没被取走的缓冲区中投放产品。

使用一个数组来表示具有n个(0,1,…,n-1)缓冲区的缓冲池。用输入指针in来指示下一个可投放产品的缓冲区,每当生产者进程生产并投放一个产品后,in加1;用一个输出指针out来指示下一个可从中获取产品的缓冲区,每当消费者进程取走一个产品后,out加


1。缓冲池是循环数组。

可利用互斥信号量mutex实现诸进程对缓冲池的互斥使用;利用信号量empty和full分别表示缓冲池中空缓冲区和满缓冲区的数量。

本实验模拟了生产者——消费者问题,打开“Microsoft Visual C++ 6.0”,输入相关代码后,对程序行进编译运行后,记录运行结果(记录第一个消费者以及其执行前的记录):

image.png

根据实验过程,结合实验程序,回答一下问题:

1.程序中创建了几个线程,生产者和消费者线程各几个?线程入口函数是什么?


创建了4个线程。生产者3个,消费者1个。消费者是DWORD WINAPI Consumer(LPVOID); 生产者是DWORD WINAPI Producer(LPVOID);


2.创建了几个信号量,它们的初始状态是什么,它们的作用各是什么?


3个信号量。g_hMutex=NULL、g_hEmptySemaphore=NULL、 g_hFullSemaphore=NULL。

g_hMutex用于线程间的互斥、g_hEmptySemaphore当缓冲区空时迫使消费者等待、g_hFullSemaphore当缓冲区满时迫使生产者等待。


3.简述函数WaitForSingleObject的功能


生产者(消费者)申请空缓冲区(满缓冲区)的使用,等待互斥信号量。


4.简述生产者线程执行的操作


生产者先申请空缓冲区的使用,等待互斥信号量,然后将信号量变为1,然后进行生产,空缓冲区增加一个产品,然后释放互斥信号量,释放满缓冲区。


5.简述消费者线程执行的操作


消费者先申请满缓冲区的使用,等待互斥信号量,然后将信号量变为1,然后进行消费,满缓冲区减少一个产品,然后释放互斥信号量变为0,释放空缓冲区。


附上源代码————


#include<windows.h>
#include<iostream>
const unsigned short SIZE_OF_BUFFER = 10;
unsigned short ProductID =0;  
unsigned short ConsumeID=0; 
unsigned short in =0;  
unsigned short out =0;
int g_buffer[SIZE_OF_BUFFER];
bool g_continue = true;
HANDLE g_hEmptySemaphore;
HANDLE g_hMutex;  
HANDLE g_hFullSemaphore; 
DWORD WINAPI Consumer(LPVOID);  
DWORD WINAPI Producer(LPVOID);
int main(){
  g_hMutex=CreateMutex(NULL,FALSE,NULL);
  g_hFullSemaphore=CreateSemaphore(NULL,0,SIZE_OF_BUFFER-1,NULL);
  g_hEmptySemaphore=CreateSemaphore(NULL,SIZE_OF_BUFFER-1,SIZE_OF_BUFFER-1,NULL);
  const unsigned short PRODUCERS_COUNT=3;  
  const unsigned short CONSUMERS_COUNT=1;
  const unsigned short THREADS_COUNT=PRODUCERS_COUNT+CONSUMERS_COUNT;
  HANDLE hThreads[THREADS_COUNT];
  DWORD producerID[PRODUCERS_COUNT];
  DWORD consumerID[CONSUMERS_COUNT];
  for (int i=0; i<PRODUCERS_COUNT; ++i){
  hThreads[i]=CreateThread(NULL,0,Producer,NULL,0,&producerID[i]);
  if(hThreads[i]=NULL)
    return -1;
  }
  for(i=0;i<CONSUMERS_COUNT;++i){
  hThreads[PRODUCERS_COUNT+i]=CreateThread(NULL,0,Consumer,NULL,0,&consumerID[i]);
  if(hThreads[i]=NULL)
    return -1;
  }
  while(g_continue){
  if(getchar()){
    g_continue=false;
    }
  }
  return 0;
 }
 void Produce(){
  std::cerr<<"Producing "<<++ProductID<<"...";
  std::cerr<<"Succeed"<< std::endl;
 }
 void Append(){
  std::cerr<<"Appending a product ...";
  g_buffer[in]=ProductID;
  in =(in+1)%SIZE_OF_BUFFER;
  std::cerr <<"Succeed"<< std::endl;
  for (int i=0; i<SIZE_OF_BUFFER;++i){
  std::cout<<i<<":"<<g_buffer[i];
   if (i==in)
  std::cout<<"<--生产";
  if (i==out)
  std::cout<<"<--消费";
  std::cout<<std::endl;
  }
 }
 void Take(){
  std::cerr <<"Taking a product ...";
  ConsumeID=g_buffer[out];
  out=(out+1)%SIZE_OF_BUFFER;
  std::cerr<<"Succeed"<<std::endl;
  for(int i=0; i<SIZE_OF_BUFFER;i++){
  std::cout<<i<<": "<<g_buffer[i]; 
  if(i==in)
    std::cout<<"<--生产";
  if(i==out)
    std::cout<<"<--消费";
  std::cout<<std::endl;
  }
 }
void Consume(){
  std::cerr<<"Consuming"<<ConsumeID<<"...";
  std::cerr<<"Succeed"<<std::endl;
}
 DWORD WINAPI Producer(LPVOID IpPara){
  while(g_continue){
   WaitForSingleObject(g_hEmptySemaphore,INFINITE); 
   WaitForSingleObject(g_hMutex,INFINITE);
   Produce();
   Append();
   Sleep(1500);
   ReleaseMutex(g_hMutex);
   ReleaseSemaphore(g_hFullSemaphore,1,NULL);
  }
  return 0;
 }
 DWORD WINAPI Consumer(LPVOID IpPara){
  while(g_continue){
  WaitForSingleObject(g_hFullSemaphore, INFINITE);
  WaitForSingleObject(g_hMutex, INFINITE);
  Take();
  Consume();
  Sleep(1500);
  ReleaseMutex(g_hMutex);
  ReleaseSemaphore(g_hEmptySemaphore,1,NULL);
  }
 return 0;
 }


这是两条平行且光滑的分割线,永不相交,却无限向前。

——————————————————————————————————————————————————————————————————————————————————————


读者——写者问题

一个数据文件或者记录可被多个进程(或线程)共享。其中,有些进程(或线程)要求读;而另一些进程(或线程)要求能写或者修改。只要求读的进程(或线程)称为“Reader进程”,其他进程(或线程)称为“Witer进程(或线程)”。允许多个Reader进程(或线程)同时读一个共享对象,不允许一个Writer进程(或线程)和其他Reader进程(或线程)或者Writer进程(或线程)同时访问共享对象。所谓读者—写者问题是指保证一个Writer(或线程)进程必须与其他进程(或线程)互斥地访问共享对象的同步问题。

本实验模拟了读者-写着问题。打开“Microsoft Visual C++ 6.0”,输入相关代码后,对程序行进编译运行后,新建一个“input.txt”文件,存放与源程序在同一目录下。分别测试两组数据,文件内容分别为:

image.png


运行结果分别是:

image.png

image.png

根据实验过程,结合试验程序回答一下几个问题:


“input.txt”文件中每列数据的含义是什么?

线程序号 读写操作 延迟时间 写文件持续时间


2.创建了几个信号量,它们的初始状态是什么,它们的作用各是什么?


两个信号量。互斥变量h_Mutex=0和ReadCount=0。互斥变量是控制写者与写者互斥,读者与写者互斥。

ReadCount进行读者计数,当为0时可以进行读或者写操作。

3.程序中哪部分代码实现了读–写互斥执行的过程?

void ReaderPriority(char*file){
  DWORD n_thread=0;
  DWORD thread_ID;
  DWORD wait_for_all;
  HANDLE h_Mutex;
  h_Mutex=CreateMutex(NULL, FALSE,"mutex_for_readcount");
  HANDLE h_Thread[MAX_THREAD_NUM];
  ThreadInfo thread_info[MAX_THREAD_NUM];
  readcount=0; 
  InitializeCriticalSection(&RP_Write);
  ifstream inFile;
  inFile.open(file, ios::nocreate);
  if (inFile.rdstate()==ios::failbit) {
   printf("打开文件\"%os\"失败!请将\"%s\"放程序目录下。\n",file,file);
   return;
  }
  printf("读者优先:\n\n");
  while(inFile){
   inFile>>thread_info[n_thread].serial;
   inFile>>thread_info[n_thread].entity;
   inFile>>thread_info[n_thread].delay;
   inFile>>thread_info[n_thread++].persist;
   inFile.get();
  }
  for(int i=0;i<(int)(n_thread);i++) {
   if (thread_info[i].entity==READER||thread_info[i].entity =='r'){
    h_Thread[i]=CreateThread(NULL,0, (LPTHREAD_START_ROUTINE)(RP_ReaderThread),&thread_info[i],0,&thread_ID);//创建读者进程
   }
   else{  
    h_Thread[i]=CreateThread(NULL,0, (LPTHREAD_START_ROUTINE)(RP_WriterThread),&thread_info[i],0,&thread_ID);//创建写者进程
   }
  }
  //等待所有的线程结束
  wait_for_all=WaitForMultipleObjects(n_thread,h_Thread,TRUE,-1);
  printf("所有的读写线程结束操作.\n");
}


4.CreateMutex(),CreateThread()两个函数什么含义,通过查阅帮助文档,给出函数内各个参数的含义。


CreateMutex()作用是找出当前系统是否已经存在指定进程的实例。如果没有则创建一个互斥体。第一个参数为指向安全属性的指针;第二个参数为互斥对象的所有者,第三个参数为指向互斥对象名的指针。

CreateThread()将在主线程的基础上创建一个新线程。第一个参数是指向SECURITY_ATTRIBUTES型态的结构的指针;第二个参数是用于新线程的初始堆栈大小,默认值为0;第三个参数是指向线程函数的指标;第四个参数为传递给ThreadProc的参数;第五个参数通常为0,但当建立的线程不马上执行时为旗标CREATE_SUSPENDED;第六个参数是一个指标,指向接受执行绪ID值的变量。


5.简单叙述一下读者——写者问题的同步和互斥关系:


读者申请互斥信号量,阻止写者进行操作,自己进行读操作,读者数加一;然后读者可以同时进行读文件,最后一个读者离开时,释放互斥信号量,写者可以进行写操作。写者申请互斥信号量,组织其他写者或者读者进行操作,写完之后释放互斥信号量。


最后附上心爱的源代码——————


#include"windows.h"
#include<conio.h>
#include<stdlib.h>
#include<fstream.h>
#include<io.h>
#include<string.h>
#include <stdio.h>
#define READER 'R'   //读者
#define WRITER 'W'   //写者
#define INTE_PER_SEC 1000 //每秒时钟中断的数目
#define MAX_THREAD_NUM 64  //最大线程数
#define MAX_FILE_NUM 32  //最大文件数目
#define MAX_STR_LEN 32  //字符串的长度
int readcount=0;  
int writecount=0;  
CRITICAL_SECTION RP_Write;//读者数目
CRITICAL_SECTION cs_Write;//写者数目
CRITICAL_SECTION cs_Read; //临界资源
struct ThreadInfo{
  int serial;//线程序号
  char entity;//线程类别(判断是读者还是写者进程)
  double delay;//线程延迟时间
  double persist;//线程读写操作时间
};
//读者优先-读者线程
//P:读者线程信息
void RP_ReaderThread(void *p){
  HANDLE h_Mutex;//互斥变量
  h_Mutex=OpenMutex(MUTEX_ALL_ACCESS,FALSE,"mutex_for_readcount");
  DWORD wait_for_mutex; //等待互斥变量的所有权 
  DWORD m_delay; //延迟时间
  DWORD m_persist;//读文件持续时间
  int m_serial;  
  //从参数中获得信息
  m_serial=((ThreadInfo*)(p))->serial;
  m_delay=(DWORD)(((ThreadInfo*)(p))->delay *INTE_PER_SEC);
  m_persist=(DWORD)(((ThreadInfo*)(p))->persist *INTE_PER_SEC); 
  Sleep(m_delay);  //延迟等待
  printf("读线程 %d 发出读请求.\n",m_serial);
  //等待互斥信号,保证对ReadCount的访问,修改互斥
  wait_for_mutex=WaitForSingleObject(h_Mutex,-1);
  //读者数目增加
  readcount++;
  if(readcount==1){
   //第一个读者,等待资源
  EnterCriticalSection(&RP_Write);
  }
  ReleaseMutex(h_Mutex);
  //读文件
  printf("读线程 %d 开始读文件.\n",m_serial);
  Sleep(m_persist);
  //退出线程
  printf("读线程 %d 结束读文件.\n",m_serial);
  //等待互斥信号,保证对ReadCount的访问,修改互斥
  wait_for_mutex=WaitForSingleObject(h_Mutex,-1);
  //读者数目减少
  readcount--;
  if(readcount==0){
  LeaveCriticalSection(&RP_Write);
  }
  ReleaseMutex(h_Mutex); 
}
//读者优先--写者进程
//P:写者线程信息
void RP_WriterThread(void*p){
  DWORD m_delay;//延迟时间  
  DWORD m_persist;  //写文件持续时间
  int m_serial; //线程序号
  //从参数中获取信息
  m_serial=((ThreadInfo*)(p))->serial;
  m_delay=(DWORD)(((ThreadInfo*)(p))->delay*INTE_PER_SEC); 
  m_persist=(DWORD)(((ThreadInfo*)(p))->persist*INTE_PER_SEC); 
  Sleep(m_delay);
  printf("写线程%d发出写请求***.\n",m_serial);
  //等待资源
  EnterCriticalSection(&RP_Write);
  //写文件
  printf("写线程%d开始写文件.\n",m_serial);
  Sleep(m_persist);
  //退出线程
  printf("写线程%d结束写文件.\n",m_serial);  
  //释放资源
  LeaveCriticalSection(&RP_Write);
}
//读者优先处理函数
void ReaderPriority(char*file){
  DWORD n_thread=0;
  DWORD thread_ID;
  DWORD wait_for_all;
  HANDLE h_Mutex;
  h_Mutex=CreateMutex(NULL, FALSE,"mutex_for_readcount");
  HANDLE h_Thread[MAX_THREAD_NUM];
  ThreadInfo thread_info[MAX_THREAD_NUM];
  readcount=0; 
  InitializeCriticalSection(&RP_Write);
  ifstream inFile;
  inFile.open(file, ios::nocreate);
  if (inFile.rdstate()==ios::failbit) {
   printf("打开文件\"%os\"失败!请将\"%s\"放程序目录下。\n",file,file);
   return;
  }
  printf("读者优先:\n\n");
  while(inFile){
   inFile>>thread_info[n_thread].serial;
   inFile>>thread_info[n_thread].entity;
   inFile>>thread_info[n_thread].delay;
   inFile>>thread_info[n_thread++].persist;
   inFile.get();
  }
  for(int i=0;i<(int)(n_thread);i++) {
   if (thread_info[i].entity==READER||thread_info[i].entity =='r'){
    h_Thread[i]=CreateThread(NULL,0, (LPTHREAD_START_ROUTINE)(RP_ReaderThread),&thread_info[i],0,&thread_ID);//创建读者进程
   }
   else{  
    h_Thread[i]=CreateThread(NULL,0, (LPTHREAD_START_ROUTINE)(RP_WriterThread),&thread_info[i],0,&thread_ID);//创建写者进程
   }
  }
  //等待所有的线程结束
  wait_for_all=WaitForMultipleObjects(n_thread,h_Thread,TRUE,-1);
  printf("所有的读写线程结束操作.\n");
}
 int main(int argc, char *argv[]){
  ReaderPriority("input.txt");
  printf("\n按任意键结束:");
  getch();
  return 0;
}

目录
相关文章
|
4月前
|
人工智能 自然语言处理 数据挖掘
阿里云百炼支持哪些AI大模型?文本生成、图像生成、语音合成及视频编辑等模型整理
阿里云百炼支持通义千问、通义万相等自研模型及DeepSeek、Kimi、Llama等第三方大模型,覆盖文本生成、图像生成、语音合成、视频生成、向量计算等多类AI能力,助力开发者高效构建应用。新用户可免费领取最高5000万Tokens。
2215 156
|
6月前
|
运维 监控 安全
2025年10月远程控制软件评测:流畅度、群控能力,教你如何选最好用的远程桌面工具
2025年主流远程控制软件深度评测:基于性能、画质、安全与场景适配多维分析,推荐连连控为专业首选。其全平台兼容、4K高帧率、智能低延迟及批量管控能力突出,适合设计、运维等高要求场景,助力企业高效协同与数字化转型。
|
安全 开发者
vite中引入defineConfig类型辅助函数
【10月更文挑战第11天】 在 Vite 中,`defineConfig` 类型辅助函数用于以类型安全的方式配置项目。它接收一个包含服务器、构建、插件等配置项的对象作为参数,提供类型提示和检查,确保配置正确。通过 `defineConfig`,配置更清晰、易于维护和扩展,支持团队协作。示例:设置服务器端口为 3000,构建输出路径为 &#39;dist&#39;。
647 57
|
存储 弹性计算 固态存储
阿里云服务器Entry云盘和ESSD Entry云盘区别、性能参数及使用常见问题参考
在我们选择阿里云服务器的时候,有部分云服务器同时支持ESSD Entry云盘和ESSD云盘,有的初次接触阿里云服务器云盘的用户可能还不是很清楚他们之间的区别,因此不知道选择哪种更好更能满足自己场景的需求,本文为大家介绍一下阿里云服务器Entry云盘和ESSD Entry云盘各自的性能参数区别及使用过程中的一些常见问题,以供选择参考。
|
存储 弹性计算 前端开发
用于搭建企业官网,如何选择阿里云服务器配置?
搭建企业官网时,可选阿里云ECS计算型c7实例,基于Intel Xeon Ice Lake处理器,提供2-128核与4GB-256GB内存选项,支持ESSD云盘。对于低访问量官网,推荐更具性价比的ECS通用算力型u1实例,配置为2核4GB内存,5M固定带宽,80G ESSD Entry盘,首年及续费均为199元。高性能需求可选用c7实例,适用于视频处理、大型游戏前端等场景。
795 12
|
边缘计算 物联网 5G
软件定义网络(SDN)的未来趋势:重塑网络架构,引领技术创新
【8月更文挑战第20天】软件定义网络(SDN)作为新兴的网络技术,正在逐步重塑网络架构,引领技术创新。随着5G、人工智能、边缘计算等技术的不断发展,SDN将展现出更加广阔的应用前景和市场潜力。未来,SDN有望成为主流网络技术,并在各行各业推动数字化转型。让我们共同期待SDN技术带来的更加智能、安全和高效的网络体验。
1046 1
|
消息中间件 存储 数据中心
RocketMQ的长轮询(Long Polling)实现分析
文章深入分析了RocketMQ的长轮询实现机制,长轮询结合了推送(push)和拉取(pull)两种消息消费模式的优点,通过客户端和服务端的配合,确保了消息的实时性同时将主动权保留在客户端。文中首先解释了长轮询的基本概念和实现步骤,然后通过一个简单的实例模拟了长轮询的过程,最后详细介绍了RocketMQ中DefaultMQPushConsumer的长轮询实现方式,包括PullMessage服务、PullMessageProcessor服务和PullCallback回调的工作原理。
603 1
|
机器学习/深度学习 数据采集 数据可视化
NumPy 正态分布与 Seaborn 可视化指南
正态分布(高斯分布)是重要的概率分布,常用于描述自然和人为现象的数据。分布呈钟形,峰值在均值(μ)处,两侧对称下降。特征由均值和标准差(σ)描述,标准差影响分布的分散程度。NumPy 的 `random.normal()` 函数可生成正态分布随机数,Seaborn 库则方便绘制分布图。正态分布广泛应用于统计学、机器学习、金融和工程等领域。练习包括生成正态分布随机数并作图,以及比较不同标准差下的分布形状。
436 3
|
关系型数据库 MySQL API
SqlAlchemy 2.0 中文文档(五十三)(1)
SqlAlchemy 2.0 中文文档(五十三)
301 0

热门文章

最新文章

下一篇
开通oss服务