live555 RTSP服务器与客户端通信源码分析

简介: live555已经发展了很多年,不过最新的live555版本,笔者没有编译通过,最终选择了2019.8.28的live555代码,如果有需要的同学,可以自行去Index of /pub/contrib/live555/ (videolan.org)去下载,不过需要自己去编译,我的编译环境是windows版本,网上有很多关于如何将其编译为VS版本的live555的,如果有需要的同学,可以在博客下留言,我会给你发一个(自己对一些代码进行了注释,不过都是自己的理解,不一定正确)。对于代码的分析:RTSP服务器使用的testOnDemandRTSPServer.cpp,RTSP客户端使用的testRT

本文说明

live555已经发展了很多年,不过最新的live555版本,笔者没有编译通过,最终选择了2019.8.28的live555代码,如果有需要的同学,可以自行去Index of /pub/contrib/live555/ (videolan.org)去下载,不过需要自己去编译,我的编译环境是windows版本,网上有很多关于如何将其编译为VS版本的live555的,如果有需要的同学,可以在博客下留言,我会给你发一个(自己对一些代码进行了注释,不过都是自己的理解,不一定正确)。对于代码的分析:RTSP服务器使用的testOnDemandRTSPServer.cpp,RTSP客户端使用的testRTSPClient.cpp。


预备知识

本文分析了live555 RTSP客户端与RTSP服务器是如何建立通信的流程。也就是如何服务器与客户端是如何收发数据的,在学习live555之前,一般希望读者了解RTSP,网络编程有一定的了解。同时,这里不会具体说明各个模块及其类的作用。关于这部分的内容,可以参考live555 - 标签 - 乌合之众 - 博客园 (cnblogs.com)这里详细说明了UseageEnvirment,BasicUsageEnvironment2个模块的内容,另外对groupsock里面的部分类也进行了说明。另外,https://blog.csdn.net/niu_gao/category_9260885.html也对live555进行了很多分析。各位可以自行取舍。

Live555的RTSP服务器端和RTSP客户端使用的是TCP通信,如果你对C++的TCP流程不是很熟悉或者已经忘记了,可以参考https://blog.csdn.net/tennysonsky/article/details/45621341,这篇文章说的比较好。具有一定的参考价值。一般服务器端创建好socket之后,会监听客户端的请求,如果按照常规TCP的思路,如果没有客户端的连接(监听的队列没有完成连接),那么服务器就会一直阻塞在accept()函数处,直到客户端连接上,然后服务器就可以与客户端进行数据的发送了。这样在我们的平时的练习中不会有什么问题,但是在实际的项目中,服务器通常有很多工作要完成,不可能就死死的等待客户端的连接,因此就必须有办法让服务器在没有客户端的连接下,也能完成其余的工作(即是说不会一直阻塞在accept接口处),其实也很简单,将socket设置为非阻塞即可,但是这样就不知道客户端什么时候已经连接了,那么有没有方法知道客户端何时连接到了客户端呢?答案是肯定的,live555使用的是Select,其实还有其他的办法,比如poll,epoll,iocp。关于这几种方案的解决办法和各自使用的环境(操作系统),请自行百度。关于Select的讲解:可以参考:https://blog.csdn.net/zhougb3/article/details/79792089


代码分析

相信通过以上的储备知识,或者你已经对live555有部分接触后,就可以开始我们的源码分析了,为了节约文章篇幅,我可能不会将所有的源码打印出来,另外个人改变了一些代码格式,会省略掉部分代码,首先我们从服务器端的main函数开始


服务端主进程

// Begin by setting up our usage environment:
  TaskScheduler* scheduler = BasicTaskScheduler::createNew();
  env = BasicUsageEnvironment::createNew(*scheduler);
  UserAuthenticationDatabase* authDB = NULL;//以下为权限控制的代码,设置后没有权限的客户端无法进行连接 
#ifdef ACCESS_CONTROL
  // To implement client access control to the RTSP server, do the following:
  authDB = new UserAuthenticationDatabase;
  authDB->addUserRecord("username1", "password1"); // replace these with real strings
  // Repeat the above with each <username>, <password> that you wish to allow
  // access to the server.
#endif
  // Create the RTSP server:此时就一直处于监听客户端的连接
  //(每一个客户端连接上来,就会调用accept去接收客户端的连接,然后不停的处理与客户端收发工作)  
  RTSPServer* rtspServer = RTSPServer::createNew(*env, 8554, authDB);
  if (rtspServer == NULL) 
  {
    *env << "Failed to create RTSP server: " << env->getResultMsg() << "\n";
    exit(1);
  }
  char const* descriptionString
    = "Session streamed by \"testOnDemandRTSPServer\"";
  / Set up each of the possible streams that can be served by the
  // RTSP server.  Each such stream is implemented using a
  // "ServerMediaSession" object, plus one or more
  // "ServerMediaSubsession" objects for each audio/video substream.
  // A MPEG-4 video elementary stream:
  {
    char const* streamName = "mpeg4ESVideoTest";
    char const* inputFileName = "test.m4e";
    ServerMediaSession* sms
      = ServerMediaSession::createNew(*env, streamName, streamName,
          descriptionString);
    sms->addSubsession(MPEG4VideoFileServerMediaSubsession
         ::createNew(*env, inputFileName, reuseFirstSource));
    rtspServer->addServerMediaSession(sms);
    announceStream(rtspServer, sms, streamName, inputFileName);
  }
  // A H.264 video elementary stream:
  {
    char const* streamName = "h264ESVideoTest";//流名字,媒体名
    char const* inputFileName = "test.264";//文件名,当客户端输入的流名字为h264ESVideoTest时,实际上打开的是test.264文件
    //创建媒体会话  
    //当客户点播时,要输入流名字streamName,告诉RTSP服务器点播的是哪个流。  
    //流名字和文件名的对应关系是通过增加子会话建立起来的(流名字streamName不是文件名inputFileName)。
    //媒体会话对会话描述、会话持续时间、流名字等与会话有关的信息进行管理  
    //第二个参数:媒体名、三:媒体信息、四:媒体描述
    ServerMediaSession* sms
      = ServerMediaSession::createNew(*env, streamName, streamName,
          descriptionString);
    //添加264子会话 这里的文件名才是真正打开文件的名字  
   //reuseFirstSource:  
   //这里的HH264VideoFileServerMediaSubsession类派生自FileServerMediaSubsession派生自OnDemandServerMediaSubsession  
   //而OnDemandServerMediaSubsession和PassiveMediaSubsession共同派生自ServerMediaSubsession  
   //关于读取文件之类都在这个类中实现的,如果要将点播改为直播就是要新建类继承此类然后添加新的方法
    sms->addSubsession(H264VideoFileServerMediaSubsession
         ::createNew(*env, inputFileName, reuseFirstSource));
    rtspServer->addServerMediaSession(sms); //6.为rtspserver添加session
    announceStream(rtspServer, sms, streamName, inputFileName); //打印信息到标准输出
  }
  ......
   // Also, attempt to create a HTTP server for RTSP-over-HTTP tunneling.
  // Try first with the default HTTP port (80), and then with the alternative HTTP
  // port numbers (8000 and 8080).
  if (rtspServer->setUpTunnelingOverHTTP(80) || 
  rtspServer->setUpTunnelingOverHTTP(8000) || rtspServer->setUpTunnelingOverHTTP(8080)) 
  {
    *env << "\n(We use port " << rtspServer->httpServerPortNum() << " for optional RTSP-over-HTTP tunneling.)\n";
  } else {
    *env << "\n(RTSP-over-HTTP tunneling is not available.)\n";
  }
  //执行循环方法,来执行循环方法,对套接字的读取事件和对媒体文件的延时发送操作都在这个循环中完成。 
  env->taskScheduler().doEventLoop(); // does not return
  return 0; // only to prevent compiler warning
}

主函数的主要功能是创建基本环境对象,事件调度对象,创建 ServerMediaSession和ServerMediaSubsession对象。关于ServerMediaSession和ServerMediaSubsession请自行百度,简单的说,ServerMediaSession和一个文件一一对应,记录了一个文件的一些信息,比如文件对应的流名字等。ServerMediaSubsession是ServerMediaSession的子会话,代表一路音频或者视频流,比如一个视频文件中可能同时包括音频和视频信息。那么ServerMediaSession对象就应该有2个ServerMediaSubsession。live555 实现了隧道的支持(rtspServer->setUpTunnelingOverHTTP这里是与隧道相关的代码,不过一般在日常没有使用,所以关于HTTP-over-TCP的代码都不做分析),最后env->taskScheduler().doEventLoop()进入消息循环。


消息循环

消息循环非常重要,是整个live555事件的发动机,没有它,客户端与服务器之间的各种通信都无法完成,因此要想继续后面的分析,因此你需要知道消息循环做了什么工作。网上也有很多文章进行了分析,我们看看消息循环里做了什么事情。

/*
     *    设置select的超时时间为maxDelayTime(<=0 或大于一百万秒 时1百万秒)
     *    调用int selectResult = select(fMaxNumSockets, &readSet, &writeSet, &exceptionSet, &tv_timeToDelay);
     *    如果select出错返回,打印出错信息,并调用 internalError函数
     *    从处理程序描述链表中查找fLastHandledSocketNum代表的 处理程序描述对象指针,
     * 如果没找到,就在后面的while的时候从链表的头开始,否则从找到的位置开始
     *    从链表中取出处理程序描述节点对象,并调用其内部保存的处理程序
     *    设置fTriggersAwaitingHandling
     *    调用fDelayQueue.handleAlarm();
    */
void BasicTaskScheduler::SingleStep(unsigned maxDelayTime) 
{
  fd_set readSet = fReadSet; // make a copy for this select() call
  fd_set writeSet = fWriteSet; // ditto
  fd_set exceptionSet = fExceptionSet; // ditto
  DelayInterval const& timeToDelay = fDelayQueue.timeToNextAlarm();//得到下一个要处理的延时结点的时间
  struct timeval tv_timeToDelay;
  tv_timeToDelay.tv_sec = timeToDelay.seconds();
  tv_timeToDelay.tv_usec = timeToDelay.useconds();
  // Very large "tv_sec" values cause select() to fail.
  // Don't make it any larger than 1 million seconds (11.5 days)
  const long MAX_TV_SEC = MILLION;//延时时间不能超过为100w秒
  if (tv_timeToDelay.tv_sec > MAX_TV_SEC) {
    tv_timeToDelay.tv_sec = MAX_TV_SEC;
  }
  // Also check our "maxDelayTime" parameter (if it's > 0): 注意maxDelayTime的单位是微妙
  if (maxDelayTime > 0 &&
      (tv_timeToDelay.tv_sec > (long)maxDelayTime/MILLION ||
       (tv_timeToDelay.tv_sec == (long)maxDelayTime/MILLION &&
  tv_timeToDelay.tv_usec > (long)maxDelayTime%MILLION))) { 
  //延时时间大于maxDelayTime的时间,那么处理时间就用传递的参数时间
    tv_timeToDelay.tv_sec = maxDelayTime/MILLION;
    tv_timeToDelay.tv_usec = maxDelayTime%MILLION;
  }
  /* 进入select阻塞等待 */
  int selectResult = select(fMaxNumSockets, &readSet, &writeSet, &exceptionSet, &tv_timeToDelay);
  if (selectResult < 0) {//-1 出错
#if defined(__WIN32__) || defined(_WIN32)
    int err = WSAGetLastError();
    // For some unknown reason, select() in Windoze sometimes fails with WSAEINVAL if
    // it was called with no entries set in "readSet".  If this happens, ignore it:
    if (err == WSAEINVAL && readSet.fd_count == 0) { //参数设置错误,不用管,设置错误码为系统调用中断
      err = EINTR;
      // To stop this from happening again, create a dummy socket:
      if (fDummySocketNum >= 0) closeSocket(fDummySocketNum);
      fDummySocketNum = socket(AF_INET, SOCK_DGRAM, 0);
      FD_SET((unsigned)fDummySocketNum, &fReadSet);
    }
    if (err != EINTR) { //如果不是参数设置错误,就输出错误码
#else
    if (errno != EINTR && errno != EAGAIN) {
#endif
  // Unexpected error - treat this as fatal:
#if !defined(_WIN32_WCE)
  perror("BasicTaskScheduler::SingleStep(): select() fails");
  // Because this failure is often "Bad file descriptor" 
  //- which is caused by an invalid socket number (i.e., a socket number
  // that had already been closed) being used in "select()"
  // - we print out the sockets that were being used in "select()",
  // to assist in debugging:
  fprintf(stderr, "socket numbers used in the select() call:");
  for (int i = 0; i < 10000; ++i) {
   if (FD_ISSET(i, &fReadSet) || FD_ISSET(i, &fWriteSet) || FD_ISSET(i, &fExceptionSet)) {
     fprintf(stderr, " %d(", i);
     if (FD_ISSET(i, &fReadSet)) fprintf(stderr, "r");
     if (FD_ISSET(i, &fWriteSet)) fprintf(stderr, "w");
     if (FD_ISSET(i, &fExceptionSet)) fprintf(stderr, "e");
     fprintf(stderr, ")");
   }
  }
  fprintf(stderr, "\n");
#endif
  internalError();
      }
  }
  // Call the handler function for one readable socket:
  HandlerIterator iter(*fHandlers); 
  HandlerDescriptor* handler;
  // To ensure forward progress through the handlers, begin past the last
  // socket number that we handled:
  if (fLastHandledSocketNum >= 0) {
    while ((handler = iter.next()) != NULL) {
      if (handler->socketNum == fLastHandledSocketNum) break;
    }
    if (handler == NULL) {
      fLastHandledSocketNum = -1;
      iter.reset(); // start from the beginning instead
    }
  }
  while ((handler = iter.next()) != NULL) {
    int sock = handler->socketNum; // alias 别名
    int resultConditionSet = 0;
    //如果对应的事件已经发生
    if (FD_ISSET(sock, &readSet) && FD_ISSET(sock, &fReadSet)/*sanity check*/) 
    resultConditionSet |= SOCKET_READABLE;
    if (FD_ISSET(sock, &writeSet) && FD_ISSET(sock, &fWriteSet)/*sanity check*/) 
    resultConditionSet |= SOCKET_WRITABLE;
    if (FD_ISSET(sock, &exceptionSet) && FD_ISSET(sock, &fExceptionSet)/*sanity check*/) 
    resultConditionSet |= SOCKET_EXCEPTION;
    if ((resultConditionSet&handler->conditionSet) != 0 && handler->handlerProc != NULL) 
    {//如果有处理的事件和对应的条件发生了
      fLastHandledSocketNum = sock;
          // Note: we set "fLastHandledSocketNum" before calling the handler,
          // in case the handler calls "doEventLoop()" reentrantly.
      (*handler->handlerProc)(handler->clientData, resultConditionSet);//处理设置的回调函数
      break;
    }
  }
  if (handler == NULL && fLastHandledSocketNum >= 0) {
    // We didn't call a handler, but we didn't get to check all of them,
    // so try again from the beginning:
    iter.reset();
    while ((handler = iter.next()) != NULL) {
      int sock = handler->socketNum; // alias
      int resultConditionSet = 0;
      if (FD_ISSET(sock, &readSet) && FD_ISSET(sock, &fReadSet)/*sanity check*/) 
      resultConditionSet |= SOCKET_READABLE;
      if (FD_ISSET(sock, &writeSet) && FD_ISSET(sock, &fWriteSet)/*sanity check*/) 
      resultConditionSet |= SOCKET_WRITABLE;
      if (FD_ISSET(sock, &exceptionSet) && FD_ISSET(sock, &fExceptionSet)/*sanity check*/) 
      resultConditionSet |= SOCKET_EXCEPTION;
      if ((resultConditionSet&handler->conditionSet) != 0 && handler->handlerProc != NULL) 
      {
  fLastHandledSocketNum = sock;
     // Note: we set "fLastHandledSocketNum" before calling the handler,
            // in case the handler calls "doEventLoop()" reentrantly.
  (*handler->handlerProc)(handler->clientData, resultConditionSet);
  break;
      }
    }
    if (handler == NULL) fLastHandledSocketNum = -1;//because we didn't call a handler
  }
  // Also handle any newly-triggered event (Note that we do this *after* calling a socket handler,
  // in case the triggered event handler modifies The set of readable sockets.)
  if (fTriggersAwaitingHandling != 0) {
    if (fTriggersAwaitingHandling == fLastUsedTriggerMask) {
      // Common-case optimization for a single event trigger: 优化 如果为最后一个插入的事件
      fTriggersAwaitingHandling &=~ fLastUsedTriggerMask;
      if (fTriggeredEventHandlers[fLastUsedTriggerNum] != NULL) {
  (*fTriggeredEventHandlers[fLastUsedTriggerNum])(fTriggeredEventClientDatas[fLastUsedTriggerNum]);
      }
    } else {
      // Look for an event trigger that needs handling (making sure
      // that we make forward progress through all possible triggers):
      unsigned i = fLastUsedTriggerNum;
      EventTriggerId mask = fLastUsedTriggerMask;
      do {
  i = (i+1)%MAX_NUM_EVENT_TRIGGERS;
  mask >>= 1;
  if (mask == 0) mask = 0x80000000;
  if ((fTriggersAwaitingHandling&mask) != 0) {
   fTriggersAwaitingHandling &=~ mask;
   if (fTriggeredEventHandlers[i] != NULL) {
     (*fTriggeredEventHandlers[i])(fTriggeredEventClientDatas[i]);
   }
   fLastUsedTriggerMask = mask;
   fLastUsedTriggerNum = i;
   break;
  }
      } while (i != fLastUsedTriggerNum);
    }
  }
  // Also handle any delayed event that may have come due.
  fDelayQueue.handleAlarm(); //处理定时器到时的任务
}


消息循环这段代码对我来说还是比较复杂的,主要处理了3类事件(Select,定时器,自定义触发事件),要想看懂上面一段代码,需要对其中的涉及到一些类进行分析,如果这段代码看着有点复杂,那么你需要看懂我前面给的链接,另外还可以参考下面这篇文档https://blog.csdn.net/dongkun152/article/details/106569945


服务器端

对消息循环有一个了解后,我们回到服务器的主函数里,看创建服务器的代码里做了什么

RTSPServer* rtspServer = RTSPServer::createNew(*env, 8554, authDB);

进入到createNew函数里,首先是创建一个TCP的Socket,这个socket用于监听客户端的连接(注意,这个socket只是用于监听客户端的连接,而不是与客户端进行通信的socket,如果这里不懂,请参考https://www.cnblogs.com/liangjf/p/9900928.html

关于创建socket的代码如下,就不说明了,代码里都注释了

/// <summary>
///   1.创建tcp socket,绑定socket到指定端口,socket设置为非阻塞 保持alive
///   2.监听客户端的连接
///   3.设置缓冲区的大小
///   4.如果没有设置端口,则获取socket绑定的端口
/// </summary>
/// <param name="env"></param>
/// <param name="ourPort">传递的端口,如果传递的端口为0,
///可以使用该参数获取socket绑定的端口(随机选择的一个端口)</param>
/// <returns>成功,返回socket,否则返回-1</returns>
int GenericMediaServer::setUpOurSocket(UsageEnvironment& env, Port& ourPort) 
{
  int ourSocket = -1;
  do {
    // The following statement is enabled by default.
    // Don't disable it (by defining ALLOW_SERVER_PORT_REUSE) unless you know what you're doing.
#if !defined(ALLOW_SERVER_PORT_REUSE) && !defined(ALLOW_RTSP_SERVER_PORT_REUSE)
    // ALLOW_RTSP_SERVER_PORT_REUSE is for backwards-compatibility #####
    NoReuse dummy(env); // Don't use this socket if there's already a local server using it
#endif
//创建tcp socket,绑定socket到指定端口,socket设置为非阻塞 保持alive
    ourSocket = setupStreamSocket(env, ourPort, True, True);
    if (ourSocket < 0) break;
    // Make sure we have a big send buffer:设置发送缓冲区大小为50K
    if (!increaseSendBufferTo(env, ourSocket, 50*1024)) break;
    // Allow multiple simultaneous connections: 监听-允许多个同时的连接
    if (listen(ourSocket, LISTEN_BACKLOG_SIZE) < 0) 
    {
      env.setResultErrMsg("listen() failed: ");
      break;
    }
    if (ourPort.num() == 0)
    {//如果没有设置端口号,根据socket获取端口号
      // bind() will have chosen a port for us; return it also:
      if (!getSourcePort(env, ourSocket, ourPort)) break;
    }
    return ourSocket;
  } while (0);
  //如果创建成功,但是监听或者其他设置错误,关闭socket 
  if (ourSocket != -1) ::closeSocket(ourSocket);
  return -1;
}

创建好socket后,将socket传入到RTSPServer的构造函数,然后生成RTSPServer对象。在构造函数里设置了与客户端连接使用socket,服务器使用的端口,服务器多久没通信的断开时间等,我们看看RTSPServer的父类GenericMediaServer的构造函数做了什么

GenericMediaServer::GenericMediaServer(UsageEnvironment& env, int ourSocket, Port ourPort,
       unsigned reclamationSeconds)
  : Medium(env),//设置环境变量和设置媒体的名字,并将媒体加入到相应的hashtable中
    fServerSocket(ourSocket), fServerPort(ourPort), fReclamationSeconds(reclamationSeconds),
    fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)),
    fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS)),
    fClientSessions(HashTable::create(STRING_HASH_KEYS)),
    fPreviousClientSessionId(0)
{
 // so that clients on the same host that are killed don't also kill us
  ignoreSigPipeOnSocket(fServerSocket); 
  // Arrange to handle connections from others:
  //加入到事件循环中,这样只要由客户端连接,就去处理客户端的连接,
  //只是我们要知道第3个参数就是回调函数的参数
  env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket, incomingConnectionHandler, this);
}

除了初始化与服务器相关的一些参数,比如socket,端口,ServerMediaSession哈希表(fServerMediaSessions),ClientConnection哈希表(fClientConnections),客户端Session哈希表(fClientSessions)外,上面代码最后一句还将客户端连接事件加入到了消息循环中,这样,只要客户端有连接就会去执行incomingConnectionHandler.主要你掌握了Select和消息循环机制,相信这里的代码就很好理解。

另外说明一下,incomingConnectionHandler是一个回调函数,live555由于经过了很长的时间,代码规范都是C99,很多语法都不是最新的C++11,比如,C++11写回调函数可以使用lamda,可以很方便,但是live555的回调机制都是如这个incommingConnectionHandler的写法。大家注意就可以了

假如服务器已经处于运行过程,当有一个客户端连接到服务器,这时候就会去执行incomingConnectionHandler函数了。我们看看里面做了什么。可能没看懂消息循环的有点疑惑,去注册Select事件明明只传递了this(自身),为啥incomingConnectionHandler(void* instance, int mask)却有2个参数,第一个参数就是

env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket, incomingConnectionHandler, this);

作为this传递进去的参数,那么第二个参数是代表什么呢,我们从消息循环代码中找到以下代码就知道了,原来mask就是resultConditionSet参数。

......

while ((handler = iter.next()) != NULL) 
  {
    int sock = handler->socketNum; // alias 别名
    int resultConditionSet = 0;
    //如果对应的事件已经发生
    if (FD_ISSET(sock, &readSet) && FD_ISSET(sock, &fReadSet)/*sanity check*/) 
    resultConditionSet |= SOCKET_READABLE;
    if (FD_ISSET(sock, &writeSet) && FD_ISSET(sock, &fWriteSet)/*sanity check*/) 
    resultConditionSet |= SOCKET_WRITABLE;
    if (FD_ISSET(sock, &exceptionSet) && FD_ISSET(sock, &fExceptionSet)/*sanity check*/) 
    resultConditionSet |= SOCKET_EXCEPTION;
    if ((resultConditionSet&handler->conditionSet) != 0 && handler->handlerProc != NULL) 
    {
    //如果有处理的事件和对应的条件发生了
      fLastHandledSocketNum = sock;
          // Note: we set "fLastHandledSocketNum" before calling the handler,
          // in case the handler calls "doEventLoop()" reentrantly.
      (*handler->handlerProc)(handler->clientData, resultConditionSet);//处理设置的回调函数
      break;
    }
  }
......
/// <summary>
///  客户端连接的回调
/// 代码的参数
/// </summary>
/// <param name="instance">传递的服务器自身参数</param>
/// <param name="mask">标志位</param>
void GenericMediaServer::incomingConnectionHandler(void* instance, int/*mask*/)
{
  GenericMediaServer* server = (GenericMediaServer*)instance;
  server->incomingConnectionHandler();
}
void GenericMediaServer::incomingConnectionHandler() 
{
  incomingConnectionHandlerOnSocket(fServerSocket);
}
/// <summary>
///  处理客户端连接(这是客户端连接后真正需要完成的内容)这个函数会在事件循环中调用多次,
///只要有客户端连接,就会调用一次
///  完成的任务主要包括以下几个事件
///  1.accept
///  2.设置与客户端进行通信的socket为非阻塞模式
///  3.增加发送缓冲
///  4. 创建一个与客户端的连接(在其构造函数里就设置了客户端请求的回调)
/// </summary>
/// <param name="serverSocket">服务器端创建的连接</param>
void GenericMediaServer::incomingConnectionHandlerOnSocket(int serverSocket) 
{
  struct sockaddr_in clientAddr;//连接的客户端信息
  SOCKLEN_T clientAddrLen = sizeof clientAddr;
  //接收客户端的连接
  int clientSocket = accept(serverSocket, (struct sockaddr*)&clientAddr, &clientAddrLen);
  if (clientSocket < 0) 
  {
    int err = envir().getErrno();
    if (err != EWOULDBLOCK) 
    { //如果是这个错误,不用处理,非阻塞的时候可能会出现这个错误
      envir().setResultErrMsg("accept() failed: ");
    }
    return;
  }
  // so that clients on the same host that are killed don't also kill us
  ignoreSigPipeOnSocket(clientSocket); 
  (https://blog.csdn.net/Emilio563/article/details/50637592 说明
  makeSocketNonBlocking(clientSocket);//设置为非阻塞模式
  increaseSendBufferTo(envir(), clientSocket, 50*1024);//增加发送缓冲为50K
#ifdef DEBUG
  envir() << "accept()ed connection from " << AddressString(clientAddr).val() << "\n";
#endif
  // Create a new object for handling this connection: 创建一个新的连接,用于收发数据
  (void)createNewClientConnection(clientSocket, clientAddr);
}

只要对以上代码熟悉后,在看看createNewClientConnection里面所做的工作就很清楚了,无非就是创建一个与客户端的连接,然后将其加入到fClientConnections的哈希表中(代表所有的客户端连接)。最后就创建一个读socket事件,当客户端有请求时,就能读取客户端的请求,然后对请求进行处理。创建连接的代码如下:

GenericMediaServer::ClientConnection
::ClientConnection(GenericMediaServer& ourServer, int clientSocket, struct sockaddr_in clientAddr)
  : fOurServer(ourServer), fOurSocket(clientSocket), fClientAddr(clientAddr) 
{
  // Add ourself to our 'client connections' table: 将我们自身的连接加入到服务器的连接 table 中
  this 指代 ClientConnection对象 ,this是一个地址,可以强制转为字符串,作为hash表的key
  fOurServer.fClientConnections->Add((char const*)this, this);
  // Arrange to handle incoming requests:安排处理到来的请求数据的的回调
  //(一般客户端不会无缘无故创建连接,客户端是发送请求后创建的,
  //根据客户端的openconection()函数是在第一个请求之后创建的可以知道),
  //客户端和服务器建立连接后,就需要创建读socket,从客户端读取数据
  //当请求发送时,就会触发incomingRequestHandler回调
  resetRequestBuffer();//重置请求的buffer
  envir().taskScheduler().setBackgroundHandling(fOurSocket, 
  SOCKET_READABLE|SOCKET_EXCEPTION, incomingRequestHandler, this);
}

客户端发送请求后,服务器会执行incomingRequestHandler回调,看看代码做了什么

void GenericMediaServer::ClientConnection::incomingRequestHandler(void* instance, int /*mask*/) 
{
  ClientConnection* connection = (ClientConnection*)instance;
  connection->incomingRequestHandler();
}
/// <summary>
/// 处理客户端请求回调
/// </summary>
void GenericMediaServer::ClientConnection::incomingRequestHandler()
{
  struct sockaddr_in dummy; // 'from' address, meaningless in this case
  //读取数据到请求缓冲中
  int bytesRead = readSocket(envir(), fOurSocket, 
  &fRequestBuffer[fRequestBytesAlreadySeen], fRequestBufferBytesLeft, dummy);
  handleRequestBytes(bytesRead);//处理请求
}

handleRequestBytes函数里对请求的字节数进行判断,如果接收数据大于0,那么接下来判断请求的消息是否时一个完整的请求,如果是,则对请求进行RTSP解析(parseRTSPRequestString),如果解析成功,则根据请求命令进行分别的处理,如果请求失败,则再次判断是否是HTTP请求,如果是,则对其进行处理。最后判断请求字节是否处理完毕,如果没有(可能是流水化请求),那么循环继续到最开始的地方重复以上的动作。最后如果在处理的过程中出错,应该关闭socket或者关闭该连接。代码如下:

oid RTSPServer::RTSPClientConnection::handleRequestBytes(int newBytesRead)
{
  int numBytesRemaining = 0;//剩余的字节空间
  ++fRecursionCount;
  do
  {
  RTSPServer::RTSPClientSession* clientSession = NULL;
  //newBytesRead < 0代表recvfrom接收到的字节为0(也算错)或者读取返回一个小于0的错误码,
  //newBytesRead = 0 代表本次没有读取到数据,但是不是错误。
  if (newBytesRead < 0 || (unsigned)newBytesRead >= fRequestBufferBytesLeft)
  {
    // Either the client socket has died, or the request was too big for us.
    // (unsigned)newBytesRead >= fRequestBufferBytesLeft
    //代表剩余的缓冲字节已经不足以容纳请求的字节数,也直接关闭请求
    // Terminate this connection: 终止连接
#ifdef DEBUG
    fprintf(stderr, "RTSPClientConnection[%p]::handleRequestBytes() read %d new bytes (of %d); 
    terminating connection!\n", this, newBytesRead, fRequestBufferBytesLeft);
#endif
    fIsActive = False;//发生错误,直接设置活跃的标志为False,代表不能在接收请求
    break;
  }
  //请求消息是否结束的标志
  Boolean endOfMsg = False;
  //请求消息开始的地方
  unsigned char* ptr = &fRequestBuffer[fRequestBytesAlreadySeen];
#ifdef DEBUG
  ptr[newBytesRead] = '\0';
  fprintf(stderr, "RTSPClientConnection[%p]::handleRequestBytes() %s %d new bytes:%s\n",
    this, numBytesRemaining > 0 ? "processing" : "read", newBytesRead, ptr);
#endif
  if (fClientOutputSocket != fClientInputSocket && numBytesRemaining == 0)
  {
    // We're doing RTSP-over-HTTP tunneling, and input commands
    // are assumed to have been Base64-encoded.
    // We therefore Base64-decode as much of this new data as we can
    // (i.e., up to a multiple of 4 bytes).
    //RTSP-Over-HTTP的关键(同时也是全部内容)在于:让RTSP报文通过HTTP端口(即80端口)通信。
    //我们知道RTSP的标准端口是554,但是由于各种不同的防火墙等安全策略配置的原因,
    //客户端在访问554端口时可能存在限制,从而无法正常传输RTSP报文。
    // 但是HTTP端口(80端口)是普遍开放的,于是就有了让RTSP报文通过80端口透传的想法,即RTSP-Over-HTTP。
    // But first, we remove any whitespace that may be in the input data:
    unsigned toIndex = 0;
    for (int fromIndex = 0; fromIndex < newBytesRead; ++fromIndex)
    {
    char c = ptr[fromIndex];
    if (!(c == ' ' || c == '\t' || c == '\r' || c == '\n'))
    { // not 'whitespace': space,tab,CR,NL
      ptr[toIndex++] = c;
    }
    }
    newBytesRead = toIndex;
    unsigned numBytesToDecode = fBase64RemainderCount + newBytesRead;
    unsigned newBase64RemainderCount = numBytesToDecode % 4;
    numBytesToDecode -= newBase64RemainderCount;
    if (numBytesToDecode > 0)
    {
    ptr[newBytesRead] = '\0';
    unsigned decodedSize;
    unsigned char* decodedBytes = 
    base64Decode((char const*)(ptr - fBase64RemainderCount), numBytesToDecode, decodedSize);
#ifdef DEBUG
    fprintf(stderr, "Base64-decoded %d input bytes into %d new bytes:", 
    numBytesToDecode, decodedSize);
    for (unsigned k = 0; k < decodedSize; ++k) fprintf(stderr, "%c", decodedBytes[k]);
    fprintf(stderr, "\n");
#endif
    // Copy the new decoded bytes in place of the old ones
    // (we can do this because there are fewer decoded bytes than original):
    unsigned char* to = ptr - fBase64RemainderCount;
    for (unsigned i = 0; i < decodedSize; ++i) *to++ = decodedBytes[i];
    // Then copy any remaining (undecoded) bytes to the end:
    for (unsigned j = 0; j < newBase64RemainderCount; ++j) *to++ = 
    (ptr - fBase64RemainderCount + numBytesToDecode)[j];
    newBytesRead = decodedSize - fBase64RemainderCount + newBase64RemainderCount;
    // adjust to allow for the size of the new decoded data (+ remainder)
    delete[] decodedBytes;
    }
    fBase64RemainderCount = newBase64RemainderCount;
  }
        //最开始设置在请求的前3个位置,+2还在请求的前一个位置
  unsigned char* tmpPtr = fLastCRLF + 2;
  if (fBase64RemainderCount == 0)//没有处理的base64数据,HTTP-OVER-RTSP需要这个数据
  { // no more Base-64 bytes remain to be read/decoded
    // Look for the end of the message: <CR><LF><CR><LF>  
    //找到消息的<CR><LF><CR><LF>处,(头部分的地方)
    if (tmpPtr < fRequestBuffer) tmpPtr = fRequestBuffer;
    while (tmpPtr < &ptr[newBytesRead - 1])//一直循环遍历到请求消息结尾的地方
    {
    if (*tmpPtr == '\r' && *(tmpPtr + 1) == '\n')
    {
        //检索到\r\n\r\n就代表结束(只检索头部分,不检索内容部分)
      if (tmpPtr - fLastCRLF == 2) 
      { // This is it:
      endOfMsg = True;
      break;
      }
      fLastCRLF = tmpPtr;
    }
    ++tmpPtr;
    }
  }
  //通过上面的代码,fLastCRLF指向<CR><LF><CR><LF>的第一个<CR>位置
  fRequestBufferBytesLeft -= newBytesRead;//剩余可用空间减少
  fRequestBytesAlreadySeen += newBytesRead;//增加已用空间
  //不是一次完整的请求,可能这次read的时候还没读完,需要和下次读取的数据一起和并
  if (!endOfMsg) break; // subsequent reads will be needed to complete the request 
  // Parse the request string into command name and 'CSeq', then handle the command:
  //解析'CSeq'和请求命令的请求,然后处理命令。
  fRequestBuffer[fRequestBytesAlreadySeen] = '\0';
  char cmdName[RTSP_PARAM_STRING_MAX];//
  char urlPreSuffix[RTSP_PARAM_STRING_MAX];//
  char urlSuffix[RTSP_PARAM_STRING_MAX];//url除了ip和端口部分;比如h264ESVideoTest
  char cseq[RTSP_PARAM_STRING_MAX];//序列号
  char sessionIdStr[RTSP_PARAM_STRING_MAX];
  unsigned contentLength = 0;
  Boolean playAfterSetup = False;
  //(临时的,为了解析fLastCRLF[0]='\r' fLastCRLF[1]='\n'),
  //fLastCRLF + 2 - fRequestBuffer的长度比总长度小2,没有计算空行的\r\n
  fLastCRLF[2] = '\0'; // temporarily, for parsing fLastCRLF[0]='\r' fLastCRLF[1]='\n'
  Boolean parseSucceeded = parseRTSPRequestString((char*)fRequestBuffer, 
  fLastCRLF + 2 - fRequestBuffer,
    cmdName, sizeof cmdName,
    urlPreSuffix, sizeof urlPreSuffix,
    urlSuffix, sizeof urlSuffix,
    cseq, sizeof cseq,
    sessionIdStr, sizeof sessionIdStr,
    contentLength);
  fLastCRLF[2] = '\r'; // restore its value 
        // Check first for a bogus "Content-Length" value that would cause a pointer wraparound:
  //tmpPtr+2刚好在头与内容的空行的开头处+字节的长度值,
  //如果长度值是伪造的,那么指针会回绕,那么值是不正确的。
  if (tmpPtr + 2 + contentLength < tmpPtr + 2)//感觉这里的逻辑不太好懂
  {
#ifdef DEBUG
    fprintf(stderr, "parseRTSPRequestString() returned a bogus \"Content-Length:\" value: 0x%x (%d)\n", contentLength, (int)contentLength);
#endif
    contentLength = 0;
    parseSucceeded = False;
  }
  if (parseSucceeded) //解析成功
  {
#ifdef DEBUG
    fprintf(stderr, "parseRTSPRequestString() succeeded, returning cmdName \"%s\", urlPreSuffix \"%s\", urlSuffix \"%s\", CSeq \"%s\", Content-Length %u, with %d bytes following the message.\n", cmdName, urlPreSuffix, urlSuffix, cseq, contentLength, ptr + newBytesRead - (tmpPtr + 2));
#endif
    // If there was a "Content-Length:" header, 
    //then make sure we've received all of the data that it specified:
    // we still need more data; subsequent reads will give it to us  
    //如果内容(content)没有获取完成,那么这次也不处理,等待下次一起处理
    if (ptr + newBytesRead < tmpPtr + 2 + contentLength) break; 
    // If the request included a "Session:" id, and it refers to a client session that's
    // current ongoing, then use this command to indicate 'liveness' on that client session:
    Boolean const requestIncludedSessionId = sessionIdStr[0] != '\0';
    if (requestIncludedSessionId)//请求中包含session id
    {
    clientSession = (RTSPServer::RTSPClientSession*)(fOurRTSPServer.lookupClientSession(sessionIdStr));
    //每次发送消息后,就保证该session处于活动发送方,重置定时,保证该客户端的session是活动的
    if (clientSession != NULL) clientSession->noteLiveness();
    }
    // We now have a complete RTSP request.
    // Handle the specified command (beginning with commands that are session-independent):
    //获取到完整的请求,开始处理请求
    fCurrentCSeq = cseq;//当前序列号
    //option请求有2种目的:session存活(在前几行代码已经做了),第二个就是普通的OPTIONS请求
    if (strcmp(cmdName, "OPTIONS") == 0) 
    {
    // If the "OPTIONS" command included a "Session:" id for a session that doesn't exist,
    // then treat this as an error:
    if (requestIncludedSessionId && clientSession == NULL)// "OPTIONS" 请求中包含一个"Session" Id,但是实际上session不存在,这说明时一个为了保持会话活动性而发送的请求
    {
      handleCmd_sessionNotFound();
    }
    else //普通的OPTIONS请求
    {
      // Normal case:
      handleCmd_OPTIONS();
    }
    }
    //难道GET_PARAMETER和SET_PARAMETER的请求都是* ?
    else if (urlPreSuffix[0] == '\0' && urlSuffix[0] == '*' && urlSuffix[1] == '\0')
    {
    // The special "*" URL means: an operation on the entire server.  
    //This works only for GET_PARAMETER and SET_PARAMETER:
    if (strcmp(cmdName, "GET_PARAMETER") == 0)//实际上,实现没有做任何功能
    {
      handleCmd_GET_PARAMETER((char const*)fRequestBuffer);
    }
    else if (strcmp(cmdName, "SET_PARAMETER") == 0)//实际上,实现没有做任何功能
    {
      handleCmd_SET_PARAMETER((char const*)fRequestBuffer);
    }
    else
    {
      handleCmd_notSupported();
    }
    }
    else if (strcmp(cmdName, "DESCRIBE") == 0)
    {
    handleCmd_DESCRIBE(urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);
    }
    else if (strcmp(cmdName, "SETUP") == 0)
    {
    Boolean areAuthenticated = True;
    if (!requestIncludedSessionId)
    {
      // No session id was present in the request.
      // So create a new "RTSPClientSession" object for this request.
      // But first, make sure that we're authenticated to perform this command:
      char urlTotalSuffix[2 * RTSP_PARAM_STRING_MAX];
      // enough space for urlPreSuffix/urlSuffix'\0'
      urlTotalSuffix[0] = '\0';
      if (urlPreSuffix[0] != '\0')
      {
      strcat(urlTotalSuffix, urlPreSuffix);
      strcat(urlTotalSuffix, "/");
      }
      strcat(urlTotalSuffix, urlSuffix);
      if (authenticationOK("SETUP", urlTotalSuffix, (char const*)fRequestBuffer))
      {
      clientSession =
      (RTSPServer::RTSPClientSession*)fOurRTSPServer.createNewClientSessionWithId();
      }
      else
      {
      areAuthenticated = False;
      }
    }
    if (clientSession != NULL)
    {
      clientSession->handleCmd_SETUP(this, urlPreSuffix, urlSuffix, (char
      const*)fRequestBuffer);
      playAfterSetup = clientSession->fStreamAfterSETUP;
    }
    else if (areAuthenticated)
    {
      handleCmd_sessionNotFound();
    }
    }
    else if (strcmp(cmdName, "TEARDOWN") == 0
    || strcmp(cmdName, "PLAY") == 0
    || strcmp(cmdName, "PAUSE") == 0
    || strcmp(cmdName, "GET_PARAMETER") == 0
    || strcmp(cmdName, "SET_PARAMETER") == 0)
    {
    if (clientSession != NULL)//这些方法都需要session
    {
      clientSession->handleCmd_withinSession(this, cmdName, urlPreSuffix, urlSuffix, 
      (char const*)fRequestBuffer);
    }
    else
    {
      handleCmd_sessionNotFound();
    }
    }
    else if (strcmp(cmdName, "REGISTER") == 0 || strcmp(cmdName, "DEREGISTER") == 0)
    {
    // Because - unlike other commands - an implementation of this command needs
    // the entire URL, we re-parse the command to get it:
    char* url = strDupSize((char*)fRequestBuffer);
    if (sscanf((char*)fRequestBuffer, "%*s %s", url) == 1)
    {
      // Check for special command-specific parameters in a "Transport:" header:
      Boolean reuseConnection, deliverViaTCP;
      char* proxyURLSuffix;
      parseTransportHeaderForREGISTER((const char*)fRequestBuffer, 
      reuseConnection, deliverViaTCP, proxyURLSuffix);
      handleCmd_REGISTER(cmdName, url, urlSuffix, (char const*)fRequestBuffer, 
      reuseConnection, deliverViaTCP, proxyURLSuffix);
      delete[] proxyURLSuffix;
    }
    else
    {
      handleCmd_bad();
    }
    delete[] url;
    }
    else
    {
    // The command is one that we don't handle: 不支持的方法
    handleCmd_notSupported();
    }
  }
  else //解析RTSP命令失败,判断是否是HTTP命令
  {
#ifdef DEBUG
    fprintf(stderr, "parseRTSPRequestString() failed; checking now for HTTP commands (for RTSP-over-HTTP tunneling)...\n");
#endif
    // The request was not (valid) RTSP, but check for a special case: 
    //HTTP commands (for setting up RTSP-over-HTTP tunneling):
    char sessionCookie[RTSP_PARAM_STRING_MAX];
    char acceptStr[RTSP_PARAM_STRING_MAX];
    *fLastCRLF = '\0'; // temporarily, for parsing
    parseSucceeded = parseHTTPRequestString(cmdName, sizeof cmdName,
    urlSuffix, sizeof urlPreSuffix,
    sessionCookie, sizeof sessionCookie,
    acceptStr, sizeof acceptStr);
    *fLastCRLF = '\r';
    if (parseSucceeded) 
    {
#ifdef DEBUG
    fprintf(stderr, "parseHTTPRequestString() succeeded, returning cmdName \"%s\", urlSuffix \"%s\", sessionCookie \"%s\", acceptStr \"%s\"\n", cmdName, urlSuffix, sessionCookie, acceptStr);
#endif
    // Check that the HTTP command is valid for RTSP-over-HTTP tunneling: There must be a 'session cookie'.
    Boolean isValidHTTPCmd = True;//是否是HTTP命令
    if (strcmp(cmdName, "OPTIONS") == 0) {
      handleHTTPCmd_OPTIONS();
    }
    else if (sessionCookie[0] == '\0') {
      // There was no "x-sessioncookie:" header.  If there was an "Accept: application/x-rtsp-tunnelled" header,
      // then this is a bad tunneling request.  Otherwise, assume that it's an attempt to access the stream via HTTP.
      if (strcmp(acceptStr, "application/x-rtsp-tunnelled") == 0) {
      isValidHTTPCmd = False;
      }
      else {
      handleHTTPCmd_StreamingGET(urlSuffix, (char const*)fRequestBuffer);
      }
    }
    else if (strcmp(cmdName, "GET") == 0) {
      handleHTTPCmd_TunnelingGET(sessionCookie);
    }
    else if (strcmp(cmdName, "POST") == 0) {
      // We might have received additional data following the HTTP "POST" command - i.e., the first Base64-encoded RTSP command.
      // Check for this, and handle it if it exists:
      unsigned char const* extraData = fLastCRLF + 4;
      unsigned extraDataSize = &fRequestBuffer[fRequestBytesAlreadySeen] - extraData;
      if (handleHTTPCmd_TunnelingPOST(sessionCookie, extraData, extraDataSize))
      {
      // We don't respond to the "POST" command, and we go away:
      fIsActive = False;
      break;
      }
    }
    else
    {
      isValidHTTPCmd = False;
    }
    if (!isValidHTTPCmd)
    {
      handleHTTPCmd_notSupported();
    }
    }
    else 
    {
#ifdef DEBUG
    fprintf(stderr, "parseHTTPRequestString() failed!\n");
#endif
    handleCmd_bad();
    }
    }
#ifdef DEBUG
  fprintf(stderr, "sending response: %s", fResponseBuffer);
#endif
         //发送响应给客户端(fResponseBuffer在各个handCmd_里进行设置的)
  send(fClientOutputSocket, (char const*)fResponseBuffer, strlen((char*)fResponseBuffer), 0);
  if (playAfterSetup)
  {
    // The client has asked for streaming to commence now, rather than after a
    // subsequent "PLAY" command.  So, simulate the effect of a "PLAY" command:
    clientSession->handleCmd_withinSession(this, "PLAY", urlPreSuffix, urlSuffix, (char
    const*)fRequestBuffer);
  }
  // Check whether there are extra bytes remaining in the buffer,
  // after the end of the request (a rare case).
  // If so, move them to the front of our buffer, and keep processing it,
  // because it might be a following, pipelined request.
  //检查buffer中是否有额外的字节,在这个请求的末尾(极少情况下)
  //如果是,已到他们到缓冲的最开始地方,然后继续处理它,这种情况可能出现的原因是流水线的需求。
  //头和空行的字节长度+内容字节长度
  unsigned requestSize = (fLastCRLF + 4 - fRequestBuffer) + contentLength;
  numBytesRemaining = fRequestBytesAlreadySeen - requestSize;
  // to prepare for any subsequent request 
  //处理完请求后,需要重置缓冲区,然后继续后续的请求 seen设置为0,left设置为最大值
  resetRequestBuffer(); 
  if (numBytesRemaining > 0)
  {
      //将后续请求移动到buffer最开始的位置
    memmove(fRequestBuffer, &fRequestBuffer[requestSize], numBytesRemaining);
    newBytesRead = numBytesRemaining;//设置读取的字节数为剩余请求的字节数
  }
  } while (numBytesRemaining > 0);
  --fRecursionCount;
  if (!fIsActive) //不能接收请求
  {
  if (fRecursionCount > 0)//如果还有没有处理的字节
    closeSockets();
  else //所有请求都处理完了,可以删除该连接了
    delete this;
  // Note: The "fRecursionCount" test is for a pathological situation 
  //where we reenter the event loop and get called recursively
  // while handling a command (e.g., while handling a "DESCRIBE", to get a SDP description).
  // In such a case we don't want to actually delete 
  //ourself until we leave the outermost call.
  }
}

基本上服务器端代码就是以上这些了,不过还有很多细节,没有说明,比如服务器端是如何处理各种RTSP请求,然后发回给客户端的。这些需要读取对RTSP格式和RTSP规范有具体的认识后才知道代码的实现。


客户端主进程

int main(int argc, char** argv) 
{
  // Begin by setting up our usage environment:
  TaskScheduler* scheduler = BasicTaskScheduler::createNew();
  UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler);
  // We need at least one "rtsp://" URL argument:
  if (argc < 2)
  {
  usage(*env, argv[0]);
  return 1;
  }
  // There are argc-1 URLs: argv[1] through argv[argc-1].  Open and start streaming each one:
  //url和rtspclient是一一对应的(一个客户端处理一个url流)
  for (int i = 1; i <= argc - 1; ++i) 
  {
  openURL(*env, argv[0], argv[i]);
  }
  // All subsequent activity takes place within the event loop:
  env->taskScheduler().doEventLoop(&eventLoopWatchVariable);
  // This function call does not return, unless, at some point in time, "eventLoopWatchVariable" gets set to something non-zero.
  return 0;
  // If you choose to continue the application past this point (i.e., if you comment out the "return 0;" statement above),
  // and if you don't intend to do anything more with the "TaskScheduler" and "UsageEnvironment" objects,
  // then you can also reclaim the (small) memory used by these objects by uncommenting the following code:
  /*
   env->reclaim(); env = NULL;
   delete scheduler; scheduler = NULL;
  */
}

客户端主函数很简单,除了openURL(),其他都在服务器端代码进行了说明,openURL主要是打卡一路流,然后对起进行处理,testOpenClient可以打开多路流,所以主函数使用了for循环来进行处理,看看openURL代码:

/// <summary>
/// 打开URL,URL和客户端是一一对应的,比如我们使用VLC打开一个网络媒体流(rtsp:127.0.0.1/test.264),就会使用客户端对该流进行播放。
/// </summary>
/// <param name="env"></param>
/// <param name="progName"></param>
/// <param name="rtspURL"></param>
void openURL(UsageEnvironment& env, char const* progName, char const* rtspURL) 
{
  // Begin by creating a "RTSPClient" object.  
  //Note that there is a separate "RTSPClient" object for each stream that we wish
  // to receive (even if more than stream uses the same "rtsp://" URL).
  RTSPClient* rtspClient = ourRTSPClient::createNew(env, rtspURL, RTSP_CLIENT_VERBOSITY_LEVEL, progName);
  if (rtspClient == NULL) 
  {
  env << "Failed to create a RTSP client for URL \"" << rtspURL << "\": " << env.getResultMsg() << "\n";
  return;
  }
  //当客户端数量为0时(所有流都完成了工作或者发生了异常,调用shutdownStream一次,该数就减去1),就可以退出客户端了
  ++rtspClientCount;
  // Next, send a RTSP "DESCRIBE" command, to get a SDP description for the stream.
  // Note that this command - like all RTSP commands - is sent asynchronously; we do not block, waiting for a response.
  // Instead, the following function call returns immediately, 
  //and we handle the RTSP response later, from within the event loop:
  rtspClient->sendDescribeCommand(continueAfterDESCRIBE);
}

客户端

openURL里使用createNew创建了RTSPClient.RTSPClient构造函数主要是对客户端需要的参数进行设置,比如访问服务器的url,客户端代理头,响应缓冲区等。代码都很好懂,这里就不说了。

接下来就是发送DESCRIBE请求,这里说说发送请求的代码基本上都差不多,主要分为以下2个步骤:

1.构造各种请求对象(RequestRecord)

2.发送请求( sendRequest)

看看发送DESCRIBE请求的代码:

/// <summary>
/// 发送DESCRIBE请求
/// </summary>
/// <param name="responseHandler">处理响应后的回调处理器</param>
/// <param name="authenticator"></param>
/// <returns></returns>
unsigned RTSPClient::sendDescribeCommand(responseHandler* responseHandler, Authenticator* authenticator)
{
  if (fCurrentAuthenticator < authenticator) //fCurrentAuthenticator 默认是有值的,只不过各个参数都为0
  fCurrentAuthenticator = *authenticator;
  return sendRequest(new RequestRecord(++fCSeq, "DESCRIBE", responseHandler));
}

对responseHandler 参数说明如下:

/// <summary>
  /// 函数指针(类型),当发送请求后,服务器返回响应后,客户端(RTSPClient)就会执行相应的回调函数,
  ///这个函数指针一般是作为请求的参数传递进去(即去执行应用端请求后想做的事情)
  /// 一个函数被调用,作为对RTSP请求的响应,参数如下:
  /// rtspClient:发送原始命令的"RTSPClient" 对象
  /// resultCode: 如果为0,命令成功完成,如果非0,命令没有成功完成,
  ///resultCode代表如下错误:正数代表RTSP错误码(如 404-not found),
  ///负数代表socket/network 错误,0-"resultCode" is the standard "errno" code.
  /// resultString:一个以'\0'结尾的字符串,如果响应返回的话,否则为 NULL.
  /// 特别的是:"DESCRIBE" 命令成功时返回媒体会话的SDP描述信息,
  ///"OPTIONS"命令返回允许的命令。即使 resultCode是非0,这个参数也可以存在,比如错误消息。
  /// 当然,当 resultCode == 0时,这个参数也可以为NULL(即 命令成功了,但是无任何结果头)。
  /// 另外必须注意resultString是动态分配的,不使用时必须被handler(或者caller)被调用者使用delete[]释放
  /// </summary>
  typedef void (responseHandler)(RTSPClient* rtspClient, int resultCode, char* resultString);

比如在openURL()函数最后一行代码:rtspClient->sendDescribeCommand(continueAfterDESCRIBE)continueAfterDESCRIBE就是服务器对DESCIBE请求响应后,用户端需要去完成的功能。我们可以先看看做了什么工作:

// Implementation of the RTSP 'response handlers':
/// <summary>
/// 发送DESCRIBE请求之后,服务器回复响应后,应该处理的回调函数
/// </summary>
/// <param name="rtspClient"></param>
/// <param name="resultCode">0成功,大于0是RTSP错误码,小于0是网络错误码</param>
/// <param name="resultString">是动态分配的字符串,必须使用delete[]进行释放</param>
void continueAfterDESCRIBE(RTSPClient* rtspClient, int resultCode, char* resultString) 
{
  do {
  UsageEnvironment& env = rtspClient->envir(); // alias 
  StreamClientState& scs = ((ourRTSPClient*)rtspClient)->scs; // alias 
  if (resultCode != 0) //错误的情况
  {
    env << *rtspClient << "Failed to get a SDP description: " << resultString << "\n";
    delete[] resultString;
    break;
  }
  char* const sdpDescription = resultString;//descibe请求响应的字符串是sdp描述
  env << *rtspClient << "Got a SDP description:\n" << sdpDescription << "\n";
  // Create a media session object from this SDP description:
  scs.session = MediaSession::createNew(env, sdpDescription);//根据sdp描述信息创建会话及其子会话
  delete[] sdpDescription; // because we don't need it anymore
  if (scs.session == NULL)
  {
    env << *rtspClient << "Failed to create a MediaSession object from the SDP description: " <<
    env.getResultMsg() << "\n";
    break;
  }
  else if (!scs.session->hasSubsessions()) //没有子会话也算错(也就是没有对应的流,不用获取对应的数据,直接关闭该客户端)
  {
    env << *rtspClient << "This session has no media subsessions (i.e., no \"m=\" lines)\n";
    break;
  }
  // Then, create and set up our data source objects for the session. 
  // We do this by iterating over the session's 'subsessions',
  // calling "MediaSubsession::initiate()", and then sending a RTSP "SETUP" command, on each one.
  // (Each 'subsession' will have its own data source.)
  //然后,为session创建和设置相应的数据源,调用"MediaSubsession::initiate()"遍历会话的 'subsessions',
  //然而为每个'subsessions'发送RTSP'SETUP'命令,
  scs.iter = new MediaSubsessionIterator(*scs.session);//创建子会话迭代器
  setupNextSubsession(rtspClient);
  return;
  } while (0);
  // An unrecoverable error occurred with this stream.
  shutdownStream(rtspClient);
}

continueAfterDESCRIBE 主要完成的工作主要包括:1.根据sdp描述信息创建会话及其子会话 2.为每个子会话发送SETUP请求

由于篇幅原因,我就不在这里继续展开MediaSession::createNew和setupNextSubsession实现的工作了,有兴趣的朋友可以给我留言,继续后面的分析。

我们继续回到RTSPClient::sendDescribeCommand这个函数进行分析。里面就处理了sendRequest了,然后就返回了。我们看看sendRequest:


unsigned RTSPClient::sendRequest(RequestRecord* request)
{
  char* cmd = NULL;
  do {
  Boolean connectionIsPending = False;//该请求是否需要连接的标志
  //不为空,说明先前有请求(这次请求不是第一个请求)为了创建连接,
  // 但是openConnection()却没有连接服务器成功,
  //这时候又发来一个请求,需要将其加入到等待连接的请求队列中。直到连接上了将该队列清空
  //注意fRequestsAwaitingConnection哈希表会在该客户端连接到服务器之后清空,
  // 并依次将fRequestsAwaitingConnection中所有等待连接的请求的进行处理
  if (!fRequestsAwaitingConnection.isEmpty()) 
  {
    // A connection is currently pending (with at least one enqueued request).  Enqueue this request also:
    //有需要进行连接后的请求(至少有一个队列的请求),那么需要把这个请求入队。
    connectionIsPending = True;
  }
  else if (fInputSocketNum < 0)//如果还没有创建连接,就先创建连接
  { // we need to open a connection
    int connectResult = openConnection();
    if (connectResult < 0)
    break; // an error occurred
    服务器没有及时响应该连接(可能服务器比较忙,这时候如果我们在次发送一个请求,
    ///fRequestsAwaitingConnection就不为空,这时候就继续将请求加入到没有连接的socket上)
    else if (connectResult == 0)
    {
    // A connection is pending
    connectionIsPending = True;
    } // else the connection succeeded.  Continue sending the command.
  }
  该请求需要先进行连接,放入到等待连接待处理的请求的队列中,并直接返回(本次不发送请求给服务器)
  if (connectionIsPending) 
  {
    fRequestsAwaitingConnection.enqueue(request);
    return request->cseq();
  }
  // If requested (and we're not already doing it, or have done it), 
  // set up the special protocol for tunneling RTSP-over-HTTP:
  //隧道处理RTSP:fTunnelOverHTTPPortNum非0 ,请求命令不是GET请求并且输入输出socket相同
  if (fTunnelOverHTTPPortNum != 0 && strcmp(request->commandName(), "GET") != 0 
    && fOutputSocketNum == fInputSocketNum)
  {
    if (!setupHTTPTunneling1()) break;
    fRequestsAwaitingHTTPTunneling.enqueue(request);//成功后放到HTTPTunneling对列中
    return request->cseq();//等待处理
  }
  // Construct and send the command:
  // First, construct command-specific headers that we need:
  char* cmdURL = fBaseURL; // by default 请求的url(先设置为一个默认值)
  Boolean cmdURLWasAllocated = False; //cmduRL是否分配的标志
  char const* protocolStr = "RTSP/1.0"; // by default 协议字符串
  char* extraHeaders = (char*)""; // by default 扩展的头
  Boolean extraHeadersWereAllocated = False; //扩张的头是否是动态分配的标志
  char* contentLengthHeader = (char*)""; // by default 内容长度头(请求可能有内容,比如ANNOUNCE请求)
  Boolean contentLengthHeaderWasAllocated = False;
  if (!setRequestFields(request, cmdURL, cmdURLWasAllocated, protocolStr,
    extraHeaders, extraHeadersWereAllocated))
  {
    break;
  }
  char const* contentStr = request->contentStr(); // by default
  if (contentStr == NULL)
    contentStr = "";
  unsigned contentStrLen = strlen(contentStr);
  if (contentStrLen > 0)  //构建content-length头
  {
    char const* contentLengthHeaderFmt =
    "Content-Length: %d\r\n";
    unsigned contentLengthHeaderSize = strlen(contentLengthHeaderFmt)
    + 20 /* max int len */;
    contentLengthHeader = new char[contentLengthHeaderSize];
    sprintf(contentLengthHeader, contentLengthHeaderFmt, contentStrLen);
    contentLengthHeaderWasAllocated = True;
  }
  char* authenticatorStr = createAuthenticatorString(request->commandName(), fBaseURL);
  char const* const cmdFmt =
    "%s %s %s\r\n"
    "CSeq: %d\r\n"
    "%s"//权限
    "%s" //User-Agent
    "%s"//扩展头(不同的请求不同)
    "%s"//内容头 Content-Length
    "\r\n"
    "%s";//内容
  unsigned cmdSize = strlen(cmdFmt)
    + strlen(request->commandName()) + strlen(cmdURL) + strlen(protocolStr)
    + 20 /* max int len for cseq */
    + strlen(authenticatorStr)
    + fUserAgentHeaderStrLen
    + strlen(extraHeaders)
    + strlen(contentLengthHeader)
    + contentStrLen;
  cmd = new char[cmdSize];
  sprintf(cmd, cmdFmt,
    request->commandName(), cmdURL, protocolStr,
    request->cseq(),
    authenticatorStr,
    fUserAgentHeaderStr,
    extraHeaders,
    contentLengthHeader,
    contentStr);
  delete[] authenticatorStr;
  if (cmdURLWasAllocated)
    delete[] cmdURL;
  if (extraHeadersWereAllocated)
    delete[] extraHeaders;
  if (contentLengthHeaderWasAllocated)
    delete[] contentLengthHeader;
  if (fVerbosityLevel >= 1) envir() << "Sending request: " << cmd << "\n";
  if (fTunnelOverHTTPPortNum != 0 && strcmp(request->commandName(), "GET") 
    != 0 && strcmp(request->commandName(), "POST") != 0)
  {
    // When we're tunneling RTSP-over-HTTP, we Base-64-encode the request before we send it.
    // (However, we don't do this for the HTTP "GET" and "POST" commands that we use to set up the tunnel.)
    //使用HTTP隧道技术发送RTSP,对命令进行base64编码(注意,我们不能对http的get和post请求使用)
    char* origCmd = cmd;
    cmd = base64Encode(origCmd, strlen(cmd));
    if (fVerbosityLevel >= 1) envir() << "\tThe request was base-64 encoded to: " << cmd << "\n\n";
    delete[] origCmd;
  }
  if (send(fOutputSocketNum, cmd, strlen(cmd), 0) < 0) //发送命令
  {
    char const* errFmt = "%s send() failed: ";
    unsigned const errLength = strlen(errFmt) + strlen(request->commandName());
    char* err = new char[errLength];
    sprintf(err, errFmt, request->commandName());
    envir().setResultErrMsg(err);
    delete[] err;
    break;
  }
  // The command send succeeded, so enqueue the request record, 
  // so that its response (when it comes) can be handled.
  // However, note that we do not expect a response to a POST 
  //command with RTSP-over-HTTP, so don't enqueue that.
  int cseq = request->cseq();
  // 隧道请求(get)和rtsp请求都放入到等待响应的请求队列中
  if (fTunnelOverHTTPPortNum == 0 || strcmp(request->commandName(), "POST") != 0)
  {
    fRequestsAwaitingResponse.enqueue(request);
  }
  else
  {
    delete request;
  }
  delete[] cmd;
  return cseq;
  } while (0);
  // An error occurred, so call the response handler immediately (indicating the error):
  delete[] cmd;
  handleRequestError(request);//socket 错误
  delete request;
  return 0;
}

sendRequest的逻辑是先判断该客户端是否有等待连接的请求,如果是,直接将这次请求加入到等待连接请求的哈希表中,直到连接到服务器后,这次请求会随先前的请求一起被处理,通常这种情况比较少见,另外如果是客户端第一次发起请求,那么就首先创建和服务器的连接int connectResult = openConnection();然后根据不同的请求构造请求头和内容(setRequestFields函数),然后使用send发送给服务器端,这样服务器就收到客户端的请求了(回到前面,看看哪个函数会处理服务器是如何收到客户端的请求吧?),然后使用回调函数进行处理,处理后将响应发送给客户端,怎么到现在还没有看到客户端是如何接收服务器端发送的响应呢?

带着问题,我们看看openConnection()里做了什么吧?

/// <summary>
///  打开连接
/// </summary>
/// <returns>成功连接,返回1,-1打开连接失败,0-等待服务器连接成功()</returns>
int RTSPClient::openConnection()
{
  do
  {
  // Set up a connection to the server.  Begin by parsing the URL:
  char* username;
  char* password;
  NetAddress destAddress;
  portNumBits urlPortNum;
  char const* urlSuffix;//后缀:stream-name
  if (!parseRTSPURL(envir(), fBaseURL, username, password, destAddress, urlPortNum, &urlSuffix))//解析fBaseURL
    break;//直接退出  
  //判断传输端口,可能使用隧道技术(fTunnelOverHTTPPortNum不为0)
  portNumBits destPortNum = fTunnelOverHTTPPortNum == 0 ? urlPortNum : fTunnelOverHTTPPortNum;
  //如果用户名和密码不为空,设置权限对象
  if (username != NULL || password != NULL) 
  {
    fCurrentAuthenticator.setUsernameAndPassword(username, password);
    delete[] username;
    delete[] password;
  }
  // We don't yet have a TCP socket (or we used to have one, but it got closed).  Set it up now.
  //创建一个tcp socket,客户端端口号设置为0,是绑定的端口号,由系统随机生成一个端口(客户端和服务器端通信,
  //不需要客户端提供一个固定的端口,服务器会根据fInputSocketNum得到客户端绑定该socket绑定的随机端口号)
  fInputSocketNum = setupStreamSocket(envir(), 0);
  if (fInputSocketNum < 0) break;
  ignoreSigPipeOnSocket(fInputSocketNum); // so that servers on the same host that get killed don't also kill us
  if (fOutputSocketNum < 0) fOutputSocketNum = fInputSocketNum;
  envir() << "Created new TCP socket " << fInputSocketNum << " for connection\n";
  // Connect to the remote endpoint: 连接到服务器端
  fServerAddress = *(netAddressBits*)(destAddress.data());
  int connectResult = connectToServer(fInputSocketNum, destPortNum);
  if (connectResult < 0) //连接服务器失败,直接返回-1
    break;
  else if (connectResult > 0)
  {
    // The connection succeeded.  Arrange to handle responses to requests sent on it:
    // 连接成功,在taskScheduler轮训IO,socket读到数据的回调函数为incomingDataHandler
    envir().taskScheduler().setBackgroundHandling(fInputSocketNum, SOCKET_READABLE | SOCKET_EXCEPTION,
    (TaskScheduler::BackgroundHandlerProc*)&incomingDataHandler, this);
  }
  return connectResult;
  } while (0);
  resetTCPSockets();
  return -1;
}

看到这里,你大概知道客户端是如何做到接收服务器的响应了吧,就是:envir().taskScheduler().setBackgroundHandling(fInputSocketNum, SOCKET_READABLE | SOCKET_EXCEPTION,(TaskScheduler::BackgroundHandlerProc*)&incomingDataHandler, this);incomingDataHandler里面和服务器接收数据逻辑基本差不多,只是对接收到的数据的处理不同,客户端处理在handleResponseBytes(bytesRead)中,然后希望你在handleResponseBytes(bytesRead)里面的代码找到在哪里执行了DESCRIBE请求的回调,即continueAfterDESCRIBE函数。限于篇幅原因,我就不分析handleResponseBytes(bytesRead)了。自己去分析吧。


题外话

本来还想用一个简单的流程图来结束前面接收的内容,发现写这篇文档都写了好几个小时,实在是不想在写了,感兴趣的朋友可以自己去画一画吧。另外非常感谢我文章中引用的那些文档,这篇文档不仅仅是说明RTSP服务器和客户端是如何实现通信的,更多的还有连接上的一些关于live555和其他的一些网络知识。


相关文章
|
1月前
|
XML 前端开发 JavaScript
探索 XMLHttpRequest:网页与服务器的异步通信之道(上)
探索 XMLHttpRequest:网页与服务器的异步通信之道(上)
|
2月前
|
消息中间件 运维 网络协议
客户端和服务器之间的通信
客户端和服务器之间的通信
34 0
|
1月前
|
编解码 Linux C语言
探索C++与Live555实现RTSP服务器的艺术(一)
探索C++与Live555实现RTSP服务器的艺术
84 1
|
21天前
|
网络协议 Python
pythonTCP客户端编程连接服务器
【4月更文挑战第6天】本教程介绍了TCP客户端如何连接服务器,包括指定服务器IP和端口、发送连接请求、处理异常、进行数据传输及关闭连接。在Python中,使用`socket`模块创建Socket对象,然后通过`connect()`方法尝试连接服务器 `(server_ip, server_port)`。成功连接后,利用`send()`和`recv()`进行数据交互,记得在通信完成后调用`close()`关闭连接,确保资源释放和程序稳定性。
|
4天前
|
JSON JavaScript 前端开发
服务器通信:使用WebSocket与后端实时交互
【4月更文挑战第24天】WebSocket为解决服务器与客户端实时双向通信问题而生,常用于聊天、游戏和实时通知等场景。本文通过4步教你实现WebSocket通信:1) 客户端使用JavaScript创建WebSocket连接;2) 监听`open`、`message`和`close`事件;3) 使用`send`方法发送数据;4) 使用`close`方法关闭连接。服务器端则需处理连接和数据发送,具体实现依后端技术而定。WebSocket为现代Web应用的实时交互提供了强大支持。
|
1月前
|
编解码 C++ 流计算
探索C++与Live555实现RTSP服务器的艺术(三)
探索C++与Live555实现RTSP服务器的艺术
24 1
|
1月前
|
存储 编解码 算法
探索C++与Live555实现RTSP服务器的艺术(二)
探索C++与Live555实现RTSP服务器的艺术
44 1
|
1月前
|
JSON 安全 网络安全
探索 XMLHttpRequest:网页与服务器的异步通信之道(下)
探索 XMLHttpRequest:网页与服务器的异步通信之道(下)
|
1月前
|
Ubuntu JavaScript 关系型数据库
在阿里云Ubuntu 20.04服务器中搭建一个 Ghost 博客
在阿里云Ubuntu 20.04服务器上部署Ghost博客的步骤包括创建新用户、安装Nginx、MySQL和Node.js 18.x。首先,通过`adduser`命令创建非root用户,然后安装Nginx和MySQL。接着,设置Node.js环境,下载Nodesource GPG密钥并安装Node.js 18.x。之后,使用`npm`安装Ghost-CLI,创建Ghost安装目录并进行安装。配置过程中需提供博客URL、数据库连接信息等。最后,测试访问前台首页和后台管理页面。确保DNS设置正确,并根据提示完成Ghost博客的配置。
在阿里云Ubuntu 20.04服务器中搭建一个 Ghost 博客
|
22天前
|
弹性计算
阿里云ECS使用体验
在申请高校学生免费体验阿里云ECS云服务器后的一些使用体验和感受。