接上文,轻量级消息队列的实现

简介: 接上文,轻量级消息队列的实现

[ 序 言 ]
   
上篇文章《轻量级消息队列的实现》介绍了轻量的消息队列的实现,并提供API接口的实现代码,今天来完善一下LiteQueue的功能。新增功能如下:
      1、增加队列满时再入队的策略,用户可选
      2、增加队列异常的回调函数,通知上层应用
      3、增加判断队列是否为满的接口
      4、优化判空逻辑,解决了写入次数和队列项相同时再读出异常的bug
      5、增加LiteQueue的打印开关

文件目录如下:

  为了以后管理方便,把代码托管在Gitee上且开源,欢迎大家测试提出bug,可以私信我解bug。最新的代码也会同步到master分支上。
项目地址:https://gitee.com/one-hundred-and-twenty-three11/lite-queue/tree/dev/


[
Lite_Queue_Config.h 代 码 实 现 ]

#ifndef  __LITE_QUEUE_CONFIG_H__
#define  __LITE_QUEUE_CONFIG_H__
#define  QUEUE_ENABLE                       1
#define  QUEUE_DISABLE                      !QUEUE_ENABLE
#define  QUEUE_DEBUG_ENABLE                 QUEUE_ENABLE
#if(QUEUE_DEBUG_ENABLE)
#define  log_i(x...)                        printf(x)
#else
#define  log_i(x...)
#endif
#define  QUEUE_ERROR_CALLBACK               QUEUE_DISABLE
#endif   /* __LITE_QUEUE_CONFIG_H__ */

[ Lite_Queue.h 代 码 实 现 ]

#ifndef  __LITE_QUEUE_H__
#define  __LITE_QUEUE_H__
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include "Lite_Queue_Config.h"
typedef enum
{
    QUEUE_SEND_TO_BACK = 0,   /* After the queue is full, if more data is added to the queue, it will return to the initial position and start joining the queue. */
    QUEUE_DISCARD,            /* After the queue is full, if more data is added to the queue, the current data will be discarded. */
}QueueWriteMode;
typedef enum
{
    LITE_QUEUE_LOCK = 0,
    LITE_QUEUE_UNLOCK,
    LITE_QUEUE_IDLE,
    LITE_QUEUE_WRITE_BUSY,
    LITE_QUEUE_READ_BUSY,
    LITE_QUEUE_ERR,
    LITE_QUEUE_OK,
    LITE_QUEUE_EMPTY,
    LITE_QUEUE_NONEMPTY,
    LITE_QUEUE_FULL,
    LITE_QUEUE_NONEFULL,
}LiteQueue_Status;
typedef void (*LiteQueue_Callback_t)(LiteQueue_Status status);
/*
                 *tail: Move this pointer when writing data
                     item_num_1     item_num_2                 item_num_x
 ---------------    -----------    -----------                -----------
|   LiteQueue   |  | item_size |  | item_size |    ......    | item_size |
 ---------------    -----------    -----------                -----------  
                 *head: Move the pointer when reading data
LiteQueue : Structure describing queue parameters.
item_num_x: Number of items.
item_size : The size of each list item set when creating the queue, unit: bytes, used to store data received in the queue.
*/
typedef struct 
{
    volatile uint8_t  queue_write_lock;         /* write queue lock */
    volatile uint8_t  queue_read_lock;          /* read queue lock */   
    uint8_t   *head;                            /* Point to the head of the queue, du from the head of the queue */
    uint8_t   *tail;                            /* Point to the end of the queue and write data from the end of the queue */
    size_t    item_num;                         /* The number of list items in the queue */
    size_t    item_size;                        /* How much address space each list item occupies, unit: bytes */
    size_t    unread_item;                      /* The number of unread list items remaining in the queue */
    LiteQueue_Callback_t LiteQueue_Callback;    /* Callback function when queue exception occurs */
}LiteQueue, *pLiteQueue;
#define  LITE_QUEUE_WRITE_LOCK(__QUEUE__)      do{                                                              \
                                                    if((__QUEUE__)->queue_write_lock == LITE_QUEUE_LOCK)        \
                                                    {                                                           \
                                                        (__QUEUE__)->LiteQueue_Callback(LITE_QUEUE_WRITE_BUSY); \
                                                        return LITE_QUEUE_WRITE_BUSY;                           \
                                                    }                                                           \
                                                    else                                                        \
                                                    {                                                           \
                                                        (__QUEUE__)->queue_write_lock = LITE_QUEUE_LOCK;        \
                                                    }                                                           \
                                                 }while(0)
#define  LITE_QUEUE_WRITE_UNLOCK(__QUEUE__)    do{                                                              \
                                                    (__QUEUE__)->queue_write_lock = LITE_QUEUE_UNLOCK;          \
                                                 }while(0)
#define  LITE_QUEUE_READ_LOCK(__QUEUE__)       do{                                                              \
                                                    if((__QUEUE__)->queue_read_lock == LITE_QUEUE_LOCK)         \
                                                    {                                                           \
                                                        (__QUEUE__)->LiteQueue_Callback(LITE_QUEUE_READ_BUSY);  \
                                                        return LITE_QUEUE_READ_BUSY;                            \
                                                    }                                                           \
                                                    else                                                        \
                                                    {                                                           \
                                                        (__QUEUE__)->queue_read_lock = LITE_QUEUE_LOCK;         \
                                                    }                                                           \
                                                 }while(0)
#define  LITE_QUEUE_READ_UNLOCK(__QUEUE__)     do{                                                              \
                                                    (__QUEUE__)->queue_read_lock = LITE_QUEUE_UNLOCK;           \
                                                 }while(0)
extern LiteQueue *xLiteQueue_Create(size_t item_num, size_t item_size, LiteQueue_Callback_t callback);
extern LiteQueue_Status xWrite_To_LiteQueue(LiteQueue *queue, uint8_t *buff, QueueWriteMode WriteMode);  
extern LiteQueue_Status Read_From_LiteQueue(LiteQueue *queue, uint8_t *buff);                                            
extern LiteQueue_Status isLiteQueue_Empty(LiteQueue *queue);
extern LiteQueue_Status isLiteQueue_Full(LiteQueue *queue);
extern LiteQueue_Status LiteQueue_Clear(LiteQueue *queue);
extern LiteQueue_Status LiteQueue_Delete(LiteQueue *queue);
#if(QUEUE_ERROR_CALLBACK == QUEUE_ENABLE)
#define  LiteQueue_Create(item_num, item_size, callback)    xLiteQueue_Create(item_num, item_size, callback)
#else
#define  LiteQueue_Create(item_num, item_size)              xLiteQueue_Create((item_num), (item_size), NULL)
#endif 
#define  Write_Back_To_LiteQueue(queue, buff)               xWrite_To_LiteQueue((queue), (buff), (QUEUE_SEND_TO_BACK))
#define  Write_Discard_LiteQueue(queue, buff)               xWrite_To_LiteQueue((queue), (buff), (QUEUE_DISCARD))
#endif   /* __LITE_QUEUE_H__ */

[ Lite_Queue.c 代 码 实 现 ]

#include "Lite_Queue.h"
/*
* @ brief : Create message queue.
* @ param : {size_t     } item_num : The number of list items in the queue.
            {size_t     } item_size: The size of each list item, unit: bytes.
* @ return: {LiteQueue *} queue    : Message queue handle pointer.
* @ author: bagy.
* @ note  : Create a queue and initialize the queue items to 0, with the head and tail pointers pointing to the starting position of the list items.
*/
LiteQueue *xLiteQueue_Create(size_t item_num, size_t item_size, LiteQueue_Callback_t callback)
{
    if((item_num < 1) || (item_size < 1))
        return NULL;
    LiteQueue *queue = (LiteQueue *)malloc(sizeof(LiteQueue) + item_num * item_size);
    if(queue == NULL)
    {
        log_i("LiteQueue malloc failed.\r\n");
        return NULL;
    }
    memset((uint8_t *)queue, 0, sizeof(LiteQueue) + item_num * item_size);
    queue->head = (uint8_t *)((uint8_t *)queue + sizeof(LiteQueue));
    queue->tail = queue->head;
    queue->item_num = item_num;
    queue->item_size = item_size;
    queue->queue_read_lock = LITE_QUEUE_UNLOCK;
    queue->queue_write_lock = LITE_QUEUE_UNLOCK;
    queue->unread_item = 0;
    queue->LiteQueue_Callback = callback;
    return queue;
}
/*
* @ brief : Write data to the queue.
* @ param : {LiteQueue      *} queue    : Message queue handle pointer.
            {uint8_t        *} buff     : Data to be written to the queue.
            {QueueWriteMode  } WriteMode: Strategies for writing to the queue.
* @ return: {LiteQueue_Status} Returns the status of the queue.
* @ author: bagy.
* @ note  : Writing data when the queue is full will automatically overwrite the first frame of data.
*/
LiteQueue_Status xWrite_To_LiteQueue(LiteQueue *queue, uint8_t *buff, QueueWriteMode WriteMode)
{
    if((queue == NULL) || (buff == NULL))
        return LITE_QUEUE_ERR;
    LITE_QUEUE_WRITE_LOCK(queue);
    if(isLiteQueue_Full(queue) == LITE_QUEUE_FULL)
    {
        queue->LiteQueue_Callback(LITE_QUEUE_FULL);
        if(WriteMode == QUEUE_SEND_TO_BACK)
            log_i("The queue is full, overwriting the data that first entered the queue.\n");
        else
        {
            LITE_QUEUE_WRITE_UNLOCK(queue);
            log_i("Queue is full, discard current frame.\n");
            return LITE_QUEUE_FULL;  
        }  
    }
    memcpy(queue->tail, buff, queue->item_size);
    if(queue->tail == (uint8_t *)queue + sizeof(LiteQueue) + (queue->item_num - 1) * queue->item_size)
        queue->tail = (uint8_t *)queue + sizeof(LiteQueue);
    else 
        queue->tail += queue->item_size;
    queue->unread_item++;
    if(queue->unread_item >= queue->item_num)
        queue->unread_item = queue->item_num;
    LITE_QUEUE_WRITE_UNLOCK(queue);
    return LITE_QUEUE_OK;
}
/*
* @ brief : Read data from queue.
* @ param : {LiteQueue      *} queue: Message queue handle pointer.
            {uint8_t        *} buff : Data to be read from the queue.
* @ return: {LiteQueue_Status} Returns the status of the queue.
* @ author: bagy.
* @ note  : Read data starting from the position of the head pointer and save it to the buff.
*/
LiteQueue_Status Read_From_LiteQueue(LiteQueue *queue, uint8_t *buff)
{
    if((queue == NULL) || (buff == NULL) || (isLiteQueue_Empty(queue) == LITE_QUEUE_EMPTY))
        return LITE_QUEUE_ERR;
    LITE_QUEUE_READ_LOCK(queue);
    memcpy(buff, queue->head, queue->item_size);  
    if(queue->head == (uint8_t *)queue + sizeof(LiteQueue) + (queue->item_num - 1) * queue->item_size)
        queue->head = (uint8_t *)queue + sizeof(LiteQueue);
    else 
        queue->head += queue->item_size;
    queue->unread_item--;
    LITE_QUEUE_READ_UNLOCK(queue);
    return LITE_QUEUE_OK;  
}
/*
* @ brief : Determine whether the queue is empty.
* @ param : {LiteQueue      *} queue: Message queue handle pointer.
* @ return: {LiteQueue_Status} Returns the status of the queue.
* @ author: bagy.
* @ note  : Determine whether the queue is empty based on the comparison between the number of remaining unread list items and the total list 
            items of the queue. If the number of remaining unread list items is 0, it means that the queue is empty and the queue is not empty.
*/
inline LiteQueue_Status isLiteQueue_Empty(LiteQueue *queue)
{
    if(queue == NULL)
        return LITE_QUEUE_ERR;
    if(!queue->unread_item)
        return LITE_QUEUE_EMPTY;
    else
        return LITE_QUEUE_NONEMPTY;
}
/*
* @ brief : Determine if the queue is full.
* @ param : {LiteQueue      *} queue: Message queue handle pointer.
* @ return: {LiteQueue_Status} Returns the status of the queue.
* @ author: bagy.
* @ note  : Determine whether the queue is full by comparing the number of remaining unread list items with the total list items 
            of the queue. If they are the same, it means the queue is full, otherwise there is still space left in the queue.
*/
inline LiteQueue_Status isLiteQueue_Full(LiteQueue *queue)
{
    if(queue == NULL)
        return LITE_QUEUE_ERR;
    if(queue->unread_item == queue->item_num)
        return LITE_QUEUE_FULL;
    else
        return LITE_QUEUE_NONEFULL;
}
/*
* @ brief : Clear the message queue.
* @ param : {LiteQueue      *} queue: Message queue handle pointer.
* @ return: {LiteQueue_Status} Returns the status of the queue.
* @ author: bagy.
* @ note  : Determine whether the head and tail pointers are the same. If they are the same, 
            it means there is no data in the queue, otherwise it means there is still data that has not been read out.
*/
LiteQueue_Status LiteQueue_Clear(LiteQueue *queue)
{   
    if(queue == NULL)
        return LITE_QUEUE_ERR;
    LITE_QUEUE_WRITE_LOCK(queue);
    LITE_QUEUE_READ_LOCK(queue);
    queue->head = (uint8_t *)((uint8_t *)queue + sizeof(LiteQueue));
    queue->tail = queue->head;
    queue->unread_item = 0;
    memset(queue->head, 0, queue->item_num * queue->item_size);
    LITE_QUEUE_WRITE_UNLOCK(queue);
    LITE_QUEUE_READ_UNLOCK(queue);
    return LITE_QUEUE_OK;
}
/*
* @ brief : Clear the message queue.
* @ param : {LiteQueue      *} queue: Message queue handle pointer.
* @ return: {LiteQueue_Status} Returns the status of the queue.
* @ author: bagy.
* @ note  : Determine whether the head and tail pointers are the same. If they are the same, 
            it means there is no data in the queue, otherwise it means there is still data that has not been read out.
*/
LiteQueue_Status LiteQueue_Delete(LiteQueue *queue)
{
    if(queue == NULL)
        return LITE_QUEUE_ERR;    
    LITE_QUEUE_WRITE_LOCK(queue);
    LITE_QUEUE_READ_LOCK(queue);
    memset((uint8_t *)queue, 0, sizeof(LiteQueue) + queue->item_num * queue->item_size);
    queue->head = NULL;
    queue->tail = NULL;
    queue->LiteQueue_Callback = NULL;
    free(queue);
    queue = NULL;
    return LITE_QUEUE_OK;
}

[ main.c 测 试 代 码 实 现 ]

#include "Lite_Queue.h"
#define  QUEUE_ITEM_NUM      3
#define  QUEUE_ITEM_SIZE     10
/*
* @ brief : Print the contents of each list item in the queue.
* @ param : {LiteQueue *} queue: Message queue handle pointer.
* @ return: None.
* @ author: bagy.
* @ note  : None.
*/
static void PrintLiteQueue(LiteQueue *queue)
{
    if(queue == NULL)
        return;
    for(int i = 0; i < queue->item_num; i++)
    {
        log_i("[item_num:%d] ", i);
        for(int n = 0; n < queue->item_size; n++)
        {
            log_i("%d ", *((uint8_t *)queue + sizeof(LiteQueue) + i * queue->item_size + n));
        }
        log_i("\n");
    }
}
/*
* @ brief : Print the data in buff.
* @ param : {const char *} mode : Print mode.
            {LiteQueue  *} queue: Message queue handle pointer.
* @ return: None.
* @ author: bagy.
* @ note  : Used to observe buff data changes and test to verify the correctness of written or read data.
*/
static void PrintBuff(const char *mode, uint8_t *buff, size_t len)
{  
    if((buff == NULL) || (len < 1))
        return;
    if(strstr(mode, "write"))
        log_i("Write buff>>>:");
    else
        log_i("Read buff <<<:");
    for(size_t i = 0; i < len; i++)
    {
        log_i("%d ", buff[i]);
    }
    log_i("\n\n");
}
/*
* @ brief : Queue callback function.
* @ param : {LiteQueue_Status} sta: Queue status.
* @ return: None.
* @ author: bagy.
* @ note  : When the queue writes or reads abnormally, the upper application will be notified through the callback function.
*/
#if(QUEUE_ERROR_CALLBACK == QUEUE_ENABLE)
void LiteQueue_Callback(LiteQueue_Status sta)
{
    switch(sta)
    {
        case LITE_QUEUE_FULL      : /*...*/ break;
        case LITE_QUEUE_WRITE_BUSY: /*...*/ break;
        case LITE_QUEUE_READ_BUSY : /*...*/ break;
        default:break;
    }
}
#endif
int main(void)
{
    uint8_t writebuff[QUEUE_ITEM_SIZE] = {0};
    uint8_t readbuff[QUEUE_ITEM_SIZE]  = {0};
    /* Create message queue, 4 list items, each list item has 10 bytes of memory space */
    #if(QUEUE_ERROR_CALLBACK == QUEUE_ENABLE)
    pLiteQueue msgQueue = LiteQueue_Create(QUEUE_ITEM_NUM, QUEUE_ITEM_SIZE, LiteQueue_Callback);
    #else
    pLiteQueue msgQueue = LiteQueue_Create(QUEUE_ITEM_NUM, QUEUE_ITEM_SIZE);
    #endif
    PrintLiteQueue(msgQueue);
    log_i("\n");
    /* Simulate writing and reading to the queue 6 times, and observe the data in the queue by printing */
    for(int i = 0; i < 4; i++)
    {
        /* Simulate data, change the writebuff data and write it to the queue */
        for(int n = 0; n < msgQueue->item_size; n++)
            writebuff[n] = (i * msgQueue->item_size + n) % 256;
        /* Data is written to the queue */
        Write_Back_To_LiteQueue(msgQueue, writebuff);
        PrintLiteQueue(msgQueue);
        log_i("\n");
    }
    /* Read data from queue */
    Read_From_LiteQueue(msgQueue, readbuff);
    PrintBuff("read", readbuff, sizeof(readbuff));
    /* Read data from queue */
    Read_From_LiteQueue(msgQueue, readbuff);
    PrintBuff("read", readbuff, sizeof(readbuff));
    PrintBuff("write", writebuff, sizeof(writebuff));
    /* Data is written to the queue */
    Write_Back_To_LiteQueue(msgQueue, writebuff);
    PrintLiteQueue(msgQueue);
    return 0;
}

[ Makefile ]

Lite_Queue: main.o Lite_Queue.o
  gcc -o Lite_Queue main.o Lite_Queue.o
main.o: main.c Lite_Queue.h
  gcc -c main.c
Lite_Queue.o: Lite_Queue.c Lite_Queue.h Lite_Queue_Config.h
  gcc -c Lite_Queue.c
clean:
  del -f *.o *.gch

[ 编 译 相 关 ]

编译:make
清除:make clean
运行:.\Lite_Queue.exe

如果觉得本篇文章多少有点帮助的话,求赞、关注、评论、转发,创作不易!你们的支持是小编创作最大动力。

相关文章
|
4月前
|
消息中间件 物联网 Linux
Linux怎么安装czmq(物联网消息通讯轻量级消息队列)
Linux怎么安装czmq(物联网消息通讯轻量级消息队列)
46 8
|
4月前
|
消息中间件 自然语言处理 负载均衡
RabbitMQ揭秘:轻量级消息队列的优缺点全解析
**RabbitMQ简介** RabbitMQ是源自电信行业的消息中间件,支持AMQP协议,提供轻量、快速且易于部署的解决方案。它拥有灵活的路由配置,广泛的语言支持,适用于异步处理、负载均衡、日志收集和微服务通信等场景。然而,当面临大量消息堆积或高吞吐量需求时,性能可能会下降,并且扩展和开发成本相对较高。
251 0
|
5月前
|
消息中间件 存储 测试技术
轻量级消息队列的实现
轻量级消息队列的实现
|
消息中间件 NoSQL Java
SpringBoot项目:RedisTemplate实现轻量级消息队列
背景 公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用ActiveMQ等消息队列中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发
|
消息中间件 存储 监控
【实测】django的超轻量级消息队列:django-task-mq 使用教程
【实测】django的超轻量级消息队列:django-task-mq 使用教程
|
消息中间件 Windows
在Windows系统上实现轻量级的线程间及进程间消息队列
Windows没有message queue累世的IPC内核对象,使得在在处理IPC时少了一种传递消息的手段。利用Windows的Naming Object可以实现一套简单的Inter-Thread消息队列。
1247 0
|
3月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
32 0
手撸MQ消息队列——循环数组
|
2月前
|
消息中间件 存储 缓存
一个用过消息队列的人,竟不知为何要用 MQ?
一个用过消息队列的人,竟不知为何要用 MQ?
122 1