一、消息队列的原理
消息队列是Linux的一种通信机制,这种通信机制传递的数据具有某种结构,而不是简单的字节流。
在Linux
内核我们可以创建一个队列结构,然后我们可以将我们需要发送和读取的数据插入这个队列里面,多个不同的进程可以通过相同的key
值找到相同的队列。
对于消息队列来说:无论发送消息的进程还是接收消息的进程,都需要在进程空间中用消息缓冲区来暂存消息,然后向消息队列写入或读取数据时也按照结构体的方式来进行写入和读取的!对于进程来说消息缓冲区的结构定义一般如下:
由于结构体中有一个mtype
类型,这个字段可以帮助我们区分是哪一个进程写入的,消息队列里面可以让多个不同的进程写入数据,多个不同的进程读取数据,因此消息队列是全双工通信,可读可写。
消息队列的内核结构
- 消息队列的本质其实是一个内核提供的链表,内核基于这个链表,实现了一个数据结构。
- 向消息队列中写数据,实际上是向这个数据结构中插入一个新结点;从消息队列读数据,实际上是从这个数据结构中删除一个结点。
- 和管道一样,每个消息的最大长度是有上限的(MSGMAX),每个消息队列的总字节数也是有上限的(MSGMNB),系统上的消息队列总数也是有上限的(MSGMNI)
- 消息队列是一个全双工通信,可读可写。
- 消息队列的生命周期是随内核的,即进程退出以后消息队列不会消失!
二、消息队列的使用
和共享内存一样,消息队列的使用也涉及很多的系统调用,而且它们的调用接口都是类似的,下面我们开始进行讲解。
1、msgget函数
此函数用于帮我们创建消息队列,创建完毕以后会给我们返回一个消息队列的标识符。
- 参数:
- 与共享内存一样,是一个
key
值,可以通过ftok
函数进行获取 - 是一个标志位,主要有三个标志:
IPC_CREAT
和IPC_EXCL
及mode_flags
其含义与共享内存一致。
- 返回值:调用成功,返回一个和参数
key
相关联的消息队列的标识符,调用失败就返回-1
,错误码被设置。
2、msgctl函数
此函数的功能很强大,里面有许多的标志位,可以完成许多不同的工作,这个函数主要用来控制消息队列。
- 参数
- 来自于
msgget
函数得到的消息队列标识符。 - 是标志位,里面有许多标志,我们经常使用的是这两个:
IPC_RMID
,IPC_STAT
- 一个
struct msqid_ds
类型的指针,在标志位中设置了IPC_STAT
,指针所指向的变量里面就能拿到相关的内核信息,如果不关心内核信息可以设置为nullptr
- 返回值:一般来说,成功返回是
0
,错误返回结果是-1
。
3、msgsnd函数
此函数可以将我们要通信的消息放入消息队列里面,类似与Linux
文件操作中的write
函数。
- 参数
- 来自于
msgget
函数得到的消息队列标识符。 - 要写入的消息的指针,由于消息队列支持多个进程进行写入,在向消息队列里面写数据时,用户自己要组织一个结构体,然后将这个结构体对象当成一条消息进行写入。实际中对于此参数的结构体常常这样定义:
- 发送的消息正文的字节数,注意这里的是指正文内容
mtext
里面数据的字节数。 - 标志位,
IPC_NOWAIT
消息没有发送完成函数也会立即返回,0
:直到发送完成函数才返回。
- 返回值:成功返回是
0
,错误返回结果是-1
。
4、msgrcv函数
这个函数的作用可以帮助我们从消息队列里面取出数据,类似与Linux
文件操作中的read
函数。
- 参数:
- 来自于
msgget
函数得到的消息队列标识符。 - 读取到的数据要放到哪里,这里还是填我们自定义的结构体对象。
- 要读取的正文字节数。
- 消息队列里面的消息的区分类型
mtype
,选择你想要读取的类型的数据 - 标准位,
IPC_NOWAIT
,非阻塞等待,若没有消息,进程会立即返回-1
。0
,阻塞等待。
- 返回值:
如果读取成功就返回读取到的字节数,如果读取失败就返回-1
,错误码被设置。
5、代码实例
command.hpp
#ifndef __COMMAND_HPP #define __COMMAND_HPP #include <iostream> #include <string> #include <cstdlib> #include <cstring> #include <cerrno> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> const std::string pathname = "./"; const int proj_id = 10; struct msgqbuf { long mtype; // 该字段用于区分是哪一个进程写的数据 , message type, must be > 0 char mtext[]; // 该字段才是正文内容, 这里可以给变长数组,也可以给一个确定大小的数组 }; #endif
msgserver.cpp
#include "command.hpp" int main() { // 1.生成key值 key_t key = ftok(pathname.c_str(), proj_id); if (key == -1) { std::cerr << "错误码 : " << errno << " " << strerror(errno) << std::endl; return -1; } // 2.创建消息队列 int msqid = msgget(key, IPC_CREAT | IPC_EXCL | 0664); if (msqid == -1) { std::cerr << "错误码 : " << errno << " " << strerror(errno) << std::endl; return -1; } std::cout << "消息队列创建成功" << std::endl; // 3.进行进程间通信 // 读取消息 msgqbuf* pmb_r = (msgqbuf*)malloc(sizeof(msgqbuf) + sizeof(char) * 25); pmb_r->mtype = 1; int readnum = msgrcv(msqid, pmb_r, sizeof(char) * 25, pmb_r->mtype, 0); if (readnum == -1) { std::cerr << "错误码 : " << errno << " " << strerror(errno) << std::endl; } else if (readnum > 0) { std::cout << "读取到了" << readnum << "字节" << std::endl; std::cout << "消息的类型: " << pmb_r->mtype << " 消息的内容: " << pmb_r->mtext << std::endl; } // 4.删除消息队列 int err = msgctl(msqid, IPC_RMID, nullptr); if (err == -1) { std::cerr << "错误码 : " << errno << " " << strerror(errno) << std::endl; } free(pmb_r); return 0; }
msgclient.cpp
#include "command.hpp" int main() { // 1.生成key值 key_t key = ftok(pathname.c_str(), proj_id); if (key == -1) { std::cerr << "错误码 : " << errno << " " << strerror(errno) << std::endl; return -1; } // 2.创建消息队列 int msqid = msgget(key, IPC_CREAT); if (msqid == -1) { std::cerr << "错误码 : " << errno << " " << strerror(errno) << std::endl; return -1; } // 3.进行进程间通信 // 3.1向消息队列发送消息 msgqbuf* pmb_w = (msgqbuf*)malloc(sizeof(msgqbuf) + sizeof(char) * 25); if (!pmb_w) { std::cerr << "错误码 : " << errno << " " << strerror(errno) << std::endl; } // 设置消息的类型与数据 pmb_w->mtype = 1; std::string s = "hello linux\n"; int i = 0; for (auto& e : s) { pmb_w->mtext[i++] = e; } // 发送消息 int err = msgsnd(msqid, pmb_w, strlen(pmb_w->mtext), 0); if (err == -1) { std::cerr << "错误码 : " << errno << " " << strerror(errno) << std::endl; } free(pmb_w); return 0; }
要先运行msgserver
创建消息队列,然后运行msgclient
进行通信。
运行结果: