ZLMediaKit源码学习——UDP

简介: ZLMediaKit源码学习——UDP

ZLMediaKit源码学习——UDP

服务器启动

  ZLMediaKit所有服务的启动都是在server/main.cpp中启动,下面是启动webrtc的源码,其主要执行以下:

  1. 创建UDP服务
  2. 设置socket连接时的回调,主要为了连接迁移,将会话绑定到socket中
  3. 启动UDP
#if defined(ENABLE_WEBRTC)
        //webrtc udp服务器
        auto rtcSrv = std::make_shared<UdpServer>();
        rtcSrv->setOnCreateSocket([](const EventPoller::Ptr &poller, const Buffer::Ptr &buf, struct sockaddr *, int) {
   
            if (!buf) {
   
                return Socket::createSocket(poller, false);
            }
            auto new_poller = WebRtcSession::queryPoller(buf);
            if (!new_poller) {
   
                //该数据对应的webrtc对象未找到,丢弃之
                return Socket::Ptr();
            }
            // 创建socket对象
            return Socket::createSocket(new_poller, false);
        });
        uint16_t rtcPort = mINI::Instance()[RTC::kPort];
#endif//defined(ENABLE_WEBRTC)

...


#if defined(ENABLE_WEBRTC)
            //webrtc udp服务器
            if (rtcPort) {
    rtcSrv->start<WebRtcSession>(rtcPort); }
#endif//defined(ENABLE_WEBRTC)

UDP收包

  UdpServer::createSession我们可以在这里打个断点,接收到UDP包时会调到这边

toolkit::UdpServer::createSession UdpServer.cpp:216
toolkit::UdpServer::getOrCreateSession UdpServer.cpp:210
toolkit::UdpServer::onRead_l UdpServer.cpp:145
toolkit::UdpServer::onRead UdpServer.cpp:129
toolkit::UdpServer::<lambda>::operator()(const toolkit::Buffer::Ptr &, sockaddr *, int) const UdpServer.cpp:49
std::_Function_handler<void (const std::shared_ptr<toolkit::Buffer> &, sockaddr *, int), <lambda> >::_M_invoke(const std::_Any_data &, const std::shared_ptr<toolkit::Buffer> &, sockaddr *&&, int &&) std_function.h:300
std::function<void (const std::shared_ptr<toolkit::Buffer> &, sockaddr *, int)>::operator()(const std::shared_ptr<toolkit::Buffer> &, sockaddr *, int) const std_function.h:688
toolkit::Socket::onRead Socket.cpp:318
toolkit::Socket::<lambda>::operator()(int) const Socket.cpp:259
std::_Function_handler<void (int), <lambda> >::_M_invoke(const std::_Any_data &, int &&) std_function.h:300
std::function<void (int)>::operator()(int) const std_function.h:688
toolkit::EventPoller::runLoop EventPoller.cpp:304
std::__invoke_impl<void, void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> invoke.h:73
std::__invoke<void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> invoke.h:95
std::thread::_Invoker<std::tuple<void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> >::_M_invoke<0ul, 1ul, 2ul, 3ul> thread:244
std::thread::_Invoker<std::tuple<void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> >::operator() thread:251
std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> > >::_M_run thread:195
<unknown> 0x00007ffff7b0ade4
start_thread 0x00007ffff7c1f609
clone 0x00007ffff77f8293

  从堆栈我们可以分析出UDP包接收流程如下:

  1. toolkit::EventPoller::runLoop EventPoller.cpp:304:进行事件轮询
  2. toolkit::Socket::::operator()(int) const Socket.cpp:259:触发读事件
  3. toolkit::UdpServer::onRead_l UdpServer.cpp:145: 接收到UDP的处理
void UdpServer::onRead_l(bool is_server_fd, const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len) {
   
    // udp server fd收到数据时触发此函数;大部分情况下数据应该在peer fd触发,此函数应该不是热点函数
    bool is_new = false;
    if (auto session = getOrCreateSession(id, buf, addr, addr_len, is_new)) {
   
        if (session->getPoller()->isCurrentThread()) {
   
            //当前线程收到数据,直接处理数据
            emitSessionRecv(session, buf);
        } else {
   
            //数据漂移到其他线程,需要先切换线程
            WarnL << "udp packet incoming from other thread";
            std::weak_ptr<Session> weak_session = session;
            //由于socket读buffer是该线程上所有socket共享复用的,所以不能跨线程使用,必须先拷贝一下
            auto cacheable_buf = std::make_shared<BufferString>(buf->toString());
            session->async([weak_session, cacheable_buf]() {
   
                if (auto strong_session = weak_session.lock()) {
   
                    emitSessionRecv(strong_session, cacheable_buf);
                }
            });
        }

#if !defined(NDEBUG)
        if (!is_new) {
   
            TraceL << "udp packet incoming from " << (is_server_fd ? "server fd" : "other peer fd");
        }
#endif
    }
}
  1. 判断当前会话是否存在,不存在则创建
const Session::Ptr &UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) {
   
    {
   
        //减小临界区
        std::lock_guard<std::recursive_mutex> lock(*_session_mutex);
        auto it = _session_map->find(id);
        if (it != _session_map->end()) {
   
            return it->second->session();
        }
    }
    is_new = true;
    return createSession(id, buf, addr, addr_len);
}
  1. 创建会话
const Session::Ptr &UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
   
    auto socket = createSocket(_poller, buf, addr, addr_len);
    if (!socket) {
   
        //创建socket失败,本次onRead事件收到的数据直接丢弃
        return s_null_session;
    }

    auto addr_str = string((char *) addr, addr_len);
    std::weak_ptr<UdpServer> weak_self = std::dynamic_pointer_cast<UdpServer>(shared_from_this());
    auto session_creator = [this, weak_self, socket, addr_str, id]() -> const Session::Ptr & {
   
        auto server = weak_self.lock();
        if (!server) {
   
            return s_null_session;
        }

        //如果已经创建该客户端对应的UdpSession类,那么直接返回
        lock_guard<std::recursive_mutex> lck(*_session_mutex);
        auto it = _session_map->find(id);
        if (it != _session_map->end()) {
   
            return it->second->session();
        }

        socket->bindUdpSock(_socket->get_local_port(), _socket->get_local_ip());
        socket->bindPeerAddr((struct sockaddr *) addr_str.data(), addr_str.size());
        //在connect peer后再取消绑定关系, 避免在 server 的 socket 或其他cloned server中收到后续数据包.
        SockUtil::dissolveUdpSock(_socket->rawFD());

        auto helper = _session_alloc(server, socket);
        auto session = helper->session();
        // 把本服务器的配置传递给 Session
        session->attachServer(*this);

        std::weak_ptr<Session> weak_session = session;
        socket->setOnRead([weak_self, weak_session, id](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
   
            auto strong_self = weak_self.lock();
            if (!strong_self) {
   
                return;
            }

            //快速判断是否为本会话的的数据, 通常应该成立
            if (id == makeSockId(addr, addr_len)) {
   
                if (auto strong_session = weak_session.lock()) {
   
                    emitSessionRecv(strong_session, buf);
                }
                return;
            }

            //收到非本peer fd的数据,让server去派发此数据到合适的session对象
            strong_self->onRead_l(false, id, buf, addr, addr_len);
        });
        socket->setOnErr([weak_self, weak_session, id](const SockException &err) {
   
            // 在本函数作用域结束时移除会话对象
            // 目的是确保移除会话前执行其 onError 函数
            // 同时避免其 onError 函数抛异常时没有移除会话对象
            onceToken token(nullptr, [&]() {
   
                // 移除掉会话
                auto strong_self = weak_self.lock();
                if (!strong_self) {
   
                    return;
                }
                //从共享map中移除本session对象
                lock_guard<std::recursive_mutex> lck(*strong_self->_session_mutex);
                strong_self->_session_map->erase(id);
            });

            // 获取会话强应用
            if (auto strong_session = weak_session.lock()) {
   
                // 触发 onError 事件回调
                strong_session->onError(err);
            }
        });

        auto pr = _session_map->emplace(id, std::move(helper));
        assert(pr.second);
        return pr.first->second->session();
    };

    if (socket->getPoller()->isCurrentThread()) {
   
        //该socket分配在本线程,直接创建session对象,并处理数据
        return session_creator();
    }

    //该socket分配在其他线程,需要先拷贝buffer,然后在其所在线程创建session对象并处理数据
    auto cacheable_buf = std::make_shared<BufferString>(buf->toString());
    socket->getPoller()->async([session_creator, cacheable_buf]() {
   
        //在该socket所在线程创建session对象
        auto session = session_creator();
        if (session) {
   
            //该数据不能丢弃,给session对象消费
            emitSessionRecv(session, cacheable_buf);
        }
    });
    return s_null_session;
}

Webrtc数据处理

  我们在WebRtcTransport::inputSockData打个断点,此处是webrtc报文的处理的地方:

WebRtcTransport::inputSockData WebRtcTransport.cpp:281
WebRtcSession::onRecv WebRtcSession.cpp:70
toolkit::emitSessionRecv UdpServer.cpp:134
toolkit::UdpServer::<lambda>::operator()(void) const UdpServer.cpp:304
std::_Function_handler<void (), <lambda> >::_M_invoke(const std::_Any_data &) std_function.h:300
std::function<void ()>::operator()() const std_function.h:688
toolkit::TaskCancelableImp<void ()>::operator()() const TaskExecutor.h:111
<lambda#1>::operator()(const std::shared_ptr<toolkit::TaskCancelableImp<void ()> > &) const EventPoller.cpp:237
toolkit::List<std::shared_ptr<toolkit::TaskCancelableImp<void ()> > >::for_each<<lambda#1> >(<lambda#1> &&) List.h:203
toolkit::EventPoller::onPipeEvent EventPoller.cpp:235
toolkit::EventPoller::<lambda>::operator()(int) const EventPoller.cpp:67
std::_Function_handler<void (int), <lambda> >::_M_invoke(const std::_Any_data &, int &&) std_function.h:300
std::function<void (int)>::operator()(int) const std_function.h:688
toolkit::EventPoller::runLoop EventPoller.cpp:304
std::__invoke_impl<void, void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> invoke.h:73
std::__invoke<void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> invoke.h:95
std::thread::_Invoker<std::tuple<void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> >::_M_invoke<0ul, 1ul, 2ul, 3ul> thread:244
std::thread::_Invoker<std::tuple<void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> >::operator() thread:251
std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> > >::_M_run thread:195
<unknown> 0x00007ffff7b0ade4
start_thread 0x00007ffff7c1f609
clone 0x00007ffff77f8293

  此处会对报文进行分析,判断报文属于那种类型并对其及逆行处理,报文类型主要有:

  1. Stun:处理
  2. Dtls:处理DTLS握手
  3. RTP:音视频
  4. RTCP:会话控制
void WebRtcTransport::inputSockData(char *buf, int len, RTC::TransportTuple *tuple) {
   
    // 校验是否是stun
    if (RTC::StunPacket::IsStun((const uint8_t *) buf, len)) {
   
        std::unique_ptr<RTC::StunPacket> packet(RTC::StunPacket::Parse((const uint8_t *) buf, len));
        if (!packet) {
   
            WarnL << "parse stun error" << std::endl;
            return;
        }
        _ice_server->ProcessStunPacket(packet.get(), tuple);
        return;
    }
    // 校验是否是DTLS
    if (is_dtls(buf)) {
   
        _dtls_transport->ProcessDtlsData((uint8_t *) buf, len);
        return;
    }
    // 校验是否是RTP
    if (is_rtp(buf)) {
   
        if (!_srtp_session_recv) {
   
            WarnL << "received rtp packet when dtls not completed from:" << getPeerAddress(tuple);
            return;
        }
        // SRTP解密
        if (_srtp_session_recv->DecryptSrtp((uint8_t *) buf, &len)) {
   
            // 处理RTP报文
            onRtp(buf, len, _ticker.createdTime());
        }
        return;
    }
    // 校验是否是RTCP
    if (is_rtcp(buf)) {
   
        if (!_srtp_session_recv) {
   
            WarnL << "received rtcp packet when dtls not completed from:" << getPeerAddress(tuple);
            return;
        }
        // SRTP解密
        if (_srtp_session_recv->DecryptSrtcp((uint8_t *) buf, &len)) {
   
            // 处理RTCP报文
            onRtcp(buf, len);
        }
        return;
    }
}
相关文章
|
7月前
|
存储 缓存 网络协议
dpdk课程学习之练习笔记二(arp, udp协议api测试)
dpdk课程学习之练习笔记二(arp, udp协议api测试)
194 0
|
7月前
|
网络协议 Linux 数据处理
网络编程【网络编程基本概念、 网络通信协议、IP地址 、 TCP协议和UDP协议】(一)-全面详解(学习总结---从入门到深化)
网络编程【网络编程基本概念、 网络通信协议、IP地址 、 TCP协议和UDP协议】(一)-全面详解(学习总结---从入门到深化)
196 3
|
5月前
|
网络协议 网络架构
【网络编程入门】TCP与UDP通信实战:从零构建服务器与客户端对话(附简易源码,新手友好!)
在了解他们之前我们首先要知道网络模型,它分为两种,一种是OSI,一种是TCP/IP,当然他们的模型图是不同的,如下
229 1
|
7月前
|
存储 网络协议 关系型数据库
Python从入门到精通:2.3.2数据库操作与网络编程——学习socket编程,实现简单的TCP/UDP通信
Python从入门到精通:2.3.2数据库操作与网络编程——学习socket编程,实现简单的TCP/UDP通信
108 0
|
Python
【从零学习python 】73. UDP网络程序-发送数据
【从零学习python 】73. UDP网络程序-发送数据
70 0
网络直播源码UDP协议搭建:为平台注入一份力量
在实时网络直播源码平台中,UDP协议的实时、高速的传输速度尤为重要,UDP协议的特性使其成为低延迟、高质量流媒体传输技术的理想选择,也让网络直播源码平台成为一个优质、更为用户提供更好体验的平台
网络直播源码UDP协议搭建:为平台注入一份力量
|
Python
【从零学习python 】74. UDP网络程序:端口问题与绑定信息详解
【从零学习python 】74. UDP网络程序:端口问题与绑定信息详解
228 0
|
网络协议 安全 网络性能优化
网络进阶学习:重要网络协议(tcp协议,udp协议,http协议)
网络进阶学习:重要网络协议(tcp协议,udp协议,http协议)
136 0
|
缓存 网络协议 算法
计算机网络学习26:TCP/UDP对比区别、TCP流量控制、拥塞控制、超时重传时间的选择、可靠传输的实现
UDP: User Datagram Protocol 用户数据报协议 TCP: Transmission Control Protocol 传输控制协议 同时这里指的连接是指逻辑连接,而不是物理连接。
计算机网络学习26:TCP/UDP对比区别、TCP流量控制、拥塞控制、超时重传时间的选择、可靠传输的实现
|
网络协议 网络性能优化 网络安全
网络协议报文理解刨析篇二(再谈Http和Https), 加上TCP/UDP/IP协议分析(理解着学习), 面试官都惊讶你对网络的见解(2)
网络协议报文理解刨析篇二(再谈Http和Https), 加上TCP/UDP/IP协议分析(理解着学习), 面试官都惊讶你对网络的见解(2)
网络协议报文理解刨析篇二(再谈Http和Https), 加上TCP/UDP/IP协议分析(理解着学习), 面试官都惊讶你对网络的见解(2)