Linux IPC实践(5) --System V消息队列(2)

简介: 消息发送/接收APImsgsnd函数int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);参数   ms...

消息发送/接收API

msgsnd函数

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

参数

   msgid: 由msgget函数返回的消息队列标识码, 也可以是通过ipcs命令查询出来的一个已经存在的消息队列的ID号

   msgp:是一个指针,指针指向准备发送的消息,

   msgsz:是msgp指向的消息长度, 注意:这个长度不含保存消息类型的那个long int长整型

   msgflg:控制着当前消息队列满到达系统上限时将要发生的事情,如果msgflg = IPC_NOWAIT表示队列满不等待,返回EAGAIN错误。

消息结构在两方面受到制约: (1)它必须小于系统规定的上限值(MSGMAX); (2)必须以一个long int长整数开始,接收者函数将利用这个长整数确定消息的类型;

//消息结构参考形式如下:
struct msgbuf
{
    long mtype;       /* message type, must be > 0 */
    char mtext[1];    /* message data, 可以设定为更多的字节数 */
};
/**示例1: 
测试1: 发送消息的最大长度为8192字节, 一旦超过这个值, 则msgsnd出错, 提示 Invalid argument错误;
测试2: 消息队列所能够接收的最大字节数16384字节, 一旦超过这个长度, 如果msgflg为0(阻塞模式), 则进程会一直阻塞下去, 直到有进程来将消息取走; 而如果msgflg为IPC_NOWAIT模式, 则一个字节也不会写入消息队列, 直接出错返回;
**/
int main(int argc, char *argv[])
{
    if (argc != 3)
        err_quit("Usage: ./main <type> <length>");

    int type = atoi(argv[1]);
    int len = atoi(argv[2]);

    int msgid = msgget(0x255, 0666|IPC_CREAT);
    if (msgid == -1)
        err_exit("msgget error");

    struct msgbuf *buf;
    buf = (struct msgbuf *)malloc(len + sizeof(msgbuf::mtype));
    buf->mtype = type;
    if (msgsnd(msgid, buf, len, IPC_NOWAIT) == -1)
        err_exit("msgsnd error");
}

msgrcv函数

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

参数

   msgid: 由msgget函数返回的消息队列标识码

   msgp:是一个指针,指针指向准备接收的消息;

   msgsz:是msgp指向的消息长度,这个长度不含保存消息类型的那个long int长整型

   msgtype:它可以实现接收优先级的简单形式(见下图)

   msgflg:控制着队列中没有相应类型的消息可供接收时将要发生的事(见下图)

返回值:

   成功->返回实际放到接收缓冲区里去的字节数(注意: 此处并不包含msgbuf中的mtype的长度[man-page: msgrcv() returns the number of bytes actually copied into the mtext array.]);

   失败->返回-1;

 

msgtyp参数

msgtyp=0

返回队列第一条信息

msgtyp>0

返回队列第一条类型等于msgtype的消息

msgtyp<0

返回队列第一条类型小于等于(<=)msgtype绝对值的消息,并且是满足条件的消息类型最小的消息(按照类型进行排序的顺序进行接收消息)

 

msgflg参数

msgflg=IPC_NOWAIT

队列没有可读消息不等待,返回ENOMSG错误。

msgflg=MSG_NOERROR

消息大小超过msgsz(msgrcv 函数的第三个参数)时被截断, 并且不会报错

msgtyp>0且msgflg=MSG_EXCEPT

接收类型不等于msgtype的第一条消息

/** 示例2: 消息接收(配合示例1中程序使用)
说明: 	-t [number], 指定接收消息的类型, 类型为number的值
-n ,指定以IPC_NOWAIT模式接收消息
**/
int main(int argc, char *argv[])
{
    /** 解析参数 **/
    int type = 0;
    int flag = 0;
    int opt;
    while ((opt = getopt(argc, argv, "nt:")) != -1)
    {
        switch (opt)
        {
        case 'n':   // 指定IPC_NOWAIT选项
            flag |= IPC_NOWAIT;
            break;
        case 't':   // 指定接收的类型, 如果为0的话,说明是按照顺序接收
            type = atoi(optarg);
            break;
        default:
            exit(EXIT_FAILURE);
        }
    }

    int msgid = msgget(0x255, 0);
    if (msgid == -1)
        err_exit("msgget error");

    const int MSGMAX = 8192;    //指定一条消息的最大长度
    struct msgbuf *buf;
    buf = (struct msgbuf *)malloc(MSGMAX + sizeof(buf->mtype));

    ssize_t nrcv;
    if ((nrcv = msgrcv(msgid, buf, MSGMAX, type, flag)) == -1)
        err_exit("msgrcv error");
    cout << "recv " << nrcv << " bytes, type = " << buf->mtype << endl;
}

/** 综合示例: msgsnd/msgrcv, 消息发送/接收实践 **/
//1. 消息发送
int main()
{
    int msgid = msgget(0x1234,0666|IPC_CREAT);
    if (msgid == -1)
        err_exit("msgget error");

    struct msgBuf myBuffer;
    for (int i = 0; i < 128; ++i)
    {
        myBuffer.mtype = i+1;
        sprintf(myBuffer.mtext,"Hello, My number is %d",i+1);
        if (msgsnd(msgid,&myBuffer,strlen(myBuffer.mtext),IPC_NOWAIT) == -1)
            err_exit("msgsnd error");
    }
}
//2. 消息接收:从队首不断的取数据
int main(int argc, char *argv[])
{
    int msgid = msgget(0x1234, 0);
    if (msgid == -1)
        err_exit("msgget error");

    struct msgBuf buf;
    ssize_t nrcv;
    while ((nrcv = msgrcv(msgid, &buf, sizeof(buf.mtext), 0, IPC_NOWAIT)) > 0)
    {
        cout << "recv " << nrcv << " bytes, type: " << buf.mtype
        << ", message: " << buf.mtext << endl;
    }
}

[附]-getopt函数的用法

#include <unistd.h>
int getopt(int argc, char * const argv[],
                  const char *optstring);

extern char *optarg;
extern int optind, opterr, optopt;
//示例: 解析 ./main -n -t 3 中的参数选项
int main(int argc, char *argv[])
{
    while (true)
    {
        int opt = getopt(argc, argv, "nt:");
        if (opt == '?')
            exit(EXIT_FAILURE);
        else if (opt == -1)
            break;

        switch (opt)
        {
        case 'n':
            cout << "-n" << endl;
            break;
        case 't':
            int n = atoi(optarg);
            cout << "-t " << n << endl;
            break;
        }
    }
}

目录
相关文章
|
消息中间件 存储 监控
活动实践 | 快速体验云消息队列RocketMQ版
本方案介绍如何使用阿里云消息队列RocketMQ版Serverless实例进行消息管理。主要步骤包括获取接入点、创建Topic和订阅组、收发消息、查看消息轨迹及仪表盘监控。通过这些操作,用户可以轻松实现消息的全生命周期管理,确保消息收发的高效与可靠。此外,还提供了消费验证、下载消息等功能,方便用户进行详细的消息处理与调试。
|
10月前
|
监控 Linux 应用服务中间件
Linux多节点多硬盘部署MinIO:分布式MinIO集群部署指南搭建高可用架构实践
通过以上步骤,已成功基于已有的 MinIO 服务,扩展为一个 MinIO 集群。该集群具有高可用性和容错性,适合生产环境使用。如果有任何问题,请检查日志或参考MinIO 官方文档。作者联系方式vx:2743642415。
3411 57
|
消息中间件 监控 数据挖掘
【有奖实践】轻量消息队列(原 MNS)订阅 OSS 事件实时处理文件变动
当你需要对对象存储 OSS(Object Storage Service)中的文件变动进行实时处理、同步、监听、业务触发、日志记录等操作时,你可以通过设置 OSS 的事件通知规则,自定义关注的文件,并将 OSS 事件推送到轻量消息队列(原 MNS)的队列或主题中,开发者的服务即可及时收到相关通知,并通过消费消息进行后续的业务处理。
316 95
|
消息中间件 Linux
Linux中的System V通信标准--共享内存、消息队列以及信号量
希望本文能帮助您更好地理解和应用System V IPC机制,构建高效的Linux应用程序。
504 48
|
消息中间件 对象存储
轻量消息队列(原 MNS)订阅 OSS 事件实践
使用轻量消息队列订阅OSS事件,实时处理文件变动,赢取ins风U型枕(限量500个)。访问活动页面,完成实操并上传截图即可参与领奖。活动时间:即日起至2025年2月28日16:00。奖品数量有限,先到先得,快来报名吧!
270 2
|
消息中间件 Linux
Linux:进程间通信(共享内存详细讲解以及小项目使用和相关指令、消息队列、信号量)
通过上述讲解和代码示例,您可以理解和实现Linux系统中的进程间通信机制,包括共享内存、消息队列和信号量。这些机制在实际开发中非常重要,能够提高系统的并发处理能力和数据通信效率。希望本文能为您的学习和开发提供实用的指导和帮助。
932 20
|
Linux
【Linux】System V信号量详解以及semget()、semctl()和semop()函数讲解
System V信号量的概念及其在Linux中的使用,包括 `semget()`、`semctl()`和 `semop()`函数的具体使用方法。通过实际代码示例,演示了如何创建、初始化和使用信号量进行进程间同步。掌握这些知识,可以有效解决多进程编程中的同步问题,提高程序的可靠性和稳定性。
861 19
|
消息中间件 Java 开发工具
【实践】快速学会使用云消息队列RabbitMQ版
本次分享的主题是快速学会使用云消息队列RabbitMQ版的实践。内容包括:如何创建和配置RabbitMQ实例,如Vhost、Exchange、Queue等;如何通过阿里云控制台管理静态用户名密码和AccessKey;以及如何使用RabbitMQ开源客户端进行消息生产和消费测试。最后介绍了实验资源的回收步骤,确保资源合理利用。通过详细的操作指南,帮助用户快速上手并掌握RabbitMQ的使用方法。
1129 10
|
缓存 监控 网络协议
Linux操作系统的内核优化与实践####
本文旨在探讨Linux操作系统内核的优化策略与实际应用案例,深入分析内核参数调优、编译选项配置及实时性能监控的方法。通过具体实例讲解如何根据不同应用场景调整内核设置,以提升系统性能和稳定性,为系统管理员和技术爱好者提供实用的优化指南。 ####