前言
Linux 进程间通信是操作系统中非常重要的一个概念,它允许不同进程之间进行数据交换和协作,从而实现更高效、更复杂的应用程序。在计算机科学领域,进程间通信是一个广泛研究的主题,也是操作系统设计中的核心问题之一。
本文将介绍 Linux 中常见的几种进程间通信方式,包括管道、命名管道、共享内存等。我们将深入探讨每种方式的工作原理、优缺点以及适用场景,帮助读者更好地理解和运用这些技术。
无论你是初学者还是资深开发人员,本文都将为你提供有价值的知识和实用的技能。让我们开始吧!
理解进程通信
进程间通信的必要性
单进程无法使用并发能力,更加无法实现多进程协同
有的是为了传输数据、同步执行流、消息通知等,这些都需要多进程协同才能实现
它不是目的,而是手段
进程间通信的技术背景
1.进程是具有独立性的。虚拟地址空间+页表 保证进程运行的独立性
2.因为他的独立性导致通信成本会比较高
进程间通信的本质理解
1.进程间通信的前提是让不同的进程看到同一块“内存”(特定的结构组织)
2.同一块“内存”不能隶属于任何一个进程,而应该强调共享
进程间通信标准和分类
进程间通信方式的一些标准和分类
- 管道
- 匿名管道pipe
- 命名管道
- System V IPC 主要做的单机通信
- System V 消息队列
- System V 共享内存
- System V 信号量
- POSIX IPC 主要做的网络通信
- 消息队列
- 共享内存
- 信号量
- 互斥量
- 条件变量
- 读写锁
标准在我们使用者看来都是接口上的理解
管道
什么是管道
比如天然气、石油等管道,它只能单向输出内容,它能传输资源
现在在计算机中最重要的资源是数据,数据是互联网时代的“石油”
- 管道是Unix中最古老的进程间通信的形式。
- 我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
- 管道由系统提供
管道的原理
管道通信背后是进程之间通过管道进行通信
举例:
1.父进程以读写的方式打开一个文件
2.fork()创建子进程
3.双方各自关闭自己不需要的文件描述符
- 此时子进程会自己拷贝父进程的文件描述符表
- 此时子进程的也会指向父进程以读写的方式打开的那个文件
- 此时就有两个进程共享一个文件了
- 假设使父进程写入,子进程读取
- 这个数据流就是一个“管道”
它不会把数据存到磁盘上,都在内存中
用fork来共享管道原理
站在文件描述符角度-深度理解管道
匿名管道
#include <unistd.h> 功能:创建一无名管道 原型 int pipe(int fd[2]); 参数 fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端 返回值:成功返回0,失败返回错误代码
一个小demo来理解管道
#include <iostream> #include <string> #include <cstdio> #include <cstring> #include <assert.h> #include <unistd.h> #include <sys/types.h> #include <sys/wait.h> using namespace std; // 为什么不定义全局buffer来进行通信呢?? 因为有写时拷贝的存在,无法更改通信! int main() { // 1. 创建管道 int pipefd[2] = {0}; // pipefd[0(嘴巴,读书)]: 读端 , pipefd[1(钢笔,写)]: 写端 int n = pipe(pipefd); assert(n != -1); // debug assert, release assert (void)n;//避免n只被定义没有被使用导致大量报红 #ifdef DEBUG cout << "pipefd[0]: " << pipefd[0] << endl; // 3 cout << "pipefd[1]: " << pipefd[1] << endl; // 4 #endif // 2. 创建子进程 pid_t id = fork(); assert(id != -1); if (id == 0) { //子进程 - 读 // 3. 构建单向通信的信道,父进程写入,子进程读取 // 3.1 关闭子进程不需要的fd close(pipefd[1]); char buffer[1024 * 8]; while (true) { // sleep(20); // 写入的一方,fd没有关闭,如果有数据,就读,没有数据就等 // 写入的一方,fd关闭, 读取的一方,read会返回0,表示读到了文件的结尾! ssize_t s = read(pipefd[0], buffer, sizeof(buffer) - 1); //读取pipefd[0]文件描述符,到buffer中,读取sizeof(buffer) - 1个 //最后一个位置自己设置\0 if (s > 0) { buffer[s] = 0; cout << "child get a message[" << getpid() << "] Father# " << buffer << endl; } else if(s == 0) { cout << "writer quit(father), me quit!!!" << endl; break; } } // close(pipefd[0]); exit(0); } //父进程 - 写 // 3. 构建单向通信的信道 // 3.1 关闭父进程不需要的fd close(pipefd[0]); string message = "我是父进程,我正在给你发消息"; int count = 0; char send_buffer[1024 * 8]; while (true) { // 3.2 构建一个变化的字符串 snprintf(send_buffer, sizeof(send_buffer), "%s[%d] : %d", message.c_str(), getpid(), count++); // 3.3 写入 write(pipefd[1], send_buffer, strlen(send_buffer)); // 3.4 故意sleep sleep(1); cout << count << endl; if (count == 5){ cout << "writer quit(father)" << endl; break; } } close(pipefd[1]); pid_t ret = waitpid(id, nullptr, 0); cout << "id : " << id << " ret: " << ret <<endl; assert(ret > 0); (void)ret; return 0; }
管道特点
- 只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;通常,一个管道由一个进程创建,然后该进程调用fork,此后父、子进程之间就可应用该管道。
- 此时子进程会继承父进程的文件描述符表
- 管道提供流式服务
- 它是buffer里面有多少就读多少,可能一次读取你多次写入的内容
- 一般而言,进程退出,管道释放,所以管道的生命周期随进程
- 管道是基于文件的,文件的生命周期是随进程的,那么管道的生命周期也是随进程的
- 一般而言,内核会对管道操作进行同步与互斥
- 管道能让进程间协同,它具有访问控制,因为它会有等待,当写入写满时还没读取,就会等待读取,否则再写入会覆盖buffer中的内容
- 同步和互斥以后再讲
- 管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道
- 半双工不是说只能读或只能写,它的含义是暂时只用这个功能
管道读写规则
- 当没有数据可读时
- O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来到为止。
- O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。
- 当管道满的时候
- O_NONBLOCK disable: write调用阻塞,直到有进程读走数据
- O_NONBLOCK enable:调用返回-1,errno值为EAGAIN
- 如果所有管道写端对应的文件描述符被关闭,则read返回0,标识读到了文件结尾
- 如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致write进程退出
- 当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性。
- 当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性。
匿名管道实现一个小的进程池demo
ProcessPool.cc
#include <iostream> #include <vector> #include <cstdlib> #include <ctime> #include <cassert> #include <unistd.h> #include <sys/wait.h> #include <sys/types.h> #include "Task.hpp" #define PROCESS_NUM 5 //进程个数 using namespace std; int waitCommand(int waitFd, bool &quit) //如果对方不发,我们就阻塞 { uint32_t command = 0; ssize_t s = read(waitFd, &command, sizeof(command)); if (s == 0) { quit = true; return -1; } assert(s == sizeof(uint32_t)); return command; } void sendAndWakeup(pid_t who, int fd, uint32_t command) { write(fd, &command, sizeof(command)); cout << "main process: call process " << who << " execute " << desc[command] << " through " << fd << endl; } int main() { // 代码中关于fd的处理,有一个小问题,不影响我们使用,但是你能找到吗?? load(); // pid: pipefd vector<pair<pid_t, int>> slots; // 先创建多个进程 for (int i = 0; i < PROCESS_NUM; i++) { // 创建管道 int pipefd[2] = {0}; int n = pipe(pipefd); assert(n == 0); (void)n; pid_t id = fork(); assert(id != -1); // 子进程我们让他进行读取 if (id == 0) { // 关闭写端 close(pipefd[1]); // child while (true) { // pipefd[0] // 等命令 bool quit = false; int command = waitCommand(pipefd[0], quit); //如果对方不发,我们就阻塞 if (quit) break; // 执行对应的命令 if (command >= 0 && command < handlerSize()) { callbacks[command](); } else { cout << "非法command: " << command << endl; } } exit(1); } // father,进行写入,关闭读端 close(pipefd[0]); // pipefd[1] slots.push_back(pair<pid_t, int>(id, pipefd[1])); } // 父进程派发任务 srand((unsigned long)time(nullptr) ^ getpid() ^ 23323123123L); // 让数据源更随机 while (true) { // 选择一个任务, 如果任务是从网络里面来的? int command = rand() % handlerSize(); // 选择一个进程 ,采用随机数的方式,选择进程来完成任务,随机数方式的负载均衡 int choice = rand() % slots.size(); // 把任务给指定的进程 sendAndWakeup(slots[choice].first, slots[choice].second, command); sleep(1); // int select; // int command; // cout << "############################################" << endl; // cout << "# 1. show funcitons 2.send command #" << endl; // cout << "############################################" << endl; // cout << "Please Select> "; // cin >> select; // if (select == 1) // showHandler(); // else if (select == 2) // { // cout << "Enter Your Command> "; // // 选择任务 // cin >> command; // // 选择进程 // int choice = rand() % slots.size(); // // 把任务给指定的进程 // sendAndWakeup(slots[choice].first, slots[choice].second, command); // } // else // { // } } // 关闭fd, 所有的子进程都会退出 for (const auto &slot : slots) { close(slot.second); } // 回收所有的子进程信息 for (const auto &slot : slots) { waitpid(slot.first, nullptr, 0); } }
Task.hpp
#pragma once #include <iostream> #include <string> #include <vector> #include <unordered_map> #include <unistd.h> #include <functional> typedef std::function<void()> func; std::vector<func> callbacks; std::unordered_map<int, std::string> desc; void readMySQL() { std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n" << std::endl; } void execuleUrl() { std::cout << "sub process[" << getpid() << " ] 执行url解析\n" << std::endl; } void cal() { std::cout << "sub process[" << getpid() << " ] 执行加密任务\n" << std::endl; } void save() { std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n" << std::endl; } void load() { desc.insert({callbacks.size(), "readMySQL: 读取数据库"}); callbacks.push_back(readMySQL); desc.insert({callbacks.size(), "execuleUrl: 进行url解析"}); callbacks.push_back(execuleUrl); desc.insert({callbacks.size(), "cal: 进行加密计算"}); callbacks.push_back(cal); desc.insert({callbacks.size(), "save: 进行数据的文件保存"}); callbacks.push_back(save); } void showHandler() { for(const auto &iter : desc ) { std::cout << iter.first << "\t" << iter.second << std::endl; } } int handlerSize() { return callbacks.size(); }
makefile
ProcessPool:ProcessPool.cc g++ -o $@ $^ -std=c++11 #-DDEBUG .PHONY:clean clean: rm -f ProcessPool
命名管道
- 管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
- 如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名管道。
- 命名管道是一种特殊类型的文件
- 文件在系统中的路径具有唯一性,双方进程可以通过命名管道文件的路径看到同一份资源,这样就可以和匿名管道实现同样的效果
创建一个命名管道
命名管道可以从命令行上创建
命令行方法是使用下面这个命令:
mkfifo filename
命名管道也可以从程序里创建
相关函数有:
int mkfifo(const char *filename,mode_t mode);
创建命名管道:
int main(int argc, char *argv[]) { mkfifo("p2", 0644); return 0; }
匿名管道与命名管道的区别
- 匿名管道由pipe函数创建并打开。
- 命名管道由mkfifo函数创建,打开用open
- FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完成之后,它们具有相同的语义。
命名管道的打开规则
如果当前打开操作是为读而打开FIFO时
- O_NONBLOCK disable:阻塞直到有相应进程为写而打开该FIFO
- O_NONBLOCK enable:立刻返回成功
如果当前打开操作是为写而打开FIFO时
- O_NONBLOCK disable:阻塞直到有相应进程为读而打开该FIFO
- O_NONBLOCK enable:立刻返回失败,错误码为ENXIO
命名管道实现server&client通信
server.cc
#include "comm.hpp" #include <sys/wait.h> static void getMessage(int fd) { char buffer[SIZE]; while (true) { memset(buffer, '\0', sizeof(buffer)); ssize_t s = read(fd, buffer, sizeof(buffer) - 1); if (s > 0) { cout <<"[" << getpid() << "] "<< "client say> " << buffer << endl; } else if (s == 0) { // end of file cerr <<"[" << getpid() << "] " << "read end of file, clien quit, server quit too!" << endl; break; } else { // read error perror("read"); break; } } } int main() { // 1. 创建管道文件 if (mkfifo(ipcPath.c_str(), MODE) < 0) { perror("mkfifo"); exit(1); } Log("创建管道文件成功", Debug) << " step 1" << endl; // 2. 正常的文件操作 int fd = open(ipcPath.c_str(), O_RDONLY); if (fd < 0) { perror("open"); exit(2); } Log("打开管道文件成功", Debug) << " step 2" << endl; int nums = 3; for (int i = 0; i < nums; i++) { pid_t id = fork(); if (id == 0) { // 3. 编写正常的通信代码了 getMessage(fd); exit(1); } } for(int i = 0; i < nums; i++) { waitpid(-1, nullptr, 0); } // 4. 关闭文件 close(fd); Log("关闭管道文件成功", Debug) << " step 3" << endl; unlink(ipcPath.c_str()); // 通信完毕,就删除文件 Log("删除管道文件成功", Debug) << " step 4" << endl; return 0; }
client.cc
#include "comm.hpp" int main() { // 1. 获取管道文件 int fd = open(ipcPath.c_str(), O_WRONLY); if(fd < 0) { perror("open"); exit(1); } // 2. ipc过程 string buffer; while(true) { cout << "Please Enter Message Line :> "; std::getline(std::cin, buffer); write(fd, buffer.c_str(), buffer.size()); } // 3. 关闭 close(fd); return 0; }
comm.hpp
#ifndef _COMM_H_ #define _COMM_H_ #include <iostream> #include <string> #include <cstdio> #include <cstring> #include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include "Log.hpp" using namespace std; #define MODE 0666 #define SIZE 128 string ipcPath = "./fifo.ipc"; #endif
Log.hpp
#ifndef _LOG_H_ #define _LOG_H_ #include <iostream> #include <ctime> #define Debug 0 #define Notice 1 #define Warning 2 #define Error 3 const std::string msg[] = { "Debug", "Notice", "Warning", "Error" }; std::ostream &Log(std::string message, int level) { std::cout << " | " << (unsigned)time(nullptr) << " | " << msg[level] << " | " << message; return std::cout; } #endif
makefile
.PHONY:all all:client mutiServer client:client.cxx g++ -o $@ $^ -std=c++11 mutiServer:server.cxx g++ -o $@ $^ -std=c++11 .PHONY:clean clean: rm -f client mutiServer
system V 共享内存
共享内存区是最快的IPC形式。一旦这样的内存映射到共享它的进程的地址空间,这些进程间数据传递不再涉及到内核,换句话说是进程不再通过执行进入内核的系统调用来传递彼此的数据
查看共享内存资源的脚本命令:while : ; do ipcs -m; sleep 1; done
- key:这个共享内存段名字,key的作用是作为标识,让通信的对方进程找到我创建的共享内存
- shmid: 共享内存标识
- perms:共享内存的权限
- bytes:共享内存的大小
- nottch:共享内存上连接的进程数
共享内存示意图
共享内存的原理
它的本质上是一块内存被映射给了两个进程,这两个进程从而能够实现进程通信,前面讲的管道也是类似的原理,所以也能用来通信,但它们的本质上来说都是共享同一个文件。
共享内存是操作系统提供的,操作系统当然要管理共享内存,所以当我们申请共享内存的时候,得到不仅仅是共享内存块,还有管理共享内存的内核数据结构。
共享内存数据结构
struct shmid_ds { struct ipc_perm shm_perm; /* operation perms */ int shm_segsz; /* size of segment (bytes) */ __kernel_time_t shm_atime; /* last attach time */ __kernel_time_t shm_dtime; /* last detach time */ __kernel_time_t shm_ctime; /* last change time */ __kernel_ipc_pid_t shm_cpid; /* pid of creator */ __kernel_ipc_pid_t shm_lpid; /* pid of last operator */ unsigned short shm_nattch; /* no. of current attaches */ unsigned short shm_unused; /* compatibility */ void *shm_unused2; /* ditto - used by DIPC */ void *shm_unused3; /* unused */ };
共享内存函数
shmget函数
功能:用来创建共享内存 原型 int shmget(key_t key, size_t size, int shmflg); 参数 key:这个共享内存段名字,key的作用是作为标识,让通信的对方进程找到我创建的共享内存, 它是多少不重要,但是必须要在系统中唯一即可。 怎么取得唯一的key? 需要使用ftok函数来生成一个唯一的key #include <sys/ipc.h> key_t ftok(const char *pathname, int proj_id); size:共享内存大小 shmflg:由九个权限标志构成,它们的用法和创建文件时使用的mode模式标志是一样的 返回值:成功返回一个非负整数,即该共享内存段的标识码shmid,类似曾经的fd;失败返回-1 shmflg常用两个选项: 1.IPC_CREAT:单独使用时,如果创建共享内存,如果底层已经存在,则获取后返回, 如果不存在,就创建后返回。 2.IPC_EXCL:单独使用没有意义 3.IPC_CREAT和IPC_EXCL一起使用:底层不存在就创建,底层存在出错返回, 也就是说返回成功一定是一个全新的shm
shmid vs key
1.只有创建的时候用key,大部分情况用户访问共享内存都用的是shmid
2.key可以标识共享内存在系统层面的唯一性,shmid可以标识共享内存在用户层面上的唯一性
2.当进程运行结束后,共享内存依旧存在,system V IPC资源的生命周期随内核
3.想要删除可以手动删除,命令是ipcrm -m <shmid>,也可以代码删除,使用函数shmctl
shmat函数
功能:将共享内存段连接到进程地址空间 原型 void *shmat(int shmid, const void *shmaddr, int shmflg); 参数 shmid: 共享内存标识 shmaddr:指定连接的地址 shmflg:它的两个可能取值是SHM_RND和SHM_RDONLY 返回值:成功返回一个指针,指向共享内存第一个节;失败返回-1
说明:
shmaddr为NULL,核心自动选择一个地址 shmaddr不为NULL且shmflg无SHM_RND标记,则以shmaddr为连接地址。 shmaddr不为NULL且shmflg设置了SHM_RND标记,则连接的地址会自动向下调整为SHMLBA的整数倍。 公式:shmaddr -(shmaddr % SHMLBA) shmflg=SHM_RDONLY,表示连接操作用来只读共享内存
shmdt函数
功能:将共享内存段与当前进程脱离 原型 int shmdt(const void *shmaddr); 参数 shmaddr: 由shmat所返回的指针 返回值:成功返回0;失败返回-1 注意:将共享内存段与当前进程脱离不等于删除共享内存段
shmctl函数
功能:用于控制共享内存 原型 int shmctl(int shmid, int cmd, struct shmid_ds *buf); 参数 shmid:由shmget返回的共享内存标识码 cmd:将要采取的动作(有三个可取值) buf:指向一个保存着共享内存的模式状态和访问权限的数据结构 返回值:成功返回0;失败返回-1
当为IPC_RMID选项时,即使有进程连接了共享内存,它也会删除
共享内存的要点
为什么通过共享内存传输的速度最快?
如上图所示,当我们使用其他的方式传输数据、信号的时候,会通过内核的处理再转给另一端,这之间会经历多次拷贝。
比如使用键盘写入,然后打印在显示器上,会经历4次拷贝如果直接使用共享内存的方式来进行相同的操作就省去了内核的处理,少了两次拷贝。
基于对共享内存的理解
为了让进程间通信,需要让不同的进程之间看到同一份资源,在此之前讲的所有的通信方式本质都是优先解决一个问题:让不同的进程之间看到同一份资源
但是使用共享内存来让不同的进程之间看到同一份资源时会带来一些时序问题,造成数据不一致的问题,比如写端只写了一半就被读取可能会造成语义相反等情况。
一些概念
- 我们把多个进程(执行流)看到的公共的一份资源叫临界资源
- 我们把自己的进程访问临界资源的代码叫做临界区
- 为了更好地进行临界区的保护,可以让多执行流在任何一个时刻都只能有一个进程进入临界区 也就是互斥
- 多个执行流互相运行的时候互相干扰,主要是我们不加保护的访问了同样的资源(临界资源),在非临界区多个执行流是互相不能影响的
- 原子性:要么不做,要么做完,没有中间状态就称之为原子性