消息队列中的消息结构可以由我们自由定义,具备较强的灵活性。通过消息结构可以共享一个队列,进行消息复用。通常定义一个类似如下的消息结构:
#define MSGMAXDAT 1024 struct mymsg { long msg_len; //消息长度 long msg_type; //消息类型 long msg_data[MSGMAXDATA]; //消息内容 };
消息结构相关联的类型字段(msg_type)提供了两个特性:
(1)标识消息,使得多个进程在单个队列上复用消息。
(2)用作优先级字段,允许接收者以不同于先进先出的某个顺序读出各个消息。
例子1:每个应用一个队列,可以在多个客户和单个服务器之间复用消息。使用一个消息队列进行通信,由消息类型标识消息是从客户到服务器,还是服务器到客户。通信模型如下:
按照通信模型编写程序如下:
公共头文件svmsg.h
1 #ifndef SVMSG_H 2 #define SVMSG_H 3 #include <stdio.h> 4 #include <stdlib.h> 5 #include <string.h> 6 #include <unistd.h> 7 #include <sys/types.h> 8 #include <sys/ipc.h> 9 #include <sys/msg.h> 10 #include <errno.h> 11 12 #define MSG_R 0400 /* read permission */ 13 #define MSG_W 0200 /* write permission */ 14 #define SVMSG_MODE (MSG_R | MSG_W | MSG_R >>3 | MSG_R >>6) 15 #define MQ_KEY 1234L 16 #define MSGMAX 1024 17 //消息结构 18 struct mymesg 19 { 20 long mesg_len; 21 long mesg_type; 22 char mesg_data[MSGMAX]; 23 }; 24 #endif
客户端程序sysv_client.c
1 #include "svmsg.h" 2 void client(int ,int); 3 4 int main(int argc,char *argv[]) 5 { 6 int msqid; 7 if((msqid = msgget(MQ_KEY,0)) == -1) 8 { 9 perror("megget()"); 10 exit(-1); 11 } 12 client(msqid,msqid); 13 exit(0); 14 } 15 16 void client(int readfd,int writefd) 17 { 18 size_t len; 19 ssize_t n; 20 char *ptr; 21 struct mymesg mesg; 22 printf("Send request to server.\n"); 23 //set pid to message 24 snprintf(mesg.mesg_data,MSGMAX,"%ld",(long)getpid()); 25 len = strlen(mesg.mesg_data); 26 mesg.mesg_data[len] = ' '; //blank 27 ptr = mesg.mesg_data+len+1; 28 printf("Enter filename: "); 29 fgets(ptr,MSGMAX-len,stdin); 30 len = strlen(mesg.mesg_data); 31 if(mesg.mesg_data[len-1] == '\n') 32 len--; 33 mesg.mesg_len = len; 34 mesg.mesg_type = 1; 35 printf("mesg_data is :%s len=%ld\n",mesg.mesg_data, mesg.mesg_len); 36 if(msgsnd(writefd,&(mesg.mesg_type),mesg.mesg_len,0) == -1) 37 { 38 perror("msgsnd() error"); 39 exit(-1); 40 } 41 //read from IPC,write to standard output 42 mesg.mesg_type = getpid(); 43 while( (n = msgrcv(readfd,&(mesg.mesg_type),MSGMAX,mesg.mesg_type,0))>0) 44 { 45 write(STDOUT_FILENO,mesg.mesg_data,n); 46 putchar('\n'); 47 } 48 if(n == 0 ) 49 { 50 printf("Read file from server is completed.\n"); 51 } 52 if(n == -1) 53 { 54 perror("msgrcv() error"); 55 exit(-1); 56 } 57 }
服务器程序sysv_server.c
1 #include "svmsg.h" 2 void server(int ,int); 3 int main(int argc,char *argv[]) 4 { 5 int msqid; 6 if((msqid = msgget(MQ_KEY,SVMSG_MODE | IPC_CREAT)) == -1) 7 { 8 perror("megget()"); 9 exit(-1); 10 } 11 server(msqid,msqid); 12 exit(0); 13 } 14 15 void server(int readfd,int writefd) 16 { 17 FILE *fp; 18 char *ptr; 19 pid_t pid; 20 ssize_t n; 21 ssize_t len; 22 struct mymesg mesg; 23 printf("Waiting for client......\n"); 24 for(; ;) 25 { 26 mesg.mesg_type = 1; 27 if((n = msgrcv(readfd,&(mesg.mesg_type),MSGMAX,mesg.mesg_type,0)) == 0) 28 { 29 printf("pathname missing.\n"); 30 continue; 31 } 32 mesg.mesg_data[n] = '\0'; 33 printf("Received message from client is: %s\n",mesg.mesg_data); 34 if ((ptr = strchr(mesg.mesg_data,' ')) == NULL) 35 { 36 printf("bogus request: %s\n",mesg.mesg_data); 37 continue; 38 } 39 *ptr++ = 0; 40 pid = atoi(mesg.mesg_data); 41 mesg.mesg_type = pid; 42 //open fiel and read data 43 if((fp = fopen(ptr,"r")) == NULL) 44 { 45 printf("open file failed.sent msg to client\n"); 46 snprintf(mesg.mesg_data+n,sizeof(mesg.mesg_data)-n,": can't open,%s\n",strerror(errno)); 47 mesg.mesg_len = strlen(ptr); 48 memmove(mesg.mesg_data,ptr,mesg.mesg_len); 49 if(msgsnd(writefd,&(mesg.mesg_type),mesg.mesg_len,0) == -1) 50 { 51 perror("msgsnd() error"); 52 exit(-1); 53 } 54 } 55 else 56 { 57 printf("open file successed.sent file to client\n"); 58 while(fgets(mesg.mesg_data,MSGMAX,fp) != NULL) 59 { 60 mesg.mesg_len = strlen(mesg.mesg_data); 61 if(msgsnd(writefd,&(mesg.mesg_type),mesg.mesg_len,0) == -1) 62 { 63 perror("msgsnd() error"); 64 exit(-1); 65 } 66 } 67 fclose(fp); 68 } 69 printf("send compelted.\n"); 70 mesg.mesg_len = 0; 71 if(msgsnd(writefd,&(mesg.mesg_type),mesg.mesg_len,0) == -1) 72 { 73 perror("msgsnd() error"); 74 exit(-1); 75 } 76 } 77 }
程序测试结果如下所示:
例子2:每个客户一个队列,将例子1改成所有用户用一个共同的消息队列向服务器发送消息,给每个客户分配一个消息队列,使得服务器对每个客户进行应答。通信模型如下:
以并发服务器模型编写这个程序,服务器给每个客户fork一个子进程进行处理。程序如下:
公共头文件svmsg.h和svmsg.c:
1 //svmsg.h file 2 #ifndef SVMSG_H 3 #define SVMSG_H 4 #include <stdio.h> 5 #include <stdlib.h> 6 #include <string.h> 7 #include <unistd.h> 8 #include <signal.h> 9 #include <sys/types.h> 10 #include <sys/ipc.h> 11 #include <sys/msg.h> 12 #include <errno.h> 13 14 #define MSG_R 0400 /* read permission */ 15 #define MSG_W 0200 /* write permission */ 16 #define SVMSG_MODE (MSG_R | MSG_W | MSG_R >>3 | MSG_R >>6) 17 #define MQ_KEY 1234L 18 #define MSGMAX 1024 19 //message structure 20 struct mymesg 21 { 22 long mesg_len; 23 long mesg_type; 24 char mesg_data[MSGMAX]; 25 }; 26 27 ssize_t mesg_send(int id,struct mymesg *mptr); 28 ssize_t mesg_recv(int id,struct mymesg *mptr); 29 30 void Mesg_send(int id,struct mymesg *mptr); 31 ssize_t Mesg_recv(int id,struct mymesg *mptr); 32 #endif
1 //svmsg.c file 2 #include "svmsg.h" 3 4 ssize_t mesg_send(int id,struct mymesg *mptr) 5 { 6 return (msgsnd(id,&(mptr->mesg_type),mptr->mesg_len,0)); 7 } 8 9 ssize_t mesg_recv(int id,struct mymesg *mptr) 10 { 11 ssize_t n; 12 n = msgrcv(id,&(mptr->mesg_type),MSGMAX,mptr->mesg_type,0); 13 mptr->mesg_len = n; 14 return n; 15 } 16 17 void Mesg_send(int id,struct mymesg *mptr) 18 { 19 if(mesg_send(id,mptr) == -1) 20 { 21 perror("mesg_send() error"); 22 exit(-1); 23 } 24 } 25 ssize_t Mesg_recv(int id,struct mymesg *mptr) 26 { 27 ssize_t n; 28 do 29 { 30 n = mesg_recv(id,mptr); 31 }while(n==-1 && errno == EINTR); 32 if(n == -1) 33 { 34 perror("mesg_recv() error"); 35 exit(-1); 36 } 37 return n; 38 }
客户端程序如下:
1 #include "svmsg.h" 2 3 void client(int ,int); 4 5 int main(int argc,char *argv[]) 6 { 7 int readid,writeid; 8 if((writeid = msgget(MQ_KEY,0)) == -1) 9 { 10 perror("megget()"); 11 exit(-1); 12 } 13 if((readid = msgget(IPC_PRIVATE,SVMSG_MODE | IPC_CREAT)) == -1) 14 { 15 perror("megget()"); 16 exit(-1); 17 } 18 client(readid,writeid); 19 msgctl(readid,IPC_RMID,NULL); 20 exit(0); 21 } 22 23 void client(int readid,int writeid) 24 { 25 size_t len; 26 ssize_t n; 27 char *ptr; 28 struct mymesg mesg; 29 printf("Send request to server.\n"); 30 //set pid to message 31 snprintf(mesg.mesg_data,MSGMAX,"%d",readid); 32 len = strlen(mesg.mesg_data); 33 mesg.mesg_data[len] = ' '; //blank 34 ptr = mesg.mesg_data+len+1; 35 printf("Enter filename: "); 36 fgets(ptr,MSGMAX-len,stdin); 37 len = strlen(mesg.mesg_data); 38 if(mesg.mesg_data[len-1] == '\n') 39 len--; 40 mesg.mesg_len = len; 41 mesg.mesg_type = 1; 42 printf("mesg_data is :%s\n",mesg.mesg_data); 43 Mesg_send(writeid,&mesg); 44 printf("Send messge to server successed.\n"); 45 //read from IPC,write to standard output 46 while( (n = Mesg_recv(readid,&mesg))>0) 47 { 48 write(STDOUT_FILENO,mesg.mesg_data,n); 49 putchar('\n'); 50 } 51 if(n == 0 ) 52 { 53 printf("Read file from server is completed.\n"); 54 } 55 }
服务器程序如下:
1 #include "svmsg.h" 2 3 void server(int ,int); 4 void sig_child(int signo); 5 6 int main(int argc,char *argv[]) 7 { 8 int msqid; 9 if((msqid = msgget(MQ_KEY,SVMSG_MODE | IPC_CREAT)) == -1) 10 { 11 perror("megget()"); 12 exit(-1); 13 } 14 server(msqid,msqid); 15 exit(0); 16 } 17 18 void server(int readid,int writeid) 19 { 20 FILE *fp; 21 char *ptr; 22 pid_t pid; 23 ssize_t n; 24 ssize_t len; 25 struct mymesg mesg; 26 signal(SIGCHLD,sig_child); 27 printf("Waiting for client......\n"); 28 for(; ;) 29 { 30 mesg.mesg_type = 1; 31 if((n = Mesg_recv(readid,&mesg)) == 0) 32 { 33 printf("pathname missing.\n"); 34 continue; 35 } 36 mesg.mesg_data[n] = '\0'; 37 printf("Received message from client is: %s\n",mesg.mesg_data); 38 if ((ptr = strchr(mesg.mesg_data,' ')) == NULL) 39 { 40 printf("bogus request: %s\n",mesg.mesg_data); 41 continue; 42 } 43 *ptr++ = 0; 44 writeid = atoi(mesg.mesg_data); 45 if(fork() == 0) 46 { 47 //open fiel and read data 48 if((fp = fopen(ptr,"r")) == NULL) 49 { 50 printf("open file failed.sent msg to client\n"); 51 snprintf(mesg.mesg_data+n,sizeof(mesg.mesg_data)-n,": can't open,%s\n",strerror(errno)); 52 mesg.mesg_len = strlen(ptr); 53 memmove(mesg.mesg_data,ptr,mesg.mesg_len); 54 Mesg_send(writeid,&mesg); 55 } 56 else 57 { 58 printf("open file successed.sent file to client\n"); 59 while(fgets(mesg.mesg_data,MSGMAX,fp) != NULL) 60 { 61 mesg.mesg_len = strlen(mesg.mesg_data); 62 Mesg_send(writeid,&mesg); 63 } 64 fclose(fp); 65 } 66 printf("send compelted.\n"); 67 mesg.mesg_len = 0; 68 Mesg_send(writeid,&mesg); 69 } 70 } 71 } 72 73 void sig_child(int signo) 74 { 75 pid_t pid; 76 int stat; 77 while ((pid = waitpid(-1,&stat,WNOHANG)) > 0); 78 return ; 79 }
程序测试结果如下: