一、进程间通信目的、发展和分类
1、目的
数据传输:一个进程需要将它的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源。
通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
2、发展
LInux原生的管道
System V 进程间通信 多进程
POSIX进程间通信
3、分类
管道
1、匿名管道pipe
2、命名管道
System V IPC
1、System V 消息队列
2、System V 共享内存
3、System V 信号量
POSIX IPC
1、消息队列
2、共享内存
3、信号量
4、互斥量
5、条件变量
6、读写锁
二、管道
1、什么是管道
管道是Unix中最古老的进程间通信的形式。我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”,在生活中我们见过的下水道之类的都是管道
2、匿名管道
#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码
3、实例代码测试
这个是用代码去创建子进程然后进行发送消息,通过管道进行发送数据,这里是利用一个随机数进行发送代码编号,为了防止一直找同一个进程,代码测试如下,文章末附上所有的代码。
三、代码
test2.cpp
#include <iostream> #include <vector> #include <cstdlib> #include <ctime> #include <cassert> #include <unistd.h> #include <sys/wait.h> #include <sys/types.h> #include "Task.hpp" #define PROCESS_NUM 5 using namespace std; int waitCommand(int waitFd, bool &quit) //如果对方不发,我们就阻塞 { uint32_t command = 0; ssize_t s = read(waitFd, &command, sizeof(command)); if (s == 0) { quit = true; return -1; } assert(s == sizeof(uint32_t)); return command; } void sendAndWakeup(pid_t who, int fd, uint32_t command) { write(fd, &command, sizeof(command)); cout << "main process: call process " << who << " execute " << desc[command] << " through " << fd << endl; } int main() { // 代码中关于fd的处理,有一个小问题,不影响我们使用,但是你能找到吗?? load(); // pid: pipefd vector<pair<pid_t, int>> slots; // 先创建多个进程 for (int i = 0; i < PROCESS_NUM; i++) { // 创建管道 int pipefd[2] = {0}; int n = pipe(pipefd); assert(n == 0); (void)n; pid_t id = fork(); assert(id != -1); // 子进程我们让他进行读取 if (id == 0) { // 关闭写端 close(pipefd[1]); // child while (true) { // pipefd[0] // 等命令 bool quit = false; int command = waitCommand(pipefd[0], quit); //如果对方不发,我们就阻塞 if (quit) break; // 执行对应的命令 if (command >= 0 && command < handlerSize()) { callbacks[command](); } else { cout << "非法command: " << command << endl; } } exit(1); } // father,进行写入,关闭读端 close(pipefd[0]); // pipefd[1] slots.push_back(pair<pid_t, int>(id, pipefd[1])); } // 父进程派发任务 srand((unsigned long)time(nullptr) ^ getpid() ^ 23323123123L); // 让数据源更随机 while (true) { // 选择一个任务, 如果任务是从网络里面来的? int command = rand() % handlerSize(); // 选择一个进程 ,采用随机数的方式,选择进程来完成任务,随机数方式的负载均衡 int choice = rand() % slots.size(); // 把任务给指定的进程 sendAndWakeup(slots[choice].first, slots[choice].second, command); sleep(1); } // 关闭fd, 所有的子进程都会退出 for (const auto &slot : slots) { close(slot.second); } // 回收所有的子进程信息 for (const auto &slot : slots) { waitpid(slot.first, nullptr, 0); } }
Task.hpp
#pragma once #include <iostream> #include <string> #include <vector> #include <unordered_map> #include <unistd.h> #include <functional> typedef std::function<void()> func; std::vector<func> callbacks; std::unordered_map<int, std::string> desc; void readMySQL() { std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n" << std::endl; } void execuleUrl() { std::cout << "sub process[" << getpid() << " ] 执行url解析\n" << std::endl; } void cal() { std::cout << "sub process[" << getpid() << " ] 执行加密任务\n" << std::endl; } void save() { std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n" << std::endl; } void load() { desc.insert({callbacks.size(), "readMySQL: 读取数据库"}); callbacks.push_back(readMySQL); desc.insert({callbacks.size(), "execuleUrl: 进行url解析"}); callbacks.push_back(execuleUrl); desc.insert({callbacks.size(), "cal: 进行加密计算"}); callbacks.push_back(cal); desc.insert({callbacks.size(), "save: 进行数据的文件保存"}); callbacks.push_back(save); } void showHandler() { for(const auto &iter : desc ) { std::cout << iter.first << "\t" << iter.second << std::endl; } } int handlerSize() { return callbacks.size(); }