Linux IPC实践(7) --Posix消息队列

简介: 1. 创建/获取一个消息队列#include /* For O_* constants */#include /* For mode constan...

1. 创建/获取一个消息队列

#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
mqd_t mq_open(const char *name, int oflag);	//专用于打开一个消息队列
mqd_t mq_open(const char *name, int oflag, mode_t mode,
              struct mq_attr *attr);

参数:

   name:  消息队列名字;

   oflag: 与open函数类型, 可以是O_RDONLY, O_WRONLY, O_RDWR, 还可以按位或上O_CREAT, O_EXCL, O_NONBLOCK.

   mode: 如果oflag指定了O_CREAT, 需要指定mode参数;

   attr: 指定消息队列的属性;

返回值:

   成功: 返回消息队列文件描述符;

   失败: 返回-1;


注意-Posix IPC名字限制:

   1. 必须以”/”开头, 并且后面不能还有”/”, 形如:/file-name;

   2. 名字长度不能超过NAME_MAX

   3. 链接时:Link with -lrt.

/** System V 消息队列

通过msgget来创建/打开消息队列

int msgget(key_t key, int msgflg);

**/

 

2. 关闭一个消息队列

int mq_close(mqd_t mqdes);
/** System V 消息队列没有类似的该函数调用**/

3. 删除一个消息队列

int mq_unlink(const char *name);
/** System V 消息队列
通过msgctl函数, 并将cmd指定为IPC_RMID来实现
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
**/
//示例
int main()
{
    mqd_t mqid = mq_open("/abc", O_CREAT|O_RDONLY, 0666, NULL);
    if (mqid == -1)
        err_exit("mq_open error");
    cout << "mq_open success" << endl;
    mq_close(mqid);
    mq_unlink("/abc");
    cout << "unlink success" << endl;
}

4. 获取/设置消息队列属性

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, struct mq_attr *newattr,
                        struct mq_attr *oldattr);

参数:

   newattr: 需要设置的属性

   oldattr: 原来的属性

//struct mq_attr结构体说明
struct mq_attr
{
    long mq_flags;       /* Flags: 0 or O_NONBLOCK */
    long mq_maxmsg;      /* Max. # of messages on queue: 消息队列能够保存的消息数 */
    long mq_msgsize;     /* Max. message size (bytes): 消息的最大长度 */
    long mq_curmsgs;     /* # of messages currently in queue: 消息队列当前保存的消息数 */
};

/** System V 消息队列

通过msgctl函数, 并将cmd指定为IPC_STAT/IPC_SET来实现

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

**/

/** 示例: 获取消息队列的属性
**/
int main(int argc,char **argv)
{
    mqd_t mqid = mq_open("/test", O_RDONLY|O_CREAT, 0666, NULL);
    if (mqid == -1)
        err_exit("mq_open error");

    struct mq_attr attr;
    if (mq_getattr(mqid, &attr) == -1)
        err_exit("mq_getattr error");
    cout << "Max messages on queue: " << attr.mq_maxmsg << endl;
    cout << "Max message size: " << attr.mq_msgsize << endl;
    cout << "current messages: " << attr.mq_curmsgs << endl;

    mq_close(mqid);
    return 0;
}

5. 发送消息

int mq_send(mqd_t mqdes, const char *msg_ptr,
           size_t msg_len, unsigned msg_prio);

参数:

   msg_ptr: 指向需要发送的消息的指针

   msg_len: 消息长度

   msg_prio: 消息的优先级

/** System V 消息队列

通过msgsnd函数来实现消息发送

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

**/

/** 示例: 向消息队列中发送消息, prio需要从命令行参数中读取 **/
struct Student
{
    char name[36];
    int age;
};
int main(int argc,char **argv)
{
    if (argc != 2)
        err_quit("./send <prio>");

    mqd_t mqid = mq_open("/test", O_WRONLY|O_CREAT, 0666, NULL);
    if (mqid == -1)
        err_exit("mq_open error");

    struct Student stu = {"xiaofang", 23};
    unsigned prio = atoi(argv[1]);
    if (mq_send(mqid, (const char *)&stu, sizeof(stu), prio) == -1)
        err_exit("mq_send error");

    mq_close(mqid);
    return 0;
}

6. 从消息队列中读取消息

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr,
                       size_t msg_len, unsigned *msg_prio);

参数:

  msg_len: 读取的消息的长度, 注意: 此值一定要等于mq_attr::mq_msgsize的值, 该值可以通过mq_getattr获取, 但一般是8192字节     [this must be greater than the mq_msgsize attribute of the queue (see mq_getattr(3)).]

  msg_prio: 保存获取的消息的优先级

返回值:

  成功: 返回读取的消息的字节数

  失败: 返回-1

  注意: 读取的永远是消息队列中优先级最高的最早的消息, 如果消息队列为, 如果不指定为非阻塞模式, 则mq_receive会阻塞;

/** System V 消息队列

通过msgrcv函数来实现消息发送的

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

**/

/** 示例: 从消息队列中获取消息 **/
int main(int argc,char **argv)
{
    mqd_t mqid = mq_open("/test", O_RDONLY);
    if (mqid == -1)
        err_exit("mq_open error");

    struct Student buf;
    int nrcv;
    unsigned prio;
    struct mq_attr attr;
    if (mq_getattr(mqid, &attr) == -1)
        err_exit("mq_getattr error");

    if ((nrcv = mq_receive(mqid, (char *)&buf, attr.mq_msgsize, &prio)) == -1)
        err_exit("mq_receive error");

    cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: "
         << buf.name << ", age: " << buf.age << endl;

    mq_close(mqid);
    return 0;
}

7. 建立/删除消息到达通知事件

int mq_notify(mqd_t mqdes, const struct sigevent *sevp);

参数sevp:

   NULL: 表示撤销已注册通知;

   非空: 表示当消息到达且消息队列当前为空, 那么将得到通知;

通知方式:

   1. 产生一个信号, 需要自己绑定

   2. 创建一个线程, 执行指定的函数

注意: 这种注册的方式只是在消息队列从空到非空时才产生消息通知事件, 而且这种注册方式是一次性的!

//sigevent结构体
struct sigevent
{
    int          sigev_notify; /* Notification method */
    int          sigev_signo;  /* Notification signal */
    union sigval sigev_value;  /* Data passed with notification */
    void       (*sigev_notify_function) (union sigval);  /* Function used for thread notification (SIGEV_THREAD) */
    void        *sigev_notify_attributes; /* Attributes for notification thread (SIGEV_THREAD) */
    pid_t        sigev_notify_thread_id; /* ID of thread to signal (SIGEV_THREAD_ID) */
};
union sigval            /* Data passed with notification */
{
    int     sival_int;         /* Integer value */
    void   *sival_ptr;         /* Pointer value */
};

sigev_notify代表通知的方式: 一般常用两种取值:SIGEV_SIGNAL, 以信号方式通知; SIGEV_THREAD, 以线程方式通知

如果以信号方式通知: 则需要设定一下两个参数:

   sigev_signo: 信号的代码

   sigev_value: 信号的附加数据(实时信号)

如果以线程方式通知: 则需要设定以下两个参数:

   sigev_notify_function

   sigev_notify_attributes

/** Posix IPC所特有的功能, System V没有 **/

/**示例: 将下面程序多运行几遍, 尤其是当消息队列”从空->非空”, 多次”从空->非空”, 当消息队列不空时运行该程序时, 观察该程序的状态;
**/
mqd_t mqid;
long size;
void sigHandlerForUSR1(int signo)
{
    //将数据的读取转移到对信号SIGUSR1的响应函数中来
    struct Student buf;
    int nrcv;
    unsigned prio;
    if ((nrcv = mq_receive(mqid, (char *)&buf, size, &prio)) == -1)
        err_exit("mq_receive error");

    cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: "
         << buf.name << ", age: " << buf.age << endl;
}

int main(int argc,char **argv)
{
    // 安装信号响应函数
    if (signal(SIGUSR1, sigHandlerForUSR1) == SIG_ERR)
        err_exit("signal error");

    mqid = mq_open("/test", O_RDONLY);
    if (mqid == -1)
        err_exit("mq_open error");

    // 获取消息的最大长度
    struct mq_attr attr;
    if (mq_getattr(mqid, &attr) == -1)
        err_exit("mq_getattr error");
    size = attr.mq_msgsize;

    // 注册消息到达通知事件
    struct sigevent event;
    event.sigev_notify = SIGEV_SIGNAL;  //指定以信号方式通知
    event.sigev_signo = SIGUSR1;        //指定以SIGUSR1通知
    if (mq_notify(mqid, &event) == -1)
        err_exit("mq_notify error");

    //死循环, 等待信号到来
    while (true)
        pause();

    mq_close(mqid);
    return 0;
}
/** 示例:多次注册notify, 这样就能过多次接收消息, 但是还是不能从队列非空的时候进行接收, 将程序改造如下:
**/
mqd_t mqid;
long size;
struct sigevent event;
void sigHandlerForUSR1(int signo)
{
    // 注意: 是在消息被读走之前进行注册,
    // 不然该程序就感应不到消息队列"从空->非空"的一个过程变化了
    if (mq_notify(mqid, &event) == -1)
        err_exit("mq_notify error");

    //将数据的读取转移到对信号SIGUSR1的响应函数中来
    struct Student buf;
    int nrcv;
    unsigned prio;
    if ((nrcv = mq_receive(mqid, (char *)&buf, size, &prio)) == -1)
        err_exit("mq_receive error");

    cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: "
         << buf.name << ", age: " << buf.age << endl;
}

int main(int argc,char **argv)
{
    // 安装信号响应函数
    if (signal(SIGUSR1, sigHandlerForUSR1) == SIG_ERR)
        err_exit("signal error");

    mqid = mq_open("/test", O_RDONLY);
    if (mqid == -1)
        err_exit("mq_open error");

    // 获取消息的最大长度
    struct mq_attr attr;
    if (mq_getattr(mqid, &attr) == -1)
        err_exit("mq_getattr error");
    size = attr.mq_msgsize;

    // 注册消息到达通知事件
    event.sigev_notify = SIGEV_SIGNAL;  //指定以信号方式通知
    event.sigev_signo = SIGUSR1;        //指定以SIGUSR1通知
    if (mq_notify(mqid, &event) == -1)
        err_exit("mq_notify error");

    //死循环, 等待信号到来
    while (true)
        pause();

    mq_close(mqid);
    return 0;
}

mq_notify 注意点总结:

   1. 任何时刻只能有一个进程可以被注册为接收某个给定队列的通知;

   2. 当有一个消息到达某个先前为空的队列, 而且已有一个进程被注册为接收该队列的通知时, 只有没有任何线程阻塞在该队列的mq_receive调用的前提下, 通知才会发出;

   3. 当通知被发送给它的注册进程时, 该进程的注册被撤销. 进程必须再次调用mq_notify以重新注册(如果需要的话),但是要注意: 重新注册要放在从消息队列读出消息之前而不是之后(如同示例程序);

 

附-查看已经成功创建的Posix消息队列

#其存在与一个虚拟文件系统中, 需要将其挂载到系统中才能查看

  Mounting the message queue filesystem On Linux, message queues are created in a virtual filesystem.  

(Other implementations may also  provide such a feature, but the details are likely to differ.)  This 

file system can be mounted (by the superuser, 注意是使用root用户才能成功) using the following commands:

mkdir /dev/mqueue

mount -t mqueue none /dev/mqueue

还可以使用cat查看该消息队列的状态, rm删除:

cat /dev/mqueue/abc 

rm abc

还可umount该文件系统

umount /dev/mqueue

 

附-Makefile

.PHONY: clean all
CC = g++
CPPFLAGS = -Wall -g
BIN = main
SOURCES = $(BIN.=.cpp)
all: $(BIN)

%.o: %.c
	$(CC) $(CPPFLAGS) -c $^ -o $@
main: main.o
	$(CC) $(CPPFLAGS) $^ -lrt -o $@

clean:
	-rm -rf $(BIN) *.o bin/ obj/ core

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 Linux
Linux中的System V通信标准--共享内存、消息队列以及信号量
希望本文能帮助您更好地理解和应用System V IPC机制,构建高效的Linux应用程序。
479 48
|
消息中间件 Linux
Linux:进程间通信(共享内存详细讲解以及小项目使用和相关指令、消息队列、信号量)
通过上述讲解和代码示例,您可以理解和实现Linux系统中的进程间通信机制,包括共享内存、消息队列和信号量。这些机制在实际开发中非常重要,能够提高系统的并发处理能力和数据通信效率。希望本文能为您的学习和开发提供实用的指导和帮助。
901 20
|
消息中间件 存储 Linux
|
消息中间件 Linux API
Linux c/c++之IPC进程间通信
这篇文章详细介绍了Linux下C/C++进程间通信(IPC)的三种主要技术:共享内存、消息队列和信号量,包括它们的编程模型、API函数原型、优势与缺点,并通过示例代码展示了它们的创建、使用和管理方法。
400 0
Linux c/c++之IPC进程间通信
|
消息中间件 Linux 开发者
Linux进程间通信秘籍:管道、消息队列、信号量,一文让你彻底解锁!
【8月更文挑战第25天】本文概述了Linux系统中常用的五种进程间通信(IPC)模式:管道、消息队列、信号量、共享内存与套接字。通过示例代码展示了每种模式的应用场景。了解这些IPC机制及其特点有助于开发者根据具体需求选择合适的通信方式,促进多进程间的高效协作。
671 3
|
消息中间件 物联网 Linux
Linux怎么安装czmq(物联网消息通讯轻量级消息队列)
Linux怎么安装czmq(物联网消息通讯轻量级消息队列)
170 8
|
开发者 API Windows
从怀旧到革新:看WinForms如何在保持向后兼容性的前提下,借助.NET新平台的力量实现自我进化与应用现代化,让经典桌面应用焕发第二春——我们的WinForms应用转型之路深度剖析
【8月更文挑战第31天】在Windows桌面应用开发中,Windows Forms(WinForms)依然是许多开发者的首选。尽管.NET Framework已演进至.NET 5 及更高版本,WinForms 仍作为核心组件保留,支持现有代码库的同时引入新特性。开发者可将项目迁移至.NET Core,享受性能提升和跨平台能力。迁移时需注意API变更,确保应用平稳过渡。通过自定义样式或第三方控件库,还可增强视觉效果。结合.NET新功能,WinForms 应用不仅能延续既有投资,还能焕发新生。 示例代码展示了如何在.NET Core中创建包含按钮和标签的基本窗口,实现简单的用户交互。
378 0
|
消息中间件 运维 监控
Linux命令ipcs详解:IPC对象的全面洞察
`ipcs`命令详解:Linux下用于洞察IPC(消息队列、信号量、共享内存)对象的工具。它列出系统中的IPC资源,显示详细信息,如ID、所有者、权限等。参数如`-m`、`-q`、`-s`分别显示共享内存、消息队列和信号量信息。结合`-l`或`-c`可调整输出格式。定期检查IPC状态有助于系统管理和性能优化。需注意权限和谨慎操作。
|
消息中间件 Linux 数据处理
Linux命令ipcrm详解:轻松管理IPC对象
`ipcrm`是Linux下用于删除IPC(进程间通信)对象的命令,如消息队列、共享内存和信号量。它通过指定对象ID或键值进行操作,如`-m ID`删除共享内存,`-q ID`删除消息队列,`-s ID`删除信号量。使用时需注意确认对象未被使用,以免影响系统运行。结合`ipcs`命令检查对象详情,并可定期清理不再需要的IPC对象以优化系统资源。
|
消息中间件 Linux
【Linux】进程间通信——system V(共享内存 | 消息队列 | 信号量)(下)
【Linux】进程间通信——system V(共享内存 | 消息队列 | 信号量)(下)
175 0