进程间通信
概念
在之前的学习中,我们知道进程是具有独立性的,现在却需要让进程之间进行通信,代价肯定是比较大的,而且为什么要进行通信呢?这就是通信的目的;通信该如何完成呢?
既然进程需要将数据传递给双方,肯定需要一块内存空间用来存放数据,这块空间不能由进程其中一方提供,因为进程是具有独立性的;进行通信时,除了进程双方,操作系统也参与进来,所以需要操作系统直接或间接给进程进程双方提供内存空间,让进程双方都能“看到”这一资源;不同的通信,所需要的空间也不同
目的
数据传输:一个进程需要将自己的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源
通知事件:一个进程需要向另一个或一组进程发送消息,通知
进程控制:有些进程希望完全控制另一个进程的执行,此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变
这里介绍一个简单的场景:cat file|grep 'hello'该指令是,将文件file的内容打印到显示器上,不过呢,先需要经过grep进行过滤,才会将内容进行打印;两个进程之间通过|完成了某种意义上的联系,这里也就是通信
方式
System V:
可以使进程跨主机进行通信
POSIX IPC:
将进程聚焦在本地进行通信
管道:
匿名管道
命名管道
前两种较为复杂,这里就学习以管道的方式进行通信
管道
概念
管道是unix中最古老的进程间通信的方式;将一个进程连接到另一个进程的一个数据流称作一个管道
父进程通过调用管道特定的系统调用,以读方式和写方式打开一个内存级文件,并通过创建子进程的方式,被字进程继承之后,关闭各自的读写端,进而形成一个通信信道
匿名管道
在上一章节中学习到,当在磁盘上打开文件时,操作系统会生成对应的struct file进行管理;通过管道进行通信时,也存在文件,称作管道文件,属于内存级文件,没有名称,称作匿名管道
图解:
父进程通过文件描述符表,映射到对应的文件结构体;子进程继承父进程的文件描述表,指向同一个结构体;父进程可以通过结构体将内容写到磁盘中,子进程再到磁盘上进行读,不过这样做,效率太低,毕竟是通信;所以父进程所打开的是管道文件,也就是在内存中;结构体提供了父子进程都能够使用的内存空间(缓冲区)
内存空间已经提供,接下来就是父子进程的通信过程
图解:
父进程之所以需要以读写两中方式打开管道文件,是因为如果只以一种方式打开文件的话,子进程也就继承父进程打开文件的方式,并且两者之间不能进行通信
匿名管道目前用来进行父子进程之间的通信
实例代码
先介绍创建管道的函数
int pipe(int pipefd[2]);
数组中第一个元素表示读端
pipefd[0] refers to the read end of the pipe
数组中第二个元素表示写端
pipefd[0] refers to the read end of the pipe
返回值
On success, zero is returned. On error, -1 is returned, and errno is set appropriately
代码演示
using namespace std; int main() { int fds[2]; int n=pipe(fds); assert(n==0); cout<<"fds[0]"<<fds[0]<<endl; cout<<"fds[1]"<<fds[1]<<endl; return 0; }
由结果可知,读写端分别对应文件描述符表下标的3,4
makefile
mypipe:mypipe.cpp g++ -o $@ $^ -std=c++11 .PHONY:clean clean: rm -f mypipe
mypipe.cpp
头文件
#include<iostream> #include<cstdio> #include<cstring> #include<string> #include<cassert> #include<unistd.h> #include<sys/types.h> #include<sys/wait.h>
子进程每隔五秒写一次数据
int main() { //父进程以读写打开文件 int fds[2]; int n=pipe(fds); assert(n==0); //创建子进程 pid_t id=fork(); assert(id>=0); //子进程 if(id==0) { //子进程进行写入,关闭读端 close(fds[0]); const char *s=" 我是子进程,正在给你发消息 "; int cnt=0; while(true) { cnt++; char buffer[1024]; snprintf(buffer,sizeof buffer,"child->parent say: %s[%d][%d]",s,cnt,getpid()); //写端写满时,继续写入会发生堵塞,只能等待父进程读取 write(fds[1],buffer,strlen(buffer)); cout<<"count: "<< cnt <<endl; sleep(5); } //关闭写端 close(fds[1]); cout<<"子进程关闭写端"<<endl; exit(0); } //父进程关闭写端 close(fds[1]); //父进程通信代码 while(true) { char buffer[1024]; cout<<"*************************"<<endl; ssize_t s=read(fds[0],buffer,sizeof(buffer)-1); cout<<"#########################"<<endl; if(s>0) { buffer[s]=0; } cout<<"Get Message# "<<buffer<<"|my pid"<<getpid()<<endl; } //回收进程 n=waitpid(id,nullptr,0); assert(n==id); return 0; }
此时如果管道中没有数据,读端,默认会发生堵塞
写端一直写,读端休眠1000秒
int main() { //父进程以读写打开文件 int fds[2]; int n=pipe(fds); assert(n==0); //创建子进程 pid_t id=fork(); assert(id>=0); //子进程 if(id==0) { //子进程进行写入,关闭读端 close(fds[0]); const char *s=" 我是子进程,正在给你发消息 "; int cnt=0; while(true) { cnt++; char buffer[1024]; snprintf(buffer,sizeof buffer,"child->parent say: %s[%d][%d]",s,cnt,getpid()); //写端写满时,继续写入会发生堵塞,只能等待父进程读取 write(fds[1],buffer,strlen(buffer)); cout<<"count: "<< cnt <<endl; } //关闭写端 close(fds[1]); cout<<"子进程关闭写端"<<endl; exit(0); } //父进程关闭写端 close(fds[1]); //父进程通信代码 while(true) { sleep(1000); char buffer[1024]; cout<<"*************************"<<endl; ssize_t s=read(fds[0],buffer,sizeof(buffer)-1); cout<<"#########################"<<endl; if(s>0) { buffer[s]=0; } cout<<"Get Message# "<<buffer<<"|my pid"<<getpid()<<endl; } //回收进程 n=waitpid(id,nullptr,0); assert(n==id); return 0; }
写端将内存空间写满时,再进行写入会发生堵塞,需要等待读端进行读取
子进程写一次直接退出,并且关闭自己的写端
int main() { //父进程以读写打开文件 int fds[2]; int n=pipe(fds); assert(n==0); //创建子进程 pid_t id=fork(); assert(id>=0); //子进程 if(id==0) { //子进程进行写入,关闭读端 close(fds[0]); const char*s="我是子进程,正在给你发消息"; int cnt=0; while(true) { cnt++; char buffer[1024]; snprintf(buffer,sizeof buffer,"child->parent say:%s[%d][%d]",s,cnt,getpid()); //写端写满时,继续写入会发生堵塞,只能等待父进程读取 write(fds[1],buffer,strlen(buffer)); cout<<"count: "<<cnt<<endl; break; } //关闭写端 close(fds[1]); cout<<"子进程关闭写端"<<endl; exit(0); } //父进程关闭写端 close(fds[1]); //父进程通信代码 while(true) { char buffer[1024]; ssize_t s=read(fds[0],buffer,sizeof(buffer-1)); if(s>0) { buffer[s]=0; cout<<"Get Message"<<buffer<<"|my pid"<<getpid()<<endl; } else if(s==0) { //读取到文件末尾 cout<<"read:"<<s<<endl; break; } } int status=0; n=waitpid(id,&status,0); assert(n==id); cout<<"pid->"<<n<<" "<<(status&0x7F)<<endl; return 0; }
父进程进行读取,将内容读取完之后,直接退出进程
子进程一直写,父进程读取一次直接退出
int main() { //父进程以读写打开文件 int fds[2]; int n=pipe(fds); assert(n==0); //创建子进程 pid_t id=fork(); assert(id>=0); //子进程 if(id==0) { //子进程进行写入,关闭读端 close(fds[0]); const char*s="我是子进程,正在给你发消息"; int cnt=0; while(true) { cnt++; char buffer[1024]; snprintf(buffer,sizeof buffer,"child->parent say:%s[%d][%d]",s,cnt,getpid()); //写端写满时,继续写入会发生堵塞,只能等待父进程读取 write(fds[1],buffer,strlen(buffer)); cout<<"count: "<<cnt<<endl; } //关闭写端 close(fds[1]); cout<<"子进程关闭写端"<<endl; exit(0); } //父进程关闭写端 close(fds[1]); //父进程通信代码 while(true) { char buffer[1024]; ssize_t s=read(fds[0],buffer,sizeof(buffer-1)); if(s>0) { buffer[s]=0; cout<<"Get Message"<<buffer<<"|my pid"<<getpid()<<endl; } else if(s==0) { //读取到文件末尾 cout<<"read:"<<s<<endl; break; } break; } close(fds[0]); cout<<"父进程关闭读端"<<endl; int status=0; n=waitpid(id,&status,0); assert(n==id); cout<<"pid->"<<n<<" "<<(status&0x7F)<<endl; return 0; }
操作系统会直接终止写段,杀死进程
特点
只能用于具有公共祖先的进程之间的进程;通常,一个管道由一个进程创建,然后该进程调用fork,此后父子进程之间就可使用该管道
管道提供流式服务
进程退出,管道释放,管道生命周期随进程
内核会对管道操作进行同步和互斥
管道是半双工,数据只能向一个方向流动;如果需要双方通信,需要建立两个管道
进程池
父进程通过管道创建多个子进程:向每个管道中都写入不同的任务,每个子进程都读入到相应的任务并执行
图解:
#include <iostream> #include <ctime> #include <sys/types.h> #include <unistd.h> #include <vector> #include <cassert> #include <string> #include <cstdio> #include <sys/types.h> #include <sys/wait.h> #define MakeSeed() srand((unsigned long)time(nullptr)^getpid()) #define PROCESS_NUM 8 using namespace std; typedef void(*func_t)(); void DownLoadTask() { cout<<getpid()<<": 下载任务\n"<<endl; sleep(1); } void IOTask() { cout<<getpid()<<": IO任务\n"<<endl; sleep(1); } void FlushTask() { cout<<getpid()<<": 刷新任务\n"<<endl; sleep(1); } void LoadTaskFunc(vector<func_t>* func_t) { assert(func_t); func_t->push_back(DownLoadTask); func_t->push_back(IOTask); func_t->push_back(FlushTask); } class subEP { public: subEP(pid_t subid,int writefd) :_subid(subid) ,_writefd(writefd) { char namebuffer[1024]; snprintf(namebuffer,sizeof namebuffer,"process-%d[pid(%d)-fd(%d)]",_num++,_subid,_writefd); _name=namebuffer; } public: static int _num; string _name; pid_t _subid; int _writefd; }; int subEP::_num=0; void SentTask(const subEP &process,int tasknum) { cout<<"send task num: "<<tasknum<<" to -> "<<process._name<<endl; int n=write(process._writefd,&tasknum,sizeof tasknum); assert(n==sizeof(int)); (void)n; } int recvTask(int readfd) { int code=0; ssize_t s=read(readfd,&code,sizeof code); if(s==4) return code; else if(s<=0) return -1; else return 0; } void CreateSubProcess(vector<subEP>* subs,vector<func_t>& funcmap) { vector<int> deletefd; for(int i=0;i<PROCESS_NUM;i++) { int fds[2]; int n=pipe(fds); assert(n==0); (void)n; pid_t id=fork(); //子进程,处理任务 if(id==0) { for(int i=0;i<deletefd.size();i++) close(deletefd[i]); //子进程关闭写端 close(fds[1]); while(true) { //1.获取命令码,如果没有发送,子进程应该阻塞 int CommandCode=recvTask(fds[0]); if(CommandCode>=0 && CommandCode<funcmap.size()) { funcmap[CommandCode](); } else if(CommandCode==-1) break; } exit(0); } //父进程关闭读端 close(fds[0]); subEP sub(id,fds[1]); subs->push_back(sub); deletefd.push_back(fds[1]); } } void LoadBlanceContrl(vector<subEP>& subs,const vector<func_t>&funcmap,int count) { int processnum=subs.size(); int tasknum=funcmap.size(); bool forever=(count==0?true:false); while(true) { //1.选择一个子进程 int subIndex=rand()%processnum; //2.选择一个任务 int taskIndex=rand()%tasknum; //3.任务发送给选择的进程 SentTask(subs[subIndex],taskIndex); sleep(1); if(!forever) { count--; if(count==0) break; } } //写端停止,读端读取到结束 for(int i=0;i<processnum;i++) { close(subs[i]._writefd); } } void WaitProcess(vector<subEP> processes) { int processnum=processes.size(); for(int i=0;i<processnum;i++) { waitpid(processes[i]._subid,NULL,0); cout<<"wait process success ... "<<processes[i]._subid<<endl; } } int main() { //时间种子 MakeSeed(); //1.建立子进程并创建子进程通信管道 //1.1.加载方法表 vector<func_t> funcmap; LoadTaskFunc(&funcmap); //1.2.创建子进程,维护父子通信信道(管道) vector<subEP> subs; //1.3.给子进程分配函数 CreateSubProcess(&subs,funcmap); //2.父进程,控制子进程,负载均衡地向子进程发送命令码 int TaskCnt=3;//0,永远进行 LoadBlanceContrl(subs,funcmap,TaskCnt); //3.回收子进程信息 WaitProcess(subs); return 0; }