实现一个通用的生产者消费者队列(c语言版本)

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/voidreturn/article/details/78151898 背景:笔者之前一直从事嵌入式音视频相关的开发工作,对于音视频的数据的处理,生产者消费者队列必不可少,而如何实现一个高效稳定的生产者消费者队列则十分重要,不过按照笔者从业的经验,所看到的现象,不容乐观,很多知名大厂在这种基础组件的开发能力上十分堪忧。
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/voidreturn/article/details/78151898

背景:笔者之前一直从事嵌入式音视频相关的开发工作,对于音视频的数据的处理,生产者消费者队列必不可少,而如何实现一个高效稳定的生产者消费者队列则十分重要,不过按照笔者从业的经验,所看到的现象,不容乐观,很多知名大厂在这种基础组件的开发能力上十分堪忧。

音视频数据处理的特点:

  • 音视频数据量大:音视频数据特别是视频数据,占据了计算机数据的很大一块,不信就看看每个人的硬盘里,去除电影,照片,mp3是不是很空荡荡的。
  • 实时性要求高:音视频的延时如果大于200ms,使用体验会十分糟糕。
  • 处理流程复杂:一帧数据从sensor捕获到最终网传输出或者lcd显示,都需要经过一系列的模块进行处理。特别是网络传输,一般要经过原始数据捕获,视频数据格式转换,数据编码压缩,数据封装打包,网络传输。

生产者消费者队列在视频数据处理的必要性:

视频数据的处理为什么需要生产者消费者队列?其实上面提到的音视频数据处理的特点就是答案。

  • 一般对视频数据处理的模块都是运行在多线程中,每一个处理模块运行在一个线程中(也是软件工程分模块的思想),相互之间通过生产者消费者队列进行数据的交互,之所以用线程模型,而没有用我一直推崇的进程模型是因为线程间的共享内存比较方便,而进程则要相对复杂的多。
  • 利用多线程/多进程的并行处理能力 ,如果采用单线程单进程的单线处理模式,一帧数据从采集到输出线性经过几个模块,时效性无法保证。
  • 缓存数据,保持平滑,通过队列缓存视频数据,可以有效的去除一些数据抖动,帮助音视频数据的平滑播放,同时这个数据的缓存又不易过多,否则加大了延时,损伤实时性,所以队列大小的设置是一个平衡的艺术。

常见的生产者消费者队列实现存在的问题:

不注重效率性能:

  • 对于buffer状态的检测采用loop轮询方式。
    loop轮询是任何有追求的程序员都要避免的处理方式,而笔者以自己经历经常看到以下类似的代码:
pthread_mutex_lock();
state = check_some_state();
if (state == xxx) {
    do_some_process();
} else {
    usleep(x);
}
pthread_mutex_unlock();

以上代码loop一个状态,如果状态成立做有效的处理,不成立则睡眠一定时间,之后再次调用该段代码进行下一次的状态检测,而这个睡眠时间是一个随机经验值,很有可能下次仍然是无效的检测,接着睡眠再loop,多余的loop是一种资源的浪费。

数据的传递采用copy方式:

数据的传递,采用copy的方式,一帧数据在一个完整的处理流程中经过n次copy(笔者见过一个系统中一帧数据copy了8次之多)

生产者消费者队列和业务代码混杂在一起,没有分离:

对于开发者,都希望用最简单的接口完成某个功能,而不关心内部实现,在这里就是,只需要生产者生产数据,消费者消费数据,而内部的处理(同步,数据的处理等)完全不关心,这样开发者就不需要去弄很多锁,降低了开发难度,也可以使代码组件化,模块化。

有经验的开发者应该感觉以上都是基础点,不会有人犯这样的错误,不过笔者以自己的经历肯定的说,以上两种问题在某世界级大厂的视频设备上随处可见。

让我悲哀的是,当我指出这些问题时,某些开发者完全无动于衷。而我更无力的是,现在的cpu,memory性能实在是高,在某些不太高端的嵌入式芯片上,优化过的数据并没有十分明显,在一个实际项目上,经过优化后cpu大概降低1%(原总系统cpu占用7%),有经验的开发者又会说原7%的cpu占用率说明这个芯片做这个系统浪费了,不过也没办法其实已经用了比较低端的芯片了。。。

以上的吐槽主要是想说明:由于cpu,memory性能的提升,让很多开发者感觉软件的优化意义不大了,而我是一个理想主义者,对于某些设计ugly的代码真的是零容忍啊。

如何优化:

以上说了这么多,那如何操作呢?

  • 提高效率,去除多余的loop轮询:
    采用线程的同步机制,当状态条件符合要求时,通过通知机制触发后续处理,这样去除了无效的loop轮询检测,linux系统下可以采用条件变量,信号量来实现,我更倾向于使用条件变量,因为它是linux系统原生支持的接口。
    提到条件变量,不得不提一个概念:同步互斥,这么一个基本的操作系统概念,我最喜欢作为面试第一题,不过能用一句话切中要害的说出之间区别和各自特点的人不是很多,答不出这题的,基本上就pass了。
  • 减少多余copy:
    采用预分配的方式,将buffer分为free和active两大类,每一类buffer又切成几个小buffer,然后通过指针将两类buffer下的小buffer链接成两个链表,使用者获取buffer通过free链表获取buffer,再将buffer put到active链表上,以上都是指针的操作,没有数据的copy,极大的减少了copy操作。(再次强调指针是个好东西)
  • 模块化组件化:
    将生产者消费者队列的处理部分完全剥离成一个独立的模块组件,对外只提供几个基本的接口,内部完成同步通知的处理。

一个简单的实现:

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
#include <time.h>

#include "sfifo.h"

//#define CONFIG_COND_FREE 1
#define CONFIG_COND_ACTIVE 1

#define MAX_SFIFO_NUM   32

struct sfifo_des_s sfifo_des[MAX_SFIFO_NUM];
struct sfifo_des_s *my_sfifo_des;

struct sfifo_s* sfifo_get_free_buf(struct sfifo_des_s *sfifo_des_p)
{
    static long empty_count = 0;
    struct sfifo_s *sfifo = NULL;

    pthread_mutex_lock(&(sfifo_des_p->free_list.lock_mutex));
#ifdef CONFIG_COND_FREE
    while (sfifo_des_p->free_list.head == NULL) {
        pthread_cond_wait(&(sfifo_des_p->free_list.cond), &(sfifo_des_p->free_list.lock_mutex));
    }
#else
    if (sfifo_des_p->free_list.head == NULL) {
        if (empty_count++ % 120 == 0) {
            printf("free list empty\n");
        }
        goto EXIT;
    }
#endif
    sfifo = sfifo_des_p->free_list.head;
    sfifo_des_p->free_list.head = sfifo->next;

EXIT:
    pthread_mutex_unlock(&(sfifo_des_p->free_list.lock_mutex));

    return sfifo;
}

int sfifo_put_free_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p)
{
    int send_cond = 0;

    pthread_mutex_lock(&(sfifo_des_p->free_list.lock_mutex));
    if (sfifo_des_p->free_list.head == NULL) {
        sfifo_des_p->free_list.head = sfifo;
        sfifo_des_p->free_list.tail = sfifo;
        sfifo_des_p->free_list.tail->next = NULL;
        send_cond = 1;
    } else {
        sfifo_des_p->free_list.tail->next = sfifo;
        sfifo_des_p->free_list.tail = sfifo;
        sfifo_des_p->free_list.tail->next = NULL;
    }
    pthread_mutex_unlock(&(sfifo_des_p->free_list.lock_mutex));
#ifdef CONFIG_COND_FREE
    if (send_cond) {
        pthread_cond_signal(&(sfifo_des_p->free_list.cond));
    }
#endif
    return 0;
}

struct sfifo_s* sfifo_get_active_buf(struct sfifo_des_s *sfifo_des_p)
{
    struct sfifo_s *sfifo = NULL;

    pthread_mutex_lock(&(sfifo_des_p->active_list.lock_mutex));
#ifdef CONFIG_COND_ACTIVE
    while (sfifo_des_p->active_list.head == NULL) {
        //pthread_cond_timedwait(&(sfifo_des_p->active_list.cond), &(sfifo_des_p->active_list.lock_mutex), &outtime);
        pthread_cond_wait(&(sfifo_des_p->active_list.cond), &(sfifo_des_p->active_list.lock_mutex));
    }
#else
    if (sfifo_des_p->active_list.head == NULL) {
        printf("active list empty\n");
        goto EXIT;
    }
#endif

    sfifo = sfifo_des_p->active_list.head;
    sfifo_des_p->active_list.head = sfifo->next;

EXIT:
    pthread_mutex_unlock(&(sfifo_des_p->active_list.lock_mutex));

    return sfifo;
}

int sfifo_put_active_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p)
{
    int send_cond = 0;

    pthread_mutex_lock(&(sfifo_des_p->active_list.lock_mutex));
    if (sfifo_des_p->active_list.head == NULL) {
        sfifo_des_p->active_list.head = sfifo;
        sfifo_des_p->active_list.tail = sfifo;
        sfifo_des_p->active_list.tail->next = NULL;
        send_cond = 1;
    } else {
        sfifo_des_p->active_list.tail->next = sfifo;
        sfifo_des_p->active_list.tail = sfifo;
        sfifo_des_p->active_list.tail->next = NULL;
    }
    pthread_mutex_unlock(&(sfifo_des_p->active_list.lock_mutex));
#ifdef CONFIG_COND_ACTIVE
    if (send_cond) {
        pthread_cond_signal(&(sfifo_des_p->active_list.cond));
    }
#endif
    return 0;
}

int dump_sfifo_list(struct sfifo_list_des_s *list)
{
    struct sfifo_s *sfifo = NULL;
    sfifo = list->head;
    do {
        printf("dump : %x\n", sfifo->buffer[0]);
        usleep(500 * 1000);
    } while (sfifo->next != NULL && (sfifo = sfifo->next));

    return 0;
}

struct sfifo_des_s *sfifo_init(int sfifo_num, int sfifo_buffer_size, int sfifo_active_max_num)
{
    int i = 0;
    struct sfifo_s *sfifo;

    struct sfifo_des_s *sfifo_des_p;
    sfifo_des_p = (struct sfifo_des_s *)malloc(sizeof(struct sfifo_des_s));

    sfifo_des_p->sfifos_num = sfifo_num;
    sfifo_des_p->sfifos_active_max_num = sfifo_active_max_num;

    sfifo_des_p->free_list.sfifo_num = 0;
    sfifo_des_p->free_list.head = NULL;
    sfifo_des_p->free_list.tail = NULL;
    pthread_mutex_init(&sfifo_des_p->free_list.lock_mutex, NULL);
    pthread_cond_init(&sfifo_des_p->free_list.cond, NULL);

    sfifo_des_p->active_list.sfifo_num = 0;
    sfifo_des_p->active_list.head = NULL;
    sfifo_des_p->active_list.tail = NULL;
    pthread_mutex_init(&sfifo_des_p->active_list.lock_mutex, NULL);
    pthread_cond_init(&sfifo_des_p->active_list.cond, NULL);

    for (i = 0; i < sfifo_num; i++) {
        sfifo = (struct sfifo_s *)malloc(sizeof(struct sfifo_s));
        sfifo->buffer = (unsigned char *)malloc(sfifo_buffer_size);
        printf("sfifo_init: %x\n", sfifo->buffer);
        memset(sfifo->buffer, i, sfifo_buffer_size);
        sfifo->size = sfifo_buffer_size;
        sfifo->next = NULL;
        sfifo_put_free_buf(sfifo, sfifo_des_p);
    }

    return sfifo_des_p;
}

void *productor_thread_func(void *arg)
{
    struct sfifo_s *sfifo;

    while (1) {
        sfifo = sfifo_get_free_buf(my_sfifo_des);
        if (sfifo != NULL) {
            printf("+++++++++++++++++ put : %x\n", sfifo->buffer[0]);
            sfifo_put_active_buf(sfifo, my_sfifo_des);
        }
        //usleep(20*1000);
    }
}

void *comsumer_thread_func(void *arg)
{
    struct sfifo_s *sfifo;
    int count = 0;

    while (1) {
        sfifo = sfifo_get_active_buf(my_sfifo_des);
        if (sfifo != NULL) {
            printf("---------------- get %x\n", sfifo->buffer[0]);
            sfifo_put_free_buf(sfifo, my_sfifo_des);
        }
        //usleep(10 * 1000);
        // if (count++ > 10000) {
        //  exit(-1);
        // }
    }
}

int main()
{
    int ret;
    static pthread_t productor_thread;
    static pthread_t consumer_thread;
    struct sfifo_s *r_sfifo;

    my_sfifo_des = sfifo_init(10, 4096, 5);

    ret = pthread_create(&productor_thread, NULL, productor_thread_func, NULL);
    ret = pthread_create(&consumer_thread, NULL, comsumer_thread_func, NULL);

    while (1) {
        sleep(1);
    }

    return 0;
}

以上是一个简单的生产者消费者队列的c语言的实现,对应的头文件在本文底部(贴代码太长看起来很崩溃)。

仔细的同学可能会发现,以上代码sfifo_get_free_buf()中默认是loop轮询检测free buffer链表的,你前面不是说了一大堆不能loop吗?怎么还用loop呢?
这里其实有两个原因:

  • 生产者的loop是可以接受的,当发生多余loop,无法命中时,说明生产者太快,消费者太慢,而其实对于一个生产者消费者模型出现以上问题时,说明整个业务流程要重新考虑,因为正常的情况是消费者总是要快于生产者,这个业务模型才能正常的运行下去。
  • 对于有些业务模型,生产者业务模块部分是不能阻塞的,也就是说,如果free list没有数据,我们采用pthread_cond_wait()阻塞后,会导致生产者出现问题,这样最好的处理方式就是生产者模块接口返回出错,生产者业务方丢弃数据(此时就是丢帧了,这种情况如果频繁发生是不能接受了,不过也说明了消费者要有足够的能力处理生产者生产出的数据,否则整个业务都是有问题)

这个实现有那些优势:

走读和运行以上代码的同学应该可以发现这里做了一个简单可运行的demo模拟了生产者和消费者双方:

void *productor_thread_func(void *arg)
{
    struct sfifo_s *sfifo;

    while (1) {
        sfifo = sfifo_get_free_buf(my_sfifo_des);
        if (sfifo != NULL) {
            printf("+++++++++++++++++ put : %x\n", sfifo->buffer[0]);
            sfifo_put_active_buf(sfifo, my_sfifo_des);
        }
        //usleep(20*1000);
    }
}

void *comsumer_thread_func(void *arg)
{
    struct sfifo_s *sfifo;
    int count = 0;

    while (1) {
        sfifo = sfifo_get_active_buf(my_sfifo_des);
        if (sfifo != NULL) {
            printf("---------------- get %x\n", sfifo->buffer[0]);
            sfifo_put_free_buf(sfifo, my_sfifo_des);
        }
        //usleep(10 * 1000);
        // if (count++ > 10000) {
        //  exit(-1);
        // }
    }
}

这里面对于使用者的优点有:

  1. 接口简单:只需要get free,put active;get active,put free。
  2. 没有了数据copy,只需要操作链表上的buffer就可以了,而这些buffer的参数控制通过init接口设置。
  3. 不用再控制sleep的时间值:前面提到,在loop模型下,如果状态不成立需要sleep一段时间,再次检查,这样来控制同步状态,而这个时间值很难确定,如果时间值过长,则会导致状态检测不及时,延误数据处理,如果时间值太短,则会增加状态检测miss cache的次数,耗费更多cpu资源。而采用本模块的实现则完全不需要考虑这些问题,只需要衔接业务处理,sleep,同步,yield cpu的操作都由这个模块实现吧,完全不需要关心。
  4. 模块化,完全和业务处理无关,可以毫无压力的运用在不同的业务处理逻辑中,没有剥离代码的工作。

以上描述了一个生产者消费者队列c语言的实现,为什么是c语言版本的?因为其他高级语言,有很多成熟的库提供了该功能,完全不用自己写,而c就没这么完善了,不过这也说明了c的简单灵活。但悲哀的是很多人因此进行了很ugly的实现。
多吐槽几句,嵌入式行业由于各种技术原因,导致开发语言还是采用c,这样对开发人员有了不小的要求,而如何才能写一些优雅的代码,对人的素质有了要求,但现状是优秀的开发者都被互联网行业抢走了,导致嵌入式行业开发人员的水平参差不齐,本来应该是一个对编码能力要求很高的行业被一些水平低下的开发者占据。so,我离开了这个行业了。。。

附:模块头文件,类linux用户可通过gcc xxx.c命令build该demo,然后运行测试。

#ifndef SFIFO_H_
#define SFIFO_H_

struct sfifo_list_des_s {
    int sfifo_num;

    struct sfifo_s *head;
    struct sfifo_s *tail;

    pthread_mutex_t lock_mutex;
    pthread_cond_t cond;
};

struct sfifo_des_s {
    int sfifo_init;

    unsigned int sfifos_num;
    unsigned int sfifos_active_max_num;

    struct sfifo_list_des_s free_list;
    struct sfifo_list_des_s active_list;
};

struct sfifo_s {
    unsigned char *buffer;
    unsigned int size;
    struct sfifo_s *next;
};

extern struct sfifo_des_s *sfifo_init(int sfifo_num, int sfifo_buffer_size, int sfifo_active_max_num);

/* productor */
extern struct sfifo_s* sfifo_get_free_buf(struct sfifo_des_s *sfifo_des_p);
extern int sfifo_put_free_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p);

/* consumer */
extern struct sfifo_s* sfifo_get_active_buf(struct sfifo_des_s *sfifo_des_p);
extern int sfifo_put_active_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p);

#endif
目录
相关文章
|
19天前
|
C语言
顺序队列的初始化、进队和出队(C语言)
顺序队列的初始化、进队和出队(C语言)
15 1
|
2月前
|
Linux C语言
Linux系统下C语言的队列操作
Linux系统下C语言的队列操作
32 0
|
8天前
|
C语言
【C语言/数据结构】排序(快速排序及多种优化|递归及非递归版本)
【C语言/数据结构】排序(快速排序及多种优化|递归及非递归版本)
10 0
|
1月前
|
搜索推荐 C语言 C++
【排序算法】C语言实现归并排序,包括递归和迭代两个版本
【排序算法】C语言实现归并排序,包括递归和迭代两个版本
|
1月前
|
存储 缓存 C语言
初阶数据结构之---栈和队列(C语言)
初阶数据结构之---栈和队列(C语言)
|
1月前
费马螺线在现实生活中的应用
费马螺线在现实生活中的应用
12 1
|
1月前
|
算法 C语言
【算法与数据结构】 C语言实现单链表队列详解2
【算法与数据结构】 C语言实现单链表队列详解
|
1月前
|
存储 算法 C语言
【算法与数据结构】 C语言实现单链表队列详解1
【算法与数据结构】 C语言实现单链表队列详解
|
2月前
|
存储 机器学习/深度学习 C语言
C语言队列讲解
C语言队列讲解
12 0
|
19天前
|
C语言
C语言:内存函数(memcpy memmove memset memcmp使用)
C语言:内存函数(memcpy memmove memset memcmp使用)