system V消息队列

简介: 1.消息队列1)消息队列提供了一个从进程向另外一个进程发送一块是数据的方法2)每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型不足之处:每个消息的最大长度是有限制的。MSGMAX每个消息队列的总的字节数也是有上限。

1.消息队列
1)消息队列提供了一个从进程向另外一个进程发送一块是数据的方法
2)每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型
不足之处:
每个消息的最大长度是有限制的。MSGMAX
每个消息队列的总的字节数也是有上限。MSGMNB
系统上消息队列的总数也有一个上限。MSGMNI
可以这样查看这三个限制:

xcy@xcy-virtual-machine:~$ cat /proc/sys/kernel/msgmax
8192
xcy@xcy-virtual-machine:~$ cat /proc/sys/kernel/msgmnb
16384
xcy@xcy-virtual-machine:~$ cat /proc/sys/kernel/msgmni
32000
xcy@xcy-virtual-machine:~$

2.IPC对象数据结构
内核为每个IPC对象维护了一个数据结构:

           struct ipc_perm {
               key_t          __key;       /* Key supplied to msgget(2) */
               uid_t          uid;         /* Effective UID of owner */
               gid_t          gid;         /* Effective GID of owner */
               uid_t          cuid;        /* Effective UID of creator */
               gid_t          cgid;        /* Effective GID of creator */
               unsigned short mode;        /* Permissions */
               unsigned short __seq;       /* Sequence number */
           };

下面是消息队列的数据结构:

           struct msqid_ds {
               struct ipc_perm msg_perm;     /* Ownership and permissions */
               time_t          msg_stime;    /* Time of last msgsnd(2) */
               time_t          msg_rtime;    /* Time of last msgrcv(2) */
               time_t          msg_ctime;    /* Time of last change */
               unsigned long   __msg_cbytes; /* Current number of bytes in
                                                queue (nonstandard) */
               msgqnum_t       msg_qnum;     /* Current number of messages
                                                in queue */
               msglen_t        msg_qbytes;   /* Maximum number of bytes
                                                allowed in queue */ // 这个就是MSGMNB
               pid_t           msg_lspid;    /* PID of last msgsnd(2) */
               pid_t           msg_lrpid;    /* PID of last msgrcv(2) */
           };

3.如何查看IPC对象
可以看到每个IPC对象都有一个key还有一个id
命令ipcs

如何删除:
ipcrm -Q <key>
ipcrm -q <id>
例如:

xcy@xcy-virtual-machine:~$ ipcrm -Q 0x4d2
xcy@xcy-virtual-machine:~$

 

4.如何操作消息队列

主要有下面几个函数:

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgfl_g); 
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgfl_g);
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,int msgfl_g);

先补充一个自己的头文件comm.h

#ifndef __COMM_H__
#define __COMM_H__


#include<errno.h>
#include<stdlib.h>
#define ERR_EXIT(m) \
    do \
    { \
        perror(m); \
        exit(EXIT_FAILURE); \
    } while(0)


#endif // __COMM_H__

下面单独介绍:
4.1 msgget:
int msgget(key_t key, int msgfl_g); 
功能:用来创建和访问一个消息队列
参数:key:某个消息队列的名字(比如1234)。还可以是IPC_PRIVATE,表示私有的,这样创建出来的消息队列只能用于亲缘进程通信,它的key是0.
msgfl_g:由九个权限标志构成,它们的用法和创建文件时使用的mode模式标志是一样的
返回值:成功返回非负整数,是该消息队列的标识码,失败返回-1.
下面表示创建的大致过程:

 

开始 ->
if(key == IPC_PRIVATE)
{
    if(系统表格满了)    
    {
        // 出错返回
    }
    else
    {
        // 创建成功,返回标识符
    }
}
else // 不是私有的
{
    if(key已经存在)    
    {
        if(IPC_CREAT和IPC_EXCL都设置了)
        {
            // 出错返回
        }
        else
        {
            if(访问权限允许)
            {
                // 成功打开消息队列,返回标识符
            }
            else
            {
                // 出错返回
            }
        }
    }
    else   // key 不存在
    {
        if(IPC_CREAT设置了)
        {
            if(系统表格满了)    
              {
                // 出错返回
            }
            else
            {
                // 创建成功,返回标识符
            }
        }
        else // 不存在就无法打开
        {
            // 出错返回
        }
    }
}
View Code

 

实例:

#include<stdio.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>

#include"comm.h"
int main()
{
    //int msgid = msgget(1234,0666 | IPC_CREAT | IPC_EXCL);// 已存在则失败
    int msgid = msgget(1234,0666 | IPC_CREAT);
    //int msgid = msgget(IPC_PRIVATE,0666 | IPC_CREAT);
    if(msgid < 0)
        ERR_EXIT("msgget");
    printf("msgget success\n");
    printf("msgid = %d\n", msgid);

    return 0;
}
View Code

4.2 msgctl
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
作用:消息队列控制函数
返回值:成功返回0,失败返回-1
参数:msgid:由msgget返回的id
cmd:有三个值可选
IPC_STAT:获取消息队列的状态,把msqid_ds结构中的数据设置为消息队列的当前关联值
IPC_SET:在进程有足够权限的条件下,把消息队列的当前关联值设置为msqid_ds数据结构中给出的值
IPC_RMID:删除消息队列。(此时buf可置为null)
buf:输入参数,根据第二个参数来设置,具体用法参考例子
例子:

 

#include<stdio.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>

#include"comm.h"

void stattest(int msgid)
{
    struct msqid_ds buf;

    int ret = msgctl(msgid, IPC_STAT, &buf);
    if(ret < 0)
        ERR_EXIT("msgctl");
    printf("mode = %o\n", buf.msg_perm.mode);
    printf("bytes = %ld\n", buf.__msg_cbytes);
    printf("number = %d\n", (int)buf.msg_qnum);
    printf("msgmnb = %d\n", (int)buf.msg_qbytes);
}

void rmtest(int msgid)
{
    int ret = msgctl(msgid, IPC_RMID, NULL);
    if(ret < 0)
        ERR_EXIT("msgctl");

    printf("remove msg success\n");
}
void settest(int msgid)
{
    struct msqid_ds buf;

    int ret = msgctl(msgid, IPC_STAT, &buf);
    if(ret < 0)
        ERR_EXIT("msgctl stat");
    sscanf("600", "%o", (unsigned int*)&buf.msg_perm.mode);
    ret = msgctl(msgid, IPC_SET, &buf);
    if(ret < 0)
        ERR_EXIT("msgctl set");
}


int main()
{
    int msgid = msgget(1234,0);
    if(msgid < 0)
        ERR_EXIT("msgget");
    printf("msgget success\n");
    printf("msgid = %d\n", msgid);
    //stattest(msgid);
    //rmtest(msgid);
    settest(msgid);
    return 0;
}
View Code

 

运行,这里仅仅展示stat的例子:

xcy@xcy-virtual-machine:~/test/IPC_MSG$ ./msg_stat // 不存在会出错
msgget: No such file or directory
xcy@xcy-virtual-machine:~/test/IPC_MSG$ ./msg_get  // 先创建
msgget success
msgid = 229376
xcy@xcy-virtual-machine:~/test/IPC_MSG$ ./msg_stat
msgget success
msgid = 229376
mode = 666
bytes = 0
number = 0
msgmnb = 16384
xcy@xcy-virtual-machine:~/test/IPC_MSG$

4.3 msgsnd

int msgsnd(int msqid, const void *msgp,size_t msgsz, int msgfl_g);
作用:把一条消息添加到消息队列中去返回值:成功返回0,失败返回-1
参数:msgid:由msgget返回的idmsgp:一个指针,指向准备发送的消息
msgsz:是msgp指向的消息长度。这个长度不包含保存消息类型那个long int的长整型

msgfl_g:控制位,控制当前消息队列满或到达系统上限时将要发生的事情。
         0表示等待;msgfl_g =IPC_NOWAIT。表示队列满不等待,返回EAGAIN 

下面是消息的封装(参考形式):

struct msgbuf{
        longmtype; // 必须为long型,接收者函数通过这个长整数确定消息的类型        

  char mtext[1]; // 这里存放具体的消息内容,必须小于MSGMAX
};

例子:

 

#include<stdio.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>

#include"comm.h"

struct msgbuf{
    long mtype;
    char mtext[1];
};

int main(int argc, char *argv[])
{
    if(argc != 3)
    {
        printf("Usage:<%s> <type> <len>\n", argv[0]);
        return -1;
    }
    int type = atoi(argv[1]); // 表示发送的类型
    int len = atoi(argv[2]); // 表示发送的数据大小

    int msgid = msgget(1234,0);
    if(msgid < 0)
        ERR_EXIT("msgget");
    printf("msgget success\n");
    printf("msgid = %d\n", msgid);

    struct msgbuf *buf = (struct msgbuf*)malloc(sizeof(buf));
    buf->mtype = type;
    int ret = msgsnd(msgid, (void*)buf, len, 0);// 满了就等待
    //int ret = msgsnd(msgid, (void*)buf, len, IPC_NOWAIT);// 满了就会失败
    if(ret < 0)
        ERR_EXIT("msgsnd");
    return 0;
}
View Code

 

4.4 msgrcv
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,int msgfl_g);作用:从一个消息队列中接收消息
返回值:成功返回实际接收到放到接收缓冲区里去的字符个数,失败返回-1参数:msgid:由msgget返回的id
msgp:一个指针,指向准备接收的消息msgsz:是msgp指向的消息长度,不包含消息类型的长度。
msgtype:它可以实现接收优先级的简单形式msgfl_g:控制着队列中没有相应类型的消息可工接收时将要发生的事情。

下面重点来分析msgtye:

=0:表示返回队列里的第一条消息

>0:返回队列第一条类型等于msgtype的消息。

<0:返回队列第一条类型小于等于msgtype绝对值的消息。

 

关于小于0,man手册里面的东西:
* If  msgtyp  is  less than 0, then the first message in the queue with the lowest type less than or equal to the absolute  value  of  msgtypwill be read.
我的理解是:第一条小于msgtype绝对值的消息将会被读取。实际的测试情况有点不符和,实际情况返回的是小于msgtype绝对值中类型最小的那一个。
比如:发送给消息队列的顺序是 4 5 4 1 78 3 8(这些数字表示类型),获取的是-10。结果却收到的是类型为1的消息。并且假如发送了多个1,收到的会是第一个类型为1的消息。

补充:

msgfl_g=IPC_NOWAIT:队列没有可读消息不等待,返回ENOMSG错误
msgfl_g=MSG_NOERROR:消息大小超过msgsz时被截断
msgtype>0且msgfl_g=MSG_EXCEPT:接收类型不等于msgtype的第一条消息

例子:

#include<stdio.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<unistd.h>

#include"comm.h"

struct msgbuf{
    long mtype;
    char mtext[1];
};

#define MSGMAX 8192

int main(int argc, char *argv[])
{
    int flag = 0;
    int type = 0;
    int opt;
    while(1)
    {
        /* 接受命令行参数。形式如下: ./a.out -n -t 3    
            -n 表示不等待
            -t 3 :表示接收类型为3的消息。
        */
        opt = getopt(argc, argv, "nt:"); // 这个n表示接受n参数(但是-n后面不能接参数),t后面的冒号表示t后面可以接参数。具体用法网上再查一下。
        if(opt == '?')
            exit(EXIT_FAILURE);
        if(opt == -1)
            break;
        switch(opt)
        {
        case 'n':
            flag |= IPC_NOWAIT;
            break;
        case 't':
            type = atoi(optarg);
            break;
        default:
            break;
        }
    }
    int msgid = msgget(1234,0);
    if(msgid < 0)
        ERR_EXIT("msgget");
    printf("msgget success\n");
    printf("msgid = %d\n", msgid);

    struct msgbuf *buf = (struct msgbuf*)malloc(sizeof(long) + MSGMAX);
    buf->mtype = type;
    int ret = msgrcv(msgid, buf, MSGMAX, type, flag);
    if(ret < 0)
        ERR_EXIT("msgsnd");
    printf("recv msg, type:%ld len:%d\n", buf->mtype, ret); // 打印接受到的类型和长度

    return 0;
}
View Code

运行,联合send一起运行:
先是发送:

xcy@xcy-virtual-machine:~/test/IPC_MSG$ ./msg_send
Usage:<./msg_send> <type> <len>
xcy@xcy-virtual-machine:~/test/IPC_MSG$ ./msg_send 5 50
msgget success
msgid = 229376
xcy@xcy-virtual-machine:~/test/IPC_MSG$ ./msg_send 4 40
msgget success
msgid = 229376
xcy@xcy-virtual-machine:~/test/IPC_MSG$ ./msg_send 3 30
msgget success
msgid = 229376
xcy@xcy-virtual-machine:~/test/IPC_MSG$ ./msg_send 2 30
msgget success
msgid = 229376
xcy@xcy-virtual-machine:~/test/IPC_MSG$ ./msg_send 2 40
msgget success
msgid = 229376
xcy@xcy-virtual-machine:~/test/IPC_MSG$ ./msg_send 1 10
msgget success
msgid = 229376

发送了6个消息。类型分别是5 -> 4 -> 3 -> 2 -> 2 -> 1

再来看接收:

xcy@xcy-virtual-machine:~/test/IPC_MSG$ ./msg_recv -n -3  // 这里参数错误了
./msg_recv: invalid option -- '3'
xcy@xcy-virtual-machine:~/test/IPC_MSG$ ./msg_recv -t -3 // -3.表示接收队列中绝对值小于3的类型,并且是最小的
msgget success
msgid = 229376
recv msg, type:1 len:10
xcy@xcy-virtual-machine:~/test/IPC_MSG$ ./msg_recv -t -3
msgget success
msgid = 229376
recv msg, type:2 len:30
xcy@xcy-virtual-machine:~/test/IPC_MSG$ ./msg_recv -t -3
msgget success
msgid = 229376
recv msg, type:2 len:40
xcy@xcy-virtual-machine:~/test/IPC_MSG$ ./msg_recv -n -t 6 // 这里表示不等待,所以会出错。
msgget success
msgid = 229376
msgsnd: No message of desired type
xcy@xcy-virtual-machine:~/test/IPC_MSG$

 

目录
相关文章
|
4月前
|
消息中间件 Linux
【Linux】进程间通信——system V(共享内存 | 消息队列 | 信号量)(下)
【Linux】进程间通信——system V(共享内存 | 消息队列 | 信号量)(下)
67 0
|
4月前
|
消息中间件 存储 Linux
【Linux】进程间通信——system V(共享内存 | 消息队列 | 信号量)(上)
【Linux】进程间通信——system V(共享内存 | 消息队列 | 信号量)(上)
76 0
|
5月前
|
消息中间件 Linux
【Linux】System V 消息队列(不重要)
【Linux】System V 消息队列(不重要)
|
消息中间件 缓存 算法
【Linux】进程间通信——system V共享内存 | 消息队列 | 信号量
system V共享内存、system V消息队列和system V信号量的介绍。
|
消息中间件 安全 Linux
Linux之进程间通信——system V(共享内存、消息队列、信号量等)(下)
Linux之进程间通信——system V(共享内存、消息队列、信号量等)(下)
126 0
|
消息中间件 存储 Linux
Linux之进程间通信——system V(共享内存、消息队列、信号量等)(上)
Linux之进程间通信——system V(共享内存、消息队列、信号量等)(上)
119 0
|
消息中间件 存储 算法
【Linux】System V 共享内存、消息队列、信号量
【Linux】System V 共享内存、消息队列、信号量
172 0
|
消息中间件 安全 Linux
【Linux】system V 消息队列 | system V 信号量(简单赘述)
【Linux】system V 消息队列 | system V 信号量(简单赘述)
186 0
|
消息中间件 Linux
Linux IPC实践(4) --System V消息队列(1)
消息队列概述    消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法(仅局限于本机);    每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型值.
910 0
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。