ZLMediaKit源码学习——UDP
服务器启动
ZLMediaKit所有服务的启动都是在server/main.cpp中启动,下面是启动webrtc的源码,其主要执行以下:
- 创建UDP服务
- 设置socket连接时的回调,主要为了连接迁移,将会话绑定到socket中
- 启动UDP
#if defined(ENABLE_WEBRTC)
//webrtc udp服务器
auto rtcSrv = std::make_shared<UdpServer>();
rtcSrv->setOnCreateSocket([](const EventPoller::Ptr &poller, const Buffer::Ptr &buf, struct sockaddr *, int) {
if (!buf) {
return Socket::createSocket(poller, false);
}
auto new_poller = WebRtcSession::queryPoller(buf);
if (!new_poller) {
//该数据对应的webrtc对象未找到,丢弃之
return Socket::Ptr();
}
// 创建socket对象
return Socket::createSocket(new_poller, false);
});
uint16_t rtcPort = mINI::Instance()[RTC::kPort];
#endif//defined(ENABLE_WEBRTC)
...
#if defined(ENABLE_WEBRTC)
//webrtc udp服务器
if (rtcPort) {
rtcSrv->start<WebRtcSession>(rtcPort); }
#endif//defined(ENABLE_WEBRTC)
UDP收包
UdpServer::createSession我们可以在这里打个断点,接收到UDP包时会调到这边
toolkit::UdpServer::createSession UdpServer.cpp:216
toolkit::UdpServer::getOrCreateSession UdpServer.cpp:210
toolkit::UdpServer::onRead_l UdpServer.cpp:145
toolkit::UdpServer::onRead UdpServer.cpp:129
toolkit::UdpServer::<lambda>::operator()(const toolkit::Buffer::Ptr &, sockaddr *, int) const UdpServer.cpp:49
std::_Function_handler<void (const std::shared_ptr<toolkit::Buffer> &, sockaddr *, int), <lambda> >::_M_invoke(const std::_Any_data &, const std::shared_ptr<toolkit::Buffer> &, sockaddr *&&, int &&) std_function.h:300
std::function<void (const std::shared_ptr<toolkit::Buffer> &, sockaddr *, int)>::operator()(const std::shared_ptr<toolkit::Buffer> &, sockaddr *, int) const std_function.h:688
toolkit::Socket::onRead Socket.cpp:318
toolkit::Socket::<lambda>::operator()(int) const Socket.cpp:259
std::_Function_handler<void (int), <lambda> >::_M_invoke(const std::_Any_data &, int &&) std_function.h:300
std::function<void (int)>::operator()(int) const std_function.h:688
toolkit::EventPoller::runLoop EventPoller.cpp:304
std::__invoke_impl<void, void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> invoke.h:73
std::__invoke<void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> invoke.h:95
std::thread::_Invoker<std::tuple<void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> >::_M_invoke<0ul, 1ul, 2ul, 3ul> thread:244
std::thread::_Invoker<std::tuple<void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> >::operator() thread:251
std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> > >::_M_run thread:195
<unknown> 0x00007ffff7b0ade4
start_thread 0x00007ffff7c1f609
clone 0x00007ffff77f8293
从堆栈我们可以分析出UDP包接收流程如下:
- toolkit::EventPoller::runLoop EventPoller.cpp:304:进行事件轮询
- toolkit::Socket::::operator()(int) const Socket.cpp:259:触发读事件
- toolkit::UdpServer::onRead_l UdpServer.cpp:145: 接收到UDP的处理
void UdpServer::onRead_l(bool is_server_fd, const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len) {
// udp server fd收到数据时触发此函数;大部分情况下数据应该在peer fd触发,此函数应该不是热点函数
bool is_new = false;
if (auto session = getOrCreateSession(id, buf, addr, addr_len, is_new)) {
if (session->getPoller()->isCurrentThread()) {
//当前线程收到数据,直接处理数据
emitSessionRecv(session, buf);
} else {
//数据漂移到其他线程,需要先切换线程
WarnL << "udp packet incoming from other thread";
std::weak_ptr<Session> weak_session = session;
//由于socket读buffer是该线程上所有socket共享复用的,所以不能跨线程使用,必须先拷贝一下
auto cacheable_buf = std::make_shared<BufferString>(buf->toString());
session->async([weak_session, cacheable_buf]() {
if (auto strong_session = weak_session.lock()) {
emitSessionRecv(strong_session, cacheable_buf);
}
});
}
#if !defined(NDEBUG)
if (!is_new) {
TraceL << "udp packet incoming from " << (is_server_fd ? "server fd" : "other peer fd");
}
#endif
}
}
- 判断当前会话是否存在,不存在则创建
const Session::Ptr &UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) {
{
//减小临界区
std::lock_guard<std::recursive_mutex> lock(*_session_mutex);
auto it = _session_map->find(id);
if (it != _session_map->end()) {
return it->second->session();
}
}
is_new = true;
return createSession(id, buf, addr, addr_len);
}
- 创建会话
const Session::Ptr &UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
auto socket = createSocket(_poller, buf, addr, addr_len);
if (!socket) {
//创建socket失败,本次onRead事件收到的数据直接丢弃
return s_null_session;
}
auto addr_str = string((char *) addr, addr_len);
std::weak_ptr<UdpServer> weak_self = std::dynamic_pointer_cast<UdpServer>(shared_from_this());
auto session_creator = [this, weak_self, socket, addr_str, id]() -> const Session::Ptr & {
auto server = weak_self.lock();
if (!server) {
return s_null_session;
}
//如果已经创建该客户端对应的UdpSession类,那么直接返回
lock_guard<std::recursive_mutex> lck(*_session_mutex);
auto it = _session_map->find(id);
if (it != _session_map->end()) {
return it->second->session();
}
socket->bindUdpSock(_socket->get_local_port(), _socket->get_local_ip());
socket->bindPeerAddr((struct sockaddr *) addr_str.data(), addr_str.size());
//在connect peer后再取消绑定关系, 避免在 server 的 socket 或其他cloned server中收到后续数据包.
SockUtil::dissolveUdpSock(_socket->rawFD());
auto helper = _session_alloc(server, socket);
auto session = helper->session();
// 把本服务器的配置传递给 Session
session->attachServer(*this);
std::weak_ptr<Session> weak_session = session;
socket->setOnRead([weak_self, weak_session, id](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
//快速判断是否为本会话的的数据, 通常应该成立
if (id == makeSockId(addr, addr_len)) {
if (auto strong_session = weak_session.lock()) {
emitSessionRecv(strong_session, buf);
}
return;
}
//收到非本peer fd的数据,让server去派发此数据到合适的session对象
strong_self->onRead_l(false, id, buf, addr, addr_len);
});
socket->setOnErr([weak_self, weak_session, id](const SockException &err) {
// 在本函数作用域结束时移除会话对象
// 目的是确保移除会话前执行其 onError 函数
// 同时避免其 onError 函数抛异常时没有移除会话对象
onceToken token(nullptr, [&]() {
// 移除掉会话
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
//从共享map中移除本session对象
lock_guard<std::recursive_mutex> lck(*strong_self->_session_mutex);
strong_self->_session_map->erase(id);
});
// 获取会话强应用
if (auto strong_session = weak_session.lock()) {
// 触发 onError 事件回调
strong_session->onError(err);
}
});
auto pr = _session_map->emplace(id, std::move(helper));
assert(pr.second);
return pr.first->second->session();
};
if (socket->getPoller()->isCurrentThread()) {
//该socket分配在本线程,直接创建session对象,并处理数据
return session_creator();
}
//该socket分配在其他线程,需要先拷贝buffer,然后在其所在线程创建session对象并处理数据
auto cacheable_buf = std::make_shared<BufferString>(buf->toString());
socket->getPoller()->async([session_creator, cacheable_buf]() {
//在该socket所在线程创建session对象
auto session = session_creator();
if (session) {
//该数据不能丢弃,给session对象消费
emitSessionRecv(session, cacheable_buf);
}
});
return s_null_session;
}
Webrtc数据处理
我们在WebRtcTransport::inputSockData打个断点,此处是webrtc报文的处理的地方:
WebRtcTransport::inputSockData WebRtcTransport.cpp:281
WebRtcSession::onRecv WebRtcSession.cpp:70
toolkit::emitSessionRecv UdpServer.cpp:134
toolkit::UdpServer::<lambda>::operator()(void) const UdpServer.cpp:304
std::_Function_handler<void (), <lambda> >::_M_invoke(const std::_Any_data &) std_function.h:300
std::function<void ()>::operator()() const std_function.h:688
toolkit::TaskCancelableImp<void ()>::operator()() const TaskExecutor.h:111
<lambda#1>::operator()(const std::shared_ptr<toolkit::TaskCancelableImp<void ()> > &) const EventPoller.cpp:237
toolkit::List<std::shared_ptr<toolkit::TaskCancelableImp<void ()> > >::for_each<<lambda#1> >(<lambda#1> &&) List.h:203
toolkit::EventPoller::onPipeEvent EventPoller.cpp:235
toolkit::EventPoller::<lambda>::operator()(int) const EventPoller.cpp:67
std::_Function_handler<void (int), <lambda> >::_M_invoke(const std::_Any_data &, int &&) std_function.h:300
std::function<void (int)>::operator()(int) const std_function.h:688
toolkit::EventPoller::runLoop EventPoller.cpp:304
std::__invoke_impl<void, void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> invoke.h:73
std::__invoke<void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> invoke.h:95
std::thread::_Invoker<std::tuple<void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> >::_M_invoke<0ul, 1ul, 2ul, 3ul> thread:244
std::thread::_Invoker<std::tuple<void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> >::operator() thread:251
std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (toolkit::EventPoller::*)(bool, bool), toolkit::EventPoller *, bool, bool> > >::_M_run thread:195
<unknown> 0x00007ffff7b0ade4
start_thread 0x00007ffff7c1f609
clone 0x00007ffff77f8293
此处会对报文进行分析,判断报文属于那种类型并对其及逆行处理,报文类型主要有:
- Stun:处理
- Dtls:处理DTLS握手
- RTP:音视频
- RTCP:会话控制
void WebRtcTransport::inputSockData(char *buf, int len, RTC::TransportTuple *tuple) {
// 校验是否是stun
if (RTC::StunPacket::IsStun((const uint8_t *) buf, len)) {
std::unique_ptr<RTC::StunPacket> packet(RTC::StunPacket::Parse((const uint8_t *) buf, len));
if (!packet) {
WarnL << "parse stun error" << std::endl;
return;
}
_ice_server->ProcessStunPacket(packet.get(), tuple);
return;
}
// 校验是否是DTLS
if (is_dtls(buf)) {
_dtls_transport->ProcessDtlsData((uint8_t *) buf, len);
return;
}
// 校验是否是RTP
if (is_rtp(buf)) {
if (!_srtp_session_recv) {
WarnL << "received rtp packet when dtls not completed from:" << getPeerAddress(tuple);
return;
}
// SRTP解密
if (_srtp_session_recv->DecryptSrtp((uint8_t *) buf, &len)) {
// 处理RTP报文
onRtp(buf, len, _ticker.createdTime());
}
return;
}
// 校验是否是RTCP
if (is_rtcp(buf)) {
if (!_srtp_session_recv) {
WarnL << "received rtcp packet when dtls not completed from:" << getPeerAddress(tuple);
return;
}
// SRTP解密
if (_srtp_session_recv->DecryptSrtcp((uint8_t *) buf, &len)) {
// 处理RTCP报文
onRtcp(buf, len);
}
return;
}
}