海思3559万能平台搭建:RTSP优化buffpool的引入

简介: 海思3559万能平台搭建:RTSP优化buffpool的引入

前言

 在只有一路编码进行rtsp播放和保存时。之前的思路是没有任何问题的,设计比较简单,但是如果我们的运算量上来了,也不仅仅一个通道编码了,编码速率,保存速率,rtsp发送包的速率的差异会越来越大,而每一包的大小又不可能相同,这个时候就需要引入一个缓冲池来平衡输入输出的速率不一致(简单这么理解一下),且解决包大小不同的问题

 理论部分节选自知乎专栏https://zhuanlan.zhihu.com/p/533321012

 代码部分给出了缓冲池功能源码以及调用部分代码

缓冲管理

 最开始的缓冲引入是为了缓和 CPU 与 I/O 设备速度不匹配的矛盾,提高 CPU 和 I/O 设备的并行性,在现代操作系统中,几乎所有的 I/O 设备在与处理机交换数据时都用了缓冲区。缓冲管理的主要职责是组织好这些缓冲区,并提供获得和释放缓冲区的手段。

 事实上,凡在数据到达速率与其离去速率不同的地方,都可设置缓冲区,以缓和它们之间速率不匹配的矛盾。众所周知,CPU的运算速率远远高于 I/O 设备的速率,如果没有缓冲区,则在输出数据时,必然会由于打印机的速度跟不上而使 CPU 停下来等待;然而在计算阶段,打印机又空闲无事。显然,如果在打印机或控制器中设置一缓冲区,用于快速暂存程序的输出数据,以后由打印机“慢慢地”从中取出数据打印,这样,就可提高 CPU 的工作效率。类似地,在输入设备与 CPU 之间也设置缓冲区,也可使 CPU 的工作效率得以提高。

 减少对 CPU 的中断频率,放宽对 CPU 中断响应时间的限制。在远程通信系统中,如果从远地终端发来的数据仅用一位缓冲来接收,如下图(a)所示,则必须在每收到一位数据时便中断一次 CPU,这样,对于速率为 9.6 Kb/s 的数据通信来说,就意味着其中断 CPU的频率也为 9.6 Kb/s,即每 100 μs 就要中断 CPU 一次,而且 CPU 必须在 100 μs 内予以响应,否则缓冲区内的数据将被冲掉。倘若设置一个具有 8 位的缓冲(移位)寄存器,如下图(b)所示,则可使 CPU 被中断的频率降低为原来的 1/8;若再设置一个 8 位寄存器,如下图©所示,则又可把 CPU 对中断的响应时间放宽到 800 μs。

62a22835d4844b7d9aa8aea8b1ca39ff.png

 提高 CPU 和 I/O 设备之间的并行性。缓冲的引入可显著地提高 CPU 和 I/O 设备间的并行操作程度,提高系统的吞吐量和设备的利用率。例如,在 CPU 和打印机之间设置了缓冲区后,便可使 CPU 与打印机并行工作。

单缓冲(Single Buffer)

 在单缓冲情况下,每当用户进程发出一 I/O 请求时,操作系统便在主存中为之分配一缓冲区,如下图所示。在块设备输入时,假定从磁盘把一块数据输入到缓冲区的时间为 T,操作系统将该缓冲区中的数据传送到用户区的时间为 M,而 CPU 对这一块数据处理(计算)的时间为 C。由于 T 和 C 是可以并行的(如下图),当 T>C 时,系统对每一块数据的处理时间为 M+T,反之则为 M+C,故可把系统对每一块数据的处理时间表示为Max(C,T)+M。

d5a270314f89469a8628f5f5be635b52.png

 在字符设备输入时,缓冲区用于暂存用户输入的一行数据,在输入期间,用户进程被挂起以等待数据输入完毕;在输出时,用户进程将一行数据输入到缓冲区后,继续进行处理。当用户进程已有第二行数据输出时,如果第一行数据尚未被提取完毕,则此时用户进程应阻塞。

双缓冲(Double Buffer)

 为了加快输入和输出速度,提高设备利用率,人们又引入了双缓冲区机制,也称为缓冲对换(Buffer Swapping)。在设备输入时,先将数据送入第一缓冲区,装满后便转向第二缓冲区。此时操作系统可以从第一缓冲区中移出数据,并送入用户进程(如下图)。接着由 CPU 对数据进行计算。在双缓冲时,系统处理一块数据的时间可以粗略地认为是Max(C,T)。如果 C<T,可使块设备连续输入;如果 C>T,则可使 CPU 不必等待设备输入。对于字符设备,若采用行输入方式,则采用双缓冲通常能消除用户的等待时间,即用户在输入完第一行之后,在 CPU 执行第一行中的命令时,用户可继续向第二缓冲区输入下一行数据。

8ead67b4abc746bdaac577ae10feb392.png

 如果我们在实现两台机器之间的通信时,仅为它们配置了单缓冲,如下图(a)所示,那么,它们之间在任一时刻都只能实现单方向的数据传输。例如,只允许把数据从 A 机传送到 B 机,或者从 B 机传送到 A 机,而绝不允许双方同时向对方发送数据。为了实现双向数据传输,必须在两台机器中都设置两个缓冲区,一个用作发送缓冲区,另一个用作接收缓冲区,如下图(b)所示。

558ef86d4dfd48109ff4e4bba845492e.png

循环缓冲

 当输入与输出或生产者与消费者的速度基本相匹配时,采用双缓冲能获得较好的效果,可使生产者和消费者基本上能并行操作。但若两者的速度相差甚远,双缓冲的效果则不够理想,不过可以随着缓冲区数量的增加,使情况有所改善。因此,又引入了多缓冲机制。可将多个缓冲组织成循环缓冲形式。对于用作输入的循环缓冲,通常是提供给输入进程或计算进程使用,输入进程不断向空缓冲区输入数据,而计算进程则从中提取数据进行计算。

循环缓冲的组成

 多个缓冲区。在循环缓冲中包括多个缓冲区,其每个缓冲区的大小相同。作为输入的多缓冲区可分为三种类型:用于装输入数据的空缓冲区 R、已装满数据的缓冲区 G 以及计算进程正在使用的现行工作缓冲区 C,如下图所示。

45215f2cddbf4bc4ba24677540a28157.png

 多个指针。作为输入的缓冲区可设置三个指针:用于指示计算进程下一个可用缓冲区 G 的指针 Nextg、指示输入进程下次可用的空缓冲区 R 的指针 Nexti,以及用于指示计算进程正在使用的缓冲区 C 的指针 Current。

循环缓冲区的使用

 计算进程和输入进程可利用下述两个过程来使用循环缓冲区。

 Getbuf 过程。当计算进程要使用缓冲区中的数据时,可调用 Getbuf 过程。该过程将由指针 Nextg 所指示的缓冲区提供给进程使用,相应地,须把它改为现行工作缓冲区,并令 Current 指针指向该缓冲区的第一个单元,同时将 Nextg 移向下一个 G 缓冲区。类似地,每当输入进程要使用空缓冲区来装入数据时,也调用 Getbuf 过程,由该过程将指针 Nexti所指示的缓冲区提供给输入进程使用,同时将 Nexti 指针移向下一个 R 缓冲区。

 Releasebuf 过程。当计算进程把 C 缓冲区中的数据提取完毕时,便调用 Releasebuf过程,将缓冲区 C 释放。此时,把该缓冲区由当前(现行)工作缓冲区 C 改为空缓冲区 R。类似地,当输入进程把缓冲区装满时,也应调用 Releasebuf 过程,将该缓冲区释放,并改为 G缓冲区。

进程同步

 使用输入循环缓冲,可使输入进程和计算进程并行执行。相应地,指针 Nexti 和指针Nextg 将不断地沿着顺时针方向移动,这样就可能出现下述两种情况:

 Nexti 指针追赶上 Nextg 指针。这意味着输入进程输入数据的速度大于计算进程处理数据的速度,已把全部可用的空缓冲区装满,再无缓冲区可用。此时,输入进程应阻塞,直到计算进程把某个缓冲区中的数据全部提取完,使之成为空缓冲区 R,并调用 Releasebuf过程将它释放时,才将输入进程唤醒。这种情况被称为系统受计算限制。

 Nextg 指针追赶上 Nexti 指针。这意味着输入数据的速度低于计算进程处理数据的速度,使全部装有输入数据的缓冲区都被抽空,再无装有数据的缓冲区供计算进程提取数据。这时,计算进程只能阻塞,直至输入进程又装满某个缓冲区,并调用 Releasebuf 过程将它释放时,才去唤醒计算进程。这种情况被称为系统受 I/O 限制。

缓冲池

 上述的缓冲区仅适用于某特定的 I/O 进程和计算进程,因而它们属于专用缓冲。当系统较大时,将会有许多这样的循环缓冲,这不仅要消耗大量的内存空间,而且其利用率不高。为了提高缓冲区的利用率,目前广泛流行公用缓冲池(Buffer Pool),在池中设置了多个可供若干个进程共享的缓冲区。

缓冲池的组成

 对于既可用于输入又可用于输出的公用缓冲池,其中至少应含有以下三种类型的缓冲区:

 ① 空(闲)缓冲区;

 ② 装满输入数据的缓冲区;

 ③ 装满输出数据的缓冲区。

 为了管理上的方便,可将相同类型的缓冲区链成一个队列,于是可形成以下三个队列:

 空缓冲队列 emq。这是由空缓冲区所链成的队列。其队首指针 F(emq)和队尾指针L(emq)分别指向该队列的首缓冲区和尾缓冲区。

 输入队列 inq。这是由装满输入数据的缓冲区所链成的队列。其队首指针 F(inq)和队尾指针 L(inq)分别指向该队列的首缓冲区和尾缓冲区。

 输出队列 outq。这是由装满输出数据的缓冲区所链成的队列。其队首指针 F(outq)和队尾指针 L(outq)分别指向该队列的首缓冲区和尾缓冲区。

 除了上述三个队列外,还应具有四种工作缓冲区:① 用于收容输入数据的工作缓冲区;② 用于提取输入数据的工作缓冲区;③ 用于收容输出数据的工作缓冲区;④ 用于提取输出数据的工作缓冲区。

Getbuf 过程和 Putbuf 过程

 在“数据结构”课程中,曾介绍过队列和对队列进行操作的两个过程,它们是:

 Addbuf(type,number)过程。该过程用于将由参数number 所指示的缓冲区B挂在type队列上。

 Takebuf(type)过程。该过程用于从 type 所指示的队列的队首摘下一个缓冲区。

 这两个过程能否用于对缓冲池中的队列进行操作呢?答案是否定的。因为缓冲池中的队列本身是临界资源,多个进程在访问一个队列时,既应互斥,又须同步。为此,需要对这两个过程加以改造,以形成可用于对缓冲池中的队列进行操作的 Getbuf 和 Putbuf 过程。

 为使诸进程能互斥地访问缓冲池队列,可为每一队列设置一个互斥信号量 MS(type)。此外,为了保证诸进程同步地使用缓冲区,又为每个缓冲队列设置了一个资源信号量RS(type)。既可实现互斥又可保证同步的 Getbuf 过程和 Putbuf 过程描述如下:

cd1d7205f4024ec58e3592078e17ba12.png

缓冲区的工作方式

 缓冲区可以工作在收容输入、提取输入、收容输出和提取输出四种工作方式下,如下图所示。

c4f430e19419424996566afaa5cac397.png

 收容输入。在输入进程需要输入数据时,便调用 Getbuf(emq)过程,从空缓冲队列emq 的队首摘下一空缓冲区,把它作为收容输入工作缓冲区 hin。然后,把数据输入其中,装满后再调用 Putbuf(inq,hin)过程,将该缓冲区挂在输入队列 inq 上。

 提取输入。当计算进程需要输入数据时,调用 Getbuf(inq)过程,从输入队列 inq 的队首取得一个缓冲区,作为提取输入工作缓冲区(sin),计算进程从中提取数据。计算进程用完该数据后,再调用 Putbuf(emq,sin)过程,将该缓冲区挂到空缓冲队列 emq 上。

 收容输出。当计算进程需要输出时,调用 Getbuf(emq)过程从空缓冲队列 emq 的队首取得一个空缓冲区,作为收容输出工作缓冲区 hout。当其中装满输出数据后,又调用Putbuf(outq,hout)过程,将该缓冲区挂在 outq 末尾。

 提取输出。由输出进程调用 Getbuf(outq)过程,从输出队列的队首取得一装满输出数据的缓冲区,作为提取输出工作缓冲区 sout。在数据提取完后,再调用 Putbuf(emq,sout)过程,将该缓冲区挂在空缓冲队列末尾。

代码

bufferpool.cpp

// BufferPool.cpp : 闁跨喐鏋婚幏鐑芥晸閺傘倖瀚� DLL 鎼存棃鏁撻惌顐ゎ劜閹风兘鏁撻弬銈嗗閻楋繝鏁撻弬銈嗗闁跨喐鏋婚幏鐑芥晸閺傘倖瀚归柨鐔告灮閹风兘鏁撻敓锟�
//
//#include "stdafx.h"
#include "BufferPool.h"
#include "ThreadUtils.h"
BuffPool::BuffPool(const char *id) : EmptyBuffs(0), FullBuffs(0)
{
  last_empty = 0;
  last_full  = 0;
  next_full  = 0;
  buffsz     = 0;
  numbuf     = 0;
  pooladdr   = 0;
  pname      = id;
}
BuffPool::~BuffPool()
{
  try
  {
  Buffer *currp;
  EmptyPool.Lock();
    while((currp = last_empty))
    {last_empty = last_empty->next; delete currp;}
    EmptyPool.UnLock();
    FullPool.Lock();
    while((currp = next_full))
    {next_full = next_full->next; delete currp;}
    FullPool.UnLock();
    free(pooladdr);
    pooladdr = NULL;
  }
  catch ( ... )
  {
  }
}
int BuffPool::Allocate(int buffnum, int  bsize)
{
  char *buffaddr = NULL;
  int bnum = buffnum;
    //闁跨喐鏋婚幏宄扳偓锟�
    numempty=buffnum;
    numfull=0;
  Buffer *new_empty;
  buffaddr = (char*)malloc(bsize*buffnum);
  if(!buffaddr)
  {
  fprintf(stderr,"malloc buffer pool failed\n");
  return -1;
  }
  pooladdr = buffaddr;
  EmptyPool.Lock();//闁跨喐鏋婚幏鐑芥晸閺傘倖瀚�
  buffsz = bsize;//闁跨喐鏋婚幏鐑芥晸閺傘倖瀚归柨鐔告灮閹风兘鏁撻弬銈嗗闁跨噦鎷�
  while(bnum--)
  {
  if (!(new_empty = new Buffer(this)))
  {
    fprintf(stderr,"new buffer failed\n");
    break;
  }
  new_empty->data = (char *)buffaddr;
  new_empty->next = last_empty;
  last_empty = new_empty;
  buffaddr += buffsz;
  EmptyBuffs.Post();//闁跨喓绮ㄦ禍銈勭闁跨喐鏋婚幏鐑芥晸閺傘倖瀚瑰┃锟�
  }
  numbuf += buffnum-(bnum+1);
  EmptyPool.UnLock();
  return -(bnum+1);
}
Buffer *BuffPool::getEmptyBuff()
{
  Buffer *buffp = 0;
  while(!buffp)
  {
  EmptyBuffs.Wait();//闁跨喖銈烘潏鐐闁跨喐鏋婚幏閿嬬爱
  EmptyPool.Lock();
  buffp = last_empty;
  last_empty = buffp->next;
  EmptyPool.UnLock();
  }
  buffp->dlen = 0;
  return buffp;
}
void BuffPool::putEmptyBuff(Buffer *buffp)
{
  EmptyPool.Lock();
  buffp->next = last_empty;
  last_empty  = buffp;
    numempty+=1;
    numfull-=1;
    //printf("numempty is %d numfull is %d\n",numempty,numfull);
  EmptyPool.UnLock();
  EmptyBuffs.Post();//闁跨喖鍙洪崙銈嗗闁跨喐鏋婚幏閿嬬爱
}
Buffer *BuffPool::getFullBuff()
{
  Buffer *buffp = 0;
  while(!buffp)
  {
  FullBuffs.Wait();
  FullPool.Lock();
  buffp = next_full;
  if (!(next_full = buffp->next)) last_full = 0;
  FullPool.UnLock();
  }
  return buffp;
}
Buffer *BuffPool::getFullBuff(HANDLE_FUNCTION handler)
{
  Buffer *buffp = this->getFullBuff();
  if (buffp == NULL) 
  return NULL;
  int r = (*handler)(buffp);
  if (r < 0)
  fprintf(stderr,"Handler occure error,Please check it\n");
  return buffp;
}
void BuffPool::putFullBuff(Buffer *buffp)
{
  FullPool.Lock();
  if (last_full) 
  last_full->next = buffp;
  else 
  next_full = buffp;
  last_full = buffp;
  buffp->next = 0;
    //闁跨喐鏋婚幏鐑芥晸閺傘倖瀚�
    numempty-=1;
    numfull+=1;
    //printf("numempty is %d numfull is %d\n",numempty,numfull);
  FullPool.UnLock();
  FullBuffs.Post();
}

buffpool.h

#ifndef __BUFFPOOL_H__
#define __BUFFPOOL_H__
#include <stdlib.h>
#include <string.h>
#include "ThreadUtils.h"
#ifdef BUFFERPOOL_EXPORTS
#define BUFFERPOOL_API __declspec(dllexport)
#else
#define BUFFERPOOL_API __declspec(dllimport)
#endif
class Buffer;
typedef int (*HANDLE_FUNCTION)(Buffer *);
class /*BUFFERPOOL_API*/ BuffPool
{
    //自己添加代码
    //int
public:
  int          Allocate(int buffnum, int bsize);
  int          BuffCount() {return numbuf;}
  int          BufferSize() {return buffsz;}
  Buffer  *getEmptyBuff();//返回空指针的地址  xin.han
  void  putEmptyBuff(Buffer *buff);
  Buffer  *getFullBuff();//返回最后一个有效数据的地址 xin.han
  Buffer  *getFullBuff(HANDLE_FUNCTION handler);
  void         putFullBuff(Buffer *buff);
  BuffPool(const char *id);
  ~BuffPool();
    int         numempty;
    int         numfull;
private:
  BufferPoolMutex EmptyPool; //互斥锁
  BufferPoolMutex FullPool;
  BufferPoolSemaphore EmptyBuffs;//资源锁
  BufferPoolSemaphore FullBuffs;
  int         numbuf;
//    int         numempty;
//    int         numfull;
  int         buffsz;//缓冲块的容量
  const char *pname;
  char *pooladdr; //内存池首地址
  Buffer *next_full;
  Buffer *last_full;
  Buffer *last_empty;
};
class /*BUFFERPOOL_API*/ Buffer
{
public:
  Buffer  *next;
  int           dlen;//缓冲块中有效数据量
  char         *data;
  inline void   Recycle() {Owner->putEmptyBuff(this);}
  Buffer(BuffPool *oP, char *bp=0, int data_len=0)
  : next(0), dlen(data_len),data(bp), Owner(oP){}
  ~Buffer() {if (data) data=NULL;}
private:
  BuffPool *Owner;
};
#endif

threadutils.h

#ifndef __THREADUTILS__
#define __THREADUTILS__
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#define MAX_SEM_COUNT 200
class /*BUFFERPOOL_API*/ BufferPoolMutex
{
    public:
        inline void   Lock() {pthread_mutex_lock(&ghMutex);}
        inline void UnLock() {pthread_mutex_unlock(&ghMutex);}
        BufferPoolMutex() 
        {
            pthread_mutex_init(&ghMutex,NULL);
        }
        ~BufferPoolMutex() {pthread_mutex_destroy(&ghMutex);}
    private:
        pthread_mutex_t ghMutex;
};
class /*BUFFERPOOL_API*/ BufferPoolSemaphore
{
    public:
        inline void Post() {sem_post(&ghSemaphore);}
        inline void Wait() {sem_wait(&ghSemaphore);}
        BufferPoolSemaphore(int semval=0) 
        {
            sem_init(&ghSemaphore,0,0);
        }
        ~BufferPoolSemaphore() {sem_destroy(&ghSemaphore);}
    private:
        sem_t ghSemaphore;
};
#endif

调用部分

 PLATFORM_BuffPoll_Init初始化buffpool,开辟内存,析构时释放

 通过信号量来通知,有了新的数据

 线程1里源源不断编码,然后将码流先SAMPLE_PUSH_HDENCFRAME_ToBuffer保存至buffpool,比如在线播放功能,那么在线播放的信号量+1,SAMPLE_PUSH_HDENCFRAME_ToBuffer的具体操作为新建个临时buff,判断大小有没有超出初始化时设置的,接着Buffer* tmpbuff=bufpool->getEmptyBuff();,若获取到空则释放bufpool->putEmptyBuff(tmpbuff);获取成功了则把要保存的内容 memcpy(tmpbuff->data,frmaddr,frmlen);到缓冲池临时buff里,然后把临时buff丢进缓冲池bufpool->putFullBuff(tmpbuff);

 线程2里while1执行在线播放的信号量sem_wait(&sem_rtspflg);自减。新建个临时buff指针指向getFullBuff();,若获取到为空则释放h264BuffPool_rtsp->putEmptyBuff(buff);,接着发送临时buff的内容,发送完毕后释放h264BuffPool_rtsp->putEmptyBuff(buff);

 抽象出来简单来说就是个大内存,化为两部分,一个方向上empty里面拿一块放在了full部分,另一个方向就从full部分copy走在放回empty部分,使用比较简单


/******************************************************************************
  Copyright (C), 2022, Sunwise Space. Co., Ltd.
 ******************************************************************************
  File Name     : rtsp.c
  Version       : 
  Author        : xin.han
  Created       : 2022.06.16
  Description   :
******************************************************************************/
extern "C" {
#include "rtsp.h"
}
#include "BufferPool.h"
#include "utils_buf.h"
extern sem_t sem_saveflg;
extern sem_t sem_rtspflg;
// static int flag_run = 1;
BuffPool *h264BuffPool_rtsp=NULL;
BuffPool *h264BuffPool_save=NULL;
rtsp_demo_handle demo;
rtsp_session_handle session[MAX_SESSION_NUM] = {NULL};
// static uint64_t ts = 0;
extern HI_S64 rtsp_ts;
extern HI_BOOL saveEnable;
// static void sig_proc(int signo)
// {
//     flag_run = 0;
// }
/* 
 *描述  :buffpool的初始化
 *参数  :NULL
 *返回值:无
 *注意  :只开辟了空间,析构的时候自动释放
 */
HI_S32 PLATFORM_BuffPoll_Init()
{
  sem_init(&sem_saveflg,0,0);
  sem_init(&sem_rtspflg,0,0);
  h264BuffPool_rtsp=new BuffPool("h264buffer");
  h264BuffPool_rtsp->Allocate(10,512*1024);
    if(h264BuffPool_rtsp!=NULL)
    {
        //判断创建缓冲区是否成功
        printf("allocate h264BuffPool_rtsp success!\n");
    }
    else
    {
        printf("allocate h264BuffPool_rtsp failed!\n");
    }
  h264BuffPool_save=new BuffPool("h264buffer");
  h264BuffPool_save->Allocate(10,512*1024);
    if(h264BuffPool_save!=NULL)
    {
        //判断创建缓冲区是否成功
        printf("allocate h264BuffPool_save success!\n");
    }
    else
    {
        printf("allocate h264BuffPool_save failed!\n");
    }
}
// static uint64_t rtsp_get_abstime (void)
// {
//  struct timeval tv;
//  gettimeofday(&tv, NULL);
//  return (tv.tv_sec * 1000000ULL + tv.tv_usec);
// }
/* 
 *描述  :用于rtsp实时播放的线程
 *参数  :NULL
 *返回值:无
 *注意  :加载文件platform.ini  rtsp://192.168.119.164:8554/mnt/sample/venc/rtsp.264
    2022-10-21 新增bufferpool和信号量
 */
void *video_play_rtsp_task(void* arg)
{
  cpu_set_t mask;//cpu核的集合
    cpu_set_t get;//获取在集合中的cpu
    int num = sysconf(_SC_NPROCESSORS_CONF);
    printf("frame_check_task:system has %d processor(s)\n", num);
    CPU_ZERO(&mask);//置空
    CPU_SET(0, &mask);//设置亲和力值
    if (pthread_setaffinity_np(pthread_self(), sizeof(mask), &mask) < 0)//设置线程CPU亲和力
    {
        fprintf(stderr, "set thread affinity failed\n");
    }
    if (pthread_getaffinity_np(pthread_self(), sizeof(get), &get) < 0)//获取线程CPU亲和力
    {
        fprintf(stderr, "get thread affinity failed\n");
    }
    printf("rtsp ts is %lld\n", rtsp_ts);
  demo = create_rtsp_demo(8554);//rtsp sever socket
  if (NULL == demo) {
  SAMPLE_PRT("rtsp new demo failed!\n");
  // return 0;
  }
  // session[0] = rtsp_new_session(demo, "/mnt/sample/venc/rtsp.264");//对应rtsp session 
  session[0] = create_rtsp_session(demo, "/mnt/sample/venc/RTSP/RTSP_chn0.h264");//对应rtsp session 
  if (NULL == session[0]) {
    printf("rtsp_new_session failed\n");  
    // continue;
  }
  printf("==========> rtsp://192.168.0.164:8554/mnt/sample/venc/RTSP/RTSP_chn0.h264 <===========\n" );
  // ts = rtsp_get_reltime();
  // rtsp_do_event(demo);
  // signal(SIGINT, sig_proc);
  // sndh264flag==HI_TRUE
  // while(sndh264flag==HI_TRUE)
  uint64_t ts = 0;
  Buffer* buff=NULL;
  while(1)
  {
  sem_wait(&sem_rtspflg);
  if(h264BuffPool_rtsp==NULL)
  {
    continue;
  }
  buff=h264BuffPool_rtsp->getFullBuff();
  if(buff==NULL)
  {
    h264BuffPool_rtsp->putEmptyBuff(buff);
    continue;
  }
  /* [DEBUG rtsp_demo.c:1145:rtsp_recv_msg] peer closed [INFO  rtsp_demo.c:428:rtsp_del_client_connection] delete client 29 from 192.168.0.73 */
  if((buff->data[4]==0x06) )
  {
    h264BuffPool_rtsp->putEmptyBuff(buff);
    continue;
  }
  ts = rtsp_get_reltime();//clock_gettime()
  // ts = rtsp_get_abstime();//gettimeofday  可能大量丢画面
  rtsp_tx_video(session[0],(uint8_t*)buff->data,buff->dlen, ts);
  rtsp_do_event(demo);
  h264BuffPool_rtsp->putEmptyBuff(buff);
  }
    return 0;
}
/****************************************************************************** 
 *描述  :保存码流到bufferpool
 ******************************************************************************/
HI_VOID SAMPLE_PUSH_HDENCFRAME_ToBuffer(BuffPool  *bufpool,HI_U8* frmaddr ,HI_U32 frmlen)
{
    //printf("hd recv frm len is %d\n",frmlen);
    if(bufpool==NULL)
    {
        printf("bufpool BuffPool is invalid && return !\n");
        return;
    }
    if(frmlen>=512*1024)
    {
        printf("frmlen %d is greater than bufferpool size\n",frmlen);
    }
    Buffer* tmpbuff=bufpool->getEmptyBuff();
    if(tmpbuff==NULL)
    {
        tmpbuff->dlen=0;
        bufpool->putEmptyBuff(tmpbuff);
    }
    else
    {
        memcpy(tmpbuff->data,frmaddr,frmlen);
        tmpbuff->dlen=frmlen;
        bufpool->putFullBuff(tmpbuff);
    }
    // if(rtspEnable)
    // {
    //     sem_post(&sem_rtspflg);
    // }
    // if(saveEnable)
    // {
    //     sem_post(&sem_saveflg);
    // }
}
/******************************************************************************
 *描述  :处理文件
 *参数  :pFd 文件描述符
          pstStream 帧码流类型结构体。
 *返回值:成功返回0
 *注意  :无
******************************************************************************/
HI_S32 PLATFORM_VENC_HandleStream(FILE* pFd, VENC_STREAM_S* pstStream)
{
    HI_U32 i;
    if(rtspEnable)
    {
        for (i = 0; i < pstStream->u32PackCount; i++)
        {
            SAMPLE_PUSH_HDENCFRAME_ToBuffer(h264BuffPool_rtsp,
                                            (uint8_t*)(pstStream->pstPack[i].pu8Addr + pstStream->pstPack[i].u32Offset),
                                            pstStream->pstPack[i].u32Len - pstStream->pstPack[i].u32Offset);
            sem_post(&sem_rtspflg);
        }
    }
    if(saveEnable)
    {
        for (i = 0; i < pstStream->u32PackCount; i++)
        {
            SAMPLE_PUSH_HDENCFRAME_ToBuffer(h264BuffPool_save,
                                            (uint8_t*)(pstStream->pstPack[i].pu8Addr + pstStream->pstPack[i].u32Offset),
                                            pstStream->pstPack[i].u32Len - pstStream->pstPack[i].u32Offset);
            sem_post(&sem_saveflg);
        }
    }
    return HI_SUCCESS;
}


相关文章
|
6月前
|
数据采集 监控 安全
精简高效与安全兼备:ARM32与MCU32平台上的信息协议设计新思路
精简高效与安全兼备:ARM32与MCU32平台上的信息协议设计新思路
267 1
|
3月前
|
UED 开发工具 iOS开发
Uno Platform大揭秘:如何在你的跨平台应用中,巧妙融入第三方库与服务,一键解锁无限可能,让应用功能飙升,用户体验爆棚!
【8月更文挑战第31天】Uno Platform 让开发者能用同一代码库打造 Windows、iOS、Android、macOS 甚至 Web 的多彩应用。本文介绍如何在 Uno Platform 中集成第三方库和服务,如 Mapbox 或 Google Maps 的 .NET SDK,以增强应用功能并提升用户体验。通过 NuGet 安装所需库,并在 XAML 页面中添加相应控件,即可实现地图等功能。尽管 Uno 平台减少了平台差异,但仍需关注版本兼容性和性能问题,确保应用在多平台上表现一致。掌握正确方法,让跨平台应用更出色。
49 0
|
4月前
|
监控 BI 数据处理
LabVIEW与欧陆温控表通讯的实现与应用:厂商软件与自主开发的优缺点
LabVIEW与欧陆温控表通讯的实现与应用:厂商软件与自主开发的优缺点
39 0
|
负载均衡
LOOK!直播APP源码平台的稳定控制方法
我就把简单两步控制直播APP源码平台的稳定的方法分享给大家了,开发直播APP源码平台优质知识分享,大家有什么不懂的或是想要开发直播APP源码平台可以问我
LOOK!直播APP源码平台的稳定控制方法
|
缓存 数据处理
海思3559万能平台搭建:协议的采集和解析
海思3559万能平台搭建:协议的采集和解析
171 0
海思3559万能平台搭建:协议的采集和解析
|
存储 网络协议 流计算
|
Linux
海思3559万能平台搭建:串口编程
海思3559万能平台搭建:串口编程
300 0
海思3559万能平台搭建:串口编程
|
存储 缓存 网络协议