亲爱的各位读者你们好,由于前段时间忙于部分项目的重构和优化,未能及时更新文章,不少读者催更,哈哈,我还是很开心能抽出时间给大家再来分享下kcp的相关技术内幕,以及之前完善自己的网络库增加了KCP的客户端服务器收发支持(结尾会分享封装的客户端服务器C++源码)。
KCP概述
对于游戏开发,尤其是MOBA游戏,或者全球唯一服架构类型的游戏,对于网络的要求比一般游戏要高。
大多数游戏公司使用的网络无非就是TCP,UDP。我们都知道 TCP 协议具有重传机制,也就是说,如果发送方认为发生了丢包现象,就重发这些数据包。很显然,我们需要一个方法来猜测是否发生了丢包。最简单的想法就是,接收方每收到一个包,就向发送方返回一个 ACK,表示自己已经收到了这段数据,反过来,如果发送方一段时间内没有收到 ACK,就知道很可能是数据包丢失了,紧接着就重发该数据包,直到收到 ACK 为止。因为即使是超时了,这个数据包也可能并没有丢,它只是绕了一条远路,来的很晚而已。毕竟 TCP 协议是位于传输层的协议,不可能明确知道数据链路层和物理层发生了什么。但这并不妨碍我们的超时重传机制,因为接收方会自动忽略重复的包。
超时和重传的概念其实就是这么简单,但内部的细节却是很多,我们最先想到的一个问题就是,到底多长时间才能算超时呢?
超时是怎么确定的?
假设我直接把超时时间设成一个固定值200ms,我们的电脑和很多服务器都有交互,这些服务器位于天南海北,国内国外,延迟差异巨大,打个比方:
假设你访问某国外网站,延迟有 130 ms,这就麻烦了,正常的数据包都可能被认为是超时,导致大量数据包被重发,可以想象,重发的数据包也很容易被误判为超时。。。雪崩效应的感觉
所以设置固定值是很不可靠的,我们要根据网络延迟,动态调整超时时间,延迟越大,超时时间越长。
因此很多公司都使用了基于UDP的可靠传输KCP
分析KCP的实现
为了提高udp可靠性,KCP是在udp协议上封装一层可靠性传输机制(类似tcp的ACK机制、重传机制、序号机制、重排机制、窗口机制),就做到了兼具tcp的安全性(流量控制和拥塞控制等)和udp的实时性,并且具备一定的灵活性(超时重传、ack等),所以KCP并不是网络协议,而是一种业务逻辑。
关于KCP的实现C,C++源码大家可以直接看下韦神的 :
GitHub - skywind3000/kcp: KCP - A Fast and Reliable ARQ Protocol
代码逻辑写的很优雅,通过几个简单的函数调用就可以实现可靠性传输,对于使用者不需要关心丢包,重传,粘包的问题。
这里我简单介绍下kcp里的几个名词:
- 用户数据:应用层发送的数据,比如一张图片2KB的数据。
- MTU:最大传输单元,即每次发送的最大数据。
- RTO:Retransmission TimeOut,重传超时时间。
- cwnd:ongestion window,拥塞窗口,表示发送方可发送多少个KCP数据包。与接收方窗口有关,与网络状况(拥塞控制)有关,与发送窗口大小有关。
- rwnd:receiver window,接收方窗口大小,表示接收方还可接收多少个KCP数据包。
- snd_queue:待发送KCP数据包队列。
- snd_nxt:下一个即将发送的kcp数据包序列号。
- snd_una:下一个待确认的序列号
- TCP的特点是可靠传输(累积确认、超时重传、选择确认)、流量控制(滑动窗口)
- 、拥塞控制(慢开始、拥塞避免、快重传、快恢复)、面向连接。KCP对这些参数
- 基本都可配,也没用建立/关闭连接的过程。
kcp协议头部
编辑
conv:连接号。UDP是无连接的,conv用于表示来自于哪个客户端,是对连接的一种替代,使用conv的好处,其实我相信很多文章里都没有写,这里我从商业项目使用的经验可以告诉读者,我们的游戏用户玩游戏的过程中经常会出现由4G,5G,wifi之间进行自由切换,而且也存在基站之间的信号切换,导致客户端的ip地址也可能发生变化,如果没有一个ID来唯一标识,那么在移动网络发生切换过程中触发了断线重连,这就导致每次切换,玩家都要走这个流程和逻辑,你觉得玩家体验还能好吗?因此我们一般使用conv来记录这个id对应哪个客户端,即使客户端网络环境发生变化,也不需要断线重连,每次KCP包的协议头部都携带conv,这样我们就知道当前发送包的是哪个客户端,当然有一些服务器的做法是将KCP的conv和TCP协议的用户进行一一映射绑定,在TCP和KCP之间自由切换,在这里mark下,后边我会介绍这种用法是怎么实现的。
cmd:命令字。如,IKCP_CMD_ACK确认命令,IKCP_CMD_WASK接收窗口大小询问命令,IKCP_CMD_WINS接收窗口大小告知命令。
frg:分片,用户数据可能会被分成多个KCP包,发送出去。
wnd:接收窗口大小,发送方的发送窗口不能超过接收方给出的数值。
ts:时间戳。
sn:序列号。
una:下一个可接收的序列号。其实就是确认号,类似tcp的ack。
len:数据长度。
data:用户数据。
KCP流程
注意收发数据时,数据要先放到缓存重排然后再进行收,如下是kcp数据接收和发送的过程简图:
编辑
接收数据时,kcp会先把数据放到recv_buf中进行缓存,注意recv_buf中的数据是已经按sn排好序的,但是不一定完整连续,因为传输过程中可能有丢失,recv_queue是从recv_buf中拷贝出的连续完整的数据分片。用户调用ikcp_recv()时是从recv_que中读取数据。还要注意一点,ikcp_recv中的len一定要是整个数据的长度,要一次性读完,这和recvfrom的size是一样的。
与接收过程类似,只不过顺序相反,用户调用ikcp_send()只是将数据放到了send_buf中,不过会自动进行数据分片,使得数据大小不超过mtu。然后,kcp会通过流量控制和拥塞控制将数据放到send_buf中再进行发送。
注意区分一下,不断调用ikcp_send(),数据会累积在send_queue中,如果对方没有接收,对方的数据则会累积在对方的recv_buf中。
说了这么多,其实kcp底层调用的,还是recvfrom()和sendto(),这一点不要忘记。
kcp接收和确认机制
KCP的接收过程是将UDP收到的数据进行解包,重新组装顺序的、可靠的数据后交付给用户。
kcp_input输入UDP收到的数据包。kcp包对前面的24个字节进行解压,包括conv、 frg、 cmd、 wnd、 ts、 sn、 una、 len。根据una,会删除snd_buf中,所有una之前的kcp数据包,因为这些数据包接收者已经确认。根据wnd更新接收端接收窗口大小。根据不同的命令字进行分别处理,这块我就不赘述了,韦神的代码写的很详细,直接看kcp_input函数。
KCP函数使用
创建 KCP对象:
// 初始化 kcp对象,conv为一个表示会话编号的整数,和tcp的 conv一样,通信双
// 方需保证 conv相同,相互的数据包才能够被认可,user是一个给回调函数的指针
ikcpcb *kcp = ikcp_create(conv, user);
设置传输回调函数(如UDP的send函数):
// KCP的下层协议输出函数,KCP需要发送数据时会调用它
// buf/len 表示缓存和长度
// user指针为 kcp对象创建时传入的值,用于区别多个 KCP对象
int udp_output(const char *buf, int len, ikcpcb *kcp, void *user)
{
….
}
// 设置回调函数
kcp->output = udp_output;
循环调用 update:
// 以一定频率调用 ikcp_update来更新 kcp状态,并且传入当前时钟(毫秒单位)
// 如 10ms调用一次,或用 ikcp_check确定下次调用 update的时间不必每次调用
ikcp_update(kcp, millisec);
输入一个应用层数据包(如UDP收到的数据包):
//
收到一个下层数据包(比如UDP包)时需要调用:ikcp_input(kcp,received_udp_packet,received_udp_size);
处理了下层协议的输出/输入后 KCP协议就可以正常工作了,使用 ikcp_send 来向
远端发送数据。而另一端使用 ikcp_recv(kcp, ptr, size)来接收数据
编辑
UDP收到的包,不断通过kcp_input喂给KCP,KCP会对这部分数据(KCP协议数据)进行解包,重新封装成应用层用户数据,应用层通过kcp_recv获取。应用层通过kcp_send发送数据,KCP会把用户数据拆分kcp数据包,通过kcp_output,以UDP(send)的方式发送。
KCP的客户端/服务器封装
这里所提供的代码是我个人的C++网络库框架部分代码,其实KCP也是去年为了支持某项目才加入的网络模块,这里我给大家分享出来,如果有需要相关cpp实现代码,可以关注我公众号,私信发给我请备注说明( kcp源码 ):
我在这里封装了一个KcpChannel 类,主要是负责kcp对象的创建,接收udp的输入,输出数据并回调服务器和客户端的发送接口send_kcp_callback 。
KcpChannel.h
#pragma once //#include "inet_addr.h" #include "os.h" #include "inet_addr.h" #include <unordered_map> #include "ikcp.h" #include <log.h> #include <string> #include "config.h" #include <util.h> #include <functional> #include "event_handler.h" #define KCP_PKG_LIMIT 100000 typedef std::function<int(const char*, int, sockaddr_in&)> send_kcp_callback; inline uint32_t iclock() { return (uint32_t)(utils::Util::GetCurrentTimeMsec() & 0xfffffffful); } class KcpChannel { public: KcpChannel(uint32_t conv, send_kcp_callback cb, sockaddr_in remote); KcpChannel(uint32_t conv, send_kcp_callback cb, INET_Addr remote); static int kcp_output(const char* buf, int len, ikcpcb* kcp, void* ptr); bool init_kcp(); ~KcpChannel(); void reset(); int input(const char* buffer, const int len); void flush(); bool get_data(const char* recv_buf, int recv_len, char* out_buf, int& out_len); bool get_data(char*& out_buf, int& out_len); void update_remote(sockaddr_in remote); void update(uint64_t current_clock); bool try_send(); int send(const char* send_buffer, int len); public: uint32_t m_conv; ikcpcb* m_kcp; Event_Handler* m_kcp_handler; sockaddr_in m_remote; IUINT32 m_next_kcp_update_ = 0; send_kcp_callback m_send_callback; };
KcpChannel.cpp的相关实现如下:
#include "kcp_channel.h" #include "log.h" #include "config.h" KcpChannel::KcpChannel(uint32_t conv, send_kcp_callback cb, sockaddr_in remote) :m_conv(conv), m_send_callback(cb), m_remote(remote) { init_kcp(); } KcpChannel::KcpChannel(uint32_t conv, send_kcp_callback cb, INET_Addr remote) : m_conv(conv), m_send_callback(cb) { m_remote.sin_family = AF_INET; m_remote.sin_addr.s_addr = remote.get_addr(); m_remote.sin_port = remote.get_port(); init_kcp(); } int KcpChannel::kcp_output(const char* buf, int len, ikcpcb* kcp, void* ptr) { assert(NULL != ptr); KcpChannel* ch = (KcpChannel*)(ptr); string s; s.assign(buf, len); ch->m_send_callback(buf, len, ch->m_remote); return 0; } bool KcpChannel::init_kcp() { m_kcp = ikcp_create(m_conv, (void*)this); assert(NULL != m_kcp); m_kcp->output = &KcpChannel::kcp_output; //ikcp_setmtu(m_kcp, 128); ikcp_wndsize(m_kcp, 512, 512); // 启动快速模式 // 第二个参数 nodelay-启用以后若干常规加速将启动 // 第三个参数 interval为内部处理时钟,默认设置为 10ms // 第四个参数 resend为快速重传指标,设置为2 // 第五个参数 为是否禁用常规流控,这里禁止 //普通模式: //ikcp_nodelay(m_kcp, 0, 40, 0, 0); ikcp_nodelay(m_kcp, 1, 10, 2, 1); ikcp_nodelay(m_kcp, 1, 10, 2, 1); m_kcp->rx_minrto = 10; //超时重传最小时间 10ms m_kcp->fastresend = 2; return 0; } KcpChannel::~KcpChannel() { ikcp_update(m_kcp, iclock()); ikcp_flush(m_kcp); ikcp_release(m_kcp); } void KcpChannel::reset() { ikcp_update(m_kcp, iclock()); ikcp_flush(m_kcp); ikcp_release(m_kcp); init_kcp(); } int KcpChannel::input(const char* buffer, const int len) { int send_ret = ikcp_input(m_kcp, buffer, len); if (send_ret == 0) { update(iclock()); flush(); } return 0; } void KcpChannel::flush() { IUINT32 cwnd = min(m_kcp->snd_wnd, m_kcp->rmt_wnd); if (m_kcp->nocwnd == 0) { cwnd = min(m_kcp->cwnd, cwnd); } if (ikcp_waitsnd(m_kcp) > cwnd) ikcp_flush(m_kcp); } bool KcpChannel::get_data(const char* recv_buf, int recv_len, char* out_buf, int& out_len) { out_len = ikcp_recv(m_kcp, out_buf, RECV_BUFFER_LENGTH); if (out_len <= 0) { LOG(WARN)("SOCK_Dgram::handle_input, kcp recv error. packet len[%d]", out_len); return false; } return true; } bool KcpChannel::get_data(char*& out_buf, int& out_len) { int peeksize = ikcp_peeksize(m_kcp); if (peeksize <= 0) { return false; } out_buf = new char[peeksize]; out_len = ikcp_recv(m_kcp, out_buf, RECV_BUFFER_LENGTH); if (out_len <= 0) { LOG(WARN)("SOCK_Dgram::handle_input, kcp recv error. packet len[%d]", out_len); return false; } return true; } void KcpChannel::update_remote(sockaddr_in remote) { bool update = false; if (remote.sin_addr.s_addr != m_remote.sin_addr.s_addr) { m_remote.sin_addr.s_addr = remote.sin_addr.s_addr; update = true; } if (remote.sin_port != m_remote.sin_port) { m_remote.sin_port = remote.sin_port; update = true; } if (update) { reset(); } } void KcpChannel::update(uint64_t current_clock) { IUINT32 current = current_clock & 0xfffffffflu; if (current >= m_next_kcp_update_) { ikcp_update(m_kcp, current); m_next_kcp_update_ = ikcp_check(m_kcp, current); } } bool KcpChannel::try_send() { if (ikcp_waitsnd(m_kcp) > (int)(m_kcp->snd_wnd * 2)) { LOG(ERROR)("SOCK_KCP_Client::send_data error, kcp nsnd_buf is full, %d - %d", m_kcp->nsnd_buf + m_kcp->nsnd_que, m_kcp->snd_wnd * 2); return false; } return true; } int KcpChannel::send(const char* send_buffer, int len) { return ikcp_send(m_kcp, send_buffer, len); }
kcp客户端的源码比较简单,这里我只罗列kcp服务器端的源码头文件和cpp文件:
#pragma once #include "net_event.h" #include "event_handler.h" #include "inet_addr.h" #include "net_manager.h" #include "block_buffer.h" #include "ikcp.h" #include "kcp_channel.h" #include "os.h" class Net_Manager; //! @class SOCK_KCP //! @brief kcp通道处理类 class SOCK_KCP_Server : public Event_Handler { public: //! 构造函数 //! @param net_manager 网络管理器 SOCK_KCP_Server(Net_Manager* net_manager, Packet_Splitter* packet_splitter, const INET_Addr& local_addr, void* pUserData); //! 析构函数 virtual ~SOCK_KCP_Server(); public: //! 创建kcp通道 //! @param local_addr upd本地绑定地址 //! @param handler 该连接的事件处理函数指针 //! @return 结果, 0:成功, -1失败 int create_kcp_server(int netbufsize); //! 获取通道id //! @return 通道id virtual uint32_t get_id(); //! 获取socket句柄 //! @return socket句柄 virtual SOCKET get_handle(); //! 处理读 //! @return 处理结果 0:处理正常, -1: 连接被关闭, -2:连接异常 virtual int handle_input(); //! 处理写 //! @return 处理结果 0:处理正常, -1: 连接被关闭, -2:连接异常 virtual int handle_output(); //! 连接异常 virtual int handle_exception(); //! 连接关闭 virtual int handle_close(); //! 超时 virtual int handle_timeout(); //! 提交发送任务 //! @param send_task 待发送的任务 virtual int post_packet(Net_Packet* send_packet); int send(const char* buf, int len, sockaddr_in& remote_addr); KcpChannel* get_session(ISTDUINT32& conv); bool add_session(ISTDUINT32& conv, KcpChannel* sock_kcp); void remove_session(ISTDUINT32& conv); void update(); private: //! 通道id uint32_t m_id; //! socket句柄 SOCKET m_socket; //! 本地监听地址 INET_Addr m_local_addr; //! 网络管理器 Net_Manager* m_net_manager; //! 接受缓存 Block_Buffer_T<RECV_BUFFER_LENGTH> m_recv_buffer; //! 该连接的事件处理函数指针 void* m_UserData; std::unordered_map<ISTDUINT32, KcpChannel*> m_sessions; };
#include "sock_kcp_server.h" #include "net_manager.h" #include "log.h" #include "ikcp.h" #include <sstream> #include <util.h> #define KCP_SERVER_TIMEOUT 100 SOCK_KCP_Server::SOCK_KCP_Server(Net_Manager* net_manager, Packet_Splitter* packet_splitter, const INET_Addr& local_addr,void* pUserData) { m_id = net_manager->m_id_manager.acquire(ID_UDP); assert(m_id != 0); m_socket = INVALID_SOCKET; m_net_manager = net_manager; m_UserData = pUserData; m_local_addr = local_addr; timeout = utils::Util::GetCurrentTimeMsec() + KCP_SERVER_TIMEOUT; } SOCK_KCP_Server::~SOCK_KCP_Server() { if (0 != m_id) { m_net_manager->m_id_manager.release(m_id); } if (INVALID_SOCKET != m_socket) { close(m_socket); m_socket = INVALID_SOCKET; } for (auto it : m_sessions) { if (it.second != NULL) { delete it.second; it.second = NULL; } } m_sessions.clear(); } KcpChannel* SOCK_KCP_Server::get_session(ISTDUINT32& conv) { auto it = m_sessions.find(conv); if (it != m_sessions.end()) { return it->second; } return NULL; } void SOCK_KCP_Server::remove_session(ISTDUINT32& conv) { auto it = m_sessions.find(conv); if (it != m_sessions.end()) { delete it->second; it->second = NULL; m_sessions.erase(it); } } bool SOCK_KCP_Server::add_session(ISTDUINT32& conv, KcpChannel* sock_kcp) { auto it = m_sessions.find(conv); if (it != m_sessions.end()) { return false; } m_sessions.insert(std::make_pair(conv, sock_kcp)); return true; } void SOCK_KCP_Server::update() { auto i_c = iclock(); for (auto it = m_sessions.begin(); it != m_sessions.end(); it++) { it->second->update(i_c); it->second->flush(); } } int SOCK_KCP_Server::create_kcp_server(int netbufsize) { m_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if (INVALID_SOCKET == m_socket) { LOG(ERROR)("SOCK_KCP_Server::create_tcp_server error, socket error, errno:%d", error_no()); return -1; } int nret = set_socket_nonblocking(m_socket); if (nret != 0) { LOG(INFO)("SOCK_KCP_Server::set_socket_nonblocking error, socket error, errno:%d", error_no()); return -1; } //设置网络底层收发缓冲区长度 nret = set_socket_bufsize(m_socket, true, netbufsize); int nsize = 0; nret = get_socket_bufsize(m_socket, true, nsize); if (nsize != netbufsize) { LOG(WARN)("setsockopt so_rcvbuf failed.to set size %d orgin size %d.", netbufsize, nsize); } nret = set_socket_bufsize(m_socket, false, netbufsize); nsize = 0; nret = get_socket_bufsize(m_socket, false, nsize); if (nsize != netbufsize) { LOG(WARN)("setsockopt so_sndbuf failed.to set size %d orgin size %d.", netbufsize, nsize); } int val = 1; setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, (char*)&val, sizeof(val)); sockaddr_in local; local.sin_family = AF_INET; local.sin_addr.s_addr = m_local_addr.get_addr(); local.sin_port = m_local_addr.get_port(); int rc = ::bind(m_socket, (sockaddr*)&local, sizeof(local)); if (0 != rc) { int err = error_no(); LOG(ERROR)("SOCK_KCP_Server::create_tcp_server error, bind error, errno:%d", err); close(m_socket); m_socket = INVALID_SOCKET; return -1; } LOG(INFO)("create new udp server:%d, %d\n", local.sin_addr.s_addr, local.sin_port); return 0; } uint32_t SOCK_KCP_Server::get_id() { return m_id; } SOCKET SOCK_KCP_Server::get_handle() { return m_socket; } int SOCK_KCP_Server::handle_input() { int r = 0; sockaddr_in from; socklen_t fromlen = sizeof(from); int datasize = 0; //一般kcp的mtu是1400字节 IKCP_MTU_DEF // 用户获得收到的数据 uint32_t curclock = iclock(); while (true) { sockaddr_in from; socklen_t fromlen = sizeof(from); int rc = recvfrom(m_socket, m_recv_buffer.wr_ptr(), m_recv_buffer.space_length(), 0, (sockaddr*)&from, &fromlen); if (rc > 0) { m_recv_buffer.wr_ptr(rc); auto conv = ikcp_getconv(m_recv_buffer.rd_ptr()); auto kcp_session = get_session(conv); if (NULL == kcp_session) { kcp_session = new KcpChannel(conv, std::bind(&SOCK_KCP_Server::send, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), from);//INET_Addr remote_addr; add_session(conv, kcp_session); } else { kcp_session->update_remote(from);//防止IP,端口地址发生变化,比如切换网络环境 } int packet_len = 0; int send_ret = kcp_session->input(m_recv_buffer.rd_ptr(), m_recv_buffer.data_length()); if (send_ret == 0) { curclock = iclock(); kcp_session->flush(); kcp_session->update(curclock); } m_recv_buffer.rd_ptr(m_recv_buffer.data_length()); if ((m_recv_buffer.data_length() == 0) || (m_recv_buffer.space_length() == 0)) { m_recv_buffer.recycle(); } while (true) { char* packet_buffer = NULL; bool ret = kcp_session->get_data(packet_buffer, packet_len); if (ret == false) { break; } Net_Event* net_event = new Net_Event; net_event->net_event_type = TYPE_DATA; net_event->id = m_id; net_event->new_id = 0; net_event->local_addr = m_local_addr; net_event->UserData = m_UserData; net_event->remote_addr.set_addr(from.sin_addr.s_addr); net_event->remote_addr.set_port(from.sin_port); net_event->packet.write_data(packet_buffer, packet_len); net_event->packet.remote_addr = net_event->remote_addr; net_event->packet.movetobegin(); net_event->packet.m_cmd.id = conv;//需要kcp上层再返回给网络层 m_net_manager->put_event(net_event); delete packet_buffer; packet_buffer = NULL; kcp_session->update(iclock()); kcp_session->flush(); } } else if (rc == 0) { // close LOG(WARN)("SOCK_KCP_Server::handle_input, socket close by peer"); return 0;//-1; } else { if (NETMANAGER_EAGAIN == error_no()) { return 0; } else { // exception LOG(WARN)("SOCK_KCP_Server::handle_input error, recvfrom error, errno:%d", error_no()); return 0;//-2; } } } if (datasize > 0) { update(); } return 0; } int SOCK_KCP_Server::handle_output() { // 如果当前正在发送的任务非空则先发送 return 0; } int SOCK_KCP_Server::handle_exception() { if (INVALID_SOCKET != m_socket) { close(m_socket); m_socket = INVALID_SOCKET; } Net_Event* net_event = new Net_Event; net_event->net_event_type = TYPE_EXCEPTION; net_event->id = m_id; net_event->new_id = 0; //net_event->local_addr = m_local_addr; net_event->UserData = m_UserData; m_net_manager->put_event(net_event); return 0; } int SOCK_KCP_Server::handle_close() { if (INVALID_SOCKET != m_socket) { close(m_socket); m_socket = INVALID_SOCKET; } if (notify_close) { return 0; } Net_Event* net_event = new Net_Event; net_event->net_event_type = TYPE_CLOSE; net_event->id = m_id; net_event->new_id = 0; net_event->local_addr = m_local_addr; // net_event->local_addr = m_local_addr; net_event->UserData = m_UserData; m_net_manager->put_event(net_event); return 0; } int SOCK_KCP_Server::handle_timeout() { timeout = utils::Util::GetCurrentTimeMsec() + KCP_SERVER_TIMEOUT; auto i_c = iclock(); for (auto it = m_sessions.begin(); it != m_sessions.end(); it++) { it->second->update(i_c); it->second->flush(); while (true) { int packet_len = 0; char* out_buf = NULL; bool ret = it->second->get_data(out_buf, packet_len); if (ret == false) { break; } Net_Event* net_event = new Net_Event; net_event->net_event_type = TYPE_DATA; net_event->id = m_id; net_event->new_id = 0; net_event->local_addr = m_local_addr; net_event->UserData = m_UserData; net_event->packet.write_data(out_buf, packet_len); net_event->packet.remote_addr = net_event->remote_addr; net_event->packet.movetobegin(); net_event->packet.m_cmd.id = it->second->m_conv;//需要kcp上层再返回给网络层 m_net_manager->put_event(net_event); } } return -1; } int SOCK_KCP_Server::post_packet(Net_Packet* send_packet) { // sync if too many send package auto clock = iclock(); ISTDUINT32 conv = send_packet->m_cmd.id; if (send_packet->m_cmd.cmd == Net_Packet::NET_DELETE) { LOG(INFO)("SOCK_KCP_Server::delete kcp conv"); remove_session(conv); return 0; } auto kcp_session = get_session(conv); if (kcp_session == NULL) { LOG(ERROR)("SOCK_KCP_Server::send_data error"); delete send_packet; return 0; } if (!kcp_session->try_send()) { //放到缓冲队列中 TODO,算了还是丢弃掉吧 LOG(ERROR)("SOCK_KCP_Server::post_packet error, start drop package"); delete send_packet; return 0; } int data_len = 0; char* send_buffer = send_packet->get_rptr(data_len); if (kcp_session->send((const char*)send_buffer, data_len) < 0) { LOG(ERROR)("SOCK_KCP_Server::send_data error"); delete send_packet; return 0; } kcp_session->update(clock); kcp_session->flush(); delete send_packet; // 监控可写事件 int rc = reactor()->register_handler(this, MASK_WRITE); if (0 != rc) { // 设置reactor失败, 认为socket异常 LOG(ERROR)("SOCK_KCP_Server::send_data error, register_handler error"); reactor()->notify_close(m_id); return 0; } return 0; } int SOCK_KCP_Server::send(const char* buf, int len, sockaddr_in& remote_addr) { socklen_t tolen = sizeof(remote_addr); //TODO 如果当前有发送的则推送到队列中。否则直接发送 const char* pBuf = buf; int curdatalen = len; int rc = 0; while (curdatalen > 0) { rc = sendto(m_socket, pBuf, curdatalen, 0, (sockaddr*)&remote_addr, tolen); if (rc > 0) { curdatalen -= rc; //m_current_send_length += rc; pBuf += rc; } else { if (NETMANAGER_EAGAIN == error_no()) { return false; } else { // exception int nErrCode = error_no(); LOG(WARN)("SOCK_KCP_Server::handle_output error, sendto error, errno:%d", nErrCode); return -1; } } } return 0; }