我们在Task类中先定义一个vector,vector中存放的类型为函数指针类型,每一个函数指针都指向一个函数方法,这样我们就可以通过指令来调用不同的方法,对于指令command来讲我们之前也提到过其实就是位图,我们定义三个宏值分别代表三种打印方法,相要调用哪个方法就让command参数&上对应的宏值就完成了。我们在.cc文件中加上头文件,然后定义一个全局的Task类:
接下来我们继续实现子进程任务函数:
在子进程函数中我们默认指令为int类型,然后通过read函数的返回值判断是否读取成功,如果返回值等于4字节说明读到了指令那么就调用对应的函数方法,如果返回值等于0就说明当前的子进程要退出了直接break,剩下的就是读取失败也是直接退出,当然我们不能只执行一次,所以我们加个循环让子进程周而复始的去执行任务:
那么接下来的父进程还需要干什么呢?父进程需要选择任务,选择进程然后下发任务,下面我们来实现:
这里的任务是可以随机选择的,但是我们为了测试就只用了LOG任务,然后选择进程的时候直接用随机函数选择即可,下发任务也就是让父进程写入,写入的是指令对应的方法,然后我们将之前答应的代码加上getpid,把子进程的pid也打印出来:
以下是运行起来后的代码:
这样我们就完成了要实现的功能,现在我们再把相应的函数优化一下,让选择任务的时候可以是交互式的,首先第一步让用户选择任务,这里我们先实现一个选择面板:
将选择面板搞定后我们就可以实现让用户选择对应的任务了:
下面我们让选择进程的功能变成按顺序的,这样就不会出现每次随机让一个进程执行任务的情况:
下面我们我们再实现一下每次拿到任务后打印正在执行哪个任务:
我们先创建一个静态的int变量用来充当第几个进程,然后我们在类外将number初始化为0,string类型的进程名接收缓冲区里的字符,函数name可以返回一个进程名。然后我们在选择任务那块也打印一下:
然后如果进程退出的话也要有提示,所以我们在子进程那块加上打印提示:
当n等于0说明父进程将写端关闭了,随之就打印一下告诉用户。然后我们再将代码简化一下,把刚刚main函数中写的控制函数代码都放到ctrlProcess函数中:
写到这里发现124行有个报错,是因为返回的name函数需要加上const让静态变量也能使用(因为我们的ctrlProcess函数的参数是const对象)
下面我们运行一下程序:
程序运行起来后我们发现输入3退出不了,并且输入其他指令程序也没有打印动作,所以我们用if语句判断一下:
除了这个问题我们还要解决子进程退出问题,还记得我们之前说过的僵尸进程吗,如果子进程没有被父进程等待那么子进程就变成了僵尸状态,为了防止这种情况我们再写一个进程等待函数函数:
回收子进程之前我们需要将父进程的所有写端都关了,然后等待10秒后我们让父进程把每个子进程都回收了,这样就完成了简单的回收子进程函数。
当我们运行起来后发现可以完成我们的需求,并且在退出后也能正常的回收子进程。
不知道有没有人会有疑问,为什么我们刚刚在同一个循环内不能直接边关闭边回收子进程呢,就像下图这样:
我们先用这样的代码运行一下:
我们发现这样写后程序就卡在了回收进程处,这是为什么呢?其实是和子进程有关,我们早就说过子进程会继承父进程打开的文件描述符(这就是管道的原理),这里的坑就在于当第二次及第二次以后的创建子进程时,这个子进程会继承前面所有父进程的端口,也就是说越往后的子进程继承的端口越多,如下图所示:
也就是说刚刚卡主的原因是写端没有全部关完就回收了子进程,就造成了阻塞。而我们用一个循环单独关闭父进程的端口时是不会出现这个问题的,因为我们是先关闭了父进程的写端,都关闭后才去回收子进程。
由于上面的讲解都是通过图片的形式,下面我们将此次的实例代码发出来:
首先是ctrlProcess.cc文件中的:
#include <iostream> #include <string> #include <cassert> #include <unistd.h> #include <vector> #include "task.hpp" #include <sys/wait.h> #include <sys/types.h> using namespace std; const int gnum = 3; Task t; class EndPoint { private: static int number; public: pid_t _child_id; int _write_fd; std::string processname; EndPoint(int id,int fd) :_child_id(id) ,_write_fd(fd) { char namebuffer[64]; snprintf(namebuffer,sizeof(namebuffer),"process-%d[%d:%d]",number++,_child_id,_write_fd); processname = namebuffer; } std::string &name() { return processname; } ~EndPoint() { } }; int EndPoint::number = 0; //子进程要执行的方法 void WaitCommand() { while (true) { int command = 0; int n = read(0,&command,sizeof(int)); if (n==sizeof(int)) { t.Execute(command); } else if(n==0) { std::cout<<"父进程让我退出,我就退出了: "<<getpid()<<std::endl; break; } else { break; } } } void CreatProcess( vector<EndPoint>* end_points) { //1.先进行构建控制结构,父进程写入,子进程读取 for (int i = 0;i<gnum;i++) { //1.1创建管道 int pipefd[2] = {0}; int n = pipe(pipefd); assert(n==0); (void)n; //1.2创建进程 pid_t id = fork(); assert(id!=-1); //返回值等于0一定是子进程 if (id==0) { //先关闭不要的fd close(pipefd[1]); //我们期望所有的子进程读取指令的时候,都从标准输入读取 //1.3.1输入重定向 dup2(pipefd[0],0); //读0就像读管道一样 //1.3.2让子进程开始等待获取命令 WaitCommand(); close(pipefd[0]); exit(0); } //一定是父进程 //1.3关闭不要的fd close(pipefd[0]); //1.4将新的子进程和他的管道写端,构建对象 end_points->push_back(EndPoint(id,pipefd[1])); } } int ShowBoard() { std::cout<<"################################"<<std::endl; std::cout<<"##0. 执行日志任务 1. 执行数据库任务##"<<std::endl; std::cout<<"##2.执行请求任务 3.退出#########"<<std::endl; std::cout<<"################################"<<std::endl; std::cout<<"请选择#"; int command = 0; std::cin>>command; return command; } void ctrlProcess(vector<EndPoint>& end_points) { //2.1我们可以写成自动化的,也可以写成交互式的 int cnt = 0; int num = 0; while (true) { //1.选择任务 int command = ShowBoard(); if (command==3) { break; } if (command<0|| command>2) { continue; } //2.选择进程 int index = cnt++; cnt%=end_points.size(); std::cout<<"选择了进程:"<<end_points[index].name()<<" | 处理任务"<<command<<std::endl; //3.下发任务 write(end_points[index]._write_fd,&command,sizeof(command)); //sleep(1); } } void ExitProcess() { exit(0); } void waitProcess(const vector<EndPoint>& end_points) { //1.我们需要让子进程全部退出 -- 只需要让父进程关闭所有的写端 //for (const auto &ep:end_points) for (int end = end_points.size()-1;end>=0;end--) { std::cout<<"父进程让所有的子进程全部退出"<<std::endl; //先关闭最后一个写端倒着一直关闭,因为子进程会继承父进程的文件描述符所以 //后面的子进程都会链接到第一个父进程的写端,如果关第一个无法全部关闭,会造成 //阻塞 close(end_points[end]._write_fd); std::cout<<"父进程回收了所有的子进程"<<std::endl; waitpid(end_points[end]._child_id,nullptr,0); } //std::cout<<"父进程让所有的子进程全部退出"<<std::endl; sleep(10); //2.父进程要回收子进程的僵尸状态 //for (const auto& ep:end_points) //{ // waitpid(ep._child_id,nullptr,0); //} //std::cout<<"父进程回收了所有的子进程"<<std::endl; //sleep(10); } int main() { //1.先进行构建控制结构,父进程写入,子进程读取 vector<EndPoint> end_points; CreatProcess(&end_points); //2.我们得到了什么? ctrlProcess(end_points); //3.处理所有的退出问题 waitProcess(end_points); return 0; }
下面是头文件task.hpp中的:
#include <iostream> #include <vector> #include <unistd.h> typedef void(*fun_t)(); //函数指针 void PrintLog() { std::cout<<"pid:"<<getpid()<<",打印日志任务,正在被执行..."<<std::endl; } void InsertMySQL() { std::cout<<"执行数据库任务,正在被执行..."<<std::endl; } void NetRequest() { std::cout<<"执行网络请求任务,正在被执行..."<<std::endl; } #define COMMAND_LOG 0 #define COMMAND_MYSQL 1 #define COMMAND_REQUEST 2 class Task { public: Task() { funcs.push_back(PrintLog); funcs.push_back(InsertMySQL); funcs.push_back(NetRequest); } ~Task() { } void Execute(int command) { if (command >= 0&& command < funcs.size()) { funcs[command](); } } public: std::vector<fun_t> funcs; };
总结
管道读写规则:
当没有数据可读时:
O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来到为止。
O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。
当管道满的时候:
O_NONBLOCK disable: write调用阻塞,直到有进程读走数据
O_NONBLOCK enable:调用返回-1,errno值为EAGAIN
如果所有管道写端对应的文件描述符被关闭,则read返回0
如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致writ进程退出
当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性。
当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性。
管道特点:
只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;通常,一个管道由一个进程创建,然后该进程调用fork,此后父、子进程之间就可应用该管道。
管道提供流式服务
一般而言,进程退出,管道释放,所以管道的生命周期随进程
一般而言,内核会对管道操作进行同步与互斥
管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道