1.进程为什么要通信?
进程也是需要某种协同的,所以如何协同的前提条件(通信)
通信数据的类别:
1.通知就绪的
2.单纯的数据
3.控制相关的信息
2.进程如何通信?
进程间通信,成本会高一点
进程间通信的前提,先让不同的进程,看到同一份(资源,一段内存)
一定是某一个进程需要通信,让os创建一个共享资源
os必须提供很多的系统调用,os创建的共享资源不同,系统调用接口不同,进程间通信会用不同的种类
3.进程通信的常见方式是什么?
消息队列
共享内存
信号量
直接复用内核代码直接通信呢?
管道:
1.命名管道
2.匿名管道
1.管道(匿名管道)
我们之前在父子进程那里,我们说过父进程和子进程是相互独立的,但是子进程可以继承父进程的好多属性,把紫色的框的内容都拷贝一份给子进程,保证了struct_file_struct,中指针数组中存在的文件结构体地址是一样的
这样一来父进程和子进程就可以操控同一个内核级缓冲区了,如果子进程要发消息给父进程,因为只有一个内核级缓冲区提供使用,也是为了防止误写,我们把父进程的文件操作符下标为4的关闭,让父进程不能往内核级缓冲区内写入,(父进程的写入对应的文件操作符关闭,并不影响子进程的写入,存在内核级的引用计数,父进程关闭,引用计数-1),同时让子进程把文件操作符下标为3的关闭,子进程不能读内核缓冲区的数据。struct_file内存在系统调用函数的函数指针,方便操作底层,让子进程调用write(),写入对应的内核级缓冲区,父进程调用read(),读出子进程写入的数据,实现了父子进程间的通信。
1.为什么我们子进程主动close(0/1/2),不影响父进程继续使用显示器文件呢?
上面黄色的地方已解答
2.父子进程既然关闭不需要的fd(文件操作符),为什么要曾经打开呢?可以不关闭吗?
为了让子进程继承读写方式的操作,因为不知道此时子进程要读还是写,可以不关闭,建议关了,会出现误写操作。
上面实现父子进程通信出现的问题就是,写入内核级缓冲区的内容,操作系统会把他们刷新到磁盘文件中去log.txt,为了解决这个问题,提出一个函数pipe()
2.pipe()函数
头文件:#include <unistd.h>
作用:相当提供一个匿名的文件,提供一个数组,数组大小为2,保存该进程的匿名文件的文件操作符,向匿名文件的内核级缓冲区写入的时候,不会刷新到磁盘中去。
返回值:返回0,成功,返回-1,失败
类似于open函数
pipefd[0]保存读匿名文件的文件结构体的文件操作符
pipefd[1]保存写匿名文件的文件结构体的文件操作符
站在文件描述符角度-深度理解管道
为什么让管道单向通信?
简单->不会出现误写,也不用考虑在内核级缓冲区中是父进程写入子进程读或者相反
如果要双向通信?
两个管道可以解决,一个管道父进程给子进程写,一个管道子进程给父进程写
3.代码实现匿名管道实现父子进程通信
makefile
pipe:pipe.cc g++ -o pipe pipe.cc -std=c++11 .PHONY:clean clean: rm -rf pipe
pipe.cc
#include<iostream> #include<unistd.h> #include<string.h> #include <sys/types.h> #include <unistd.h> using namespace std; const int size=1024; string getstring() { static int cnt=0; string messageid=to_string(cnt); cnt++; pid_t id=getpid(); string stringpid=to_string(id); string message="messageid: "; message+=messageid; message+="my pid is:"; message+=stringpid; return message; } void childwrite(int wfd) { string message="father ,i AM youchild"; while(true) { string info=message+getstring(); write(wfd,info.c_str(),info.size()); sleep(1); } } void fatherread(int rfd) { char inbuffer[size]; while(true) { ssize_t n=read(rfd,inbuffer,sizeof(inbuffer)-1);//没必要往文件中写入\0 cout<<inbuffer<<endl; } sleep(1); } int main() { int pipefd[2]; int n=pipe(pipefd); if(n!=0) { perror("pipe fail"); } cout<<"pipefd[0]:"<<pipefd[0]<<"pipefd[1]:"<<pipefd[1]<<endl; pid_t id=fork(); if(id==0) { cout<<"子进程开始发消息了"<<endl; close(pipefd[0]); childwrite(pipefd[1]); close(pipefd[1]); exit(0); } sleep(1); close(pipefd[1]); cout<<"父进程开始接受消息了"<<endl; fatherread(pipefd[0]); sleep(5); close(pipefd[0]); }
代码解释:pipe函数给该进程分配两个文件操作符,由于默认进程打开标准输入流,标准输出流,标准错误流,占了文件操作符0,1,2,所以从3开始分配,pipe返回值不等于0表示创建匿名文件失败,等于0,创建成功,pipefd[0]放的是读该文件的文件操作符,pipefd[1]放的是写该文件的文件操作符。
fork之后创建子进程,子进程关闭自己的读的文件操作符,通过childwrite()函数给该匿名文件的内核级缓冲区写入,父进程在fatherread中读取该文件内核级缓冲区的内容,实现了父子进程通信
管道的4种情况
1.如果管道内部是空的&&write fd没有关闭,读取条件不具备,读进程会被阻塞,等到读取条件具备,写入管道数据以满足读取条件(子进程写入缓冲区的时间变长,父进程读取阻塞)
2.管道被写满&&read fd不读且没有关闭,管道被写满,写进程会被阻塞(写条件不具备)->wait–写条件具备<-读取数据来满足写条件具备(父进程sleep时间变长,一直不读,管道被写满)
3.管道一直读&&写端关闭wfd,读端read返回值为0,表示读到了文件结尾(子进程写一个字符,直接break)
4.rwd直接关闭,写端wfd一直在进行写入?写端进程会被操作系统直接使用13号信号关闭,相当于进程出现了异常
管道的5种特征
1.匿名管道:只用来进行具有血缘关系的进程之间,进行通信。常用与父子进程之间通信
2.管道内部,自带进程之间的同步机制(多执行流执行代码的时候,具有明显的顺序性)
3.管道文件的生命周期是随着进程的
4.管道文件在通信的时候,是面向字节流的.(write的次数和读的次数不一样)
5.管道的通信方式,是一种特殊的半双工模式
4.进程池
创建子进程和管道
#include<iostream> #include<string> #include<vector> #include <sys/types.h> #include <unistd.h> using namespace std; class channel { public: channel(int wfd,pid_t id,const string&name) :_wfd(wfd) ,_subprocessid(id) ,_name(name) { } ~channel() {} int getwfd() {return _wfd;} pid_t getprocessid() {return _subprocessid;} string getname() {return _name;} private: int _wfd; pid_t _subprocessid; string _name; }; int main(int argc,char *argv[]) { if(argc!=2) { cout<<"usage:"<<argv[0]<<"processnum"<<endl; } int num=stoi(argv[1]); vector<channel>channels; for(int i=0;i<num;i++) { int pipefd[2]={0}; int n=pipe(pipefd); if(n<0) exit(1); pid_t id=fork(); if(id==0) { close(pipefd[0]); //task(); close(pipefd[1]); exit(0); } string channel_name="channel-"+to_string(i); close(pipefd[0]); channels.push_back(channel(pipefd[1],id,channel_name)); } for(auto channel:channels)//测试 { cout<<"----------------------"<<endl; cout<<"wfd:"<<channel.getwfd()<<endl; cout<<"subprocessid:"<<channel.getprocessid()<<endl; cout<<"name:"<<channel.getname()<<endl; } }
选择子进程要执行的功能,选择管道(对应哪个子进程)
whichone保证了父进程对后端任务划分负载均衡。
task.hpp
#pragma once #include<iostream> #include<cstdlib> #include<ctime> #define Tasknum 3 typedef void(*task_t()); void run() { cout<<"实现角色的跑"<<endl; } void jump() {cout<<"实现角色的跳"<<endl;} void climb() {cout<<"实现角色的爬"<<endl;} task_t tasks[Tasknum]; void loadtask() { srand(time(nullptr)); tasks[0]=run; tasks[1]=jump; tasks[2]=climb; } void runtask(int number) { if(number<0||number>2) return ; tasks[number](); } int slecttask() { return rand()%Tasknum; }
process.cc
#include<iostream> #include<string> #include<vector> #include <sys/types.h> #include <unistd.h> #include"task.hpp" using namespace std; class channel { public: channel(int wfd,pid_t id,const string&name) :_wfd(wfd) ,_subprocessid(id) ,_name(name) { } ~channel() {} int getwfd() {return _wfd;} pid_t getprocessid() {return _subprocessid;} string getname() {return _name;} private: int _wfd; pid_t _subprocessid; string _name; }; int whichchannel(int channelnum) { static int count=0; int channel=count; count++; count%=channelnum; return channel; } int main(int argc,char *argv[]) { //第一步:创建子进程和管道 if(argc!=2) { cout<<"usage:"<<argv[0]<<"processnum"<<endl; } int num=stoi(argv[1]); vector<channel>channels; for(int i=0;i<num;i++) { int pipefd[2]={0}; int n=pipe(pipefd); if(n<0) exit(1); pid_t id=fork(); if(id==0) { close(pipefd[0]); //task(); close(pipefd[1]); exit(0); } string channel_name="channel-"+to_string(i); close(pipefd[0]); channels.push_back(channel(pipefd[1],id,channel_name)); } //for(auto channel:channels) //{ // cout<<"----------------------"<<endl; // cout<<"wfd:"<<channel.getwfd()<<endl; // cout<<"subprocessid:"<<channel.getprocessid()<<endl; // cout<<"name:"<<channel.getname()<<endl; //} loadtask(); //第二步:选择任务 int taskcommand=slecttask(); //第三步:选择哪一个子进程来执行对应任务 int channel_id=whichchannel(num); }
发送函数指针数组下标
task.hpp
#pragma once #include<iostream> #include<cstdlib> #include<ctime> using namespace std; typedef void(*task_t)(); void run() { cout<<"实现角色的跑"<<endl; } void jump() {cout<<"实现角色的跳"<<endl;} void climb() {cout<<"实现角色的爬"<<endl;} task_t tasks[3]; void loadtask() { srand(time(nullptr)); tasks[0]=run; tasks[1]=jump; tasks[2]=climb; } void runtask(int number) { if(number<0||number>2) return ; tasks[number](); } int slecttask() { return rand()%3; }
process.cc
#include<iostream> #include<string> #include<vector> #include <sys/types.h> #include <unistd.h> #include"task.hpp" #include <sys/wait.h> using namespace std; class channel { public: channel(int wfd,pid_t id,const string&name) :_wfd(wfd) ,_subprocessid(id) ,_name(name) { } void closechannel() {close(_wfd);} void wait() { pid_t rid=waitpid(_subprocessid,nullptr,0); if(rid>0) { cout<<"wait"<<rid<<"sussess"<<endl; } } ~channel() {} int getwfd() {return _wfd;} pid_t getprocessid() {return _subprocessid;} string getname() {return _name;} private: int _wfd; pid_t _subprocessid; string _name; }; int whichone(int channelnum) { static int count=0; int channel=count; count++; count%=channelnum; return channel; } void sendtasknum(channel& chan,int taskcommand) { write(chan.getwfd(),&taskcommand,sizeof(taskcommand)); } void task(int index) { while(true) { int command=0; int n=read(index,&command,sizeof(command)); if(n==sizeof(int)) { cout<<"pid is:"<<getpid()<<"handle task"<<endl; runtask(command); } else if(n==0) { cout<<"sub process:"<<getpid()<<"quit!!"<<endl; break; } } } void cleanchannels(vector<channel>& channels) { for(auto &channel:channels) { channel.closechannel(); } for(auto &channel:channels) { channel.wait(); } } int main(int argc,char *argv[]) { //第一步:创建子进程和管道 if(argc!=2) { cout<<"usage:"<<argv[0]<<"processnum"<<endl; } int num=stoi(argv[1]); loadtask(); vector<channel>channels; for(int i=0;i<num;i++) { int pipefd[2]={0}; int n=pipe(pipefd); if(n<0) exit(1); pid_t id=fork(); if(id==0) { close(pipefd[1]); task(pipefd[0]);//第五步 close(pipefd[0]); exit(0); } string channel_name="channel-"+to_string(i); close(pipefd[0]); channels.push_back(channel(pipefd[1],id,channel_name)); } while(true) { sleep(1); //第二步:选择任务 int taskcommand=slecttask(); //第三步:选择哪一个子进程来执行对应任务 int channel_id=whichone(num); //第四步:向第三步选择的管道发送第二步选择的函数指针数组下标 sendtasknum(channels[channel_id],taskcommand); //第六步回收 } cleanchannels(channels); }
管道回收,以及子进程释放
在上一步已经回收了管道,以及子进程释放,回收管道,直接关闭对应管道的写端即可,子进程释放,让父进程使用waitpid即可,
下面来处理一种误区