POSIX和SYSTEM的消息队列应该注意的问题

简介: 首先看看POSIX的代码: 1.posix_mq_server.c #include #include #include #include #define MQ_FILE "/mq_test"#define BUF_LEN 128 int main(){     mqd_t mqd;  ...

 

首先看看POSIX的代码:

1.posix_mq_server.c

#include <mqueue.h>
#include <sys/stat.h>
#include <string.h>
#include <stdio.h>
#define MQ_FILE "/mq_test"
#define BUF_LEN 128

int main()
{
     mqd_t mqd;
    char buf[BUF_LEN];
    int  por = 0;
    int ret = 0;
    struct mq_attr attr;
    attr.mq_flags = 0;
    attr.mq_maxmsg = 3;
    attr.mq_msgsize = 50;
    attr.mq_curmsgs= 0;
    mqd = mq_open(MQ_FILE, O_WRONLY,0666,&attr);
    if (-1 == mqd)
    {
        printf("mq_open error.\n");
        return -1;
    }

    do{
        buf[BUF_LEN-1]='\0';
        printf("MQ_MSG : ");
        scanf("%s", buf);
        if(buf[BUF_LEN-1]!= '\0')
        {
            continue;
        }
        printf("strlen:%d\nMQ_POR : ",strlen(buf));
        scanf("%d", &por);
        ret== mq_send(mqd, buf, strlen(buf)+1, por);
        if (ret != 0)
        {
            perror("mq_send error.\n");
        }
        memset(buf,'\0',BUF_LEN);
    }while(strcmp(buf, "quit"));

    mq_close(mqd);
    mq_unlink(MQ_FILE);

    return 0;
}
2.posix_mq_client.c


#include <mqueue.h>
#include <sys/stat.h>
#include <string.h>
#include <stdio.h>
#define MQ_FILE "/mq_test"
#define BUF_LEN 128

int main()
{
    mqd_t mqd;
    struct mq_attr attr;
    char buf[BUF_LEN + 1] = "quit";
    int cnt;
    int  por = 0;
    attr.mq_flags = 0;
    attr.mq_maxmsg = 128;
    attr.mq_msgsize = 128;
    attr.mq_curmsgs = 0;
    //mqd = mq_open(MQ_FILE, O_RDONLY | O_CREAT, S_IRUSR | S_IWUSR, NULL);
    mqd = mq_open(MQ_FILE, O_RDONLY | O_CREAT, 0644, &attr);
    if (-1 == mqd)
    {
        printf("mq_open error.\n");
        return -1;
    }

    do{
        cnt = mq_receive(mqd, buf, BUF_LEN, &por);
        if (0 < cnt)
        {
            printf("mq receive : ");
            fflush(stdout);
            buf[cnt] = '\0';
            printf("%s  por:%d\n", buf,por);
        }

    }while(strcmp(buf, "quit")==0);

    printf("\n");
    mq_close(mqd);
    mq_unlink(MQ_FILE);

    return 0;
}

3.makefile

target:client  server
client: posix_mq_client.c
    gcc posix_mq_client.c -o client -lrt
server:posix_mq_server.c
    gcc posix_mq_server.c -o server -lrt

clean:
    rm -f client server
    rm -f *.o

运行make:

==[]==root@gaoke:~/code$./server 
MQ_MSG : fgsdfgsdfgsdfg
MQ_POR : 9
MQ_MSG : dfgsdfgsdfg
MQ_POR : 3
MQ_MSG : dfghsdfhgjghdj
MQ_POR : 6
MQ_MSG : sdfgdgfhgjh
MQ_POR : 2
MQ_MSG : dsfghgjghjkh
MQ_POR : 8
MQ_MSG : sdfgsdfgsdfgsd
MQ_POR : 5
MQ_MSG : 

==[]==root@gaoke:~/code$./client 
mq receive : fgsdfgsdfgsdfg  por:9
mq receive : dsfghgjghjkh  por:8
mq receive : dfghsdfhgjghdj  por:6
mq receive : sdfgsdfgsdfgsd  por:5
mq receive : dfgsdfgsdfg  por:3
mq receive : sdfgdgfhgjh  por:2
我们发现POSIX是严格按照优先级排序读出的,而且先读出作业优先级最高的作业。

好了再看看我们的SYSTEM是如何进行的,先写个简单的代码调试运行看看结果如何:

首先我先说明一下System V系统的消息对列对象结构:

01
02
03
04
05
06
07
08
09
10
11
12
13
struct msqid_ds {
     struct ipc_perm     msg_perm;  // 权限,跟共享内存一样
     struct msg      *msg_first; // 指向队列的第一条消息
     struct msg      *msg_last;  // 指向队列的最后一条消息
     msglen_t        msg_cbytes; // 当前队列所占字节数
     msgnum_t        msg_qnum;   // 当前队列的消息数
     msglen_t        msg_qbytes; // 队列允许的最大字节数
     pid_t           msg_lspid;  // 最后调用msgsnd的PID
     pid_t           msg_lrpid;  // 最后调用msgrcv的PID
     time_t          msg_stime;  // 最后调用msgsnd的时间
     time_t          msg_rtime;  // 最后调用msgrcv的时间
     time_t          msg_ctime;  // 最后调用msgctl的时间
}

使用其中一个IPC机制时,系统内核会维护一个ipc权限对象,用于设置读写权限

1
2
3
4
5
6
7
8
9
struct ipc_perm {
     uid_t   uid;    // owner’s user id
     gid_t   gid;        // owner’s group id
     uid_t   cuid;       // creator’s user id
     gid_t   cgid;       // creator’s group id
     mode_t  mode;   // 读写权限
     ulong_t seq;        // 序列号
     key_t   key;        // IPC key
其次我们还知道在Linux下,消息队列被创建在虚拟文件系统中。(其它实现可能也提供这样的特性,但细节可能不一样)此文件系统可以使用以下命令挂载(由超级用户):
# mkdir /dev/mqueue
# mount -t mqueue none /dev/mqueue

 

1.sys_msq_server.c

#include <stdio.h>
#include <string.h>
#include <limits.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define MQ_FILE "./mq_test"
#define BUF_LEN 128
struct msgbuf {
   long mtype;     /*  message type, must be > 0 */
   char mtext[256];  /*  message data */
};
int  main()
{
    struct msqid_ds info={0};
    struct msgbuf MSG={0};
    key_t key = ftok(MQ_FILE,10);
    int cnt =  msgget(key,IPC_CREAT|0666);//0:取消息队列标识符,若不存在则函数会报错IPC_CREAT:当msgflg&IPC_CREAT为真时,如果内核中不存在键值与key相等的消息队列,则新建一个消息队列;如果存在这样的消息队列,返回此消息队列的标识符IPC_CREAT|IPC_EXCL:如果内核中不存在键值与key相等的消息队列,则新建一个消息队列;如果存在这样的消息队列则报错
    if(cnt == -1)
    {
        perror("error!");
    }
    while(1){
        printf("enter the MSG:\n");
        scanf("%s",MSG.mtext);
        MSG.mtype = 1;
        //

        msgsnd(cnt,&MSG,strlen(MSG.mtext)+1,IPC_NOWAIT);// 最后一个参数:0:当消息队列满时,msgsnd将会阻塞,直到消息能写进消息队列IPC_NOWAIT:当消息队列已满的时候,msgsnd函数不等待立即返回IPC_NOERROR:若发送的消息大于size字节,则把该消息截断,截断部分将被丢弃,且不通知发送进程
        msgctl(cnt,IPC_STAT,&info);//IPC_STAT:获得msgid的消息队列头数据到buf中IPC_SET:设置消息队列的属性,要设置的属性需先存储在buf中,可设置的属性包括:msg_perm.uid、msg_perm.gid、msg_perm.mode以及msg_qbytes

        printf("uid:%d, gid = %d, cuid = %d, cgid= %d\n" , info.msg_perm.uid,  info.msg_perm.gid,  info.msg_perm.cuid,  info.msg_perm.cgid  ) ;
        printf("read-write:%03o, cbytes = %lu, qnum = %lu, qbytes= %lu\n" , info.msg_perm.mode&0777, info.msg_cbytes, info.msg_qnum, info.msg_qbytes ) ;
        system("ipcs -q");
    }
}

 

2.sys_msq_client.c

#include <stdio.h>
#include <string.h>
#include <limits.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define MQ_FILE "./mq_test"
#define BUF_LEN 128
struct msgbuf {
   long mtype;     /*  message type, must be > 0 */
   char mtext[256];  /*  message data */
};
int  main()
{
    struct msqid_ds info={0};
    struct msgbuf MSG={0};
    key_t key = ftok(MQ_FILE,10);
    int cnt =  msgget(key,IPC_CREAT|0666);
    int size =0;
    if(cnt == -1)
    {
        perror("error!");
    }
    while(1){
        size = msgrcv(cnt,&MSG,256,1,IPC_NOWAIT);
        if(size > 0)
        {
               puts(MSG.mtext);
        msgctl(cnt,IPC_STAT,&info);
        printf("uid:%d, gid = %d, cuid = %d, cgid= %d\n" ,info.msg_perm.uid,  info.msg_perm.gid,  info.msg_perm.cuid,  info.msg_perm.cgid  ) ;
        printf("read-write:%03o, cbytes = %lu, qnum = %lu, qbytes= %lu\n" , info.msg_perm.mode&0777, info.msg_cbytes, info.msg_qnum, info.msg_qbytes ) ;
        }
    }
}
3.makefile

target:client  server
client: sys_msq_client.c
    gcc sys_msq_client.c -o client -lrt
server:sys_msq_server.c
    gcc sys_msq_server.c -o server -lrt

clean:
    rm -f client server
    rm -f *.o

然后make运行

==[]==root@gaoke:~/code$./server 
enter the MSG:
aaaa
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 5, qnum = 1, qbytes= 65536

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages    
0xffffffff 0          root       666        5            1          

enter the MSG:
dddd
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 10, qnum = 2, qbytes= 65536

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages    
0xffffffff 0          root       666        10           2          

enter the MSG:
fffff
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 16, qnum = 3, qbytes= 65536

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages    
0xffffffff 0          root       666        16           3          

enter the MSG:
ggggg
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 22, qnum = 4, qbytes= 65536

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages    
0xffffffff 0          root       666        22           4           

 ==[]==root@gaoke:~/code$./client 
asdfasdfas
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 0, qnum = 0, qbytes= 65536
asdfasdfasdf
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 0, qnum = 0, qbytes= 65536
ssss
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 0, qnum = 0, qbytes= 65536

==[]==root@gaoke:~/code$./client 
aaaa
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 17, qnum = 3, qbytes= 65536

dddd
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 12, qnum = 2, qbytes= 65536
fffff
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 6, qnum = 1, qbytes= 65536
ggggg
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 0, qnum = 0, qbytes= 65536
由此我们还发现了什么呢?有没有发现SYSTEM的消息输入长度是固定的,然而POSIX的是可变长的。

 

SYSTEM的消息队列函数由msgget、msgctl、msgsnd、msgrcv四个函数组成。下面的表格列出了这四个函数的函数原型及其具体说明。

1.   msgget函数原型

msgget(得到消息队列标识符或创建一个消息队列对象)

所需头文件

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/msg.h>

函数说明

得到消息队列标识符或创建一个消息队列对象并返回消息队列标识符

函数原型

int msgget(key_t key, int msgflg)

函数传入值

key

0(IPC_PRIVATE):会建立新的消息队列

大于0的32位整数:视参数msgflg来确定操作。通常要求此值来源于ftok返回的IPC键值

msgflg

0:取消息队列标识符,若不存在则函数会报错

IPC_CREAT:当msgflg&IPC_CREAT为真时,如果内核中不存在键值与key相等的消息队列,则新建一个消息队列;如果存在这样的消息队列,返回此消息队列的标识符

IPC_CREAT|IPC_EXCL:如果内核中不存在键值与key相等的消息队列,则新建一个消息队列;如果存在这样的消息队列则报错

函数返回值

成功:返回消息队列的标识符

出错:-1,错误原因存于error中

附加说明

上述msgflg参数为模式标志参数,使用时需要与IPC对象存取权限(如0600)进行|运算来确定消息队列的存取权限

错误代码

EACCES:指定的消息队列已存在,但调用进程没有权限访问它

EEXIST:key指定的消息队列已存在,而msgflg中同时指定IPC_CREAT和IPC_EXCL标志

ENOENT:key指定的消息队列不存在同时msgflg中没有指定IPC_CREAT标志

ENOMEM:需要建立消息队列,但内存不足

ENOSPC:需要建立消息队列,但已达到系统的限制

如果用msgget创建了一个新的消息队列对象时,则msqid_ds结构成员变量的值设置如下:

Ÿ        msg_qnum、msg_lspid、msg_lrpid、 msg_stime、msg_rtime设置为0。

Ÿ        msg_ctime设置为当前时间。

Ÿ        msg_qbytes设成系统的限制值。

Ÿ        msgflg的读写权限写入msg_perm.mode中。

Ÿ        msg_perm结构的uid和cuid成员被设置成当前进程的有效用户ID,gid和cuid成员被设置成当前进程的有效组ID。

2.   msgctl函数原型

msgctl (获取和设置消息队列的属性)

所需头文件

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/msg.h>

函数说明

获取和设置消息队列的属性

函数原型

int msgctl(int msqid, int cmd, struct msqid_ds *buf)

函数传入值

msqid

消息队列标识符

cmd

 

IPC_STAT:获得msgid的消息队列头数据到buf中

IPC_SET:设置消息队列的属性,要设置的属性需先存储在buf中,可设置的属性包括:msg_perm.uid、msg_perm.gid、msg_perm.mode以及msg_qbytes

buf:消息队列管理结构体,请参见消息队列内核结构说明部分

函数返回值

成功:0

出错:-1,错误原因存于error中

错误代码

EACCESS:参数cmd为IPC_STAT,确无权限读取该消息队列

EFAULT:参数buf指向无效的内存地址

EIDRM:标识符为msqid的消息队列已被删除

EINVAL:无效的参数cmd或msqid

EPERM:参数cmd为IPC_SET或IPC_RMID,却无足够的权限执行

3.   msgsnd函数原型

msgsnd (将消息写入到消息队列)

所需头文件

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/msg.h>

函数说明

将msgp消息写入到标识符为msqid的消息队列

函数原型

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg)

函数传入值

msqid

消息队列标识符

msgp

发送给队列的消息。msgp可以是任何类型的结构体,但第一个字段必须为long类型,即表明此发送消息的类型,msgrcv根据此接收消息。msgp定义的参照格式如下:

    struct s_msg{ /*msgp定义的参照格式*/
     long type; /* 必须大于0,消息类型 */
           char mtext[256]; /*消息正文,可以是其他任何类型*/
    } msgp;

msgsz

要发送消息的大小,不含消息类型占用的4个字节,即mtext的长度

msgflg

0:当消息队列满时,msgsnd将会阻塞,直到消息能写进消息队列

IPC_NOWAIT:当消息队列已满的时候,msgsnd函数不等待立即返回

IPC_NOERROR:若发送的消息大于size字节,则把该消息截断,截断部分将被丢弃,且不通知发送进程。

函数返回值

成功:0

出错:-1,错误原因存于error中

错误代码

EAGAIN:参数msgflg设为IPC_NOWAIT,而消息队列已满

EIDRM:标识符为msqid的消息队列已被删除

EACCESS:无权限写入消息队列

EFAULT:参数msgp指向无效的内存地址

EINTR:队列已满而处于等待情况下被信号中断

EINVAL:无效的参数msqid、msgsz或参数消息类型type小于0

   msgsnd()为阻塞函数,当消息队列容量满或消息个数满会阻塞。消息队列已被删除,则返回EIDRM错误;被信号中断返回E_INTR错误。

 如果设置IPC_NOWAIT消息队列满或个数满时会返回-1,并且置EAGAIN错误。

msgsnd()解除阻塞的条件有以下三个条件:

①    不满足消息队列满或个数满两个条件,即消息队列中有容纳该消息的空间。

②    msqid代表的消息队列被删除。

③    调用msgsnd函数的进程被信号中断。

4.   msgrcv函数原型

msgrcv (从消息队列读取消息)

所需头文件

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/msg.h>

函数说明

从标识符为msqid的消息队列读取消息并存于msgp中,读取后把此消息从消息队列中删除

函数原型

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,

                      int msgflg);

函数传入值

msqid

消息队列标识符

msgp

存放消息的结构体,结构体类型要与msgsnd函数发送的类型相同

msgsz

要接收消息的大小,不含消息类型占用的4个字节

msgtyp

0:接收第一个消息

>0:接收类型等于msgtyp的第一个消息

<0:接收类型等于或者小于msgtyp绝对值的第一个消息

msgflg

0: 阻塞式接收消息,没有该类型的消息msgrcv函数一直阻塞等待

IPC_NOWAIT:如果没有返回条件的消息调用立即返回,此时错误码为ENOMSG

IPC_EXCEPT:与msgtype配合使用返回队列中第一个类型不为msgtype的消息

IPC_NOERROR:如果队列中满足条件的消息内容大于所请求的size字节,则把该消息截断,截断部分将被丢弃

函数返回值

成功:实际读取到的消息数据长度

出错:-1,错误原因存于error中

错误代码

E2BIG:消息数据长度大于msgsz而msgflag没有设置IPC_NOERROR

EIDRM:标识符为msqid的消息队列已被删除

EACCESS:无权限读取该消息队列

EFAULT:参数msgp指向无效的内存地址

ENOMSG:参数msgflg设为IPC_NOWAIT,而消息队列中无消息可读

EINTR:等待读取队列内的消息情况下被信号中断

msgrcv()解除阻塞的条件有以下三个:

①    消息队列中有了满足条件的消息。

②    msqid代表的消息队列被删除。

③    调用msgrcv()的进程被信号中断。

 

消息队列使用程序范例

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件 Linux
【Linux】进程间通信——system V(共享内存 | 消息队列 | 信号量)(下)
【Linux】进程间通信——system V(共享内存 | 消息队列 | 信号量)(下)
63 0
|
4月前
|
消息中间件 存储 Linux
【Linux】进程间通信——system V(共享内存 | 消息队列 | 信号量)(上)
【Linux】进程间通信——system V(共享内存 | 消息队列 | 信号量)(上)
71 0
|
5月前
|
消息中间件 Linux
【Linux】System V 消息队列(不重要)
【Linux】System V 消息队列(不重要)
|
6月前
|
消息中间件 Linux API
Linux进程间通信(IPC) Linux消息队列:讲解POSIX消息队列在Linux系统进程间通信中的应用和实践
Linux进程间通信(IPC) Linux消息队列:讲解POSIX消息队列在Linux系统进程间通信中的应用和实践
191 1
Linux进程间通信(IPC) Linux消息队列:讲解POSIX消息队列在Linux系统进程间通信中的应用和实践
|
消息中间件 缓存 算法
【Linux】进程间通信——system V共享内存 | 消息队列 | 信号量
system V共享内存、system V消息队列和system V信号量的介绍。
|
消息中间件 安全 Linux
Linux之进程间通信——system V(共享内存、消息队列、信号量等)(下)
Linux之进程间通信——system V(共享内存、消息队列、信号量等)(下)
124 0
|
消息中间件 存储 Linux
Linux之进程间通信——system V(共享内存、消息队列、信号量等)(上)
Linux之进程间通信——system V(共享内存、消息队列、信号量等)(上)
114 0
|
消息中间件 存储 算法
【Linux】System V 共享内存、消息队列、信号量
【Linux】System V 共享内存、消息队列、信号量
166 0
|
消息中间件 安全 Linux
【Linux】system V 消息队列 | system V 信号量(简单赘述)
【Linux】system V 消息队列 | system V 信号量(简单赘述)
184 0
|
消息中间件 Windows
system V消息队列
1.消息队列1)消息队列提供了一个从进程向另外一个进程发送一块是数据的方法2)每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型不足之处:每个消息的最大长度是有限制的。MSGMAX每个消息队列的总的字节数也是有上限。
924 0