1 进程间通信介绍
1.1 进程间通信目的
数据传输:一个进程需要将它的数据发送给另一个进程 。
资源共享:多个进程之间共享同样的资源。
通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
1.2 进程间通信发展
- 管道
- System V进程间通信
- POSIX进程间通信
1.3 进程间通信分类
管道
匿名管道pipe
命名管道
System V IPC
System V 消息队列
System V 共享内存
System V 信号量
POSIX IPC
消息队列
共享内存
信号量
互斥量
条件变量
读写锁
2 管道
2.1 什么是管道
管道是 Unix 中最古老的进程间通信的形式。
我们把从一个进程连接到另一个进程的一个数据流称为一个 “ 管道".
比如我们常见的命令 | , 我们知道其实我们执行的命令在linux上本质是执行一个进程,管道也分为匿名管道和命名管道,像上面这种没有名字的就叫做匿名管道。
2.2 匿名管道
2.2.1 匿名管道的使用
在文件描述符得时候我们讲过,子进程会继承父进程的文件描述符,但是子进程并不会去拷贝父进程的文件,也就是子进程与父进程其实看到的是同一份文件,这就具备了进程间通信的前提:两个进程看到了同一份资源。我们就能够根据我们的需求来让父子进程完成我们的任务(比如让父进程写文件,子进程从文件中读取)
#include <unistd.h> 功能:创建一无名管道 原型 int pipe(int fd[2]); 参数 fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端 返回值:成功返回0,失败返回错误代码
我们可以用pipe函数来帮助我们创建匿名管道(大家一定要注意,使用匿名管道的前提是在父进程创建子进程前就已经把管道打开了,这样子进程才能够继承父进程的文件描述符)
实例代码:
#include<iostream> #include<unistd.h> #include<sys/types.h> #include<sys/wait.h> #include<cerrno> #include<cstring> #include<string> using namespace std; int main() { int pipefd[2]={0}; int n=pipe(pipefd); if(n<0) { cout<<"error"<<":"<<strerror(errno)<<endl; return 1; } pid_t id=fork(); if(id==0) { //child 子进程读取,父进程写入 close(pipefd[1]); char buffer[1024]; while(true) { int n=read(pipefd[0],buffer,9); if(n>0) { buffer[n]='\0'; cout<<"child :"<<buffer<<endl; } else if(n==0) { cout<<"read file end"<<endl; } else { cout<<"read error"<<endl; } } close(pipefd[0]); exit(0); } //parent 子进程读取,父进程写入 close(pipefd[0]); const char* str="hello bit"; while(true) { write(pipefd[1],str,strlen(str)); } close(pipefd[1]); int status=0; waitpid(id,&status,0); cout<<"singal:"<<(status&0x7f)<<endl; return 0; }
这样我们就编写完成了一份基本的用匿名管道进行通信的方法。
代码中值得注意的细节有:
- 系统规定数组下标为0表示读端,数组下标为1表示写端。
- 父子进程一个完成写入,一个完成读取,在写入前应当关闭读端,同理在读取前应当关闭写段。
2.2.2 使用匿名管道创建进程池
我们可以用一个匿名管道来做一些比较优雅的事情:比如创建一个进程池,用一个父进程管理多个子进程:
contralProcess.cc:
#include <iostream> #include <string> #include <vector> #include <cassert> #include <unistd.h> #include <sys/wait.h> #include <sys/types.h> #include "Task.hpp" using namespace std; const int gnum = 3; Task t; class EndPoint { private: static int number; public: pid_t _child_id; int _write_fd; std::string processname; public: EndPoint(int id, int fd) : _child_id(id), _write_fd(fd) { //process-0[pid:fd] char namebuffer[64]; snprintf(namebuffer, sizeof(namebuffer), "process-%d[%d:%d]", number++, _child_id, _write_fd); processname = namebuffer; } std::string name() const { return processname; } ~EndPoint() { } }; int EndPoint::number = 0; // 子进程要执行的方法 void WaitCommand() { while (true) { int command = 0; int n = read(0, &command, sizeof(int)); if (n == sizeof(int)) { t.Execute(command); } else if (n == 0) { std::cout << "父进程让我退出,我就退出了: " << getpid() << std::endl; break; } else { break; } } } void createProcesses(vector<EndPoint> *end_points) { vector<int> fds; for (int i = 0; i < gnum; i++) { // 1.1 创建管道 int pipefd[2] = {0}; int n = pipe(pipefd); assert(n == 0); (void)n; // 1.2 创建进程 pid_t id = fork(); assert(id != -1); // 一定是子进程 if (id == 0) { for(auto &fd : fds) close(fd); // 1.3 关闭不要的fd close(pipefd[1]); // 我们期望,所有的子进程读取"指令"的时候,都从标准输入读取 // 1.3.1 输入重定向,可以不做 dup2(pipefd[0], 0); // 1.3.2 子进程开始等待获取命令 WaitCommand(); close(pipefd[0]); exit(0); } // 一定是父进程 // 1.3 关闭不要的fd close(pipefd[0]); // 1.4 将新的子进程和他的管道写端,构建对象 end_points->push_back(EndPoint(id, pipefd[1])); fds.push_back(pipefd[1]); } } int ShowBoard() { std::cout << "##########################################" << std::endl; std::cout << "| 0. 执行日志任务 1. 执行数据库任务 |" << std::endl; std::cout << "| 2. 执行请求任务 3. 退出 |" << std::endl; std::cout << "##########################################" << std::endl; std::cout << "请选择# "; int command = 0; std::cin >> command; return command; } void ctrlProcess(const vector<EndPoint> &end_points) { // 2.1 我们可以写成自动化的,也可以搞成交互式的 int num = 0; int cnt = 0; while(true) { //1. 选择任务 int command = ShowBoard(); if(command == 3) break; if(command < 0 || command > 2) continue; //2. 选择进程 int index = cnt++; cnt %= end_points.size(); std::string name = end_points[index].name(); std::cout << "选择了进程: " << name << " | 处理任务: " << command << std::endl; //3. 下发任务 write(end_points[index]._write_fd, &command, sizeof(command)); sleep(1); } } void waitProcess(const vector<EndPoint> &end_points) { // 1. 我们需要让子进程全部退出 --- 只需要让父进程关闭所有的write fd就可以了! // for(const auto &ep : end_points) // for(int end = end_points.size() - 1; end >= 0; end--) for(int end = 0; end < end_points.size(); end++) { std::cout << "父进程让子进程退出:" << end_points[end]._child_id << std::endl; close(end_points[end]._write_fd); waitpid(end_points[end]._child_id, nullptr, 0); std::cout << "父进程回收了子进程:" << end_points[end]._child_id << std::endl; } sleep(10); // 2. 父进程要回收子进程的僵尸状态 // for(const auto &ep : end_points) waitpid(ep._child_id, nullptr, 0); // std::cout << "父进程回收了所有的子进程" << std::endl; // sleep(10); } int main() { vector<EndPoint> end_points; // 1. 先进行构建控制结构, 父进程写入,子进程读取 , bug? createProcesses(&end_points); // 2. 我们的得到了什么?end_points ctrlProcess(end_points); // 3. 处理所有的退出问题 waitProcess(end_points); return 0; }
Task.hpp:
#pragma once #include <iostream> #include <vector> #include <unistd.h> #include <unordered_map> // typedef std::function<void ()> func_t; typedef void (*fun_t)(); //函数指针 void PrintLog() { std::cout << "pid: "<< getpid() << ", 打印日志任务,正在被执行..." << std::endl; } void InsertMySQL() { std::cout << "执行数据库任务,正在被执行..." << std::endl; } void NetRequest() { std::cout << "执行网络请求任务,正在被执行..." << std::endl; } //约定,每一个command都必须是4字节 #define COMMAND_LOG 0 #define COMMAND_MYSQL 1 #define COMMAND_REQEUST 2 class Task { public: Task() { funcs.push_back(PrintLog); funcs.push_back(InsertMySQL); funcs.push_back(NetRequest); } void Execute(int command) { if(command >= 0 && command < funcs.size()) funcs[command](); } ~Task() {} public: std::vector<fun_t> funcs; };
不知道大家注意到了没有一个问题:我们在创建子进程时先做的工作是先从vector中读取数据来关闭的,这个vector中存放的究竟是什么呢?
我们来思考下:当我们父进程第一次fork后,父进程使用了下标为4的文件描述符,第一个子进程使用了下标为3的文件描述符,父进程通过pipefd[1]向第一个管道里面写入数据,子进程通过pipefd[0]在管道里面读取数据,但是当我们第二次创建子进程的时候子进程会继承父进程的文件描述符,也就是说,第二次的子进程居然也继承了父进程第一次打开的下标为4的文件描述符,那么这样做的危害是什么?如果我们通过先的关闭第一个管道的写端,然后再回收第一个子进程时,第一个子进程会一直阻塞在那里,为什么呢?因为第二个子进程中继承父进程的写端还指向第一个子进程的读端,也就是我们如果只先关闭了第一个管道的写端是不行的,第一个子进程并没有结束,因为他的读端还指向后面所有的子进程,这就导致第一个子进程回收时一直阻塞在那里。后面进程的分析方法同理:
解决方法有:我们可以先统一将所有管道的写端关闭,然后再一个一个回收。这样所有进程的写端都被关闭了,自然就成功退出了。还可以从最后一个管道的写端开始关闭,边关闭边回收,由于最后一个子进程的读端只指向最后一个管道的写端,所以能够正常退出。
但是这样写终归治标不治本,这时因为fork创建子进程时子进程已经把前面进程的文件描述符给继承下来了,有没有方法在创建子进程是就把继承父进程的文件描述符给关闭了呢?答案是有的,上面我们提到的vector就是能够很好的处理,我们每次创建了子进程后,就把对应的管道的写端给保留下来(保留到vector中),然后每次创建时就先关闭之前继承的文件描述符给关闭就行了。(这里面的关系有点复杂,大家一定要自己下去好好总结)
所以,看待管道,就如同看待文件一样!管道的使用和文件一致,迎合了 “Linux 一切皆文件思想 ” 。