RT-Thread快速入门-消息队列

简介: RT-Thread快速入门-消息队列

上一篇介绍了消息邮箱,本篇文章介绍线程(任务)间通信的另一种方式——消息队列。

消息队列在实际项目中应用较多,建议初学者应该熟练掌握。

掌握了 RT-Thread 消息队列的原理和操作方法,如果再学习其他款 RTOS,会感觉很轻松。

1消息队列的工作机制

1. 理解消息队列

线程或中断服务例程可以将一条或多条消息放入消息队列中。

一个或多个线程也可以从消息队列中获得消息。当有多个消息发送到消息队列时,通常将先进入消息队列的消息先传给线程,也就是说,线程先得到的是最先进入消息队列的消息,即先进先出原则 (FIFO)。

如下图所示

 

2. 消息队列控制块

消息队列控制块是 RT-Thread 系统管理消息队列的一种数据结构,由结构体 struct rt_messagequeue 表示。另外 rt_mq_t 表示消息队列的句柄,即指向消息队列控制块的指针。

消息队列控制块的数据结构定义如下:

struct rt_messagequeue
{
    struct rt_ipc_object parent;           /* 继承自 ipc_object 类 */
    void                *msg_pool;         /* 指向存放消息的缓冲区的指针 */
    rt_uint16_t          msg_size;         /* 每个消息的长度 */
    rt_uint16_t          max_msgs;         /* 消息队列最大能容纳的消息数 */
    rt_uint16_t          entry;            /* 消息队列中已有的消息数 */
    void                *msg_queue_head;  /* 消息链表头 */
    void                *msg_queue_tail;  /* 消息链表尾 */
    void                *msg_queue_free;  /* 空闲消息链表 */
    rt_list_t            suspend_sender_thread;  /* 发送线程的挂起等待队列 */
};
typedef struct rt_messagequeue *rt_mq_t;

结构体定义中,继承关系一目了然,不再赘述。rt_messagequeue 对象从 rt_ipc_object 中派生,由 IPC 容器所管理。

2消息队列的操作函数

RT-Thread 提供了多种管理消息队列的接口函数。包括:创建消息队列 - 发送消息 - 接收消息 - 删除消息队列。如下图所示:

 

对于初学者来说,掌握其中常用的函数即可。本文重点介绍消息队列常用的函数接口。

实际项目中,使用消息队列的流程为:创建消息队列 - 发送消息 - 接收消息。我们就重点介绍一下对应的操作函数。

1. 创建消息队列

在 RT-Thread 中,同其他内核对象一样。创建消息队列也有两种方式:(1)动态创建(2)静态初始化。

动态创建一个消息队列的函数接口如下,调用此函数时,内核动态创建一个消息队列控制块。然后再分配一块内存空间,用于存放消息,这块内存的大小为:消息队列个数* [消息大小 + 消息头大小]。最后初始化消息队列以及消息队列控制块。

rt_mq_t rt_mq_create(const char *name,
                     rt_size_t   msg_size,
                     rt_size_t   max_msgs,
                     rt_uint8_t  flag)

参数 name 为消息队列名称;msg_size 为队列中一条消息的长度,单位为字节;max_msgs 为消息队列的最大个数;flag 为消息队列的等待方式。

创建成功,返回消息队列的句柄;创建失败,则返回 RT_NULL

静态方式创建消息队列需要两步:

  • 定义一个消息队列控制块以及一段存放消息的缓冲区
  • 初始化消息队列控制块

消息队列控制块初始化函数如下:

rt_err_t rt_mq_init(rt_mq_t mq, const char* name,
                    void *msgpool, rt_size_t msg_size,
                    rt_size_t pool_size, rt_uint8_t flag);

函数的参数解释如下表:

参数 描述
mq 消息队列控制块的指针
name 消息队列的名称
msgpool 存放消息的缓冲的指针
msg_size 一条消息的最大长度,单位为字节
pool_size 存放消息的缓冲区大小
flag 创建消息队列标志

初始化消息队列函数返回 RT_EOK

创建或初始化完成消息队列后,所有消息块都挂在空闲消息链表上,消息队列为空。

创建消息队列的标志变量取值有两种:

  • RT_IPC_FLAG_FIFO,等待消息队列的线程按照先进先出的方式进行排列。
  • RT_IPC_FLAG_PRIO,等待消息队列的线程按照优先级的方式进行排列。

2. 发送消息

RT-Thread 提供的发送消息接口函数有两种:一种是无等待超时接口,一种是有等待超时。

线程或者中断服务程序都可以给消息队列发送消息,发送消息的函数接口如下,此函数没有等待超时参数。

rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size)

参数 mq 为消息队列对象的句柄;buffer 为存放消息缓冲区的指针;size 为消息大小。

发送成功,函数返回 RT_EOK;消息队列已满,返回 -RT_EFULL

发送的消息长度大于消息队列中消息块的最大长度,则返回 -RT_ERROR

等待方式发送消息的函数接口如下,这个函数有等待超时参数:

rt_err_t rt_mq_send_wait(rt_mq_t     mq,
                         const void *buffer,
                         rt_size_t   size,
                         rt_int32_t  timeout)

此函数的参数 timeout 为发送等待超时时间,单位为系统时钟节拍。其他参数与 rt_mq_send() 相同。

如果消息队列已经满了,发送线程会根据设定的 timeout 参数等待消息队列中因为收取消息而空出空间。若超时时间到达依然没有空出空间,则发送线程将会被唤醒并返回错误码。

返回 RT_EOK 表示发送成功;返回 -RT_ETIMEOUT 表示超时;返回 -RT_ERROR 表示发送失败。

注意:在中断服务例程中发送邮件时,应该采用无等待延时的方式发送,直接使用 rt_mq_send() 或者等待超时设定为 0 的函数rt_mq_send_wait()

3. 接收消息

线程接收消息的函数接口如下,

rt_err_t rt_mq_recv(rt_mq_t mq, void *buffer,
                    rt_size_t  size, rt_int32_t timeout)

参数 mq 为消息队列对象的句柄;buffer 为消息内容;size 为消息大小;timeout 为超时时间。

接收消息时,需要指定消息队列的句柄,以及一块用于存储消息的缓冲区,接收到的消息内容将被复制到该缓冲区里。还需指定等待消息的超时时间。

当消息队列中为空时,接收消息的线程会根据设定的超时时间,挂起在消息队列的等待线程队列上,或直接返回。

3实战演练

多说无益,实践出真知。我们来举个例子,学习一下如何使用消息队列。

动态创建两个线程和一个消息队列,一个线程往消息队列中发送消息,一个线程从消息队列中接收消息。

代码如下:

#include <rtthread.h>
#define THREAD_PRIORITY 8
#define THREAD_TIMESLICE 5
/* 消息队列句柄 */
rt_mq_t mq_handle;
/* 线程 1 入口 */
static void thread1_entry(void *parameter)
{
    char buf = 0;
    rt_uint8_t cnt = 0;
    while (1)
    {       
        /* 从消息队列中收取消息 */
        if (rt_mq_recv(mq_handle, &buf, sizeof(buf), RT_WAITING_FOREVER) == RT_EOK)
        {
            rt_kprintf("thread1: recv msg , the content: %c\n", buf);
            if (cnt == 19)
            {
                break;
            }
            cnt++;          
        }
        rt_thread_mdelay(1);
    }
}
/* 线程 2 入口 */
static void thread2_entry(void *parameter)
{
    int result;
    char buf = 'A';
    rt_uint8_t cnt = 0;
    while (1)
    {
        rt_kprintf("thread2: send message - %c\n", buf);
        /* 向消息队列发送消息 */
        result = rt_mq_send(mq_handle, &buf, 1);
        if(result != RT_EOK)
        {
            rt_kprintf("rt_mq_send ERR\n");
        }       
        buf++;
        cnt++;
        if(cnt >= 20)
        {
            rt_kprintf("message queue stop send, thread2 quit\n");
            break;
        }
        /* 延时 50ms */
        rt_thread_mdelay(500);
    }   
}
int main()
{
    /* 线程控制块指针 */
    rt_thread_t thread1 = RT_NULL;
    rt_thread_t thread2 = RT_NULL;  
    /* 创建一个邮箱 */
    mq_handle = rt_mq_create("mq", 1, 2048, RT_IPC_FLAG_FIFO);
    if (mq_handle == RT_NULL)
    {
        rt_kprintf("create msg queue failed.\n");
        return -1;
    }
    /* 动态创建线程1 */
    thread1 = rt_thread_create("thread1", thread1_entry, RT_NULL,
                    1024, THREAD_PRIORITY - 1, THREAD_TIMESLICE);
    if(thread1 != RT_NULL)
    {
        /* 启动线程 */
        rt_thread_startup(thread1);
    }
    /* 动态创建线程2 */
    thread2 = rt_thread_create("thread2", thread2_entry, RT_NULL,
                    1024, THREAD_PRIORITY, THREAD_TIMESLICE);
    if(thread2 != RT_NULL)
    {
        /* 启动线程 */
        rt_thread_startup(thread2);
    }   
}

编译执行结果如下

 

该例程演示了消息队列如何使用。线程 1 从消息队列中收取消息;线程 2 定时给消息队列发送消息,一共发送了 20 条消息。

### 其他操作函数

对于 RT-Thread 消息队列操作来说,还有几个函数没有介绍。可以简单了解一下。

1. 删除动态创建的消息队列

删除由 rt_mq_create() 函数创建的消息队列,可以调用如下函数:

rt_err_t rt_mq_delete(rt_mq_t mq)

调用此函数,可以释放消息队列控制块占用的内存资源以及消息缓冲区占用的内存。在删除一个消息队列对象时,应该确保该消息队列不再被使用。

在删除前会唤醒所有挂起在该消息队列上的线程,然后释放消息队列对象占用的内存块。

2. 脱离静态创建的消息队列

删除 rt_mq_init() 初始化的消息队列,可以用如下函数:

rt_err_t rt_mq_detach(rt_mq_t mq)

调用此函数时,首先会唤醒所有挂起在该消息队列中,线程等待队列上的线程,然后将该消息队列从内核对象管理器中脱离。

3.发送紧急消息

RT-Thread 中,提供了一种发送紧急消息的函数接口,其过程与发送消息几乎一样。其函数接口如下:

rt_err_t rt_mq_urgent(rt_mq_t mq, void* buffer, rt_size_t size);

在发送紧急消息时,从空闲消息链表上取下来的消息块不是挂到消息队列的队尾,而是挂到队首,这样,接收者就能够优先接收到紧急消息,从而及时进行消息处理。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
消息中间件 负载均衡 NoSQL
Redis系列学习文章分享---第七篇(Redis快速入门之消息队列--List实现消息队列 Pubsub实现消息队列 stream的单消费模式 stream的消费者组模式 基于stream消息队列)
Redis系列学习文章分享---第七篇(Redis快速入门之消息队列--List实现消息队列 Pubsub实现消息队列 stream的单消费模式 stream的消费者组模式 基于stream消息队列)
82 0
|
消息中间件 存储 调度
RT-Thread记录(七、IPC机制之邮箱、消息队列)
讲完了线程同步的机制,我们要开始线程通讯的学习,线程通讯中的邮箱消息队列也属于 RT-Thread 的IPC机制
524 0
RT-Thread记录(七、IPC机制之邮箱、消息队列)
|
消息中间件 大数据 Java
大数据场景下的消息队列:Kafka3.0快速入门
Kafka是一个分布式的基于发布/订阅模式的消息队列,同时它又是一个分布式的事件流平台。既可作为消息队列,又可作为数据管道、流分析的应用。目前Kafka的最大应用还是消息队列。 市面上主流的消息队列有RabbitMQ,ActiveMQ、Kafka等等,其中RabbitMQ,ActiveMQ这些主要是Java应用中的队列,而Kafka主要在大数据场景下使用。 消息队列主要应用场景有如下几种:削峰、限流、解耦、异步通信等。
|
6月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
4月前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
58 0
手撸MQ消息队列——循环数组
|
5月前
|
消息中间件 存储 缓存
一个用过消息队列的人,竟不知为何要用 MQ?
一个用过消息队列的人,竟不知为何要用 MQ?
197 1
|
6月前
|
消息中间件 开发工具 RocketMQ
消息队列 MQ使用问题之一直连接master失败,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Prometheus 监控
消息队列 MQ使用问题之如何将旧集群的store目录迁移到新集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。