每个进程的用户地址空间都是独立的,一般而言是不能互相访问的,但内核空间是每个进程都共享的,所以进程之间要通信必须通过内核。
进程间通信 IPC( inter-process communication)分为:
- 同一主机进程间通信
- Unix 通信方式:匿名管道、有名管道、信号
- system V 通信方式:消息队列、共享内存、信号量
- 不同主机进程间通信:Socket
# system V # 查看 ipc 信息 ipcs [options] -l '查看各个 IPC 的限制 # 移除 ipc 资源 ipcrm [options] -m shmid '根据 shmid 删除 -M shmid '根据键值删除
1、管道
管道本质就是内核缓冲,其主要特点为:
- 管道的通信方式是半双工的。即只能从管道的写端将数据写入到内核管道缓冲区,再从管道的读端将该数据读出。若要实现全双工通信,必须创建两条管道。
- 管道是基于字节流来通信的,先进先出,数据没有边界。多次写管道,数据粘在一起。
- 管道关闭读端,写管道,收到
SIGPIPE
信号;管道关闭写端,读管道,返回 0。
管道根据其存在的方式分为
- 匿名管道:只存在于内存,只能在有亲缘关系之间的进程进行通信,生命周期随进程的结束而结束
- 有名管道:存在于文件系统,可以在非亲缘关系的进程间通信,是一种管道类型的设备文件,生命周期随内核。
1.1、匿名管道
匿名管道 pipe 只能在有亲缘关系(父子、兄弟)之间的进程进行通信。
创建匿名管道
/* 返回值:成功返回 0,失败返回 -1 参数 - pepefd: 整型数组,保存管道的两端,fd[1]是管道的写端 fd,fd[0]是管道的读端 fd */ int pipe(int pepefd[2]);
单个进程的管道没有任何价值。通过 fork 创建子进程,内核管道缓冲区采用 dup
机制进行拷贝的,而用户态数据则是拷贝了一份副本,这样两个进程可以通过各自的 fd 和读取同一个管道文件实现跨进程通信。
// dup 机制:复制文件描述符 // 选择最小可用的 fd 指向 oldfd 所指向的文件对象 int dup(int oldfd); // 自定义 newfd 指向 oldfd 指向的文件对象。若 newfd 已经被使用,则先关闭,后使用 int dup2(int oldfd, int newfd);
同时为避免父子进程同时写入读取管道造成混乱,通常的做法是:父子进程只保留一端通信(相反)。例如:父进程保留写端,关闭读端;子进程保留读端,关闭写端。
此外,对于管道命令 A | B
,将前一个命令A
的输出,作为后一个命令B
的输入。在 shell 里面执行 A | B
命令的时候,A 进程和 B 进程都是 shell 创建出来的子进程,A 和 B 之间不存在父子关系,它俩的父进程都是 shell。
测试1:管道关闭读端,写管道,收到 SIGPIPE
信号,进程退出。使用echo $?
查看进程的退出码是 141
// 管道关闭读端,写管道 #include <func.h> int main(int argc, char **argv) { int fds[2]; int ret = 0; ret = pipe(fds); ERROR_CHECK(ret, -1, "pipe"); if(fork()){ // 父进程关闭读端,写管道 close(fds[0]); // 让出cpu,等待子进程关闭读端 sleep(1); ret = write(fds[1], "hello", 5); printf("ret = %d\n", ret); ERROR_CHECK(ret, -1, "write"); wait(NULL); } else { close(fds[0]); } return 0; }
测试2:管道关闭写端,读管道,那么 read 会变成非阻塞的,返回 0
// 管道关闭写端,读管道 #include <func.h> int main(int argc, char **argv) { int fds[2]; int ret = 0; ret = pipe(fds); ERROR_CHECK(ret, -1, "pipe"); if(fork()){ close(fds[1]); wait(NULL); } else { close(fds[1]); char buf[64] = {0}; ret = read(fds[0], buf, sizeof(buf)); printf("buf = %s, ret =%d \n", buf, ret); } return 0; }
1.2、有名管道
有名管道 FIFO 可以在非亲缘关系的进程间通信,是一种类型为管道的设备文件,不会随着进程结束而消失。注意:管道文件不能用于存储数据,不能用命令打开管道。
创建和删除管道文件
/* @ brief: 创建有名管道 @ pathname: 创建的命令管道 @ mode: 权限 @ return: 成功返回 0,失败返回 -1 */ int mkfifo(const char *pathname, mode_t mode) /* @ brief: 删除文件 @ path: 文件路径 @ return: 成功返回 0,失败返回 -1 */ int unlink(const char *path);
例:有名管道实现的简易即时通信
创建两个管道文件
mkfifo 1.pipe mkfifo 2.pipe
代码实现
// ./chat1 1.pipe 2.pipe #include <func.h> int main(int argc, char *argv[]) { int fdr = open(argv[1], O_RDONLY); int fdw = open(argv[2], O_WRONLY); puts("chat1"); char buf[4096] = {0}; while(1){ // 等待键盘输入 memset(buf ,0, sizeof(buf)); read(STDIN_FILENO, buf, sizeof(buf)); // 发送消息到管道 write(fdw, buf, strlen(buf)); memset(buf, 0, sizeof(buf)); // 从管道读取消息 read(fdr, buf, sizeof(buf)); puts(buf); } return 0; } // ./chat2 1.pipe 2.pipe #include <func.h> int main(int argc, char *argv[]) { int fdw = open(argv[1], O_WRONLY); int fdr = open(argv[2], O_RDONLY); puts("chat2"); char buf[4096] = {0}; while(1){ // 从管道读取消息 memset(buf,0,sizeof(buf)); read(fdr,buf,sizeof(buf)); puts(buf); // 等待键盘输入 memset(buf,0,sizeof(buf)); read(STDIN_FILENO,buf,sizeof(buf)); // 发送消息到管道 write(fdw,buf,strlen(buf)); } return 0; }
2、信号
信号是进程间通信机制中一种异步通信机制。内核与进程在任意时刻交互。
信号的处理方式有
- 默认处理
- 忽略信号
- 捕捉信号,并自定义处理
关于信号的更多内容,见我的博客 linux_信号
3、共享内存
每个进程都有自己独立的虚拟内存空间,不同进程的虚拟内存映射到不同的物理内存中。不同进程间的通信,都要发生用户态与内核态之间的数据拷贝。A 进程将用户态数据拷贝到内核缓冲区,B 进程从内核缓冲区将数据拷贝到用户态。
共享内存的机制,就是拿出一块虚拟地址空间来,映射到相同的物理内存中。这样 A 进程和 B 进程不需要互相拷贝数据,直接就可以看到共享的数据,提高了进程间通信的效率。
注意:共享内存一旦创建后,就会一直存在,不会随进程的结束而消失,直到使用命令删除,或者重启系统。后续的消息队列也是
3.1、共享内存接口
system V 版本的共享内存创建方法
- 生成 key 值:
ftok
- 创建共享内存:
shmget
- 将共享内存映射到本进程的地址空间:
shmat
3.1.1、生成 key 值
内核使用一个非负整数 key 来区分不同的共享内存区域,不同的进程可以使用同一个 key 值来区分不同的共享内存区域。key 可以手动指定,也可以使用 ftok
接口生成。ftok
是用文件的索引节点号和用户指定的值生成 key 值。
/* 返回值: 成功返回一个key(8位的整型值),失败返回-1。 参数 - pathname: 路径所指向的文件(文件夹)必须真实存在且可访问的文件 - proj_id: 用户指定的值 */ key_t ftok(const char *pathname, int proj_id);
3.1.2、创建共享内存
/* 返回值:成功返回共享内存id, 失败返回-1 参数 - key:key 值,可以使用 ftok 函数生成,也可以自己指定。 - size:创建共享内存的大小 - shmflg:权限,IPC_CREAT|0666; */ int shmget(key_t key, size_t size, int shmflg)
私有共享内存:当 key 值为全 0 的时候,是私有的共享内存。只能在有亲缘关系间的进程之间使用。私有方式的共享内存不受 key 约束,并且每次执行都会生成新的一块共享内存
// shmget 生成共享内存时,key 指定 IPC_PRIVATE 或 0 int shmid = shmget(IPC_PRIVATE, 1024, IPC_CREAT|0666);
3.1.3、创建共享内存映射
/* shmat:at-attach 返回值:成功返回指向该共享内存的指针,失败是返回(void*)-1, 参数 - shmid:共享内存的id - shmaddr:填NULL,表示让内核决定一个合适的位置 - shmflg:权限,暂时无用,填0; */ void *shmat(int shmid, const void *shmaddr, int shmflg);
3.1.4、解除共享内存映射
/* shmdt:dt-detach 返回值:成功返回0,失败返回(void *)-1 参数 - shmaddr: shmat的返回指针 */ int shmdt(const void *shmaddr)
3.1.5、修改共享内存属性
shmctl: ctl-control
/* 返回值:成功返回0,失败返回-1 参数 - shmid:共享内存的id - cmd:IPC_STAT 获取共享内存的信息; IPC_SET 设置共享内存信息; IPC_RMID 标记删除该共享内存 - buf:是一个结构体,保存共享内存的相关信息 */ int shmctl(int shmid, int cmd, struct shmid_ds *buf)
标记删除:该共享内存正在使用(连接数不为0),该共享内存不会立即被删除,先将其键值置0,。当该共享内存不在被使用的时候,才会真正的删除。
// 获取共享内存的信息 struct shmid_ds stat; int ret = shmctl(shmid, IPC_STAT, &stat); // 修改共享内存段信息 stat.shm_perm.mode = 0666; ret = shmctl(shmid, IPC_SET, &stat); // 删除共享内存 int ret = shmctl(shmid, IPC_RMID, NULL);
3.2、例:共享内存
写进程
#include <func.h> int main(int argc,char*argv[]) { int ret = 0; // ftok 生成 key 值 key_t key = ftok(".", 1); ERROR_CHECK(key, -1, "ftok"); // 创建共享内存 int shmid = shmget(key, 1024, IPC_CREAT|0666); ERROR_CHECK(shmid, -1, "shmget"); // 将共享内存映射到本进程的地址空间 char *p = (char *)shmat(shmid, NULL, 0); ERROR_CHECK(p, (void*)-1, "shmat"); strcpy(p, "How are you"); // 解除映射 ret = shmdt(p); ERROR_CHECK(ret, -1, "shmdt"); return 0; }
读进程
#include <func.h> int main(int argc,char*argv[]) { int ret = 0; // ftok 生成 key 值 key_t key = ftok(".", 1); ERROR_CHECK(key, -1, "ftok"); // 创建共享内存 int shmid = shmget(key, 1024, IPC_CREAT|0666); ERROR_CHECK(shmid, -1, "shmget"); // 将共享内存映射到本进程的地址空间 char *p = (char *)shmat(shmid, NULL, 0); ERROR_CHECK(p, (void*)-1, "shmat"); puts(p); // 解除映射 ret = shmdt(p); ERROR_CHECK(ret, -1, "shmdt"); // 删除映射 ret = shmctl(shmid, IPC_RMID, NULL); ERROR_CHECK(ret, -1, "shmctl"); // ipcs 查看状态,结束后共享内存删除 while(1); return 0; }
4、信号量
信号量一种用于进程间同步的 IPC,不用于缓存数据。
信号量是一个整型计数器,表示有多少个共享资源可以共享使用。控制信号量的方式有两种原子操作
- p 操作:获取资源,信号量数值减 1。若减 1 后,信号量数值等于 0,则阻塞进程。
- v 操作:归还资源,信号量数值加 1。若加 1 后,信号量数值大于 0,唤醒进程。
信号量的分类
- 互斥信号量(二进制信号量):只有 0 | 1 两种状态的信号量,用于互斥。
- 整型信号量(计数信号量):具有多个资源的信号量,用于同步。
下文以 system V 版本的信号量为例,进行说明。
4.1、信号量的接口
4.1.1、创建信号量
使用接口 semget
创建信号量集合的时候,如果不是采用私有的方式创建信号量集合,那么多个进程传入同一个key来重复使用 semget 时,必须是不能修改信号量集合的大小的。
/* 返回值:成功返回信号量的 id、失败返回 -1 参数 - key: key 值 - nsems: 信号量的个数, - semflg: IPC_CREAT|0600; */ int semget(key_t key, int nsems, int semflg);
4.1.2、控制信号量
/* 返回值:失败返回 -1 参数 - semid:信号量 id - semnum:某个信号量在信号量集合中的索引 union semun{ int val; //val for SETVAL struct semid_ds *buf; //buffer for IPC_STAT,IPC_SET unsigned short *arry; //Array for GETALL,SETALL,0所有元素 } - cmd GETVAL:获取信号量所代表资源的数量,semctl 返回该值 SETVAL:设置信号量所代表的资源数量,semctl 成功返回0 // 创建一个 unsigned short 的数组,数组中的值是各个信号量值的初始值 GETALL:获取所有信号量的各自所代表的资源数量 SETALL:设置所有信号量所代表的资源数量 IPC_STAT:获取信号量集合的状态 IPC_SET:设置信号量集合的状态 IPC_RMID: 删除信号量 */ int semctl(int semid, int semnum, int cmd, ...)
4.1.3、信号量的操作
/* 返回值:成功返回0, 失败-1 参数 - semid: 信号量id - sops: 操作信号的结构体,用户自己声明 struct sembuf { unsigned short sem_num; // 要操作的信号量编号 short sem_op; // 信号量的操作 short sem_flg; // 标志,填0. SEM_UNDO,避免死锁。 }; - nsops: 结构体的数量 */ int semop(int semid, struct sembuf *sops, size_t nsops)
4.2、例:生产者与消费者问题
生产者生产商品,消耗货架;消费者消费商品,归还货架。
#include <head.h> int main(int argc, char **argv) { int ret = 0; // 创建信号量集合 int semid = semget(3000, 2, IPC_CREAT|0666); ERROR_CHECK(semid, -1, "semget"); // 创建 unsigned short 数组来 SETALL unsigned short arr[2] = {0, 5}; // 初始:商品数量 0,货架数量 5 semctl(semid, 0, SETALL, arr); // 0 表示对全部信号量操作 // 信号量的操作函数,数组一次实现两个原子操作 struct sembuf sop[2]; memset(sop, 0, sizeof(sop)); // 父进程:生产者,生产商品,消耗货架 if(fork()){ sop[0].sem_num = 0; // 对信号量 0 操作,即商品 sop[0].sem_op = 1; // 生产商品 sop[0].sem_flg = 0; sop[1].sem_num = 1; // 对信号量 1 操作,即货架 sop[1].sem_op = -1; // 消耗货架 sop[1].sem_flg = 0; while(1){ // 信号量操作 ret = semop(semid, sop, 2); // GETVAL 直接返回信号代表的资源数量 printf("生产者:商品的数量 = %d, 货架的数量 = %d\n", semctl(semid, 0, GETVAL), semctl(semid, 1, GETVAL)); sleep(1); } } // 子进程:消费者,消耗商品,释放货架 else{ sop[0].sem_num = 0; sop[0].sem_op = -1; sop[0].sem_flg = 0; sop[1].sem_num = 1; sop[1].sem_op = 1; sop[1].sem_flg = 0; while(1){ ret = semop(semid, sop, 2); printf("消费者:商品的数量 = %d, 货架的数量 = %d\n", semctl(semid, 0, GETVAL), semctl(semid, 1, GETVAL)); sleep(2); } } return 0; }
5、消息队列
消息队列 MQ 是一种多进程间通信的机制。区别于管道的流式结构,消息队列是一个消息链表,先进先出,并且数据之间是有边界的。通信双方约定好的消息体的数据类型,每个消息体都是固定大小的存储块,如果进程从消息队列中读取了消息体,内核将该消息体删除。
消息队列的使用场景
- 异步处理:消息放入队列但不立即处理,快速返回,减少等待,实现并发处理
- 流量控制(削峰):隔离网关和后端服务,消息队列能够顶住访问压力,后端不会崩溃。
- 系统解耦:独立的扩展和修改队列两边的处理过程
- 缓冲:解决生产和消费消息的处理速度不一致的问题
- 高可用:数据持久化到磁盘,同时提供备份(冗余存储)
这里仅以本地的 system V 版本的消息队列为例说明。
5.1、消息队列接口
5.1.1、创建消息队列
/* 返回值:成功返回消息队列id,失败返回-1 参数 - 参数1;key值 - 参数2:IPC_CREAT|0666 IPC_EXCL */ int msgget(key_t key, int msgflg);
5.1.2、发送消息
/* 返回值:成功返回0, 失败返回-1, 参数 - 参数1:消息队列id - 参数2:该结构体重新声明,第二个结构体成员改成自己需要使用的大小 struct msgbuf { long mtype; // 消息类型 char mtext[1]; // 消息内容 }; - 参数3:发送数据的长度 - 参数4:标志位,填0; */ int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg)
5.1.3、读取消息
/* 返回值:成功返回接收的字节数,失败返回-1 参数 - msqid: 消息队列 id - msgp: 接收的信息保存在该结构体中,需重新声明 - msgsz: 最多接收的数据量 - msgtyp: 指定接收哪个类型的数据,0表示无差别接收,负数取绝对值 - msgflg: 标志位,填0。IPC_NOWAIT不阻塞,立即响应 */ ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
5.1.4、控制队列
/* 返回值:失败返回-1 参数 - 参数1:消息队列id - 参数2:CMD。IPC_RMID 删除消息队列 ... */ int msgctl(int msqid, int cmd, ...);
5.2、例:消息队列
msgsnd.c 发送数据
#include <func.h> // 重新声明 msgbuf 结构体,字符数组按自己的需要修改 typedef struct Mymsgbuf{ long mtype; char mtext[10]; } msgbuf_t; int main(int argc,char*argv[]) { int ret = 0; int msgid = msgget(1000, IPC_CREAT | 0666); msgbuf_t mbuf; memset(&mbuf, 0, sizeof(mbuf)); mbuf.mtype = atoi(argv[1]); strcpy(mbuf.mtext, argv[2]); ret = msgsnd(msgid, &mbuf, strlen(mbuf.mtext), 0); return 0; }
msgrcv_nowait.c 接收数据
#include <func.h> typedef struct Mymsgbuf{ long mtype; char mtext[10]; } msgbuf_t; int main(int argc,char*argv[]) { int ret = 0; int msgid = msgget(1000, IPC_CREAT | 0666); msgbuf_t mbuf; memset(&mbuf, 0, sizeof(mbuf)); long type = atoi(argv[1]); ret = msgrcv(msgid, &mbuf, sizeof(mbuf.mtext), type, IPC_NOWAIT); puts(mbuf.mtext); return 0; }
msgclose.c 删除消息队列
#include <func.h> int main(int argc,char*argv[]) { int ret = 0; int msgid = msgget(1000, IPC_CREAT | 0666); ret = msgctl(msgid, IPC_RMID, NULL); return 0; }
6、socket
Socket 不仅可用于跨网络与不同的主机进程间通信,还可以用于本地主机进程间通信。
/* 返回值:成功返回套接口描述符,失败返回-1 参数 - domain:通信域,ipv4: AF_INET, ipv6: AF_INET6, 本地: 、AF_LOCAL/AF_UNIX - type:服务类型,tcp:SOCK_STREAM 数据流, udp:SOCK_DGRAM 数据报,SOCK_RAW 原始套接字 - pro‐tocol:指定socket使用的协议编号,通常为0 */ int socket(int domain, int type, int protocal)
6.1、socket 网络模型
6.1.1、tcp 流程
服务端
socket - bind - listen - accept while(1) {- recv - send -} - close
客户端
socket - connect - send - recv - close
6.1.2、udp 流程
服务端
服务端必须要先 recvfrom,然后 sendto。udp 非面向连接的,所以最开始不知道对端的 socket,所以先接收对方发来的而数据,并且记录对端的socket
socket - bind while(1) {- recvfrom - sendto -} - close
客户端
socket - sendto - recvfrom - close
6.2、socket API
更多关于 socket 网络编程的相关内容,参考我的博客
socket
创建套接字。
int socket(int domain, int type, int pro‐tocol); /* 返回值:成功返回套接口描述符,失败返回-1 参数 - 参数1 domain:通信域,ipv4: AF_INET ipv6: AF_INET6 - 参数2 type:服务类型,tcp:SOCK_STREAM udp:SOCK_DGRAM - 参数3 pro‐tocol:指定socket使用的协议编号,通常为0 */
bind
绑定本地的 ip 地址和端口号。
/* 返回值:成功返回0,失败返回-1 参数 - sockfd:socket函数的返回值, - addr:将addr指向的socket地址分配给sockfd,使用前强制转换成sockaddr - addrlen:该socket地址的长度 */ int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
listen
创建监听队列存放待处理的客户的连接。
/* 返回值:成功返回0,失败返回-1并设置errno 参数 - sockfd:被监听的sockfd - backlog:同时监听的最大连接数,集内核监听的全连接队列的长度。 内核2.2前: 半连接队列(SYN_RCVD)+ 全连接队列(ESTABLISHED); 内核2.2后和 Mac:全连接队列的数量 */ int listen(int sockfd, int backlog)
accept
从全连接队列里面取出一个节点并为其分配一个 socket。accept 只是从监听队列中取出连接,而不论连接处于何种状态,更不关心任何网络状况的变化。
/* 返回值:成功返回一个新 sockfd 用于与客户端通信,原来的 sockfd 还可以继续监听其他客户端连接请求,失败返回-1。 参数 - sockfd:监听socket,即处于LISTEN状态的socket - addr:获取被接受连接的远端的socket地址 - addrlen:结构体长度的变量指针 */ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
connect
主动与服务器建立连接
/* 返回值:成功返回0,失败返回-1 参数 - sockfd:socket系统调用返回 1 个 socket - addr:服务器监听的 socket 地址 - addrlen:这个地址长度的大小 */ int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen)
close
关闭该连接对应的 fd,将 fd 对应的引用计数减1。当引用计数为0,关闭连接。
/* 返回值:成功返回0,失败返回-1 参数 - fd:待关闭的 socket */ int close(int fd)
shutdown:立即关闭连接
/* 返回值:成功返回0,失败返回-1 参数 - sockfd:被监听的sockfd - how:SHUT_RD,SHUT_WR,SHUT_RDWR,分别对应关闭读端、写端、双端 */ int shutdown(int sockfd, int how);
recv
用于 tcp,从对端接收数据,阻塞性函数
- 将内核缓冲区中的数据拷贝到用户缓冲区
- 同时移走内核缓冲区中的数据
/* 返回值:成功返回实际接收到的字节数;返回0,对端连接断开;返回-1,错误并设置errno 参数 - sockfd - buf:缓冲区的位置 - len:缓冲区的大小 - flags:标志位,0为默认操作 */ ssize_t recv(int sockfd, void *buf, size_t len, int flags)
send
用于 tcp,向对端发送数据
/* 返回值:成功返回成功发送的字节数,失败返回-1。 参数 - sockfd - buf:缓冲区的位置 - len:缓冲区的大小 - flags:标志位,0为默认操作 */ ssize_t send(int sockfd, const void *buf, size_t len, int flags)
注:read | write 和 recv | send 区别
linux所有的设备都可以看成是一个文件,因此可用 read | write 来读写 socket 数据。
- 若 flags = 0,两者无区别。
- 若 flags != 0,可以是下面组合
MSG_DONTROUTE // 不查找路由表 MSG_OOB // 接受或发送带外数据 MSG_PEEK // 查看数据,并不从系统缓冲区移走数据 MSG_WAITALL // 等待任何数据
recvfrom
用于udp,从对端接收数据,并且保存对端的ip和端口
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen) /* 返回值:成功返回读到的字节数,失败返回-1 参数: - 参数1-4:同tcp - 参数5 src_addr:发送端的socket地址 - 参数6 addrlen:指定该地址的长度 */
sendto
向指定的 ip 和端口的对端发送数据
ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen) /* 返回值:成功返回成功发送的字节数,失败返回-1 参数 - 参数1-4:同tcp - 参数5:接收端的socket地址 - 参数6:指定该地址的长度 */