轻量级消息队列的实现

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: 轻量级消息队列的实现

[ 序 言 ]
    消息队列大家一定都不陌生,在RTOS、Linux等操作系统上都是必不可少的,作用也很简单,可以有效的防止丢包的发生,比如说,串口短时间内快速的接收一帧帧的消息进来到任务中去解析处理,当解析处理的时候再来一帧消息的时候,这时CPU正在解析上一帧数据,当前的这帧数据可能就会丢失,而消息队列就能很好的避免这个问题,收到消息时先把接收到的消息放到队列中,在任务中从队列获取数据,如果解析过程中再来一帧数据,这帧数据会先存放在消息队列中,当队列中的上一帧数据解析完成后,任务会从队列中的下一条数据开始解析处理,以此循环,直到队列中的消息解析处理完毕。

   
最近项目上使用的MCU片上资源比较有限,且无法跑RTOS,而通讯部分的消息,模组会有时连发几帧数据,且间隔最小2ms,不用队列就很难保证不丢包,因此准备自己实现一个轻量级的消息队列,在裸机或者片上资源有限的情况下也能完美运行的队列,于是就诞生了LiteQueue

[ 初 识 队 列 ]
   
从上面的工作原理上来看就已经能基本分析出队列的模样了,数据是一条条存进去的,从队列尾写入,队列头读出,那肯定是有头尾指针分别指向写入的位置和读出的位置,为了灵活的配置队列的大小,还需要配置队列中最多能存储几帧数据即几个列表项,每一项有多大的空间去保存我们接收到的数据,所以我们的队列结构体长这个样子:

/*
                 *tail: Move this pointer when writing data
                     item_num_1     item_num_2                 item_num_x
 ---------------    -----------    -----------                -----------
|   LiteQueue   |  | item_size |  | item_size |    ......    | item_size |
 ---------------    -----------    -----------                -----------  
                 *head: Move the pointer when reading data
LiteQueue : Structure describing queue parameters.
item_num_x: Number of items.
item_size : The size of each list item set when creating the queue, unit: bytes, used to store data received in the queue.
*/
typedef struct 
{
    uint8_t   *head;
    uint8_t   *tail;
    size_t    item_num;
    size_t    item_size;
}LiteQueue, *pLiteQueue;

 还需要考虑一种情况,就是多线程访问时不能同时读或写,要互斥访问。因此还需要加一个读写锁,优化之后的队列就是这样的:

/*
                 *tail: Move this pointer when writing data
                     item_num_1     item_num_2                 item_num_x
 ---------------    -----------    -----------                -----------
|   LiteQueue   |  | item_size |  | item_size |    ......    | item_size |
 ---------------    -----------    -----------                -----------  
                 *head: Move the pointer when reading data
LiteQueue : Structure describing queue parameters.
item_num_x: Number of items.
item_size : The size of each list item set when creating the queue, unit: bytes, used to store data received in the queue.
*/
typedef struct 
{
    volatile uint8_t  queue_write_lock;
    volatile uint8_t  queue_read_lock;
    uint8_t   *head;
    uint8_t   *tail;
    size_t    item_num;
    size_t    item_size;
}LiteQueue, *pLiteQueue;

[ 接 口 ]  
   
确定了队列结构体的模样,就可以根据需求确认下队列的接口定义,这也比较简单,使用队列前必定先要创建队列,并确定创建队列的大小,其次是读写队列的接口,以及判断队列是否为空、清空队列、删除队列,这些基本就足够使用了,因此围绕这些接口来实现下API的定义吧!

extern LiteQueue *LiteQueue_Create(size_t item_num, size_t item_size);   
extern LiteQueue_Status Write_To_LiteQueue(LiteQueue *queue, uint8_t *buff);  
extern LiteQueue_Status Read_From_LiteQueue(LiteQueue *queue, uint8_t *buff);                                            
extern LiteQueue_Status isLiteQueue_Empty(LiteQueue *queue);
extern LiteQueue_Status LiteQueue_Clear(LiteQueue *queue);
extern LiteQueue_Status LiteQueue_Delete(LiteQueue *queue);

   队列的状态就用一个枚举类型表示如下:

typedef enum
{
    LITE_QUEUE_IDLE = 0,
    LITE_QUEUE_BUSY,
    LITE_QUEUE_ERR,
    LITE_QUEUE_OK,
    LITE_QUEUE_EMPTY,
    LITE_QUEUE_NONEMPTY,
}LiteQueue_Status;

 还有一个队列锁不要忘记了,锁比较简单,就使用宏定义的方式实现如下:

typedef enum
{
    LITE_QUEUE_UNLOCK = 0,
    LITE_QUEUE_LOCK,
}LiteQueueLock;
#define  LITE_QUEUE_WRITE_LOCK(__QUEUE__)      do{                                                          \
                                                    if((__QUEUE__)->queue_write_lock == LITE_QUEUE_LOCK)    \
                                                    {                                                       \
                                                        return LITE_QUEUE_BUSY;                             \
                                                    }                                                       \
                                                    else                                                    \
                                                    {                                                       \
                                                        (__QUEUE__)->queue_write_lock = LITE_QUEUE_LOCK;    \
                                                    }                                                       \
                                                 }while(0)
#define  LITE_QUEUE_WRITE_UNLOCK(__QUEUE__)    do{                                                          \
                                                    (__QUEUE__)->queue_write_lock = LITE_QUEUE_UNLOCK;      \
                                                 }while(0)
#define  LITE_QUEUE_READ_LOCK(__QUEUE__)       do{                                                          \
                                                    if((__QUEUE__)->queue_read_lock == LITE_QUEUE_LOCK)     \
                                                    {                                                       \
                                                        return LITE_QUEUE_BUSY;                             \
                                                    }                                                       \
                                                    else                                                    \
                                                    {                                                       \
                                                        (__QUEUE__)->queue_read_lock = LITE_QUEUE_LOCK;     \
                                                    }                                                       \
                                                 }while(0)
#define  LITE_QUEUE_READ_UNLOCK(__QUEUE__)     do{                                                          \
                                                    (__QUEUE__)->queue_read_lock = LITE_QUEUE_UNLOCK;       \
                                                 }while(0)

[ 创 建 队 列 ]

/*
* @ brief : Create message queue.
* @ param : {size_t     } item_num : The number of list items in the queue.
            {size_t     } item_size: The size of each list item, unit: bytes.
* @ return: {LiteQueue *} queue    : Message queue handle pointer.
* @ author: bagy.
* @ note  : Create a queue and initialize the queue items to 0, with the head and tail pointers pointing to the starting position of the list items.
*/
LiteQueue *LiteQueue_Create(size_t item_num, size_t item_size)
{
    if((item_num < 1) || (item_size < 1))
        return NULL;
    LiteQueue *queue = (LiteQueue *)malloc(sizeof(LiteQueue) + item_num * item_size);
    if(queue == NULL)
    {
        log_i("LiteQueue malloc failed.\r\n");
        return NULL;
    }
    memset((uint8_t *)queue, 0, sizeof(LiteQueue) + item_num * item_size);
    queue->head = (uint8_t *)((uint8_t *)queue + sizeof(LiteQueue));
    queue->tail = queue->head;
    queue->item_num = item_num;
    queue->item_size = item_size;
    queue->queue_read_lock = LITE_QUEUE_UNLOCK;
    queue->queue_write_lock = LITE_QUEUE_UNLOCK;
    return queue;
}

   注释写的已经很清楚啦,主要是创建一个队列,使用形参传入列表项的个数以及每个列表项的大小,单位是字节。后面写入的时候把接收到的每一帧数据都依次放入列表项中。



[ 写 入 数 据 到 队 列 中 ]

/*
* @ brief : Write data to the queue.
* @ param : {LiteQueue      *} queue: Message queue handle pointer.
            {uint8_t        *} buff : Data to be written to the queue.
* @ return: {LiteQueue_Status} Returns the status of the queue.
* @ author: bagy.
* @ note  : Writing data when the queue is full will automatically overwrite the first frame of data.
*/
LiteQueue_Status Write_To_LiteQueue(LiteQueue *queue, uint8_t *buff)
{
    if((queue == NULL) || (buff == NULL))
        return LITE_QUEUE_ERR;
    LITE_QUEUE_WRITE_LOCK(queue);
    memcpy(queue->tail, buff, queue->item_size);
    if(queue->tail == (uint8_t *)queue + sizeof(LiteQueue) + (queue->item_num - 1) * queue->item_size)
        queue->tail = (uint8_t *)queue + sizeof(LiteQueue);
    else 
        queue->tail += queue->item_size;
    LITE_QUEUE_WRITE_UNLOCK(queue);
    return LITE_QUEUE_OK;
}

   这里有一个要注意的是,我这里设计是当队列满了之后,如果队列里面的数据都还没被读出,即没有空闲的位置可以放新的数据,就会覆盖第一个入队的数据写入。这种是比较极端的情况,一般只要队列的大小设置的合理,解析程序没有死循环或者异常,队列基本是不会满的。


[ 从 队 列 中 读 出 数 据 ]

/*
* @ brief : Read data from queue.
* @ param : {LiteQueue      *} queue: Message queue handle pointer.
            {uint8_t        *} buff : Data to be read from the queue.
* @ return: {LiteQueue_Status} Returns the status of the queue.
* @ author: bagy.
* @ note  : Read data starting from the position of the head pointer and save it to the buff.
*/
LiteQueue_Status Read_From_LiteQueue(LiteQueue *queue, uint8_t *buff)
{
    if((queue == NULL) || (buff == NULL) || (isLiteQueue_Empty(queue) == LITE_QUEUE_EMPTY))
        return LITE_QUEUE_ERR;
    LITE_QUEUE_READ_LOCK(queue);
    memcpy(buff, queue->head, queue->item_size);  
    if(queue->head == (uint8_t *)queue + sizeof(LiteQueue) + (queue->item_num - 1) * queue->item_size)
        queue->head = (uint8_t *)queue + sizeof(LiteQueue);
    else 
        queue->head += queue->item_size;
    LITE_QUEUE_READ_UNLOCK(queue);
    return LITE_QUEUE_OK;  
}

[ 判 断 队 列 是 否 为 空 ]

/*
* @ brief : Determine whether the queue is empty.
* @ param : {LiteQueue      *} queue: Message queue handle pointer.
* @ return: {LiteQueue_Status} Returns the status of the queue.
* @ author: bagy.
* @ note  : Determine whether the head and tail pointers are the same. If they are the same, 
            it means there is no data in the queue, otherwise it means there is still data that has not been read out.
*/
inline LiteQueue_Status isLiteQueue_Empty(LiteQueue *queue)
{
    if(queue == NULL)
        return LITE_QUEUE_ERR;
    if(queue->head == queue->tail)
        return LITE_QUEUE_EMPTY;
    else
        return LITE_QUEUE_NONEMPTY;
}

   这个接口是最简单的,只要判断头尾指针是否相同即可,相同则表示所有的数据都被读出了即队列为空


[ 清 空 队 列 ]

/*
* @ brief : Clear the message queue.
* @ param : {LiteQueue      *} queue: Message queue handle pointer.
* @ return: {LiteQueue_Status} Returns the status of the queue.
* @ author: bagy.
* @ note  : Determine whether the head and tail pointers are the same. If they are the same, 
            it means there is no data in the queue, otherwise it means there is still data that has not been read out.
*/
LiteQueue_Status LiteQueue_Clear(LiteQueue *queue)
{   
    if(queue == NULL)
        return LITE_QUEUE_ERR;
    queue->head = (uint8_t *)((uint8_t *)queue + sizeof(LiteQueue));
    queue->tail = queue->head;
    memset(queue->head, 0, queue->item_num * queue->item_size);
    return LITE_QUEUE_OK;
}


[ 删 除 队 列 ]

/*
* @ brief : Clear the message queue.
* @ param : {LiteQueue      *} queue: Message queue handle pointer.
* @ return: {LiteQueue_Status} Returns the status of the queue.
* @ author: bagy.
* @ note  : Determine whether the head and tail pointers are the same. If they are the same, 
            it means there is no data in the queue, otherwise it means there is still data that has not been read out.
*/
LiteQueue_Status LiteQueue_Delete(LiteQueue *queue)
{
    if(queue == NULL)
        return LITE_QUEUE_ERR;    
    memset((uint8_t *)queue, 0, sizeof(LiteQueue) + queue->item_num * queue->item_size);
    free(queue);
    queue = NULL;
    return LITE_QUEUE_OK;
}


[ 测 试 用 例 ]

   所有的接口都完成啦,就剩下写测试用例,测试用例我也贴心的提供了,所以你还不点赞、评论加转发?

/*
* @ brief : Print the contents of each list item in the queue.
* @ param : {LiteQueue *} queue: Message queue handle pointer.
* @ return: None.
* @ author: bagy.
* @ note  : None.
*/
static void PrintLiteQueue(LiteQueue *queue)
{
    if(queue == NULL)
        return;
    for(int i = 0; i < queue->item_num; i++)
    {
        log_i("[item_num:%d] ", i);
        for(int n = 0; n < queue->item_size; n++)
        {
            log_i("%d ", *((uint8_t *)queue + sizeof(LiteQueue) + i * queue->item_size + n));
        }
        log_i("\n");
    }
}
/*
* @ brief : Print the data in buff.
* @ param : {LiteQueue *} queue: Message queue handle pointer.
* @ return: None.
* @ author: bagy.
* @ note  : Used to observe buff data changes and test to verify the correctness of written or read data.
*/
static void PrintBuff(uint8_t *buff, size_t len)
{  
    if((buff == NULL) || (len < 1))
        return;
    log_i("Read buff<<<:");
    for(size_t i = 0; i < len; i++)
    {
        log_i("%d ", buff[i]);
    }
    log_i("\n\n");
}
int main(void)
{
    uint8_t writebuff[10] = {0};
    uint8_t readbuff[10]  = {0};
    /* Create message queue, 4 list items, each list item has 10 bytes of memory space */
    pLiteQueue msgQueue = LiteQueue_Create(4, 10);
    PrintLiteQueue(msgQueue);
    printf("\n");
    /* Simulate writing and reading to the queue 6 times, and observe the data in the queue by printing */
    for(int i = 0; i < 6; i++)
    {
        /* Simulate data, change the writebuff data and write it to the queue */
        for(int n = 0; n < msgQueue->item_size; n++)
            writebuff[n] = (i * msgQueue->item_size + n) % 256;
        /* Data is written to the queue */
        Write_To_LiteQueue(msgQueue, writebuff);
        PrintLiteQueue(msgQueue);
        /* Read data from queue */
        Read_From_LiteQueue(msgQueue, readbuff);
        PrintBuff(readbuff, sizeof(readbuff));
    }
    return 0;
}

   PrintLiteQueue这个函数是把消息队列中的数据都打印出来,并根据列表项划分好,这样看起来更加直观。

      PrintBuff是把传入的buff打印出来,可以用来观测从队列中读出的数据和写入的是否是一样的。

       main函数中创建了一个4x10的队列,创建之后打印出队列的数据如下:

  上图表示有四个列表项,每个列表项有10个字节的内存空间保存接收的数据,并全部初始化为0。

   

    接着for循环模拟向队列中写入6组数据,每次写入后都打印下当前队列中的数据,之后从队列读取数据出来保存到readbuff中并把读出的数据打印出来观测。

/* Simulate data, change the writebuff data and write it to the queue */
for(int n = 0; n < msgQueue->item_size; n++)
    writebuff[n] = (i * msgQueue->item_size + n) % 256;

  上面这几行代码就是模拟数据,让每次写入队列的数据都是不同的,且是连续的,这样比较好观测,接下来就来看下打印的结果吧。


  文章至此结束啦如果觉得本篇文章多少有点帮助的话,求赞、关注、评论、转发,创作不易!你们的支持是小编创作最大动力。

相关文章
|
4月前
|
消息中间件 物联网 Linux
Linux怎么安装czmq(物联网消息通讯轻量级消息队列)
Linux怎么安装czmq(物联网消息通讯轻量级消息队列)
46 8
|
4月前
|
消息中间件 自然语言处理 负载均衡
RabbitMQ揭秘:轻量级消息队列的优缺点全解析
**RabbitMQ简介** RabbitMQ是源自电信行业的消息中间件,支持AMQP协议,提供轻量、快速且易于部署的解决方案。它拥有灵活的路由配置,广泛的语言支持,适用于异步处理、负载均衡、日志收集和微服务通信等场景。然而,当面临大量消息堆积或高吞吐量需求时,性能可能会下降,并且扩展和开发成本相对较高。
251 0
|
5月前
|
消息中间件 API
接上文,轻量级消息队列的实现
接上文,轻量级消息队列的实现
|
消息中间件 NoSQL Java
SpringBoot项目:RedisTemplate实现轻量级消息队列
背景 公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用ActiveMQ等消息队列中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发
|
消息中间件 存储 监控
【实测】django的超轻量级消息队列:django-task-mq 使用教程
【实测】django的超轻量级消息队列:django-task-mq 使用教程
|
消息中间件 Windows
在Windows系统上实现轻量级的线程间及进程间消息队列
Windows没有message queue累世的IPC内核对象,使得在在处理IPC时少了一种传递消息的手段。利用Windows的Naming Object可以实现一套简单的Inter-Thread消息队列。
1247 0
|
3月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
32 0
手撸MQ消息队列——循环数组
|
2月前
|
消息中间件 存储 缓存
一个用过消息队列的人,竟不知为何要用 MQ?
一个用过消息队列的人,竟不知为何要用 MQ?
122 1