进程通信概念
什么是进程通信
首先我们清楚,进程是具有独立性的,如果想让进程通信,那么成本一定不低。
数据传输:一个进程需要将它的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源。
通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
为什么要有进程通信
因为有时候我们是需要多进程协同去完成某种任务的。
怎么进行通信
目前通信有两套标准:
POSIX——让通信过程可以跨主机
System V——聚焦在本地通信(比较陈旧的标准)
重点:共享内存
管道
管道是Unix中最古老的进程间通信的形式。
我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”。
管道是基于文件系统的。
那么两个进程通信:
第一个条件就是操作系统需要给双方进程提供内存空间。
第二个条件是要通信的进程看到同一份资源。(一般都是由操作系统直接或间接提供的)
不同的通信种类本质是:
上面所说的资源是操作系统的哪一个模块提供的。
匿名管道
父进程指向了一份文件,然后创建了子进程,子进程拷贝了父进程的代码和文件表,但是文件没被拷贝,这个时候父子进程看到的就是同一份文件,也就是同一份资源。
这一步才是通信的前提。
父进程往文件的缓冲区写数据,子进程从缓冲区读数据,这个就是进程之前的通信,这个方法及操作系统提供的内核文件,称为管道文件。(管道本质上就是文件)
那么需不需要将文件缓冲区的内容经过磁盘呢?之前说过打开一个文件是通过磁盘,但是因为磁盘是外设,效率很慢,所以我们实际上并不会这样。
就算不是在磁盘当中打开的文件,操作系统也能创建一个struct file对象,申请一个缓冲区。
所以这里说的管道其实是内存级文件,他不关心在磁盘的哪个路径下,要不要被写到磁盘,只要创建对象和缓冲区然后将地址添加到对应的文件描述符表就可以了。到时候操作系统会将这个文件变成管道文件。
那么我们如何让两个进程看到同一个管道文件呢?
首先清除fork是创建子进程的,子进程会继承父进程的文件地址,这样就能看到同一份管道文件了,但是这个文件并没有名字,所以叫做匿名管道。
创建匿名管道的过程
首先是父进程创建一个匿名管道。
分别以读和写的方式打开同一个文件。
然后是创建子进程,子进程会继承父进程对于这个文件的读写方式。
最后一部就是让父进程关闭读端,子进程关闭写端,这样就能让父进程给子进程读取数据了。
一般而言,我们管道只能用来单项数据通信。
管道就是输送资源的,就是数据。
这里我们来实现一下父子进程之间的通信:
这里说一下:CXX,CPP,CC都是C++源文件的后缀。
首先来了解一下创建管道的函数;
这个函数的参数是一个输出型参数,储存的是读端和写端,比如说文件描述符中,3和4是在读端和写端,那么就把3和4储存到这个数组当中。(pipefd[0]是读端,pipefd[1]是写端)这就相当于用读端和写端同时打开了一个文件。
#include <iostream> #include <unistd.h> #include <cassert> #include <sys/types.h> #include <sys/wait.h> #include <cstring> using namespace std; int main() { //第一步创建管道 int fds[2]; int n = pipe(fds); assert(n == 0); //第二部创建子进程 pid_t id = fork(); assert(id >= 0); if(id == 0)//子进程 { close(fds[1]);//子进程关闭写入 while(true) { char buffer[1024]; ssize_t s = read(fds[0],buffer,sizeof(buffer)+1);//这里读取的内容永远少1是防止超过这个内容没有位置放\0 if(s > 0) buffer[s] = 0; cout << buffer << " | my pid:" << getpid() << endl; //这里并没有休眠 } close(fds[0]); exit(0); } close(fds[0]);//父进程关闭读取 const char* s = "我是父进程,我在给你发消息"; int cnt = 0; while(true) { cnt++; char buffer[1024]; snprintf(buffer,sizeof buffer,"parent -> child say:%s[%d][%d]",s,cnt,getpid()); write(fds[1],buffer,strlen(buffer));//这里传入的长度不用+1,因为系统不是通过\0来判定字符串结尾的 sleep(1);//每隔1s写一次 } close(fds[1]); n = waitpid(id,nullptr,0); assert(n == id); return 0; }
通过这个可以发现父进程确实在给子进程发消息。
这种通信,称之为管道通信。
这个过程其实就相当于父进程通过操作系统写给管道,也就是相当于写给操作系统,然后子进程通过操作系统从管道当中读取内容。
管道读写的特性
读写特征:
1.上面的代码是隔着1s才会写入一次,如果改成sleep(5)就是5s写入一次。
这说明如果管道没有数据了,读端在读,默认会直接阻塞当前正在读取的进程,只有管道有数据,操作系统识别到,读端才会去读取数据。
2. 管道是一个固定大小的缓冲区。
如果将读端一直不读,写端一直写,那么缓冲区是会被写满的。
#include <iostream> #include <unistd.h> #include <cassert> #include <sys/types.h> #include <sys/wait.h> #include <cstring> using namespace std; int main() { //第一步创建管道 int fds[2]; int n = pipe(fds); assert(n == 0); //第二部创建子进程 pid_t id = fork(); assert(id >= 0); if(id == 0)//子进程 { close(fds[1]);//子进程关闭写入 while(true) { sleep(1000); char buffer[1024]; ssize_t s = read(fds[0],buffer,sizeof(buffer)+1);//这里读取的内容永远少1是防止超过这个内容没有位置放\0 if(s > 0) buffer[s] = 0; cout << buffer << " | my pid:" << getpid() << endl; } close(fds[0]); exit(0); } close(fds[0]);//父进程关闭读取 const char* s = "我是父进程,我在给你发消息"; int cnt = 0; while(true) { cnt++; char buffer[1024]; snprintf(buffer,sizeof buffer,"parent -> child say:%s[%d][%d]",s,cnt,getpid()); write(fds[1],buffer,strlen(buffer));//这里传入的长度不用+1,因为系统不是通过\0来判定字符串结尾的 //sleep(1);//每隔1s写一次 cout << "count:" << cnt << endl; } close(fds[1]); n = waitpid(id,nullptr,0); assert(n == id); return 0; }
这里就是管道被写满了,如果再写就会把原来的数据覆盖。
写端写满的时候,写端会阻塞,等待读端读取。
如果写端一直写,读端一直读呢?
#include <iostream> #include <unistd.h> #include <cassert> #include <sys/types.h> #include <sys/wait.h> #include <cstring> using namespace std; int main() { //第一步创建管道 int fds[2]; int n = pipe(fds); assert(n == 0); //第二部创建子进程 pid_t id = fork(); assert(id >= 0); if(id == 0)//子进程 { close(fds[1]);//子进程关闭写入 while(true) { sleep(2); char buffer[1024]; ssize_t s = read(fds[0],buffer,sizeof(buffer)+1);//这里读取的内容永远少1是防止超过这个内容没有位置放\0 if(s > 0) buffer[s] = 0; cout << buffer << " | my pid:" << getpid() << endl; } close(fds[0]); exit(0); } close(fds[0]);//父进程关闭读取 const char* s = "我是父进程,我在给你发消息"; int cnt = 0; while(true) { cnt++; char buffer[1024]; snprintf(buffer,sizeof buffer,"parent -> child say:%s[%d][%d]",s,cnt,getpid()); write(fds[1],buffer,strlen(buffer));//这里传入的长度不用+1,因为系统不是通过\0来判定字符串结尾的 //sleep(1);//每隔1s写一次 cout << "count:" << cnt << endl; } close(fds[1]); n = waitpid(id,nullptr,0); assert(n == id); return 0; }
管道是按照指定大小读取的,而不是一条一条读。
3.如果写端关闭,读端会发生什么呢?
#include <iostream> #include <unistd.h> #include <cassert> #include <sys/types.h> #include <sys/wait.h> #include <cstring> using namespace std; int main() { //第一步创建管道 int fds[2]; int n = pipe(fds); assert(n == 0); //第二部创建子进程 pid_t id = fork(); assert(id >= 0); if(id == 0)//子进程 { close(fds[1]);//子进程关闭写入 while(true) { //sleep(2); char buffer[1024]; ssize_t s = read(fds[0],buffer,sizeof(buffer)+1);//这里读取的内容永远少1是防止超过这个内容没有位置放\0 if(s > 0) { buffer[s] = 0; cout << buffer << " | my pid:" << getpid() << endl; } else if(s == 0) { cout << "end" << endl; break; } } close(fds[0]); exit(0); } close(fds[0]);//父进程关闭读取 const char* s = "我是父进程,我在给你发消息"; int cnt = 0; while(true) { cnt++; char buffer[1024]; snprintf(buffer,sizeof buffer,"parent -> child say:%s[%d][%d]",s,cnt,getpid()); write(fds[1],buffer,strlen(buffer));//这里传入的长度不用+1,因为系统不是通过\0来判定字符串结尾的 //sleep(1);//每隔1s写一次 break; } close(fds[1]); n = waitpid(id,nullptr,0); assert(n == id); return 0; }
写端关闭,读端返回0。
4.读关闭会发生什么呢?
操作系统当然会自己判断,如果都不读了,就没必要去写了,会发送给进程一个信号,终止写端。
这里操作系统会发送13信号。
管道本身的特征
1.管道的生命周期是进程的生命周期
2.管道可以用来进行具有血缘关系的进程之间进行通信,常用于父子通信。
3.管道是面向字节流的(网络)
4.半双工——单向通信(别名)
5.互斥同步机制——对共享资源进行保护的方案。(读写的特性)
在平时使用 | 这种的,比如:
sleep 10000 | sleep 200
这就是匿名管道,操作系统会创建父子进程,然后通过管道连接起来,其实命令行解释器就是会去寻找 | 然后进行一系列操作。
基于管道的进程池设计
思路:
这是用一个进程去控制多个进程,这个进程收到某个指令,然后发给这些进程的其中之一,假如说要发1就是帮忙打印东西,2就是帮忙计算等等功能(具体功能不重要,这里不实现),发的是几,功能就是对应的。
这里要注意,不能总让一个子进程工作,要分工做。
#include <iostream> #include <unistd.h> #include <cstring> #include <fcntl.h> #include <cassert> #include <sys/types.h> #include <sys/stat.h> #include <sys/wait.h> #include <vector> #include <string> using namespace std; #define PROCSS_NUM 5//创建子进程的数量 #define MakeSeed() srand((unsigned long)time(nullptr)^getpid()^rand()%1234)//生成伪随机数 typedef void(*func_t)();//重命名函数指针,其实就是命名为func_t //处理条件 void downLoadTask() { cout << getpid() << ":下载任务" << endl; sleep(1); } void ioTask() { cout << getpid() << ":IO任务" << endl; sleep(1); } void flushTask() { cout << getpid() << ":刷新任务" << endl; sleep(1); } void loadTaskFunc(vector<func_t>& func_t) { func_t.push_back(downLoadTask); func_t.push_back(ioTask); func_t.push_back(flushTask); } class subEd//子进程的信息 { public: subEd(pid_t id,int fd) :_subId(id) ,_writeFd(fd) { char nameBuffer[1024]; snprintf(nameBuffer,sizeof nameBuffer,"process-%d[pid(%d)-fd(%d)]",num++,_subId,_writeFd);//进程的名字是进程编号+pid+写端口 _name = nameBuffer; } public: static int num;//进程的编号 string _name;//进程名字 pid_t _subId;//进程pid int _writeFd;//进程写端 }; int subEd::num = 0; int recvTask(int readFd) { int code = 0; ssize_t s = read(readFd,&code,sizeof(int)); if(s==4) return code; else return -1; } void createSubProcess(vector<subEd>&arr,vector<func_t>&funcMap) { for(int i = 0;i < PROCSS_NUM; i++) { int fds[2]; int n = pipe(fds); assert(n==0); (void)n;//这里防止Release之后上面的assert直接被忽略,导致n被判断没被使用 pid_t id = fork(); if(id == 0) { //子进程处理任务 close(fds[1]);//关闭写端 while(1) { //获取命令码,如果没发送,子进程阻塞 int commandCode = recvTask(fds[0]);//读取父进程发来的信息 //完成任务 if(commandCode >= 0 && commandCode < funcMap.size()) funcMap[commandCode]();//调用处理条件的信息表 else break;//如果没有可读取的内容就跳出循环,就等于写端关闭,读端也关闭 } exit(0); } close(fds[0]);//父进程关闭读端 subEd sub(id,fds[1]);//创建子进程id和父进程写fd储存的对象,这里要注意,走到这里的是父进程,父进程拿到的返回值是子进程的pid arr.push_back(sub);//放入子进程信息的数组中 } } void sendTask(subEd& s,int taskIdx) { cout << "send task num:" << taskIdx << "send to ->" << s._name << endl; int n = write(s._writeFd,&taskIdx,sizeof(taskIdx));//这里为了安全必须发送的是四个字节 assert(n == sizeof(int)); (void)n; } void loadBlanceContrl(vector<subEd>&arr,vector<func_t>&funcMap,int count) { int processsum = arr.size();//一共多少个子进程 int tasksum = funcMap.size();//一共多少种方案 int flag = 0; if(count > 0) flag = count; while(1) { //随机选择一个子进程 int idx = rand() % processsum; //选择一个任务 int taskIdx = rand() % tasksum; //任务发送给选择的进程 sendTask(arr[idx], taskIdx); sleep(1); if(flag) { count--; if(count == 0) break; } } //这里关闭父进程的写端 for(int i = 0; i < processsum; i++) close(arr[i]._writeFd); } void waitProcess(vector<subEd>&arr) { for(int i = 0; i < arr.size();i++) { waitpid(arr[i]._subId,nullptr,0); cout << "wait sub process success ...:" << arr[i]._subId << endl; } } int main() { MakeSeed();//随机数的开始 //创建子进程和父进程之前的通信 vector<func_t>funcMap;//储存子进程处理的条件 vector<subEd>arr;//储存子进程的pid和要处理的信息 loadTaskFunc(funcMap); createSubProcess(arr,funcMap); //父进程 int taskcnt = 5;//假设等于0就是永远进行 loadBlanceContrl(arr,funcMap,taskcnt); //回收子进程 waitProcess(arr); return 0; }