前言
在只有一路编码进行rtsp播放和保存时。之前的思路是没有任何问题的,设计比较简单,但是如果我们的运算量上来了,也不仅仅一个通道编码了,编码速率,保存速率,rtsp发送包的速率的差异会越来越大,而每一包的大小又不可能相同,这个时候就需要引入一个缓冲池来平衡输入输出的速率不一致(简单这么理解一下),且解决包大小不同的问题
理论部分节选自知乎专栏https://zhuanlan.zhihu.com/p/533321012
代码部分给出了缓冲池功能源码以及调用部分代码
缓冲管理
最开始的缓冲引入是为了缓和 CPU 与 I/O 设备速度不匹配的矛盾,提高 CPU 和 I/O 设备的并行性,在现代操作系统中,几乎所有的 I/O 设备在与处理机交换数据时都用了缓冲区。缓冲管理的主要职责是组织好这些缓冲区,并提供获得和释放缓冲区的手段。
事实上,凡在数据到达速率与其离去速率不同的地方,都可设置缓冲区,以缓和它们之间速率不匹配的矛盾。众所周知,CPU的运算速率远远高于 I/O 设备的速率,如果没有缓冲区,则在输出数据时,必然会由于打印机的速度跟不上而使 CPU 停下来等待;然而在计算阶段,打印机又空闲无事。显然,如果在打印机或控制器中设置一缓冲区,用于快速暂存程序的输出数据,以后由打印机“慢慢地”从中取出数据打印,这样,就可提高 CPU 的工作效率。类似地,在输入设备与 CPU 之间也设置缓冲区,也可使 CPU 的工作效率得以提高。
减少对 CPU 的中断频率,放宽对 CPU 中断响应时间的限制。在远程通信系统中,如果从远地终端发来的数据仅用一位缓冲来接收,如下图(a)所示,则必须在每收到一位数据时便中断一次 CPU,这样,对于速率为 9.6 Kb/s 的数据通信来说,就意味着其中断 CPU的频率也为 9.6 Kb/s,即每 100 μs 就要中断 CPU 一次,而且 CPU 必须在 100 μs 内予以响应,否则缓冲区内的数据将被冲掉。倘若设置一个具有 8 位的缓冲(移位)寄存器,如下图(b)所示,则可使 CPU 被中断的频率降低为原来的 1/8;若再设置一个 8 位寄存器,如下图©所示,则又可把 CPU 对中断的响应时间放宽到 800 μs。
提高 CPU 和 I/O 设备之间的并行性。缓冲的引入可显著地提高 CPU 和 I/O 设备间的并行操作程度,提高系统的吞吐量和设备的利用率。例如,在 CPU 和打印机之间设置了缓冲区后,便可使 CPU 与打印机并行工作。
单缓冲(Single Buffer)
在单缓冲情况下,每当用户进程发出一 I/O 请求时,操作系统便在主存中为之分配一缓冲区,如下图所示。在块设备输入时,假定从磁盘把一块数据输入到缓冲区的时间为 T,操作系统将该缓冲区中的数据传送到用户区的时间为 M,而 CPU 对这一块数据处理(计算)的时间为 C。由于 T 和 C 是可以并行的(如下图),当 T>C 时,系统对每一块数据的处理时间为 M+T,反之则为 M+C,故可把系统对每一块数据的处理时间表示为Max(C,T)+M。
在字符设备输入时,缓冲区用于暂存用户输入的一行数据,在输入期间,用户进程被挂起以等待数据输入完毕;在输出时,用户进程将一行数据输入到缓冲区后,继续进行处理。当用户进程已有第二行数据输出时,如果第一行数据尚未被提取完毕,则此时用户进程应阻塞。
双缓冲(Double Buffer)
为了加快输入和输出速度,提高设备利用率,人们又引入了双缓冲区机制,也称为缓冲对换(Buffer Swapping)。在设备输入时,先将数据送入第一缓冲区,装满后便转向第二缓冲区。此时操作系统可以从第一缓冲区中移出数据,并送入用户进程(如下图)。接着由 CPU 对数据进行计算。在双缓冲时,系统处理一块数据的时间可以粗略地认为是Max(C,T)。如果 C<T,可使块设备连续输入;如果 C>T,则可使 CPU 不必等待设备输入。对于字符设备,若采用行输入方式,则采用双缓冲通常能消除用户的等待时间,即用户在输入完第一行之后,在 CPU 执行第一行中的命令时,用户可继续向第二缓冲区输入下一行数据。
如果我们在实现两台机器之间的通信时,仅为它们配置了单缓冲,如下图(a)所示,那么,它们之间在任一时刻都只能实现单方向的数据传输。例如,只允许把数据从 A 机传送到 B 机,或者从 B 机传送到 A 机,而绝不允许双方同时向对方发送数据。为了实现双向数据传输,必须在两台机器中都设置两个缓冲区,一个用作发送缓冲区,另一个用作接收缓冲区,如下图(b)所示。
循环缓冲
当输入与输出或生产者与消费者的速度基本相匹配时,采用双缓冲能获得较好的效果,可使生产者和消费者基本上能并行操作。但若两者的速度相差甚远,双缓冲的效果则不够理想,不过可以随着缓冲区数量的增加,使情况有所改善。因此,又引入了多缓冲机制。可将多个缓冲组织成循环缓冲形式。对于用作输入的循环缓冲,通常是提供给输入进程或计算进程使用,输入进程不断向空缓冲区输入数据,而计算进程则从中提取数据进行计算。
循环缓冲的组成
多个缓冲区。在循环缓冲中包括多个缓冲区,其每个缓冲区的大小相同。作为输入的多缓冲区可分为三种类型:用于装输入数据的空缓冲区 R、已装满数据的缓冲区 G 以及计算进程正在使用的现行工作缓冲区 C,如下图所示。
多个指针。作为输入的缓冲区可设置三个指针:用于指示计算进程下一个可用缓冲区 G 的指针 Nextg、指示输入进程下次可用的空缓冲区 R 的指针 Nexti,以及用于指示计算进程正在使用的缓冲区 C 的指针 Current。
循环缓冲区的使用
计算进程和输入进程可利用下述两个过程来使用循环缓冲区。
Getbuf 过程。当计算进程要使用缓冲区中的数据时,可调用 Getbuf 过程。该过程将由指针 Nextg 所指示的缓冲区提供给进程使用,相应地,须把它改为现行工作缓冲区,并令 Current 指针指向该缓冲区的第一个单元,同时将 Nextg 移向下一个 G 缓冲区。类似地,每当输入进程要使用空缓冲区来装入数据时,也调用 Getbuf 过程,由该过程将指针 Nexti所指示的缓冲区提供给输入进程使用,同时将 Nexti 指针移向下一个 R 缓冲区。
Releasebuf 过程。当计算进程把 C 缓冲区中的数据提取完毕时,便调用 Releasebuf过程,将缓冲区 C 释放。此时,把该缓冲区由当前(现行)工作缓冲区 C 改为空缓冲区 R。类似地,当输入进程把缓冲区装满时,也应调用 Releasebuf 过程,将该缓冲区释放,并改为 G缓冲区。
进程同步
使用输入循环缓冲,可使输入进程和计算进程并行执行。相应地,指针 Nexti 和指针Nextg 将不断地沿着顺时针方向移动,这样就可能出现下述两种情况:
Nexti 指针追赶上 Nextg 指针。这意味着输入进程输入数据的速度大于计算进程处理数据的速度,已把全部可用的空缓冲区装满,再无缓冲区可用。此时,输入进程应阻塞,直到计算进程把某个缓冲区中的数据全部提取完,使之成为空缓冲区 R,并调用 Releasebuf过程将它释放时,才将输入进程唤醒。这种情况被称为系统受计算限制。
Nextg 指针追赶上 Nexti 指针。这意味着输入数据的速度低于计算进程处理数据的速度,使全部装有输入数据的缓冲区都被抽空,再无装有数据的缓冲区供计算进程提取数据。这时,计算进程只能阻塞,直至输入进程又装满某个缓冲区,并调用 Releasebuf 过程将它释放时,才去唤醒计算进程。这种情况被称为系统受 I/O 限制。
缓冲池
上述的缓冲区仅适用于某特定的 I/O 进程和计算进程,因而它们属于专用缓冲。当系统较大时,将会有许多这样的循环缓冲,这不仅要消耗大量的内存空间,而且其利用率不高。为了提高缓冲区的利用率,目前广泛流行公用缓冲池(Buffer Pool),在池中设置了多个可供若干个进程共享的缓冲区。
缓冲池的组成
对于既可用于输入又可用于输出的公用缓冲池,其中至少应含有以下三种类型的缓冲区:
① 空(闲)缓冲区;
② 装满输入数据的缓冲区;
③ 装满输出数据的缓冲区。
为了管理上的方便,可将相同类型的缓冲区链成一个队列,于是可形成以下三个队列:
空缓冲队列 emq。这是由空缓冲区所链成的队列。其队首指针 F(emq)和队尾指针L(emq)分别指向该队列的首缓冲区和尾缓冲区。
输入队列 inq。这是由装满输入数据的缓冲区所链成的队列。其队首指针 F(inq)和队尾指针 L(inq)分别指向该队列的首缓冲区和尾缓冲区。
输出队列 outq。这是由装满输出数据的缓冲区所链成的队列。其队首指针 F(outq)和队尾指针 L(outq)分别指向该队列的首缓冲区和尾缓冲区。
除了上述三个队列外,还应具有四种工作缓冲区:① 用于收容输入数据的工作缓冲区;② 用于提取输入数据的工作缓冲区;③ 用于收容输出数据的工作缓冲区;④ 用于提取输出数据的工作缓冲区。
Getbuf 过程和 Putbuf 过程
在“数据结构”课程中,曾介绍过队列和对队列进行操作的两个过程,它们是:
Addbuf(type,number)过程。该过程用于将由参数number 所指示的缓冲区B挂在type队列上。
Takebuf(type)过程。该过程用于从 type 所指示的队列的队首摘下一个缓冲区。
这两个过程能否用于对缓冲池中的队列进行操作呢?答案是否定的。因为缓冲池中的队列本身是临界资源,多个进程在访问一个队列时,既应互斥,又须同步。为此,需要对这两个过程加以改造,以形成可用于对缓冲池中的队列进行操作的 Getbuf 和 Putbuf 过程。
为使诸进程能互斥地访问缓冲池队列,可为每一队列设置一个互斥信号量 MS(type)。此外,为了保证诸进程同步地使用缓冲区,又为每个缓冲队列设置了一个资源信号量RS(type)。既可实现互斥又可保证同步的 Getbuf 过程和 Putbuf 过程描述如下:
缓冲区的工作方式
缓冲区可以工作在收容输入、提取输入、收容输出和提取输出四种工作方式下,如下图所示。
收容输入。在输入进程需要输入数据时,便调用 Getbuf(emq)过程,从空缓冲队列emq 的队首摘下一空缓冲区,把它作为收容输入工作缓冲区 hin。然后,把数据输入其中,装满后再调用 Putbuf(inq,hin)过程,将该缓冲区挂在输入队列 inq 上。
提取输入。当计算进程需要输入数据时,调用 Getbuf(inq)过程,从输入队列 inq 的队首取得一个缓冲区,作为提取输入工作缓冲区(sin),计算进程从中提取数据。计算进程用完该数据后,再调用 Putbuf(emq,sin)过程,将该缓冲区挂到空缓冲队列 emq 上。
收容输出。当计算进程需要输出时,调用 Getbuf(emq)过程从空缓冲队列 emq 的队首取得一个空缓冲区,作为收容输出工作缓冲区 hout。当其中装满输出数据后,又调用Putbuf(outq,hout)过程,将该缓冲区挂在 outq 末尾。
提取输出。由输出进程调用 Getbuf(outq)过程,从输出队列的队首取得一装满输出数据的缓冲区,作为提取输出工作缓冲区 sout。在数据提取完后,再调用 Putbuf(emq,sout)过程,将该缓冲区挂在空缓冲队列末尾。
代码
bufferpool.cpp
// BufferPool.cpp : 闁跨喐鏋婚幏鐑芥晸閺傘倖瀚� DLL 鎼存棃鏁撻惌顐ゎ劜閹风兘鏁撻弬銈嗗閻楋繝鏁撻弬銈嗗闁跨喐鏋婚幏鐑芥晸閺傘倖瀚归柨鐔告灮閹风兘鏁撻敓锟� // //#include "stdafx.h" #include "BufferPool.h" #include "ThreadUtils.h" BuffPool::BuffPool(const char *id) : EmptyBuffs(0), FullBuffs(0) { last_empty = 0; last_full = 0; next_full = 0; buffsz = 0; numbuf = 0; pooladdr = 0; pname = id; } BuffPool::~BuffPool() { try { Buffer *currp; EmptyPool.Lock(); while((currp = last_empty)) {last_empty = last_empty->next; delete currp;} EmptyPool.UnLock(); FullPool.Lock(); while((currp = next_full)) {next_full = next_full->next; delete currp;} FullPool.UnLock(); free(pooladdr); pooladdr = NULL; } catch ( ... ) { } } int BuffPool::Allocate(int buffnum, int bsize) { char *buffaddr = NULL; int bnum = buffnum; //闁跨喐鏋婚幏宄扳偓锟� numempty=buffnum; numfull=0; Buffer *new_empty; buffaddr = (char*)malloc(bsize*buffnum); if(!buffaddr) { fprintf(stderr,"malloc buffer pool failed\n"); return -1; } pooladdr = buffaddr; EmptyPool.Lock();//闁跨喐鏋婚幏鐑芥晸閺傘倖瀚� buffsz = bsize;//闁跨喐鏋婚幏鐑芥晸閺傘倖瀚归柨鐔告灮閹风兘鏁撻弬銈嗗闁跨噦鎷� while(bnum--) { if (!(new_empty = new Buffer(this))) { fprintf(stderr,"new buffer failed\n"); break; } new_empty->data = (char *)buffaddr; new_empty->next = last_empty; last_empty = new_empty; buffaddr += buffsz; EmptyBuffs.Post();//闁跨喓绮ㄦ禍銈勭闁跨喐鏋婚幏鐑芥晸閺傘倖瀚瑰┃锟� } numbuf += buffnum-(bnum+1); EmptyPool.UnLock(); return -(bnum+1); } Buffer *BuffPool::getEmptyBuff() { Buffer *buffp = 0; while(!buffp) { EmptyBuffs.Wait();//闁跨喖銈烘潏鐐闁跨喐鏋婚幏閿嬬爱 EmptyPool.Lock(); buffp = last_empty; last_empty = buffp->next; EmptyPool.UnLock(); } buffp->dlen = 0; return buffp; } void BuffPool::putEmptyBuff(Buffer *buffp) { EmptyPool.Lock(); buffp->next = last_empty; last_empty = buffp; numempty+=1; numfull-=1; //printf("numempty is %d numfull is %d\n",numempty,numfull); EmptyPool.UnLock(); EmptyBuffs.Post();//闁跨喖鍙洪崙銈嗗闁跨喐鏋婚幏閿嬬爱 } Buffer *BuffPool::getFullBuff() { Buffer *buffp = 0; while(!buffp) { FullBuffs.Wait(); FullPool.Lock(); buffp = next_full; if (!(next_full = buffp->next)) last_full = 0; FullPool.UnLock(); } return buffp; } Buffer *BuffPool::getFullBuff(HANDLE_FUNCTION handler) { Buffer *buffp = this->getFullBuff(); if (buffp == NULL) return NULL; int r = (*handler)(buffp); if (r < 0) fprintf(stderr,"Handler occure error,Please check it\n"); return buffp; } void BuffPool::putFullBuff(Buffer *buffp) { FullPool.Lock(); if (last_full) last_full->next = buffp; else next_full = buffp; last_full = buffp; buffp->next = 0; //闁跨喐鏋婚幏鐑芥晸閺傘倖瀚� numempty-=1; numfull+=1; //printf("numempty is %d numfull is %d\n",numempty,numfull); FullPool.UnLock(); FullBuffs.Post(); }
buffpool.h
#ifndef __BUFFPOOL_H__ #define __BUFFPOOL_H__ #include <stdlib.h> #include <string.h> #include "ThreadUtils.h" #ifdef BUFFERPOOL_EXPORTS #define BUFFERPOOL_API __declspec(dllexport) #else #define BUFFERPOOL_API __declspec(dllimport) #endif class Buffer; typedef int (*HANDLE_FUNCTION)(Buffer *); class /*BUFFERPOOL_API*/ BuffPool { //自己添加代码 //int public: int Allocate(int buffnum, int bsize); int BuffCount() {return numbuf;} int BufferSize() {return buffsz;} Buffer *getEmptyBuff();//返回空指针的地址 xin.han void putEmptyBuff(Buffer *buff); Buffer *getFullBuff();//返回最后一个有效数据的地址 xin.han Buffer *getFullBuff(HANDLE_FUNCTION handler); void putFullBuff(Buffer *buff); BuffPool(const char *id); ~BuffPool(); int numempty; int numfull; private: BufferPoolMutex EmptyPool; //互斥锁 BufferPoolMutex FullPool; BufferPoolSemaphore EmptyBuffs;//资源锁 BufferPoolSemaphore FullBuffs; int numbuf; // int numempty; // int numfull; int buffsz;//缓冲块的容量 const char *pname; char *pooladdr; //内存池首地址 Buffer *next_full; Buffer *last_full; Buffer *last_empty; }; class /*BUFFERPOOL_API*/ Buffer { public: Buffer *next; int dlen;//缓冲块中有效数据量 char *data; inline void Recycle() {Owner->putEmptyBuff(this);} Buffer(BuffPool *oP, char *bp=0, int data_len=0) : next(0), dlen(data_len),data(bp), Owner(oP){} ~Buffer() {if (data) data=NULL;} private: BuffPool *Owner; }; #endif
threadutils.h
#ifndef __THREADUTILS__ #define __THREADUTILS__ #include <stdio.h> #include <pthread.h> #include <semaphore.h> #define MAX_SEM_COUNT 200 class /*BUFFERPOOL_API*/ BufferPoolMutex { public: inline void Lock() {pthread_mutex_lock(&ghMutex);} inline void UnLock() {pthread_mutex_unlock(&ghMutex);} BufferPoolMutex() { pthread_mutex_init(&ghMutex,NULL); } ~BufferPoolMutex() {pthread_mutex_destroy(&ghMutex);} private: pthread_mutex_t ghMutex; }; class /*BUFFERPOOL_API*/ BufferPoolSemaphore { public: inline void Post() {sem_post(&ghSemaphore);} inline void Wait() {sem_wait(&ghSemaphore);} BufferPoolSemaphore(int semval=0) { sem_init(&ghSemaphore,0,0); } ~BufferPoolSemaphore() {sem_destroy(&ghSemaphore);} private: sem_t ghSemaphore; }; #endif
调用部分
PLATFORM_BuffPoll_Init初始化buffpool,开辟内存,析构时释放
通过信号量来通知,有了新的数据
线程1里源源不断编码,然后将码流先SAMPLE_PUSH_HDENCFRAME_ToBuffer保存至buffpool,比如在线播放功能,那么在线播放的信号量+1,SAMPLE_PUSH_HDENCFRAME_ToBuffer的具体操作为新建个临时buff,判断大小有没有超出初始化时设置的,接着Buffer* tmpbuff=bufpool->getEmptyBuff();,若获取到空则释放bufpool->putEmptyBuff(tmpbuff);获取成功了则把要保存的内容 memcpy(tmpbuff->data,frmaddr,frmlen);到缓冲池临时buff里,然后把临时buff丢进缓冲池bufpool->putFullBuff(tmpbuff);
线程2里while1执行在线播放的信号量sem_wait(&sem_rtspflg);自减。新建个临时buff指针指向getFullBuff();,若获取到为空则释放h264BuffPool_rtsp->putEmptyBuff(buff);,接着发送临时buff的内容,发送完毕后释放h264BuffPool_rtsp->putEmptyBuff(buff);
抽象出来简单来说就是个大内存,化为两部分,一个方向上empty里面拿一块放在了full部分,另一个方向就从full部分copy走在放回empty部分,使用比较简单
/****************************************************************************** Copyright (C), 2022, Sunwise Space. Co., Ltd. ****************************************************************************** File Name : rtsp.c Version : Author : xin.han Created : 2022.06.16 Description : ******************************************************************************/ extern "C" { #include "rtsp.h" } #include "BufferPool.h" #include "utils_buf.h" extern sem_t sem_saveflg; extern sem_t sem_rtspflg; // static int flag_run = 1; BuffPool *h264BuffPool_rtsp=NULL; BuffPool *h264BuffPool_save=NULL; rtsp_demo_handle demo; rtsp_session_handle session[MAX_SESSION_NUM] = {NULL}; // static uint64_t ts = 0; extern HI_S64 rtsp_ts; extern HI_BOOL saveEnable; // static void sig_proc(int signo) // { // flag_run = 0; // } /* *描述 :buffpool的初始化 *参数 :NULL *返回值:无 *注意 :只开辟了空间,析构的时候自动释放 */ HI_S32 PLATFORM_BuffPoll_Init() { sem_init(&sem_saveflg,0,0); sem_init(&sem_rtspflg,0,0); h264BuffPool_rtsp=new BuffPool("h264buffer"); h264BuffPool_rtsp->Allocate(10,512*1024); if(h264BuffPool_rtsp!=NULL) { //判断创建缓冲区是否成功 printf("allocate h264BuffPool_rtsp success!\n"); } else { printf("allocate h264BuffPool_rtsp failed!\n"); } h264BuffPool_save=new BuffPool("h264buffer"); h264BuffPool_save->Allocate(10,512*1024); if(h264BuffPool_save!=NULL) { //判断创建缓冲区是否成功 printf("allocate h264BuffPool_save success!\n"); } else { printf("allocate h264BuffPool_save failed!\n"); } } // static uint64_t rtsp_get_abstime (void) // { // struct timeval tv; // gettimeofday(&tv, NULL); // return (tv.tv_sec * 1000000ULL + tv.tv_usec); // } /* *描述 :用于rtsp实时播放的线程 *参数 :NULL *返回值:无 *注意 :加载文件platform.ini rtsp://192.168.119.164:8554/mnt/sample/venc/rtsp.264 2022-10-21 新增bufferpool和信号量 */ void *video_play_rtsp_task(void* arg) { cpu_set_t mask;//cpu核的集合 cpu_set_t get;//获取在集合中的cpu int num = sysconf(_SC_NPROCESSORS_CONF); printf("frame_check_task:system has %d processor(s)\n", num); CPU_ZERO(&mask);//置空 CPU_SET(0, &mask);//设置亲和力值 if (pthread_setaffinity_np(pthread_self(), sizeof(mask), &mask) < 0)//设置线程CPU亲和力 { fprintf(stderr, "set thread affinity failed\n"); } if (pthread_getaffinity_np(pthread_self(), sizeof(get), &get) < 0)//获取线程CPU亲和力 { fprintf(stderr, "get thread affinity failed\n"); } printf("rtsp ts is %lld\n", rtsp_ts); demo = create_rtsp_demo(8554);//rtsp sever socket if (NULL == demo) { SAMPLE_PRT("rtsp new demo failed!\n"); // return 0; } // session[0] = rtsp_new_session(demo, "/mnt/sample/venc/rtsp.264");//对应rtsp session session[0] = create_rtsp_session(demo, "/mnt/sample/venc/RTSP/RTSP_chn0.h264");//对应rtsp session if (NULL == session[0]) { printf("rtsp_new_session failed\n"); // continue; } printf("==========> rtsp://192.168.0.164:8554/mnt/sample/venc/RTSP/RTSP_chn0.h264 <===========\n" ); // ts = rtsp_get_reltime(); // rtsp_do_event(demo); // signal(SIGINT, sig_proc); // sndh264flag==HI_TRUE // while(sndh264flag==HI_TRUE) uint64_t ts = 0; Buffer* buff=NULL; while(1) { sem_wait(&sem_rtspflg); if(h264BuffPool_rtsp==NULL) { continue; } buff=h264BuffPool_rtsp->getFullBuff(); if(buff==NULL) { h264BuffPool_rtsp->putEmptyBuff(buff); continue; } /* [DEBUG rtsp_demo.c:1145:rtsp_recv_msg] peer closed [INFO rtsp_demo.c:428:rtsp_del_client_connection] delete client 29 from 192.168.0.73 */ if((buff->data[4]==0x06) ) { h264BuffPool_rtsp->putEmptyBuff(buff); continue; } ts = rtsp_get_reltime();//clock_gettime() // ts = rtsp_get_abstime();//gettimeofday 可能大量丢画面 rtsp_tx_video(session[0],(uint8_t*)buff->data,buff->dlen, ts); rtsp_do_event(demo); h264BuffPool_rtsp->putEmptyBuff(buff); } return 0; } /****************************************************************************** *描述 :保存码流到bufferpool ******************************************************************************/ HI_VOID SAMPLE_PUSH_HDENCFRAME_ToBuffer(BuffPool *bufpool,HI_U8* frmaddr ,HI_U32 frmlen) { //printf("hd recv frm len is %d\n",frmlen); if(bufpool==NULL) { printf("bufpool BuffPool is invalid && return !\n"); return; } if(frmlen>=512*1024) { printf("frmlen %d is greater than bufferpool size\n",frmlen); } Buffer* tmpbuff=bufpool->getEmptyBuff(); if(tmpbuff==NULL) { tmpbuff->dlen=0; bufpool->putEmptyBuff(tmpbuff); } else { memcpy(tmpbuff->data,frmaddr,frmlen); tmpbuff->dlen=frmlen; bufpool->putFullBuff(tmpbuff); } // if(rtspEnable) // { // sem_post(&sem_rtspflg); // } // if(saveEnable) // { // sem_post(&sem_saveflg); // } } /****************************************************************************** *描述 :处理文件 *参数 :pFd 文件描述符 pstStream 帧码流类型结构体。 *返回值:成功返回0 *注意 :无 ******************************************************************************/ HI_S32 PLATFORM_VENC_HandleStream(FILE* pFd, VENC_STREAM_S* pstStream) { HI_U32 i; if(rtspEnable) { for (i = 0; i < pstStream->u32PackCount; i++) { SAMPLE_PUSH_HDENCFRAME_ToBuffer(h264BuffPool_rtsp, (uint8_t*)(pstStream->pstPack[i].pu8Addr + pstStream->pstPack[i].u32Offset), pstStream->pstPack[i].u32Len - pstStream->pstPack[i].u32Offset); sem_post(&sem_rtspflg); } } if(saveEnable) { for (i = 0; i < pstStream->u32PackCount; i++) { SAMPLE_PUSH_HDENCFRAME_ToBuffer(h264BuffPool_save, (uint8_t*)(pstStream->pstPack[i].pu8Addr + pstStream->pstPack[i].u32Offset), pstStream->pstPack[i].u32Len - pstStream->pstPack[i].u32Offset); sem_post(&sem_saveflg); } } return HI_SUCCESS; }