Linux进程间通信(IPC) Linux消息队列:讲解POSIX消息队列在Linux系统进程间通信中的应用和实践

简介: Linux进程间通信(IPC) Linux消息队列:讲解POSIX消息队列在Linux系统进程间通信中的应用和实践


  • 消息队列介绍

在应用开发中,生产者,消费者的模型非常常见,一方产生数据并把数据放入队列中,而另一方从队列中取数据,先进先出。

同样,在操作系统内核中,也实现了类似的功能,队列中存放的是“消息”。称之为消息队列,消息也可理解为数据。

主要用途是进程间通信(IPC),所谓通信,就是进行数据交互。

消息队列中的每条消息通常具有以下属性:

  • 一个无符号整数优先级(Posix)或一个长整型类型(System V)
  • 消息的数据部分长度(可以为0)
  • 数据本身(如果长度大于0的话.)

posix相关接口函数

Library interface  System call
mq_close(3) close(2)
mq_getattr(3) mq_getsetattr(2)
mq_notify(3) mq_notify(2)
mq_open(3) mq_open(2)
mq_receive(3) mq_timedreceive(2)
mq_send(3) mq_timedsend(2)
mq_setattr(3)   mq_getsetattr(2)
mq_timedreceive(3) mq_timedreceive(2)
mq_timedsend(3) mq_timedsend(2)
mq_unlink(3) mq_unlink(2)
  • Headers file
//Link with -lrt.
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
#include <time.h>
  • Open/Close mqueue
 //打开/创建 消息队列,消息队列名称前面必须加上斜杆。
 mqd_t mq_open(const char *name, int oflag);
 mqd_t mq_open(const char *name, int oflag, mode_t mode,struct mq_attr *attr);
 
 
 //关闭消息队列
 int mq_close(mqd_t mqdes);
 //删除消息队列
 int mq_unlink(const char *name); //成功返回0,失败返回-1

Parameter Description:

name:表示消息队列的名字,它符合POSIX IPC的名字规则。

oflag:表示打开的方式,和open函数的类似。有必须的选项:O_RDONLY,O_WRONLY,O_RDWR,还有可选的选项:O_NONBLOCK,O_CREAT,O_EXCL。

mode:是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时,才需要提供该参数。表示默认访问权限。可以参考open。

attr:也是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时才需要。该参数用于给新队列设定某些属性,如果是空指针,那么就采用默认属性。

注意:每个消息队列都有一个保存其当前打开着的描述符数的引用计数器,所以使用mq_unlink删除类似删除文件的机制:当一个消息队列的引用计数大于0时,其name就能删除,但是该队列的析构要到最后一个mq_close发生时才能进行.

  • Get/Set mqueue Attributes
//获取/设置消息队列属性
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr,struct mq_attr *oldattr);
struct mq_attr {
    longmq_flags;    /* Flags: 0 or O_NONBLOCK */
 
    longmq_maxmsg;   /* Max. # of messages onqueue,can only be initialized when mq_open is created*/
 
    longmq_msgsize;  /* Max. message size (bytes),can only be initialized when mq_open is created*/
 
    longmq_curmsgs;  /* # of messages currently inqueue */
 
}

Description:

mq_getattr用于获取当前消息队列的属性,mq_setattr用于设置当前消息队列的属性。

其中mq_setattr中的oldattr用于保存修改前的消息队列的属性,可以为空。

mq_setattr可以设置的属性只有mq_flags,用来设置或清除消息队列的非阻塞标志。newattr结构的其他属性被忽略。

mq_maxmsgmq_msgsize属性只能在创建消息队列时通过mq_open来设置。

mq_open只会设置该两个属性,忽略另外两个属性。mq_curmsgs属性只能被获取而不能被设置。

Note:

  • attr.mq_maxmsg 不能超过文件 /proc/sys/fs/mqueue/msg_max 中的数值,我的机器上面是10。
  • attr.mq_msgsize不能超过 /proc/sys/fs/mqueue/msgsize_max 的数值。
  • Send/Receive messages through the message queue
//发送消息
int mq_send(mqd_t mqdes, const char *msg_ptr,size_t msg_len, unsigned int msg_prio);
//接收消息,总是返回指定消息队列最高优先级的最早信息,而且该优先级能随着该消息的内容以及长度一同返回.
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr,size_t msg_len, unsigned int *msg_prio);
 
int mq_timedsend(mqd_t mqdes, const char *msg_ptr,
                     size_t msg_len, unsigned int msg_prio,
                     const struct timespec *abs_timeout);
 
ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr,
                          size_t msg_len, unsigned int *msg_prio,
                          const struct timespec *abs_timeout);

Parameter Description:

mqdes:消息队列描述符;

msg_ptr:指向消息体缓冲区的指针;

msg_len:消息体的长度,其中mq_receive的该参数不能小于能写入队列中消息的最大大小,即一定要大于等于该队列的mq_attr结构中mq_msgsize的大小。

如果mq_receive中的msg_len小于该值,就会返回EMSGSIZE错误。POXIS消息队列发送的消息长度可以为0。

msg_prio:消息的优先级;它是一个小于MQ_PRIO_MAX的数,数值越大,优先级越高。

POSIX消息队列在调用mq_receive时总是返回队列中最高优先级的最早消息。

如果消息不需要设定优先级,那么可以在mq_send是置msg_prio为0,mq_receive的msg_prio置为NULL。

const struct timespec:设置消息发送/接收的超时时间。

abs_timeout指向一个结构,该结构指定了呼叫将被阻止的时间上限。

此上限是自1970年1月1日00:00:00 +0000(UTC)以来的绝对超时(以秒和纳秒为单位)

如果队列已满,默认情况下mq_send会阻塞,可以使用mq_timedsend函数设置阻塞超时时间。

如果队列为空,默认情况下mq_receive会阻塞,可以使用mq_timedreceive函数设置阻塞超时时间。

由于默认情况下mq_send和mq_receive是阻塞进行调用,也可以通过mq_setattr来设置为O_NONBLOCK.

  • Asynchronous notification of message queue
//建立或者删除消息到达通知事件
int mq_notify(mqd_t mqdes, const struct sigevent *sevp);
 
//sigevent 结构
#include <signal.h>
union sigval {          /* Data passed with notification */
  int  sival_int;         /* Integer value */
    void   *sival_ptr;      /* Pointer value */
};
 
struct sigevent {
  int sigev_notify; /* Notification method */
    int sigev_signo;  /* Notification signal */
    union sigval sigev_value;  /* Data passed with notification */
    void (*sigev_notify_function) (union sigval);/* Function used for thread notification (SIGEV_THREAD) */
    void *sigev_notify_attributes;/* Attributes for notification thread (SIGEV_THREAD) */
    pid_t sigev_notify_thread_id; /* ID of thread to signal (SIGEV_THREAD_ID) */
};

Parameter Description:

  • 如果sevp参数为非空,那么当前进程希望在有一个消息到达所指定的先前为空的对列时得到通知。
  • 如果sevp参数为空,而且当前进程被注册为接收指定队列的通知,那么已存在的注册将被撤销。

sigev_notify :

SIGEV_NONE:空的提醒,事件发生时不做任何事情

SIGEV_SIGNAL:向进程发送sigev_signo中指定的信号,具体详细的状况参照上面的文档,这涉及到sigaction的使用

SIGEV_THREAD:通知进程在一个新的线程中启动sigev_notify_function函数,函数的实参是sigev_value,系统API自动启动一个线程,我们不用显式启动。

Note:

  • 任意时刻只有一个进程可以被注册为接收某个给定队列的通知。
  • 当有一个消息到达先前为空的消息队列,而且已有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的mq_receive调用中的前提下,通知才会发出。即说明,在mq_receive调用中的阻塞比任何通知的注册都优先。
  • 当通知被发送给它的注册进程时,其注册被撤消。进程必须再次调用mq_notify以重新注册注意:重新注册要放在从消息队列读出消息之前而不是之后。

Posix消息队列容许 异步事件通知,以告知何时有一个消息放置到某个空消息队列中,这种通知有两种方式可以选择:

  • 产生一个信号,通过信号处理函数进行处理消息.
    struct sigevent sev = { 0 };
    sev.sigev_notify = SIGEV_THREAD;
    sev.sigev_notify_function = (void *)p; //线程入口函数
    sev.sigev_notify_attributes = NULL;//线程属性,根据需求设置
    sev.sigev_value.sival_ptr = (void *)mqdes;  // 传递给线程的参数
  • 创建一个线程来执行一个指定的函数,在新的线程中处理消息.
    struct sigevent sev = { 0 };
    signal(signo,p);  //设置信号处理函数
    sev.sigev_notify = SIGEV_SIGNAL;
    sev.sigev_signo = signo ;  //信号值

  • 消息队列相关限制

mq_maxmsg:          队列中最大的消息数

mq_msgsize:          给定消息的最大字节数

MQ_OPEN_MAX:    一个进程能够同时拥有的打开着消息队列的最大数目

MQ_PRIO_MAX:     任意消息的最大优先级值+1

POSIX消息队列本身的限制就是mq_attr中的mq_maxmsg和mq_msgsize

这两个参数可以在调用mq_open创建一个消息队列的时候设定。当这个设定是受到系统内核限制的。

MQ_OPEN_MAXMQ_PRIO_MAX 定义在<unistd.h>中,可以在运行的时候听过调用sysconf函数获取.

ulimit -a |grep message
# POSIX message queues     (bytes, -q) 819200
# 限制大小为800KB,该大小是整个消息队列的大小,不仅仅是最大消息数*消息的最大大小;还包括消息队列的额外开销。
 
# POSIX消息队列默认的最大消息数和消息的最大大小分别为:
# mq_maxmsg = 10
# mq_msgsize = 8192
  • Posix消息队列与System V消息队列的区别
  1. 对Posix消息队列的读总是返回最高优先级的最早信息.
  2. 对System V消息队列的读取则可以返回任意指定优先级的消息.


  1. 当往一个空队列放置一个消息时,Posix消息队列允许产生一个信号或者启动一个线程;
  2. System V消息队列则不提供类似机制.


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2天前
|
Linux Shell
Linux系统
是对Linux系统进行管理的命令。对于Linux系统来说,无论是中央处理器、内存、磁盘驱动器、键盘、鼠标,还是用户等都是文件,Linux系统管理的命令是它正常运行的核心,与之前的DOS命令类似。linux命令在系统中有两种类型:内置Shell命令和Linux命令。Linux系统
|
1天前
|
Linux Shell
Linux系统
是对Linux系统进行管理的命令。对于Linux系统来说,无论是中央处理器、内存、磁盘驱动器、键盘、鼠标,还是用户等都是文件,Linux系统管理的命令是它正常运行的核心,与之前的DOS命令类似。linux命令在系统中有两种类型:内置Shell命令和Linux命令。
|
2天前
|
运维 监控 Shell
深入理解Linux系统下的Shell脚本编程
【10月更文挑战第24天】本文将深入浅出地介绍Linux系统中Shell脚本的基础知识和实用技巧,帮助读者从零开始学习编写Shell脚本。通过本文的学习,你将能够掌握Shell脚本的基本语法、变量使用、流程控制以及函数定义等核心概念,并学会如何将这些知识应用于实际问题解决中。文章还将展示几个实用的Shell脚本例子,以加深对知识点的理解和应用。无论你是运维人员还是软件开发者,这篇文章都将为你提供强大的Linux自动化工具。
|
1天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
34 0
手撸MQ消息队列——循环数组
|
3月前
|
消息中间件 存储 缓存
一个用过消息队列的人,竟不知为何要用 MQ?
一个用过消息队列的人,竟不知为何要用 MQ?
132 1
|
4月前
|
消息中间件 开发工具 RocketMQ
消息队列 MQ使用问题之一直连接master失败,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Prometheus 监控
消息队列 MQ使用问题之如何将旧集群的store目录迁移到新集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。