生产者-消费者问题
一组生产者进程生产产品给一组消费者进程消费。一个有n个缓冲区的缓冲池,生产者一次向一个缓冲区中投入消息,消费者从一个缓冲区中取得。生产者——消费者问题实际上是相互合作进程关系的一种抽象。该类问题不允许消费者进程到一个空缓冲区中取产品,同时也不允许生产者进程到一个已满且还没被取走的缓冲区中投放产品。
使用一个数组来表示具有n个(0,1,…,n-1)缓冲区的缓冲池。用输入指针in来指示下一个可投放产品的缓冲区,每当生产者进程生产并投放一个产品后,in加1;用一个输出指针out来指示下一个可从中获取产品的缓冲区,每当消费者进程取走一个产品后,out加
1。缓冲池是循环数组。
可利用互斥信号量mutex实现诸进程对缓冲池的互斥使用;利用信号量empty和full分别表示缓冲池中空缓冲区和满缓冲区的数量。
本实验模拟了生产者——消费者问题,打开“Microsoft Visual C++ 6.0”,输入相关代码后,对程序行进编译运行后,记录运行结果(记录第一个消费者以及其执行前的记录):
根据实验过程,结合实验程序,回答一下问题:
1.程序中创建了几个线程,生产者和消费者线程各几个?线程入口函数是什么?
创建了4个线程。生产者3个,消费者1个。消费者是DWORD WINAPI Consumer(LPVOID); 生产者是DWORD WINAPI Producer(LPVOID);
2.创建了几个信号量,它们的初始状态是什么,它们的作用各是什么?
3个信号量。g_hMutex=NULL、g_hEmptySemaphore=NULL、 g_hFullSemaphore=NULL。
g_hMutex用于线程间的互斥、g_hEmptySemaphore当缓冲区空时迫使消费者等待、g_hFullSemaphore当缓冲区满时迫使生产者等待。
3.简述函数WaitForSingleObject的功能
生产者(消费者)申请空缓冲区(满缓冲区)的使用,等待互斥信号量。
4.简述生产者线程执行的操作
生产者先申请空缓冲区的使用,等待互斥信号量,然后将信号量变为1,然后进行生产,空缓冲区增加一个产品,然后释放互斥信号量,释放满缓冲区。
5.简述消费者线程执行的操作
消费者先申请满缓冲区的使用,等待互斥信号量,然后将信号量变为1,然后进行消费,满缓冲区减少一个产品,然后释放互斥信号量变为0,释放空缓冲区。
附上源代码————
#include<windows.h> #include<iostream> const unsigned short SIZE_OF_BUFFER = 10; unsigned short ProductID =0; unsigned short ConsumeID=0; unsigned short in =0; unsigned short out =0; int g_buffer[SIZE_OF_BUFFER]; bool g_continue = true; HANDLE g_hEmptySemaphore; HANDLE g_hMutex; HANDLE g_hFullSemaphore; DWORD WINAPI Consumer(LPVOID); DWORD WINAPI Producer(LPVOID); int main(){ g_hMutex=CreateMutex(NULL,FALSE,NULL); g_hFullSemaphore=CreateSemaphore(NULL,0,SIZE_OF_BUFFER-1,NULL); g_hEmptySemaphore=CreateSemaphore(NULL,SIZE_OF_BUFFER-1,SIZE_OF_BUFFER-1,NULL); const unsigned short PRODUCERS_COUNT=3; const unsigned short CONSUMERS_COUNT=1; const unsigned short THREADS_COUNT=PRODUCERS_COUNT+CONSUMERS_COUNT; HANDLE hThreads[THREADS_COUNT]; DWORD producerID[PRODUCERS_COUNT]; DWORD consumerID[CONSUMERS_COUNT]; for (int i=0; i<PRODUCERS_COUNT; ++i){ hThreads[i]=CreateThread(NULL,0,Producer,NULL,0,&producerID[i]); if(hThreads[i]=NULL) return -1; } for(i=0;i<CONSUMERS_COUNT;++i){ hThreads[PRODUCERS_COUNT+i]=CreateThread(NULL,0,Consumer,NULL,0,&consumerID[i]); if(hThreads[i]=NULL) return -1; } while(g_continue){ if(getchar()){ g_continue=false; } } return 0; } void Produce(){ std::cerr<<"Producing "<<++ProductID<<"..."; std::cerr<<"Succeed"<< std::endl; } void Append(){ std::cerr<<"Appending a product ..."; g_buffer[in]=ProductID; in =(in+1)%SIZE_OF_BUFFER; std::cerr <<"Succeed"<< std::endl; for (int i=0; i<SIZE_OF_BUFFER;++i){ std::cout<<i<<":"<<g_buffer[i]; if (i==in) std::cout<<"<--生产"; if (i==out) std::cout<<"<--消费"; std::cout<<std::endl; } } void Take(){ std::cerr <<"Taking a product ..."; ConsumeID=g_buffer[out]; out=(out+1)%SIZE_OF_BUFFER; std::cerr<<"Succeed"<<std::endl; for(int i=0; i<SIZE_OF_BUFFER;i++){ std::cout<<i<<": "<<g_buffer[i]; if(i==in) std::cout<<"<--生产"; if(i==out) std::cout<<"<--消费"; std::cout<<std::endl; } } void Consume(){ std::cerr<<"Consuming"<<ConsumeID<<"..."; std::cerr<<"Succeed"<<std::endl; } DWORD WINAPI Producer(LPVOID IpPara){ while(g_continue){ WaitForSingleObject(g_hEmptySemaphore,INFINITE); WaitForSingleObject(g_hMutex,INFINITE); Produce(); Append(); Sleep(1500); ReleaseMutex(g_hMutex); ReleaseSemaphore(g_hFullSemaphore,1,NULL); } return 0; } DWORD WINAPI Consumer(LPVOID IpPara){ while(g_continue){ WaitForSingleObject(g_hFullSemaphore, INFINITE); WaitForSingleObject(g_hMutex, INFINITE); Take(); Consume(); Sleep(1500); ReleaseMutex(g_hMutex); ReleaseSemaphore(g_hEmptySemaphore,1,NULL); } return 0; }
这是两条平行且光滑的分割线,永不相交,却无限向前。
——————————————————————————————————————————————————————————————————————————————————————
读者——写者问题
一个数据文件或者记录可被多个进程(或线程)共享。其中,有些进程(或线程)要求读;而另一些进程(或线程)要求能写或者修改。只要求读的进程(或线程)称为“Reader进程”,其他进程(或线程)称为“Witer进程(或线程)”。允许多个Reader进程(或线程)同时读一个共享对象,不允许一个Writer进程(或线程)和其他Reader进程(或线程)或者Writer进程(或线程)同时访问共享对象。所谓读者—写者问题是指保证一个Writer(或线程)进程必须与其他进程(或线程)互斥地访问共享对象的同步问题。
本实验模拟了读者-写着问题。打开“Microsoft Visual C++ 6.0”,输入相关代码后,对程序行进编译运行后,新建一个“input.txt”文件,存放与源程序在同一目录下。分别测试两组数据,文件内容分别为:
运行结果分别是:
根据实验过程,结合试验程序回答一下几个问题:
“input.txt”文件中每列数据的含义是什么?
线程序号 读写操作 延迟时间 写文件持续时间
2.创建了几个信号量,它们的初始状态是什么,它们的作用各是什么?
两个信号量。互斥变量h_Mutex=0和ReadCount=0。互斥变量是控制写者与写者互斥,读者与写者互斥。
ReadCount进行读者计数,当为0时可以进行读或者写操作。
3.程序中哪部分代码实现了读–写互斥执行的过程?
void ReaderPriority(char*file){ DWORD n_thread=0; DWORD thread_ID; DWORD wait_for_all; HANDLE h_Mutex; h_Mutex=CreateMutex(NULL, FALSE,"mutex_for_readcount"); HANDLE h_Thread[MAX_THREAD_NUM]; ThreadInfo thread_info[MAX_THREAD_NUM]; readcount=0; InitializeCriticalSection(&RP_Write); ifstream inFile; inFile.open(file, ios::nocreate); if (inFile.rdstate()==ios::failbit) { printf("打开文件\"%os\"失败!请将\"%s\"放程序目录下。\n",file,file); return; } printf("读者优先:\n\n"); while(inFile){ inFile>>thread_info[n_thread].serial; inFile>>thread_info[n_thread].entity; inFile>>thread_info[n_thread].delay; inFile>>thread_info[n_thread++].persist; inFile.get(); } for(int i=0;i<(int)(n_thread);i++) { if (thread_info[i].entity==READER||thread_info[i].entity =='r'){ h_Thread[i]=CreateThread(NULL,0, (LPTHREAD_START_ROUTINE)(RP_ReaderThread),&thread_info[i],0,&thread_ID);//创建读者进程 } else{ h_Thread[i]=CreateThread(NULL,0, (LPTHREAD_START_ROUTINE)(RP_WriterThread),&thread_info[i],0,&thread_ID);//创建写者进程 } } //等待所有的线程结束 wait_for_all=WaitForMultipleObjects(n_thread,h_Thread,TRUE,-1); printf("所有的读写线程结束操作.\n"); }
4.CreateMutex(),CreateThread()两个函数什么含义,通过查阅帮助文档,给出函数内各个参数的含义。
CreateMutex()作用是找出当前系统是否已经存在指定进程的实例。如果没有则创建一个互斥体。第一个参数为指向安全属性的指针;第二个参数为互斥对象的所有者,第三个参数为指向互斥对象名的指针。
CreateThread()将在主线程的基础上创建一个新线程。第一个参数是指向SECURITY_ATTRIBUTES型态的结构的指针;第二个参数是用于新线程的初始堆栈大小,默认值为0;第三个参数是指向线程函数的指标;第四个参数为传递给ThreadProc的参数;第五个参数通常为0,但当建立的线程不马上执行时为旗标CREATE_SUSPENDED;第六个参数是一个指标,指向接受执行绪ID值的变量。
5.简单叙述一下读者——写者问题的同步和互斥关系:
读者申请互斥信号量,阻止写者进行操作,自己进行读操作,读者数加一;然后读者可以同时进行读文件,最后一个读者离开时,释放互斥信号量,写者可以进行写操作。写者申请互斥信号量,组织其他写者或者读者进行操作,写完之后释放互斥信号量。
最后附上心爱的源代码——————
#include"windows.h" #include<conio.h> #include<stdlib.h> #include<fstream.h> #include<io.h> #include<string.h> #include <stdio.h> #define READER 'R' //读者 #define WRITER 'W' //写者 #define INTE_PER_SEC 1000 //每秒时钟中断的数目 #define MAX_THREAD_NUM 64 //最大线程数 #define MAX_FILE_NUM 32 //最大文件数目 #define MAX_STR_LEN 32 //字符串的长度 int readcount=0; int writecount=0; CRITICAL_SECTION RP_Write;//读者数目 CRITICAL_SECTION cs_Write;//写者数目 CRITICAL_SECTION cs_Read; //临界资源 struct ThreadInfo{ int serial;//线程序号 char entity;//线程类别(判断是读者还是写者进程) double delay;//线程延迟时间 double persist;//线程读写操作时间 }; //读者优先-读者线程 //P:读者线程信息 void RP_ReaderThread(void *p){ HANDLE h_Mutex;//互斥变量 h_Mutex=OpenMutex(MUTEX_ALL_ACCESS,FALSE,"mutex_for_readcount"); DWORD wait_for_mutex; //等待互斥变量的所有权 DWORD m_delay; //延迟时间 DWORD m_persist;//读文件持续时间 int m_serial; //从参数中获得信息 m_serial=((ThreadInfo*)(p))->serial; m_delay=(DWORD)(((ThreadInfo*)(p))->delay *INTE_PER_SEC); m_persist=(DWORD)(((ThreadInfo*)(p))->persist *INTE_PER_SEC); Sleep(m_delay); //延迟等待 printf("读线程 %d 发出读请求.\n",m_serial); //等待互斥信号,保证对ReadCount的访问,修改互斥 wait_for_mutex=WaitForSingleObject(h_Mutex,-1); //读者数目增加 readcount++; if(readcount==1){ //第一个读者,等待资源 EnterCriticalSection(&RP_Write); } ReleaseMutex(h_Mutex); //读文件 printf("读线程 %d 开始读文件.\n",m_serial); Sleep(m_persist); //退出线程 printf("读线程 %d 结束读文件.\n",m_serial); //等待互斥信号,保证对ReadCount的访问,修改互斥 wait_for_mutex=WaitForSingleObject(h_Mutex,-1); //读者数目减少 readcount--; if(readcount==0){ LeaveCriticalSection(&RP_Write); } ReleaseMutex(h_Mutex); } //读者优先--写者进程 //P:写者线程信息 void RP_WriterThread(void*p){ DWORD m_delay;//延迟时间 DWORD m_persist; //写文件持续时间 int m_serial; //线程序号 //从参数中获取信息 m_serial=((ThreadInfo*)(p))->serial; m_delay=(DWORD)(((ThreadInfo*)(p))->delay*INTE_PER_SEC); m_persist=(DWORD)(((ThreadInfo*)(p))->persist*INTE_PER_SEC); Sleep(m_delay); printf("写线程%d发出写请求***.\n",m_serial); //等待资源 EnterCriticalSection(&RP_Write); //写文件 printf("写线程%d开始写文件.\n",m_serial); Sleep(m_persist); //退出线程 printf("写线程%d结束写文件.\n",m_serial); //释放资源 LeaveCriticalSection(&RP_Write); } //读者优先处理函数 void ReaderPriority(char*file){ DWORD n_thread=0; DWORD thread_ID; DWORD wait_for_all; HANDLE h_Mutex; h_Mutex=CreateMutex(NULL, FALSE,"mutex_for_readcount"); HANDLE h_Thread[MAX_THREAD_NUM]; ThreadInfo thread_info[MAX_THREAD_NUM]; readcount=0; InitializeCriticalSection(&RP_Write); ifstream inFile; inFile.open(file, ios::nocreate); if (inFile.rdstate()==ios::failbit) { printf("打开文件\"%os\"失败!请将\"%s\"放程序目录下。\n",file,file); return; } printf("读者优先:\n\n"); while(inFile){ inFile>>thread_info[n_thread].serial; inFile>>thread_info[n_thread].entity; inFile>>thread_info[n_thread].delay; inFile>>thread_info[n_thread++].persist; inFile.get(); } for(int i=0;i<(int)(n_thread);i++) { if (thread_info[i].entity==READER||thread_info[i].entity =='r'){ h_Thread[i]=CreateThread(NULL,0, (LPTHREAD_START_ROUTINE)(RP_ReaderThread),&thread_info[i],0,&thread_ID);//创建读者进程 } else{ h_Thread[i]=CreateThread(NULL,0, (LPTHREAD_START_ROUTINE)(RP_WriterThread),&thread_info[i],0,&thread_ID);//创建写者进程 } } //等待所有的线程结束 wait_for_all=WaitForMultipleObjects(n_thread,h_Thread,TRUE,-1); printf("所有的读写线程结束操作.\n"); } int main(int argc, char *argv[]){ ReaderPriority("input.txt"); printf("\n按任意键结束:"); getch(); return 0; }