[ 序 言 ]
消息队列大家一定都不陌生,在RTOS、Linux等操作系统上都是必不可少的,作用也很简单,可以有效的防止丢包的发生,比如说,串口短时间内快速的接收一帧帧的消息进来到任务中去解析处理,当解析处理的时候再来一帧消息的时候,这时CPU正在解析上一帧数据,当前的这帧数据可能就会丢失,而消息队列就能很好的避免这个问题,收到消息时先把接收到的消息放到队列中,在任务中从队列获取数据,如果解析过程中再来一帧数据,这帧数据会先存放在消息队列中,当队列中的上一帧数据解析完成后,任务会从队列中的下一条数据开始解析处理,以此循环,直到队列中的消息解析处理完毕。
最近项目上使用的MCU片上资源比较有限,且无法跑RTOS,而通讯部分的消息,模组会有时连发几帧数据,且间隔最小2ms,不用队列就很难保证不丢包,因此准备自己实现一个轻量级的消息队列,在裸机或者片上资源有限的情况下也能完美运行的队列,于是就诞生了LiteQueue
[ 初 识 队 列 ]
从上面的工作原理上来看就已经能基本分析出队列的模样了,数据是一条条存进去的,从队列尾写入,队列头读出,那肯定是有头尾指针分别指向写入的位置和读出的位置,为了灵活的配置队列的大小,还需要配置队列中最多能存储几帧数据即几个列表项,每一项有多大的空间去保存我们接收到的数据,所以我们的队列结构体长这个样子:
/* *tail: Move this pointer when writing data ↓ item_num_1 item_num_2 item_num_x --------------- ----------- ----------- ----------- | LiteQueue | | item_size | | item_size | ...... | item_size | --------------- ----------- ----------- ----------- ↑ *head: Move the pointer when reading data LiteQueue : Structure describing queue parameters. item_num_x: Number of items. item_size : The size of each list item set when creating the queue, unit: bytes, used to store data received in the queue. */ typedef struct { uint8_t *head; uint8_t *tail; size_t item_num; size_t item_size; }LiteQueue, *pLiteQueue;
还需要考虑一种情况,就是多线程访问时不能同时读或写,要互斥访问。因此还需要加一个读写锁,优化之后的队列就是这样的:
/* *tail: Move this pointer when writing data ↓ item_num_1 item_num_2 item_num_x --------------- ----------- ----------- ----------- | LiteQueue | | item_size | | item_size | ...... | item_size | --------------- ----------- ----------- ----------- ↑ *head: Move the pointer when reading data LiteQueue : Structure describing queue parameters. item_num_x: Number of items. item_size : The size of each list item set when creating the queue, unit: bytes, used to store data received in the queue. */ typedef struct { volatile uint8_t queue_write_lock; volatile uint8_t queue_read_lock; uint8_t *head; uint8_t *tail; size_t item_num; size_t item_size; }LiteQueue, *pLiteQueue;
[ 接 口 ]
确定了队列结构体的模样,就可以根据需求确认下队列的接口定义,这也比较简单,使用队列前必定先要创建队列,并确定创建队列的大小,其次是读写队列的接口,以及判断队列是否为空、清空队列、删除队列,这些基本就足够使用了,因此围绕这些接口来实现下API的定义吧!
extern LiteQueue *LiteQueue_Create(size_t item_num, size_t item_size); extern LiteQueue_Status Write_To_LiteQueue(LiteQueue *queue, uint8_t *buff); extern LiteQueue_Status Read_From_LiteQueue(LiteQueue *queue, uint8_t *buff); extern LiteQueue_Status isLiteQueue_Empty(LiteQueue *queue); extern LiteQueue_Status LiteQueue_Clear(LiteQueue *queue); extern LiteQueue_Status LiteQueue_Delete(LiteQueue *queue);
队列的状态就用一个枚举类型表示如下:
typedef enum { LITE_QUEUE_IDLE = 0, LITE_QUEUE_BUSY, LITE_QUEUE_ERR, LITE_QUEUE_OK, LITE_QUEUE_EMPTY, LITE_QUEUE_NONEMPTY, }LiteQueue_Status;
还有一个队列锁不要忘记了,锁比较简单,就使用宏定义的方式实现如下:
typedef enum { LITE_QUEUE_UNLOCK = 0, LITE_QUEUE_LOCK, }LiteQueueLock; #define LITE_QUEUE_WRITE_LOCK(__QUEUE__) do{ \ if((__QUEUE__)->queue_write_lock == LITE_QUEUE_LOCK) \ { \ return LITE_QUEUE_BUSY; \ } \ else \ { \ (__QUEUE__)->queue_write_lock = LITE_QUEUE_LOCK; \ } \ }while(0) #define LITE_QUEUE_WRITE_UNLOCK(__QUEUE__) do{ \ (__QUEUE__)->queue_write_lock = LITE_QUEUE_UNLOCK; \ }while(0) #define LITE_QUEUE_READ_LOCK(__QUEUE__) do{ \ if((__QUEUE__)->queue_read_lock == LITE_QUEUE_LOCK) \ { \ return LITE_QUEUE_BUSY; \ } \ else \ { \ (__QUEUE__)->queue_read_lock = LITE_QUEUE_LOCK; \ } \ }while(0) #define LITE_QUEUE_READ_UNLOCK(__QUEUE__) do{ \ (__QUEUE__)->queue_read_lock = LITE_QUEUE_UNLOCK; \ }while(0)
[ 创 建 队 列 ]
/* * @ brief : Create message queue. * @ param : {size_t } item_num : The number of list items in the queue. {size_t } item_size: The size of each list item, unit: bytes. * @ return: {LiteQueue *} queue : Message queue handle pointer. * @ author: bagy. * @ note : Create a queue and initialize the queue items to 0, with the head and tail pointers pointing to the starting position of the list items. */ LiteQueue *LiteQueue_Create(size_t item_num, size_t item_size) { if((item_num < 1) || (item_size < 1)) return NULL; LiteQueue *queue = (LiteQueue *)malloc(sizeof(LiteQueue) + item_num * item_size); if(queue == NULL) { log_i("LiteQueue malloc failed.\r\n"); return NULL; } memset((uint8_t *)queue, 0, sizeof(LiteQueue) + item_num * item_size); queue->head = (uint8_t *)((uint8_t *)queue + sizeof(LiteQueue)); queue->tail = queue->head; queue->item_num = item_num; queue->item_size = item_size; queue->queue_read_lock = LITE_QUEUE_UNLOCK; queue->queue_write_lock = LITE_QUEUE_UNLOCK; return queue; }
注释写的已经很清楚啦,主要是创建一个队列,使用形参传入列表项的个数以及每个列表项的大小,单位是字节。后面写入的时候把接收到的每一帧数据都依次放入列表项中。
[ 写 入 数 据 到 队 列 中 ]
/* * @ brief : Write data to the queue. * @ param : {LiteQueue *} queue: Message queue handle pointer. {uint8_t *} buff : Data to be written to the queue. * @ return: {LiteQueue_Status} Returns the status of the queue. * @ author: bagy. * @ note : Writing data when the queue is full will automatically overwrite the first frame of data. */ LiteQueue_Status Write_To_LiteQueue(LiteQueue *queue, uint8_t *buff) { if((queue == NULL) || (buff == NULL)) return LITE_QUEUE_ERR; LITE_QUEUE_WRITE_LOCK(queue); memcpy(queue->tail, buff, queue->item_size); if(queue->tail == (uint8_t *)queue + sizeof(LiteQueue) + (queue->item_num - 1) * queue->item_size) queue->tail = (uint8_t *)queue + sizeof(LiteQueue); else queue->tail += queue->item_size; LITE_QUEUE_WRITE_UNLOCK(queue); return LITE_QUEUE_OK; }
这里有一个要注意的是,我这里设计是当队列满了之后,如果队列里面的数据都还没被读出,即没有空闲的位置可以放新的数据,就会覆盖第一个入队的数据写入。这种是比较极端的情况,一般只要队列的大小设置的合理,解析程序没有死循环或者异常,队列基本是不会满的。
[ 从 队 列 中 读 出 数 据 ]
/* * @ brief : Read data from queue. * @ param : {LiteQueue *} queue: Message queue handle pointer. {uint8_t *} buff : Data to be read from the queue. * @ return: {LiteQueue_Status} Returns the status of the queue. * @ author: bagy. * @ note : Read data starting from the position of the head pointer and save it to the buff. */ LiteQueue_Status Read_From_LiteQueue(LiteQueue *queue, uint8_t *buff) { if((queue == NULL) || (buff == NULL) || (isLiteQueue_Empty(queue) == LITE_QUEUE_EMPTY)) return LITE_QUEUE_ERR; LITE_QUEUE_READ_LOCK(queue); memcpy(buff, queue->head, queue->item_size); if(queue->head == (uint8_t *)queue + sizeof(LiteQueue) + (queue->item_num - 1) * queue->item_size) queue->head = (uint8_t *)queue + sizeof(LiteQueue); else queue->head += queue->item_size; LITE_QUEUE_READ_UNLOCK(queue); return LITE_QUEUE_OK; }
[ 判 断 队 列 是 否 为 空 ]
/* * @ brief : Determine whether the queue is empty. * @ param : {LiteQueue *} queue: Message queue handle pointer. * @ return: {LiteQueue_Status} Returns the status of the queue. * @ author: bagy. * @ note : Determine whether the head and tail pointers are the same. If they are the same, it means there is no data in the queue, otherwise it means there is still data that has not been read out. */ inline LiteQueue_Status isLiteQueue_Empty(LiteQueue *queue) { if(queue == NULL) return LITE_QUEUE_ERR; if(queue->head == queue->tail) return LITE_QUEUE_EMPTY; else return LITE_QUEUE_NONEMPTY; }
这个接口是最简单的,只要判断头尾指针是否相同即可,相同则表示所有的数据都被读出了即队列为空。
[ 清 空 队 列 ]
/* * @ brief : Clear the message queue. * @ param : {LiteQueue *} queue: Message queue handle pointer. * @ return: {LiteQueue_Status} Returns the status of the queue. * @ author: bagy. * @ note : Determine whether the head and tail pointers are the same. If they are the same, it means there is no data in the queue, otherwise it means there is still data that has not been read out. */ LiteQueue_Status LiteQueue_Clear(LiteQueue *queue) { if(queue == NULL) return LITE_QUEUE_ERR; queue->head = (uint8_t *)((uint8_t *)queue + sizeof(LiteQueue)); queue->tail = queue->head; memset(queue->head, 0, queue->item_num * queue->item_size); return LITE_QUEUE_OK; }
[ 删 除 队 列 ]
/* * @ brief : Clear the message queue. * @ param : {LiteQueue *} queue: Message queue handle pointer. * @ return: {LiteQueue_Status} Returns the status of the queue. * @ author: bagy. * @ note : Determine whether the head and tail pointers are the same. If they are the same, it means there is no data in the queue, otherwise it means there is still data that has not been read out. */ LiteQueue_Status LiteQueue_Delete(LiteQueue *queue) { if(queue == NULL) return LITE_QUEUE_ERR; memset((uint8_t *)queue, 0, sizeof(LiteQueue) + queue->item_num * queue->item_size); free(queue); queue = NULL; return LITE_QUEUE_OK; }
[ 测 试 用 例 ]
所有的接口都完成啦,就剩下写测试用例,测试用例我也贴心的提供了,所以你还不点赞、评论加转发?
/* * @ brief : Print the contents of each list item in the queue. * @ param : {LiteQueue *} queue: Message queue handle pointer. * @ return: None. * @ author: bagy. * @ note : None. */ static void PrintLiteQueue(LiteQueue *queue) { if(queue == NULL) return; for(int i = 0; i < queue->item_num; i++) { log_i("[item_num:%d] ", i); for(int n = 0; n < queue->item_size; n++) { log_i("%d ", *((uint8_t *)queue + sizeof(LiteQueue) + i * queue->item_size + n)); } log_i("\n"); } } /* * @ brief : Print the data in buff. * @ param : {LiteQueue *} queue: Message queue handle pointer. * @ return: None. * @ author: bagy. * @ note : Used to observe buff data changes and test to verify the correctness of written or read data. */ static void PrintBuff(uint8_t *buff, size_t len) { if((buff == NULL) || (len < 1)) return; log_i("Read buff<<<:"); for(size_t i = 0; i < len; i++) { log_i("%d ", buff[i]); } log_i("\n\n"); } int main(void) { uint8_t writebuff[10] = {0}; uint8_t readbuff[10] = {0}; /* Create message queue, 4 list items, each list item has 10 bytes of memory space */ pLiteQueue msgQueue = LiteQueue_Create(4, 10); PrintLiteQueue(msgQueue); printf("\n"); /* Simulate writing and reading to the queue 6 times, and observe the data in the queue by printing */ for(int i = 0; i < 6; i++) { /* Simulate data, change the writebuff data and write it to the queue */ for(int n = 0; n < msgQueue->item_size; n++) writebuff[n] = (i * msgQueue->item_size + n) % 256; /* Data is written to the queue */ Write_To_LiteQueue(msgQueue, writebuff); PrintLiteQueue(msgQueue); /* Read data from queue */ Read_From_LiteQueue(msgQueue, readbuff); PrintBuff(readbuff, sizeof(readbuff)); } return 0; }
PrintLiteQueue这个函数是把消息队列中的数据都打印出来,并根据列表项划分好,这样看起来更加直观。
PrintBuff是把传入的buff打印出来,可以用来观测从队列中读出的数据和写入的是否是一样的。
main函数中创建了一个4x10的队列,创建之后打印出队列的数据如下:
上图表示有四个列表项,每个列表项有10个字节的内存空间保存接收的数据,并全部初始化为0。
接着for循环模拟向队列中写入6组数据,每次写入后都打印下当前队列中的数据,之后从队列读取数据出来保存到readbuff中并把读出的数据打印出来观测。
/* Simulate data, change the writebuff data and write it to the queue */ for(int n = 0; n < msgQueue->item_size; n++) writebuff[n] = (i * msgQueue->item_size + n) % 256;
上面这几行代码就是模拟数据,让每次写入队列的数据都是不同的,且是连续的,这样比较好观测,接下来就来看下打印的结果吧。
文章至此结束啦如果觉得本篇文章多少有点帮助的话,求赞、求关注、求评论、求转发,创作不易!你们的支持是小编创作最大的动力。