System V 消息队列—复用消息

简介:

  消息队列中的消息结构可以由我们自由定义,具备较强的灵活性。通过消息结构可以共享一个队列,进行消息复用。通常定义一个类似如下的消息结构:

复制代码
#define MSGMAXDAT     1024
struct mymsg
{
    long msg_len;   //消息长度
    long msg_type; //消息类型
    long msg_data[MSGMAXDATA]; //消息内容
};
复制代码

 消息结构相关联的类型字段(msg_type)提供了两个特性:

(1)标识消息,使得多个进程在单个队列上复用消息。

(2)用作优先级字段,允许接收者以不同于先进先出的某个顺序读出各个消息。

例子1:每个应用一个队列,可以在多个客户和单个服务器之间复用消息。使用一个消息队列进行通信,由消息类型标识消息是从客户到服务器,还是服务器到客户。通信模型如下:

按照通信模型编写程序如下:

公共头文件svmsg.h

复制代码
 1 #ifndef  SVMSG_H
 2 #define  SVMSG_H
 3 #include <stdio.h>
 4 #include <stdlib.h>
 5 #include <string.h>
 6 #include <unistd.h>
 7 #include <sys/types.h>
 8 #include <sys/ipc.h>
 9 #include <sys/msg.h>
10 #include <errno.h>
11 
12 #define MSG_R 0400 /* read permission */
13 #define MSG_W 0200 /* write permission */
14 #define SVMSG_MODE (MSG_R | MSG_W | MSG_R >>3 | MSG_R >>6)
15 #define MQ_KEY  1234L
16 #define MSGMAX  1024
17 //消息结构
18 struct mymesg
19 {
20     long mesg_len;
21     long mesg_type;
22     char mesg_data[MSGMAX];
23 };
24 #endif
复制代码

客户端程序sysv_client.c

复制代码
 1 #include "svmsg.h"
 2 void client(int ,int);
 3 
 4 int main(int argc,char *argv[])
 5 {
 6     int     msqid;
 7     if((msqid = msgget(MQ_KEY,0)) == -1)
 8     {
 9         perror("megget()");
10         exit(-1);
11     }
12     client(msqid,msqid);
13     exit(0);
14 }
15 
16 void client(int readfd,int writefd)
17 {
18     size_t len;
19     ssize_t n;
20     char *ptr;
21     struct mymesg mesg;
22     printf("Send request to server.\n");
23     //set pid to message
24     snprintf(mesg.mesg_data,MSGMAX,"%ld",(long)getpid());
25     len = strlen(mesg.mesg_data);
26     mesg.mesg_data[len] = ' '; //blank
27     ptr = mesg.mesg_data+len+1;
28     printf("Enter filename: ");
29     fgets(ptr,MSGMAX-len,stdin);
30     len = strlen(mesg.mesg_data);
31     if(mesg.mesg_data[len-1] == '\n')
32         len--;
33     mesg.mesg_len = len;
34     mesg.mesg_type = 1;
35     printf("mesg_data is :%s len=%ld\n",mesg.mesg_data, mesg.mesg_len);
36     if(msgsnd(writefd,&(mesg.mesg_type),mesg.mesg_len,0) == -1)
37     {
38         perror("msgsnd() error");
39         exit(-1);
40     }
41     //read from IPC,write to standard output
42     mesg.mesg_type = getpid();
43     while( (n = msgrcv(readfd,&(mesg.mesg_type),MSGMAX,mesg.mesg_type,0))>0)
44     {
45         write(STDOUT_FILENO,mesg.mesg_data,n);
46         putchar('\n');
47     }
48     if(n == 0 )
49     {
50         printf("Read file from server is completed.\n");
51     }
52     if(n == -1)
53     {
54         perror("msgrcv() error");
55         exit(-1);
56     }
57 }
复制代码

服务器程序sysv_server.c

复制代码
 1 #include "svmsg.h"
 2 void server(int ,int);
 3 int main(int argc,char *argv[])
 4 {
 5     int     msqid;
 6     if((msqid = msgget(MQ_KEY,SVMSG_MODE | IPC_CREAT)) == -1)
 7     {
 8         perror("megget()");
 9         exit(-1);
10     }
11     server(msqid,msqid);
12     exit(0);
13 }
14 
15 void server(int readfd,int writefd)
16 {
17     FILE            *fp;
18     char            *ptr;
19     pid_t           pid;
20     ssize_t         n;
21     ssize_t         len;
22     struct mymesg   mesg;
23     printf("Waiting for client......\n");
24     for(; ;)
25     {
26         mesg.mesg_type = 1;
27         if((n = msgrcv(readfd,&(mesg.mesg_type),MSGMAX,mesg.mesg_type,0)) == 0)
28         {
29             printf("pathname missing.\n");
30             continue;
31         }
32         mesg.mesg_data[n] = '\0';
33         printf("Received message from client is: %s\n",mesg.mesg_data);
34         if ((ptr = strchr(mesg.mesg_data,' ')) == NULL)
35         {
36             printf("bogus request: %s\n",mesg.mesg_data);
37             continue;
38         }
39         *ptr++ = 0;
40         pid = atoi(mesg.mesg_data);
41         mesg.mesg_type = pid;
42         //open fiel and read data
43         if((fp = fopen(ptr,"r")) == NULL)
44         {
45             printf("open file failed.sent msg to client\n");
46             snprintf(mesg.mesg_data+n,sizeof(mesg.mesg_data)-n,": can't open,%s\n",strerror(errno));
47             mesg.mesg_len = strlen(ptr);
48             memmove(mesg.mesg_data,ptr,mesg.mesg_len);
49             if(msgsnd(writefd,&(mesg.mesg_type),mesg.mesg_len,0) == -1)
50             {
51                 perror("msgsnd() error");
52                 exit(-1);
53             }
54         }
55         else
56         {
57             printf("open file successed.sent file to client\n");
58             while(fgets(mesg.mesg_data,MSGMAX,fp) != NULL)
59             {
60                 mesg.mesg_len = strlen(mesg.mesg_data);
61                 if(msgsnd(writefd,&(mesg.mesg_type),mesg.mesg_len,0) == -1)
62                 {
63                     perror("msgsnd() error");
64                     exit(-1);
65                 }
66             }
67             fclose(fp);
68         }
69         printf("send compelted.\n");
70         mesg.mesg_len = 0;
71         if(msgsnd(writefd,&(mesg.mesg_type),mesg.mesg_len,0) == -1)
72         {
73             perror("msgsnd() error");
74             exit(-1);
75         }
76     }
77 }
复制代码

程序测试结果如下所示:

 例子2:每个客户一个队列,将例子1改成所有用户用一个共同的消息队列向服务器发送消息,给每个客户分配一个消息队列,使得服务器对每个客户进行应答。通信模型如下:

以并发服务器模型编写这个程序,服务器给每个客户fork一个子进程进行处理。程序如下:

公共头文件svmsg.h和svmsg.c:

复制代码
 1 //svmsg.h file
 2 #ifndef  SVMSG_H
 3 #define  SVMSG_H
 4 #include <stdio.h>
 5 #include <stdlib.h>
 6 #include <string.h>
 7 #include <unistd.h>
 8 #include <signal.h>
 9 #include <sys/types.h>
10 #include <sys/ipc.h>
11 #include <sys/msg.h>
12 #include <errno.h>
13 
14 #define MSG_R 0400 /* read permission */
15 #define MSG_W 0200 /* write permission */
16 #define SVMSG_MODE (MSG_R | MSG_W | MSG_R >>3 | MSG_R >>6)
17 #define MQ_KEY  1234L
18 #define MSGMAX  1024
19 //message structure
20 struct mymesg
21 {
22     long mesg_len;
23     long mesg_type;
24     char mesg_data[MSGMAX];
25 };
26 
27 ssize_t mesg_send(int id,struct mymesg *mptr);
28 ssize_t mesg_recv(int id,struct mymesg *mptr);
29 
30 void    Mesg_send(int id,struct mymesg *mptr);
31 ssize_t Mesg_recv(int id,struct mymesg *mptr);
32 #endif
复制代码
复制代码
 1 //svmsg.c file
 2 #include "svmsg.h"
 3 
 4 ssize_t mesg_send(int id,struct mymesg *mptr)
 5 {
 6     return (msgsnd(id,&(mptr->mesg_type),mptr->mesg_len,0));
 7 }
 8 
 9 ssize_t mesg_recv(int id,struct mymesg *mptr)
10 {
11     ssize_t n;
12     n = msgrcv(id,&(mptr->mesg_type),MSGMAX,mptr->mesg_type,0);
13     mptr->mesg_len = n;
14     return n;
15 }
16 
17 void Mesg_send(int id,struct mymesg *mptr)
18 {
19     if(mesg_send(id,mptr) == -1)
20     {
21         perror("mesg_send() error");
22         exit(-1);
23     }
24 }
25 ssize_t Mesg_recv(int id,struct mymesg *mptr)
26 {
27     ssize_t n;
28     do
29     {
30         n = mesg_recv(id,mptr);
31     }while(n==-1 && errno == EINTR);
32     if(n == -1)
33     {
34         perror("mesg_recv() error");
35         exit(-1);
36     }
37     return n;
38 }
复制代码

客户端程序如下:

复制代码
 1 #include "svmsg.h"
 2 
 3 void client(int ,int);
 4 
 5 int main(int argc,char *argv[])
 6 {
 7     int    readid,writeid;
 8     if((writeid = msgget(MQ_KEY,0)) == -1)
 9     {
10         perror("megget()");
11         exit(-1);
12     }
13     if((readid = msgget(IPC_PRIVATE,SVMSG_MODE | IPC_CREAT)) == -1)
14     {
15         perror("megget()");
16         exit(-1);
17     }
18     client(readid,writeid);
19     msgctl(readid,IPC_RMID,NULL);
20     exit(0);
21 }
22 
23 void client(int readid,int writeid)
24 {
25     size_t len;
26     ssize_t n;
27     char *ptr;
28     struct mymesg mesg;
29     printf("Send request to server.\n");
30     //set pid to message
31     snprintf(mesg.mesg_data,MSGMAX,"%d",readid);
32     len = strlen(mesg.mesg_data);
33     mesg.mesg_data[len] = ' '; //blank
34     ptr = mesg.mesg_data+len+1;
35     printf("Enter filename: ");
36     fgets(ptr,MSGMAX-len,stdin);
37     len = strlen(mesg.mesg_data);
38     if(mesg.mesg_data[len-1] == '\n')
39         len--;
40     mesg.mesg_len = len;
41     mesg.mesg_type = 1;
42     printf("mesg_data is :%s\n",mesg.mesg_data);
43     Mesg_send(writeid,&mesg);
44     printf("Send messge to server successed.\n");
45     //read from IPC,write to standard output
46     while( (n = Mesg_recv(readid,&mesg))>0)
47     {
48         write(STDOUT_FILENO,mesg.mesg_data,n);
49         putchar('\n');
50     }
51     if(n == 0 )
52     {
53         printf("Read file from server is completed.\n");
54     }
55 }
复制代码

服务器程序如下:

复制代码
 1 #include "svmsg.h"
 2 
 3 void server(int ,int);
 4 void sig_child(int signo);
 5 
 6 int main(int argc,char *argv[])
 7 {
 8     int     msqid;
 9     if((msqid = msgget(MQ_KEY,SVMSG_MODE | IPC_CREAT)) == -1)
10     {
11         perror("megget()");
12         exit(-1);
13     }
14     server(msqid,msqid);
15     exit(0);
16 }
17 
18 void server(int readid,int writeid)
19 {
20     FILE            *fp;
21     char            *ptr;
22     pid_t           pid;
23     ssize_t         n;
24     ssize_t         len;
25     struct mymesg   mesg;
26     signal(SIGCHLD,sig_child);
27     printf("Waiting for client......\n");
28     for(; ;)
29     {
30         mesg.mesg_type = 1;
31         if((n = Mesg_recv(readid,&mesg)) == 0)
32         {
33             printf("pathname missing.\n");
34             continue;
35         }
36         mesg.mesg_data[n] = '\0';
37         printf("Received message from client is: %s\n",mesg.mesg_data);
38         if ((ptr = strchr(mesg.mesg_data,' ')) == NULL)
39         {
40             printf("bogus request: %s\n",mesg.mesg_data);
41             continue;
42         }
43         *ptr++ = 0;
44         writeid = atoi(mesg.mesg_data);
45         if(fork() == 0)
46         {
47             //open fiel and read data
48             if((fp = fopen(ptr,"r")) == NULL)
49             {
50                 printf("open file failed.sent msg to client\n");
51                 snprintf(mesg.mesg_data+n,sizeof(mesg.mesg_data)-n,": can't open,%s\n",strerror(errno));
52                 mesg.mesg_len = strlen(ptr);
53                 memmove(mesg.mesg_data,ptr,mesg.mesg_len);
54                 Mesg_send(writeid,&mesg);
55             }
56             else
57             {
58                 printf("open file successed.sent file to client\n");
59                 while(fgets(mesg.mesg_data,MSGMAX,fp) != NULL)
60                 {
61                     mesg.mesg_len = strlen(mesg.mesg_data);
62                     Mesg_send(writeid,&mesg);
63                 }
64                 fclose(fp);
65             }
66             printf("send compelted.\n");
67             mesg.mesg_len = 0;
68             Mesg_send(writeid,&mesg);
69         }
70     }
71 }
72 
73 void sig_child(int signo)
74 {
75     pid_t   pid;
76     int     stat;
77     while ((pid = waitpid(-1,&stat,WNOHANG)) > 0);
78     return ;
79 }
复制代码

程序测试结果如下:

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 Linux
【Linux】进程间通信——system V(共享内存 | 消息队列 | 信号量)(下)
【Linux】进程间通信——system V(共享内存 | 消息队列 | 信号量)(下)
82 0
|
6月前
|
消息中间件 存储 Linux
【Linux】进程间通信——system V(共享内存 | 消息队列 | 信号量)(上)
【Linux】进程间通信——system V(共享内存 | 消息队列 | 信号量)(上)
95 0
|
7月前
|
消息中间件 Linux
【Linux】System V 消息队列(不重要)
【Linux】System V 消息队列(不重要)
|
消息中间件 缓存 算法
【Linux】进程间通信——system V共享内存 | 消息队列 | 信号量
system V共享内存、system V消息队列和system V信号量的介绍。
|
消息中间件 安全 Linux
Linux之进程间通信——system V(共享内存、消息队列、信号量等)(下)
Linux之进程间通信——system V(共享内存、消息队列、信号量等)(下)
139 0
|
消息中间件 存储 Linux
Linux之进程间通信——system V(共享内存、消息队列、信号量等)(上)
Linux之进程间通信——system V(共享内存、消息队列、信号量等)(上)
123 0
|
消息中间件 存储 算法
【Linux】System V 共享内存、消息队列、信号量
【Linux】System V 共享内存、消息队列、信号量
187 0
|
消息中间件 安全 Linux
【Linux】system V 消息队列 | system V 信号量(简单赘述)
【Linux】system V 消息队列 | system V 信号量(简单赘述)
198 0
|
消息中间件
|
消息中间件 存储 前端开发
同步异步调用,并谈谈消息队列mq;RocketMQ发送消息和消费消息测试类
同步调用优点: 时效性强,打电话、直播,很快可以得到结果 同步调用的问题:
658 1