Linux系统编程-进程间通信(消息队列)

简介: 前面文章介绍了Linux下进程的创建,管理,陆续介绍了进程间通信的方式:管道、内存映射、共享内存等。这篇文章继续介绍Linux的进程间通信方式消息队列。

前面文章介绍了Linux下进程的创建,管理,陆续介绍了进程间通信的方式:管道、内存映射、共享内存等。这篇文章继续介绍Linux的进程间通信方式消息队列

image-20211216090722856

1. 消息队列介绍

消息队列通过名字字面意思理解就是队列排队-和平常超市买东西排队付款一样结构,消息队列与FIFO很相似,都是一个队列结构,都可以有多个进程往队列里面写信息,多个进程从队列中读取信息。但FIFO需要读、写的两端事先都打开,才能够开始信息传递工作。而消息队列可以事先往队列中写信息,需要时再打开读取信息。

注意事项:

  1. 消息队列属于顺序队列形式的结构,向队列里写的每一条消息,会追加到队列后面,读取一个就从队列里消除一个。
  2. 写函数不会阻塞,除非队列里存放的消息数量已经满了,才会导致写阻塞。
  3. 读函数,从队列里读取不到数据时,会阻塞。

查看当前系统所有的消息队列:

[root@wbyq 20181005]# ipcs -q

------ Message Queues --------
键值    消息队列ID                        使用的字节数量  队里现存的消息数量                
key        msqid      owner      perms     used-bytes   messages    
0x000004d3 0          root       666        65532        192         
0x00003044 32769      root       666        65424        564         
0x0a120534 65538      root       0          2352         21          
0x0a00000f 196611     root       0          65520        2730        
0xffffffff 163844       root       0          0            0  

查看消息队列的详细信息:

[root@wbyq 20181005]# ipcs -l

------ Shared Memory Limits --------
max number of segments = 4096
max seg size (kbytes) = 4194303
max total shared memory (kbytes) = 1073741824
min seg size (bytes) = 1

------ Semaphore Limits --------
max number of arrays = 128
max semaphores per array = 250
max semaphores system wide = 32000
max ops per semop call = 32
semaphore max value = 32767

------ Messages: Limits --------
max queues system wide = 1736        【系统最多的消息队列数量(最多支持同时1736个消息队列)】
max size of message (bytes) = 65536     【单个消息的最大字节数】
default max size of queue (bytes) = 65536 【默认的单个队列的大小65536】

默认单个队列总字节大小为: 65535字节。
消息队列的最大字节数量和消息的最大条数加起来的总字节数不能超过65535字节。

System V IPC机制消息队列相关的函数接口: 这里先列出所有函数,下面会详细介绍每个函数的用法。

#include <sys / types.h>
#include <sys / ipc.h>
#include <sys / msg.h>
int msgget(key_t key, int msgflg);
int msgsnd(int msqid, struct msgbuf * msgp, size_t msgsz, int msgflg);
ssize_t msgrcv(int msqid, struct msgbuf * msgp, size_t msgsz, long msgtyp, int msgflg);
int msgctl(int msqid, int cmd, struct msqid_ds * buf);

2. 消息队列相关函数介绍

2.1 msgget函数

函数原型:

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);

功能
msgget用于创建和访问一个消息队列。
参数
(1) key:是唯一标识一个消息队列的关键字,如果为IPC_PRIVATE(值为0,用创建一个只有创建者进程才可以访问的消息队列),表示创建一个只由调用进程使用的消息队列,非0值的key(可以通过ftok函数获得)表示创建一个可以被多个进程共享的消息队列;
(2) msgflg:指明队列的访问权限和创建标志,创建标志的可选值为IPC_CREAT和IPC_EXC,如果单独指定IPC_CREAT,msgget要么返回新创建的消息队列id,要么返回具有相同key值的消息队列id;如果IPC_EXCL和IPC_CREAT同时指明,则要么创建新的消息队列,要么当队列存在时,调用失败并返回-1。

/*1. 创建消息队列*/
int msgid = msgget((key_t)1235,0666 | IPC_CREAT);

返回值
成功执行时,返回消息队列标识值(0也是成功的)。

失败返回-1,errno被设为以下的某个值。

EACCES:指定的消息队列已存在,但调用进程没有权限访问它,而且不拥有CAP_IPC_OWNER权能
EEXIST:key指定的消息队列已存在,而msgflg中同时指定IPC_CREAT和IPC_EXCL标志
ENOENT:key指定的消息队列不存在同时msgflg中不指定IPC_CREAT标志
ENOMEM:需要建立消息队列,但内存不足
ENOSPC:需要建立消息队列,但已达到系统的最大消息队列容量

2.2 msgsnd和msgrcy函数

原型:

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,int msgflg);

功能

函数msgsnd和msgrcy用来将消息添加到消息队列中和从一个消息队列中获取信息。

参数

(1)msgid:指明消息队列的ID; 通常是msgget函数成功的返回值。

(2)msgbuf:是消息结构体,它的长度必须小于系统规定的上限,必须以一个长整型成员变量开始,接收函数将用这个成员变量来确定消息的类型。必须重写这个结构体,其中第一个参数类型不能改,其他可以自定义。

如下:

struct msgbuf {
    long mtype;         /* type of message */
    char mtext[1];       /* message text */
};

字段mtype是用户自己指定的消息类型(通常是1—5中的任意一个数值),该结构体第2个成员仅仅是一种说明性的结构,实际上用户可以使用任何类型的数据,就是消息内容;

(3)msgsz是消息体的大小,每个消息体最大不要超过4K; 不含消息类型占用的4个字节,即mtext的长度

(4)msgtyp有3种选项:


msgtyp == 0      接收队列中的第1个消息,不区分消息类型
msgtyp > 0        接收对列中的第1个类型等于msgtyp的消息
msgtyp < 0        接收其类型小于或等于msgtyp绝对值的第1个最低类型消息

(5)msgflg有3种选项:

0:当消息队列满时,msgsnd将会阻塞,直到消息能写进消息队列
IPC_NOWAIT:当消息队列已满的时候,msgsnd函数不等待立即返回
IPC_NOERROR:若发送的消息大于size字节,则把该消息截断,截断部分将被丢弃,且不通知发送进程。

2.3 msgctl函数

原型:

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);

功能

msgctl是消息队列的控制函数,常用来删除消息队列。

参数

(1)msqid:由msgget返回的消息队列标识符。

(2)cmd:通常为IPC_RMID表示删除消息队列。

(3)参数buf通常为NULL。

通过命令查看系统消息信息

(1)ipcs -q 命令查看系统的消息队列

(2)ipcs -m查看系统的共享内存

(3)ipcs -s 查看系统的信号量集。

image-20211216085707736

3. 案例代码: 消息队列示例1

下面两个程序分别编译,依次运行,不分先后顺序,就可以看到效果了。

3.1 发送消息

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/stat.h>
#define BUFFER 255
struct msgtype {    //重新定义该结构体
    long mtype;     //第一个参数类型不变
    char buf[BUFFER];
};

int main(int argc,char **argv)
{
    if(argc!=3)
    {
        printf("./app <消息> <消息类型-整数>\n");
        return 0;
    }
    /*1. 创建消息队列*/
    int msgid = msgget((key_t)1235,0666 | IPC_CREAT); 
    /*2. 向消息队列发送消息*/
    struct msgtype msg;
    memset(&msg,0,sizeof(struct msgtype));
    msg.mtype = atoi(argv[2]);                      //给结构体的成员赋值
    strncpy(msg.buf,argv[1],BUFFER);
    msgsnd(msgid,&msg,sizeof(struct msgtype),0);  
    return 0;
}

3.2 读取消息

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/stat.h>
#include <sys/msg.h>
#define BUFFER 255
struct msgtype {
    long mtype;
    char buf[BUFFER];
};

int main(int argc,char **argv)
{
    if(argc!=2)
    {
        printf("./app <消息类型-整数>\n");
        return 0;
    }
    /*1. 创建消息队列-如果存在就打开消息队列*/
    int msgid = msgget((key_t)1235, 0666 | IPC_CREAT); //获得消息队列
    struct msgtype msg;
    memset(&msg,0,sizeof(struct msgtype));
    while(1)
    {
        /*2. 从消息队列里读取指定消息类型*/
        msgrcv(msgid,&msg,sizeof(struct msgtype),atoi(argv[1]),0);
        printf("读取的消息: %s\n", msg.buf);
    }
    return 0;
}

4. 案例代码: 消息队列基本使用

下面两个例子,一个例子用于创建队列,并向队列里写数据,另一个例子从队列里读取数据。

4.1 向队列写入消息

程序运行需要传入两个额外的参数。一个是消息的类型,一个是具体的内容。

./app 1 hello
./app 2 123456789

示例代码:

#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <dirent.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <sys/wait.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>

struct msgbuf
{
    long mtype;       /* message type, must be > 0 */
    char mtext[1024];    /* message data */
};

int main(int argc,char **argv)
{
    if(argc!=3)
    {
        printf("./app <消息类型> <消息内容>\n");
        return 0;
    }
    /*1. 创建消息队列*/
    int msqid=msgget(123456,0666|IPC_CREAT);
    /*2. 向消息队列里加入数据*/
    struct msgbuf msg_buf;
    msg_buf.mtype=atoi(argv[1]); //消息类型  1、2、3、4、5
    strcpy(msg_buf.mtext,argv[2]); //消息内容
    msgsnd(msqid,&msg_buf,sizeof(struct msgbuf),0);
    printf("消息发送成功:%s,%s\n",argv[1],argv[2]);
    return 0;
}

4.2 从队列读取消息

#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <dirent.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <sys/wait.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>

struct msgbuf
{
    long mtype;       /* message type, must be > 0 */
    char mtext[1024];    /* message data */
};

int main(int argc,char **argv)
{
    if(argc!=2)
    {
        printf("./app <消息类型>\n");
        return 0;
    }
    /*1. 创建消息队列*/
    int msqid=msgget(123456,0666|IPC_CREAT);
    /*2. 从消息队列里取出数据*/
    struct msgbuf msg_buf;
    msgrcv(msqid,&msg_buf,sizeof(struct msgbuf),atoi(argv[1]),0);
    printf("消息读取成功:%d,%s\n",msg_buf.mtype,msg_buf.mtext);
    return 0;
}
目录
相关文章
|
3月前
|
并行计算 Linux
Linux内核中的线程和进程实现详解
了解进程和线程如何工作,可以帮助我们更好地编写程序,充分利用多核CPU,实现并行计算,提高系统的响应速度和计算效能。记住,适当平衡进程和线程的使用,既要拥有独立空间的'兄弟',也需要在'家庭'中分享和并行的成员。对于这个世界,现在,你应该有一个全新的认识。
184 67
|
2月前
|
Web App开发 Linux 程序员
获取和理解Linux进程以及其PID的基础知识。
总的来说,理解Linux进程及其PID需要我们明白,进程就如同汽车,负责执行任务,而PID则是独特的车牌号,为我们提供了管理的便利。知道这个,我们就可以更好地理解和操作Linux系统,甚至通过对进程的有效管理,让系统运行得更加顺畅。
76 16
|
2月前
|
Unix Linux
对于Linux的进程概念以及进程状态的理解和解析
现在,我们已经了解了Linux进程的基础知识和进程状态的理解了。这就像我们理解了城市中行人的行走和行为模式!希望这个形象的例子能帮助我们更好地理解这个重要的概念,并在实际应用中发挥作用。
72 20
|
1月前
|
监控 Shell Linux
Linux进程控制(详细讲解)
进程等待是系统通过调用特定的接口(如waitwaitpid)来实现的。来进行对子进程状态检测与回收的功能。
44 0
|
1月前
|
存储 负载均衡 算法
Linux2.6内核进程调度队列
本篇文章是Linux进程系列中的最后一篇文章,本来是想放在上一篇文章的结尾的,但是想了想还是单独写一篇文章吧,虽然说这部分内容是比较难的,所有一般来说是简单的提及带过的,但是为了让大家对进程有更深的理解与认识,还是看了一些别人的文章,然后学习了学习,然后对此做了总结,尽可能详细的介绍明白。最后推荐一篇文章Linux的进程优先级 NI 和 PR - 简书。
39 0
|
1月前
|
存储 Linux Shell
Linux进程概念-详细版(二)
在Linux进程概念-详细版(一)中我们解释了什么是进程,以及进程的各种状态,已经对进程有了一定的认识,那么这篇文章将会继续补全上篇文章剩余没有说到的,进程优先级,环境变量,程序地址空间,进程地址空间,以及调度队列。
35 0
|
1月前
|
Linux 调度 C语言
Linux进程概念-详细版(一)
子进程与父进程代码共享,其子进程直接用父进程的代码,其自己本身无代码,所以子进程无法改动代码,平时所说的修改是修改的数据。为什么要创建子进程:为了让其父子进程执行不同的代码块。子进程的数据相对于父进程是会进行写时拷贝(COW)。
41 0
|
12月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
12月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
8月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。