模拟生产者-消费者问题和读者-写者问题

简介: 模拟生产者-消费者问题和读者-写者问题

生产者-消费者问题

一组生产者进程生产产品给一组消费者进程消费。一个有n个缓冲区的缓冲池,生产者一次向一个缓冲区中投入消息,消费者从一个缓冲区中取得。生产者——消费者问题实际上是相互合作进程关系的一种抽象。该类问题不允许消费者进程到一个空缓冲区中取产品,同时也不允许生产者进程到一个已满且还没被取走的缓冲区中投放产品。

使用一个数组来表示具有n个(0,1,…,n-1)缓冲区的缓冲池。用输入指针in来指示下一个可投放产品的缓冲区,每当生产者进程生产并投放一个产品后,in加1;用一个输出指针out来指示下一个可从中获取产品的缓冲区,每当消费者进程取走一个产品后,out加


1。缓冲池是循环数组。

可利用互斥信号量mutex实现诸进程对缓冲池的互斥使用;利用信号量empty和full分别表示缓冲池中空缓冲区和满缓冲区的数量。

本实验模拟了生产者——消费者问题,打开“Microsoft Visual C++ 6.0”,输入相关代码后,对程序行进编译运行后,记录运行结果(记录第一个消费者以及其执行前的记录):

image.png

根据实验过程,结合实验程序,回答一下问题:

1.程序中创建了几个线程,生产者和消费者线程各几个?线程入口函数是什么?


创建了4个线程。生产者3个,消费者1个。消费者是DWORD WINAPI Consumer(LPVOID); 生产者是DWORD WINAPI Producer(LPVOID);


2.创建了几个信号量,它们的初始状态是什么,它们的作用各是什么?


3个信号量。g_hMutex=NULL、g_hEmptySemaphore=NULL、 g_hFullSemaphore=NULL。

g_hMutex用于线程间的互斥、g_hEmptySemaphore当缓冲区空时迫使消费者等待、g_hFullSemaphore当缓冲区满时迫使生产者等待。


3.简述函数WaitForSingleObject的功能


生产者(消费者)申请空缓冲区(满缓冲区)的使用,等待互斥信号量。


4.简述生产者线程执行的操作


生产者先申请空缓冲区的使用,等待互斥信号量,然后将信号量变为1,然后进行生产,空缓冲区增加一个产品,然后释放互斥信号量,释放满缓冲区。


5.简述消费者线程执行的操作


消费者先申请满缓冲区的使用,等待互斥信号量,然后将信号量变为1,然后进行消费,满缓冲区减少一个产品,然后释放互斥信号量变为0,释放空缓冲区。


附上源代码————


#include<windows.h>
#include<iostream>
const unsigned short SIZE_OF_BUFFER = 10;
unsigned short ProductID =0;  
unsigned short ConsumeID=0; 
unsigned short in =0;  
unsigned short out =0;
int g_buffer[SIZE_OF_BUFFER];
bool g_continue = true;
HANDLE g_hEmptySemaphore;
HANDLE g_hMutex;  
HANDLE g_hFullSemaphore; 
DWORD WINAPI Consumer(LPVOID);  
DWORD WINAPI Producer(LPVOID);
int main(){
  g_hMutex=CreateMutex(NULL,FALSE,NULL);
  g_hFullSemaphore=CreateSemaphore(NULL,0,SIZE_OF_BUFFER-1,NULL);
  g_hEmptySemaphore=CreateSemaphore(NULL,SIZE_OF_BUFFER-1,SIZE_OF_BUFFER-1,NULL);
  const unsigned short PRODUCERS_COUNT=3;  
  const unsigned short CONSUMERS_COUNT=1;
  const unsigned short THREADS_COUNT=PRODUCERS_COUNT+CONSUMERS_COUNT;
  HANDLE hThreads[THREADS_COUNT];
  DWORD producerID[PRODUCERS_COUNT];
  DWORD consumerID[CONSUMERS_COUNT];
  for (int i=0; i<PRODUCERS_COUNT; ++i){
  hThreads[i]=CreateThread(NULL,0,Producer,NULL,0,&producerID[i]);
  if(hThreads[i]=NULL)
    return -1;
  }
  for(i=0;i<CONSUMERS_COUNT;++i){
  hThreads[PRODUCERS_COUNT+i]=CreateThread(NULL,0,Consumer,NULL,0,&consumerID[i]);
  if(hThreads[i]=NULL)
    return -1;
  }
  while(g_continue){
  if(getchar()){
    g_continue=false;
    }
  }
  return 0;
 }
 void Produce(){
  std::cerr<<"Producing "<<++ProductID<<"...";
  std::cerr<<"Succeed"<< std::endl;
 }
 void Append(){
  std::cerr<<"Appending a product ...";
  g_buffer[in]=ProductID;
  in =(in+1)%SIZE_OF_BUFFER;
  std::cerr <<"Succeed"<< std::endl;
  for (int i=0; i<SIZE_OF_BUFFER;++i){
  std::cout<<i<<":"<<g_buffer[i];
   if (i==in)
  std::cout<<"<--生产";
  if (i==out)
  std::cout<<"<--消费";
  std::cout<<std::endl;
  }
 }
 void Take(){
  std::cerr <<"Taking a product ...";
  ConsumeID=g_buffer[out];
  out=(out+1)%SIZE_OF_BUFFER;
  std::cerr<<"Succeed"<<std::endl;
  for(int i=0; i<SIZE_OF_BUFFER;i++){
  std::cout<<i<<": "<<g_buffer[i]; 
  if(i==in)
    std::cout<<"<--生产";
  if(i==out)
    std::cout<<"<--消费";
  std::cout<<std::endl;
  }
 }
void Consume(){
  std::cerr<<"Consuming"<<ConsumeID<<"...";
  std::cerr<<"Succeed"<<std::endl;
}
 DWORD WINAPI Producer(LPVOID IpPara){
  while(g_continue){
   WaitForSingleObject(g_hEmptySemaphore,INFINITE); 
   WaitForSingleObject(g_hMutex,INFINITE);
   Produce();
   Append();
   Sleep(1500);
   ReleaseMutex(g_hMutex);
   ReleaseSemaphore(g_hFullSemaphore,1,NULL);
  }
  return 0;
 }
 DWORD WINAPI Consumer(LPVOID IpPara){
  while(g_continue){
  WaitForSingleObject(g_hFullSemaphore, INFINITE);
  WaitForSingleObject(g_hMutex, INFINITE);
  Take();
  Consume();
  Sleep(1500);
  ReleaseMutex(g_hMutex);
  ReleaseSemaphore(g_hEmptySemaphore,1,NULL);
  }
 return 0;
 }


这是两条平行且光滑的分割线,永不相交,却无限向前。

——————————————————————————————————————————————————————————————————————————————————————


读者——写者问题

一个数据文件或者记录可被多个进程(或线程)共享。其中,有些进程(或线程)要求读;而另一些进程(或线程)要求能写或者修改。只要求读的进程(或线程)称为“Reader进程”,其他进程(或线程)称为“Witer进程(或线程)”。允许多个Reader进程(或线程)同时读一个共享对象,不允许一个Writer进程(或线程)和其他Reader进程(或线程)或者Writer进程(或线程)同时访问共享对象。所谓读者—写者问题是指保证一个Writer(或线程)进程必须与其他进程(或线程)互斥地访问共享对象的同步问题。

本实验模拟了读者-写着问题。打开“Microsoft Visual C++ 6.0”,输入相关代码后,对程序行进编译运行后,新建一个“input.txt”文件,存放与源程序在同一目录下。分别测试两组数据,文件内容分别为:

image.png


运行结果分别是:

image.png

image.png

根据实验过程,结合试验程序回答一下几个问题:


“input.txt”文件中每列数据的含义是什么?

线程序号 读写操作 延迟时间 写文件持续时间


2.创建了几个信号量,它们的初始状态是什么,它们的作用各是什么?


两个信号量。互斥变量h_Mutex=0和ReadCount=0。互斥变量是控制写者与写者互斥,读者与写者互斥。

ReadCount进行读者计数,当为0时可以进行读或者写操作。

3.程序中哪部分代码实现了读–写互斥执行的过程?

void ReaderPriority(char*file){
  DWORD n_thread=0;
  DWORD thread_ID;
  DWORD wait_for_all;
  HANDLE h_Mutex;
  h_Mutex=CreateMutex(NULL, FALSE,"mutex_for_readcount");
  HANDLE h_Thread[MAX_THREAD_NUM];
  ThreadInfo thread_info[MAX_THREAD_NUM];
  readcount=0; 
  InitializeCriticalSection(&RP_Write);
  ifstream inFile;
  inFile.open(file, ios::nocreate);
  if (inFile.rdstate()==ios::failbit) {
   printf("打开文件\"%os\"失败!请将\"%s\"放程序目录下。\n",file,file);
   return;
  }
  printf("读者优先:\n\n");
  while(inFile){
   inFile>>thread_info[n_thread].serial;
   inFile>>thread_info[n_thread].entity;
   inFile>>thread_info[n_thread].delay;
   inFile>>thread_info[n_thread++].persist;
   inFile.get();
  }
  for(int i=0;i<(int)(n_thread);i++) {
   if (thread_info[i].entity==READER||thread_info[i].entity =='r'){
    h_Thread[i]=CreateThread(NULL,0, (LPTHREAD_START_ROUTINE)(RP_ReaderThread),&thread_info[i],0,&thread_ID);//创建读者进程
   }
   else{  
    h_Thread[i]=CreateThread(NULL,0, (LPTHREAD_START_ROUTINE)(RP_WriterThread),&thread_info[i],0,&thread_ID);//创建写者进程
   }
  }
  //等待所有的线程结束
  wait_for_all=WaitForMultipleObjects(n_thread,h_Thread,TRUE,-1);
  printf("所有的读写线程结束操作.\n");
}


4.CreateMutex(),CreateThread()两个函数什么含义,通过查阅帮助文档,给出函数内各个参数的含义。


CreateMutex()作用是找出当前系统是否已经存在指定进程的实例。如果没有则创建一个互斥体。第一个参数为指向安全属性的指针;第二个参数为互斥对象的所有者,第三个参数为指向互斥对象名的指针。

CreateThread()将在主线程的基础上创建一个新线程。第一个参数是指向SECURITY_ATTRIBUTES型态的结构的指针;第二个参数是用于新线程的初始堆栈大小,默认值为0;第三个参数是指向线程函数的指标;第四个参数为传递给ThreadProc的参数;第五个参数通常为0,但当建立的线程不马上执行时为旗标CREATE_SUSPENDED;第六个参数是一个指标,指向接受执行绪ID值的变量。


5.简单叙述一下读者——写者问题的同步和互斥关系:


读者申请互斥信号量,阻止写者进行操作,自己进行读操作,读者数加一;然后读者可以同时进行读文件,最后一个读者离开时,释放互斥信号量,写者可以进行写操作。写者申请互斥信号量,组织其他写者或者读者进行操作,写完之后释放互斥信号量。


最后附上心爱的源代码——————


#include"windows.h"
#include<conio.h>
#include<stdlib.h>
#include<fstream.h>
#include<io.h>
#include<string.h>
#include <stdio.h>
#define READER 'R'   //读者
#define WRITER 'W'   //写者
#define INTE_PER_SEC 1000 //每秒时钟中断的数目
#define MAX_THREAD_NUM 64  //最大线程数
#define MAX_FILE_NUM 32  //最大文件数目
#define MAX_STR_LEN 32  //字符串的长度
int readcount=0;  
int writecount=0;  
CRITICAL_SECTION RP_Write;//读者数目
CRITICAL_SECTION cs_Write;//写者数目
CRITICAL_SECTION cs_Read; //临界资源
struct ThreadInfo{
  int serial;//线程序号
  char entity;//线程类别(判断是读者还是写者进程)
  double delay;//线程延迟时间
  double persist;//线程读写操作时间
};
//读者优先-读者线程
//P:读者线程信息
void RP_ReaderThread(void *p){
  HANDLE h_Mutex;//互斥变量
  h_Mutex=OpenMutex(MUTEX_ALL_ACCESS,FALSE,"mutex_for_readcount");
  DWORD wait_for_mutex; //等待互斥变量的所有权 
  DWORD m_delay; //延迟时间
  DWORD m_persist;//读文件持续时间
  int m_serial;  
  //从参数中获得信息
  m_serial=((ThreadInfo*)(p))->serial;
  m_delay=(DWORD)(((ThreadInfo*)(p))->delay *INTE_PER_SEC);
  m_persist=(DWORD)(((ThreadInfo*)(p))->persist *INTE_PER_SEC); 
  Sleep(m_delay);  //延迟等待
  printf("读线程 %d 发出读请求.\n",m_serial);
  //等待互斥信号,保证对ReadCount的访问,修改互斥
  wait_for_mutex=WaitForSingleObject(h_Mutex,-1);
  //读者数目增加
  readcount++;
  if(readcount==1){
   //第一个读者,等待资源
  EnterCriticalSection(&RP_Write);
  }
  ReleaseMutex(h_Mutex);
  //读文件
  printf("读线程 %d 开始读文件.\n",m_serial);
  Sleep(m_persist);
  //退出线程
  printf("读线程 %d 结束读文件.\n",m_serial);
  //等待互斥信号,保证对ReadCount的访问,修改互斥
  wait_for_mutex=WaitForSingleObject(h_Mutex,-1);
  //读者数目减少
  readcount--;
  if(readcount==0){
  LeaveCriticalSection(&RP_Write);
  }
  ReleaseMutex(h_Mutex); 
}
//读者优先--写者进程
//P:写者线程信息
void RP_WriterThread(void*p){
  DWORD m_delay;//延迟时间  
  DWORD m_persist;  //写文件持续时间
  int m_serial; //线程序号
  //从参数中获取信息
  m_serial=((ThreadInfo*)(p))->serial;
  m_delay=(DWORD)(((ThreadInfo*)(p))->delay*INTE_PER_SEC); 
  m_persist=(DWORD)(((ThreadInfo*)(p))->persist*INTE_PER_SEC); 
  Sleep(m_delay);
  printf("写线程%d发出写请求***.\n",m_serial);
  //等待资源
  EnterCriticalSection(&RP_Write);
  //写文件
  printf("写线程%d开始写文件.\n",m_serial);
  Sleep(m_persist);
  //退出线程
  printf("写线程%d结束写文件.\n",m_serial);  
  //释放资源
  LeaveCriticalSection(&RP_Write);
}
//读者优先处理函数
void ReaderPriority(char*file){
  DWORD n_thread=0;
  DWORD thread_ID;
  DWORD wait_for_all;
  HANDLE h_Mutex;
  h_Mutex=CreateMutex(NULL, FALSE,"mutex_for_readcount");
  HANDLE h_Thread[MAX_THREAD_NUM];
  ThreadInfo thread_info[MAX_THREAD_NUM];
  readcount=0; 
  InitializeCriticalSection(&RP_Write);
  ifstream inFile;
  inFile.open(file, ios::nocreate);
  if (inFile.rdstate()==ios::failbit) {
   printf("打开文件\"%os\"失败!请将\"%s\"放程序目录下。\n",file,file);
   return;
  }
  printf("读者优先:\n\n");
  while(inFile){
   inFile>>thread_info[n_thread].serial;
   inFile>>thread_info[n_thread].entity;
   inFile>>thread_info[n_thread].delay;
   inFile>>thread_info[n_thread++].persist;
   inFile.get();
  }
  for(int i=0;i<(int)(n_thread);i++) {
   if (thread_info[i].entity==READER||thread_info[i].entity =='r'){
    h_Thread[i]=CreateThread(NULL,0, (LPTHREAD_START_ROUTINE)(RP_ReaderThread),&thread_info[i],0,&thread_ID);//创建读者进程
   }
   else{  
    h_Thread[i]=CreateThread(NULL,0, (LPTHREAD_START_ROUTINE)(RP_WriterThread),&thread_info[i],0,&thread_ID);//创建写者进程
   }
  }
  //等待所有的线程结束
  wait_for_all=WaitForMultipleObjects(n_thread,h_Thread,TRUE,-1);
  printf("所有的读写线程结束操作.\n");
}
 int main(int argc, char *argv[]){
  ReaderPriority("input.txt");
  printf("\n按任意键结束:");
  getch();
  return 0;
}

目录
相关文章
|
24天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
16 1
|
4月前
|
安全 数据库连接 数据库
(六)手撕并发编程之基于Semaphore与CountDownLatch分析AQS共享模式实现
在上篇文章《深入剖析并发之AQS独占锁&重入锁(ReetrantLock)及Condition实现原理》中我们曾基于ReetrantLock锁分析了AQS独占模式的实现原理,本章则准备从Semaphore信号量的角度出发一探AQS共享模式的具体实现。共享模式与独占模式区别在于:共享模式下允许多条线程同时获取锁资源,而在之前分析的独占模式中,在同一时刻只允许一条线程持有锁资源。
|
5月前
|
Java API
详细探究Java多线程的线程状态变化
Java多线程的线程状态主要有六种:新建(NEW)、可运行(RUNNABLE)、阻塞(BLOCKED)、等待(WAITING)、超时等待(TIMED_WAITING)和终止(TERMINATED)。线程创建后处于NEW状态,调用start()后进入RUNNABLE状态,表示准备好运行。当线程获得CPU资源,开始执行run()方法时,它处于运行状态。线程可以因等待锁或调用sleep()等方法进入BLOCKED或等待状态。线程完成任务或发生异常后,会进入TERMINATED状态。
21229 5
|
6月前
|
监控 安全 Java
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
110 1
|
6月前
|
设计模式 安全 Java
【Linux 系统】多线程(生产者消费者模型、线程池、STL+智能指针与线程安全、读者写者问题)-- 详解
【Linux 系统】多线程(生产者消费者模型、线程池、STL+智能指针与线程安全、读者写者问题)-- 详解
|
Java
高并发编程-线程生产者消费者的综合示例
高并发编程-线程生产者消费者的综合示例
79 0
二十、经典同步问题-读者写者问题
二十、经典同步问题-读者写者问题
|
存储 SQL 安全
搞定|通过实际案例搞懂多任务线程
搞定|通过实际案例搞懂多任务线程
搞定|通过实际案例搞懂多任务线程
|
Java
Java多线程学习(五)线程间通信知识点补充
欢迎关注我的微信公众号:**“Java面试通关手册”**(分享各种Java学习资源,面试题,以及企业级Java实战项目回复关键字免费领取)。另外我创建了一个Java学习交流群(群号:**174594747**),欢迎大家加入一起学习,这里更有面试,学习视频等资源的分享。
2265 0