kcp 是一个快速可靠ARQ协议,相比于tcp,以 10%-20% 带宽浪费的代价换取了快 30%-40% 的传输速度。kcp 可以看做应用层协议,底层采用 udp 传输
kcp 协议栈
1、kcp 的协议特性
1.1、RTO 不翻倍
RTO(Retransmission TimeOut)
,重传超时时间。tcp x 2,kcp x 1.5,提高传输速度
kcp RTO
1.2、选择重传
tcp 丢包时会全部重传从该包开始以后的数据,而 kcp 选择性重传,只重传真正丢失的数据包。
1.3、快速重传
tcp 重传模式
- 超时重传:超过规定的时间 RTO 则重传
- 快速重传:收到三个
冗余ACK
,不去等待 RTO ,直接重传
这里指的是收到fastresend
个失序报文后,不等待超时,直接重传,减少丢包等待时间。
1.4、非延迟 ACK
tcp 为充分利用带宽,延迟发送 ACK,RTT 时间较大,延长了丢包时的判断过程。而 kcp 的 ACK 是否延迟发送可以调节。
kcp 非延迟应答
1.5、ACK + UNA
ARQ (自动重传请求,Automatic Repeat-reQuest
)模型响应有两种方式
UNA
:此编号前所有包已收到,tcpACK
:该编号包已收到
只用 UNA 将导致全部重传,只用 ACK 则丢失成本太高,以往协议都是二选其一。而 kcp 协议中,除去单独的 ACK 包(精确)外,所有包都有 UNA 信息。
1.6、非退让流控
KCP正常模式同TCP一样使用公平退让法则,即发送窗口大小由:发送缓存大小、接收端剩余接收缓存大小、丢包退让、慢启动这四要素决定。但传送及时性要求很高的小数据时,可选择仅用前两项来控制发送频率。以牺牲部分公平性及带宽利用率之代价,换取了流畅传输的效果。
KCP 实时性好,但带宽利用率较低,因为
- 非退让流控,不断尝试发送数据,有效包不多
- 每个包应答,占用一定的带宽
2、kcp 实现
UDP 收到的报文通过kcp_input
传递给 KCP,KCP 会对数据进行解包,重新封装成应用层用户数据,应用层通过kcp_recv
获取。应用层通过kcp_send
发送数据,KCP 会把用户数据拆分kcp报文,通过kcp_output
,以UDP 的方式发送。
KCP 源码流程图
kcp 源码流程图
KCP 源码架构
kcp 源码架构图
2.1、kcp 数据结构
kcp 报文结构:
kcp 报文
conv
:会话编号,通信双方必须一致。cmd
:报文类型
IKCP_CMD_ACK
确认命令IKCP_CMD_PUSH
数据推送命令IKCP_CMD_WASK
接收窗口询问大小命令IKCP_CMD_WINS
接收窗口大小告知命令
wnd
: 己方可用接收窗口大小,接收窗口大小 - 接收队列大小frg
:segmen t分片。0,最后一个分片。3 2 1 0
sn
:segment 报文的序列号。ts
:发送时间戳,用于计算RTO
和RTT
una
:待接收的序列号,其实确认号,表示该序列号之前的所有报文都收到了,可以删除len
:数据长度,DATA
的长度DATA
: 用户数据
kcp 使用的 Segment 定义如下
struct IKCPSEG { struct IQUEUEHEAD node; // 用来串接多个 KCP segment,即前向后向指针 IUINT32 conv; // 会话编号 IUINT32 cmd; // 报文类型 IUINT32 frg; // 分片 IUINT32 wnd; // 可用接收窗口大小(接收窗口大小-接收队列大小) IUINT32 ts; // 发送时刻的时间戳 IUINT32 sn; // 分片 segment 的序号 IUINT32 una; // 待接收消息序号 IUINT32 len; // 数据长度 IUINT32 resendts; // 下次超时重传该报文的时间戳 IUINT32 rto; // 重传超时时间 IUINT32 fastack; // 收到ack时该分片被跳过的次数,用于快速重传 IUINT32 xmit; // 记录了该报文被传输了几次 char data[1]; // 实际传输的数据 payload };
每一个 KCP 用户都需要调用 ikcp_create
创建一个 kcp 控制块 ikcpcb
。ikcpcb
结构用来实现整个 KCP 协议。
struct IKCPCB { IUINT32 conv; // 标识会话 IUINT32 mtu; // 最大传输单元,默认数据为1400,最小为50 IUINT32 mss; // 最大分片大小,不大于mtu IUINT32 state; // 连接状态(0xffffffff表示断开连接) IUINT32 snd_una; // 第一个未确认的包 IUINT32 snd_nxt; // 下一个待分配包的序号 IUINT32 rcv_nxt; // 待接收消息序号.为了保证包的顺序,接收方会维护一个接收窗口,接收窗口有一个起始序号rcv_nxt 以及尾序号rcv_nxt + rcv_wnd(接收窗口大小) IUINT32 ts_recent; IUINT32 ts_lastack; IUINT32 ssthresh; // 拥塞窗口的阈值 IINT32 rx_rttval; // RTT的变化量,代表连接的抖动情况 IINT32 rx_srtt; // smoothed round trip time,平滑后的RTT; IINT32 rx_rto; // 收ACK接收延迟计算出来的重传超时时间 IINT32 rx_minrto; // 最小重传超时时间 IUINT32 snd_wnd; // 发送窗口大小 IUINT32 rcv_wnd; // 接收窗口大小,本质上而言如果接收端一直不去读取数据则rcv_queue就会满(达到rcv_wnd) IUINT32 rmt_wnd; // 远端接收窗口大小 IUINT32 cwnd; // 拥塞窗口大小, 动态变化 IUINT32 probe; // 探查变量, IKCP_ASK_TELL表示告知远端窗口大小。IKCP_ASK_SEND表示请求远端告知窗口大小; IUINT32 current; IUINT32 interval; // 内部flush刷新间隔,对系统循环效率有非常重要影响, 间隔小了cpu占用率高, 间隔大了响应慢 IUINT32 ts_flush; // 下次flush刷新的时间戳 IUINT32 xmit; // 发送segment的次数, 当segment的xmit增加时,xmit增加(重传除外) IUINT32 nrcv_buf; // 接收缓存中的消息数量 IUINT32 nsnd_buf; // 发送缓存中的消息数量 IUINT32 nrcv_que; // 接收队列中消息数量 IUINT32 nsnd_que; // 发送队列中消息数量 IUINT32 nodelay; // 是否启动无延迟模式。无延迟模式rtomin将设置为0,拥塞控制不启动; IUINT32 updated; //是否调用过update函数的标识; IUINT32 ts_probe; // 下次探查窗口的时间戳; IUINT32 probe_wait; // 探查窗口需要等待的时间; IUINT32 dead_link; // 最大重传次数,被认为连接中断; IUINT32 incr; // 可发送的最大数据量; struct IQUEUEHEAD snd_queue; //发送消息的队列 struct IQUEUEHEAD rcv_queue; //接收消息的队列, 确认过用户可读取 struct IQUEUEHEAD snd_buf; //发送消息的缓存 struct IQUEUEHEAD rcv_buf; //接收消息的缓存 IUINT32 *acklist; //待发送的ack的列表 当收到一个数据报文时,将其对应的 ACK 报文的 sn 号以及时间戳 ts //同时加入到acklist 中,即形成如 [sn1, ts1, sn2, ts2 …] 的列表 IUINT32 ackcount; // 记录 acklist 中存放的 ACK 报文的数量 IUINT32 ackblock; // acklist 数组的可用长度,当 acklist 的容量不足时,需要进行扩容 void *user; // 指针,可以任意放置代表用户的数据,也可以设置程序中需要传递的变量; char *buffer; // 存储字节流信息 int fastresend; // 触发快速重传的重复ACK个数; int fastlimit; int nocwnd; // 取消拥塞控制 int stream; // 是否采用流传输模式 int logmask; // 日志的类型,如IKCP_LOG_IN_DATA,方便调试 int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);//发送消息的回调函数 void (*writelog)(const char *log, struct IKCPCB *kcp, void *user); // 写日志的回调函数 };
2.2、kcp 报文发送
KCP 中,数据发送流程分为:
- 上层应用调用
ikcp_send
将数据写入snd_queue
- 下层函数
ikcp_flush
决定将多少数据从snd_queue
移动到snd_buf
,进行发送
kcp 报文发送
ikcp_send
ikcp_send
ikcp_send
的功能:把用户发送的数据根据MSS
分片成 kcp 的数据包格式,插入待发送队列
分片方式
- 流模式:检测每个发送队列⾥的分片是否达到
MSS
,没有达到则用新的数据填充分片。 - 消息模式:将用户数据的每个分片设置 sn 和 frag,将分片后的数据存入发送队列,接收方通过 sn 和 frag 解包。即使⼀个分片的数据量可能不能达到MSS,也会作为⼀个包发送出去。
int ikcp_send(ikcpcb *kcp, const char *buffer, int len) { // 1、如果KCP开启流模式 if (kcp->stream != 0) { if (!iqueue_is_empty(&kcp->snd_queue)) { // 取出 snd_queue 中的最后一个报文,将其填充到 mss 的长度,设置frg为0 IKCPSEG *old = iqueue_entry(kcp->snd_queue.prev, IKCPSEG, node); // 旧分片内数据长度小于mss if (old->len < kcp->mss) { int capacity = kcp->mss - old->len; // 还能容纳的数据长度 int extend = (len < capacity)? len : capacity; // 需要填充的长度 seg = ikcp_segment_new(kcp, old->len + extend); // 新建segment assert(seg); if (seg == NULL) { return -2; } // 新分片添加到发送队列尾部 iqueue_add_tail(&seg->node, &kcp->snd_queue); // 拷贝旧分片的数据到新分片 memcpy(seg->data, old->data, old->len); // 将buffer中的数据也拷贝到新分片 if (buffer) { memcpy(seg->data + old->len, buffer, extend); buffer += extend; // buffer指向剩余数据的开头 } seg->len = old->len + extend; seg->frg = 0; len -= extend; // 更新len为剩余数据长度 iqueue_del_init(&old->node); // 删除old ikcp_segment_delete(kcp, old); } } if (len <= 0) { return 0; } } // 2、计算数据需要分成多少段报文 if (len <= (int)kcp->mss) count = 1; // mss 1376 + head 24 = mtu 1400 else count = (len + kcp->mss - 1) / kcp->mss; if (count >= (int)IKCP_WND_RCV) return -2; // 超过对方的初始接收窗口 if (count == 0) count = 1; // fragment // 3、将数据全部新建 segment 插入发送队列尾部,队列计数递增, frag 递减 for (i = 0; i < count; i++) { int size = len > (int)kcp->mss ? (int)kcp->mss : len; seg = ikcp_segment_new(kcp, size); assert(seg); if (seg == NULL) { return -2; } if (buffer && len > 0) { // 仍有待发送的数据 memcpy(seg->data, buffer, size); } seg->len = size; // 分片编号,逆序。流模式情况下分片编号不用填写 seg->frg = (kcp->stream == 0)? (count - i - 1) : 0; iqueue_init(&seg->node); iqueue_add_tail(&seg->node, &kcp->snd_queue); // 加入到 snd_queue 中 kcp->nsnd_que++; if (buffer) { buffer += size; } len -= size; } }
应用层调用 ikcp_send
之后将用户数据置入 snd_queue
中,当 kcp 调用 ikcp_flush
时才将数据从 snd_queue
中 移入到 snd_buf
中,然后调用 kcp->output()
发送。
ikcp_flush
ikcp_flush
ikcp_flush
的实现,主要可以分为如下几个部分:
检查 kcp->update 是否更新,未更新直接返回。kcp->update 由 ikcp_update
更新,上层应用需要每隔一段时间(10-100ms)调用 ikcp_update
来驱动 KCP 发送数据;
// 'ikcp_update' haven't been called. if (kcp->updated == 0) return;
准备将 acklist
中记录的 ACK 报文发送出去,即从 acklist
中填充 ACK 报文的 sn
和 ts
字段;
// flush acknowledges // 逐一获取 acklist 中的 sn 和 ts,编码成 segment,以流的方式凑够 MTU 发送 count = kcp->ackcount; // 需要应答的分片数量 for (i = 0; i < count; i++) { size = (int)(ptr - buffer); // 超过 MTU 大小直接发送 if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) { ikcp_output(kcp, buffer, size); ptr = buffer; // 新建分片 } ikcp_ack_get(kcp, i, &seg.sn, &seg.ts); // 应答包 ptr = ikcp_encode_seg(ptr, &seg); // 编码segment协议头 } kcp->ackcount = 0;
检查当前是否需要对远端窗口进行探测。由于 KCP 流量控制依赖于远端通知其可接受窗口的大小,一旦远端接受窗口 kcp->rmt_wnd
为0,那么本地将不会再向远端发送数据,因此就没有机会从远端接受 ACK 报文,从而没有机会更新远端窗口大小。在这种情况下,KCP 需要发送窗口探测报文到远端,待远端回复窗口大小后,后续传输可以继续:
// probe window size (if remote window size equals zero) // 1、远端窗口大小为0,需要发送窗口探测报文 if (kcp->rmt_wnd == 0) { // 初始化探测间隔和下一次探测时间 if (kcp->probe_wait == 0) { kcp->probe_wait = IKCP_PROBE_INIT; // 默认7秒探测 kcp->ts_probe = kcp->current + kcp->probe_wait; // 下一次探测时间 } else { //远端窗口为0,发送过探测请求,但是已经超过下次探测的时间 // 检测是否到了探测时间 if (_itimediff(kcp->current, kcp->ts_probe) >= 0) { // 更新探测间隔probe_wait if (kcp->probe_wait < IKCP_PROBE_INIT) kcp->probe_wait = IKCP_PROBE_INIT; kcp->probe_wait += kcp->probe_wait / 2; if (kcp->probe_wait > IKCP_PROBE_LIMIT) kcp->probe_wait = IKCP_PROBE_LIMIT; // 更新下次探测时间ts_probe kcp->ts_probe = kcp->current + kcp->probe_wait; // 更新探测变量probe为IKCP_ASK_SEND,发送探测消息 kcp->probe |= IKCP_ASK_SEND; } } } // 2、远端窗口正常,则不需要发送窗口探测 else { kcp->ts_probe = 0; // 更新下次探测时间为0 kcp->probe_wait = 0; // 更新探测窗口等待时间为0 }
将窗口探测报文和窗口回复报文发送出去
// flush window probing commands if (kcp->probe & IKCP_ASK_SEND) { seg.cmd = IKCP_CMD_WASK; // 窗口探测[询问对方窗口size] size = (int)(ptr - buffer); if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) { ikcp_output(kcp, buffer, size); ptr = buffer; } ptr = ikcp_encode_seg(ptr, &seg); } // flush window probing commands if (kcp->probe & IKCP_ASK_TELL) { seg.cmd = IKCP_CMD_WINS; // 窗口告知[告诉对方我方窗口size] size = (int)(ptr - buffer); if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) { ikcp_output(kcp, buffer, size); ptr = buffer; } ptr = ikcp_encode_seg(ptr, &seg); } kcp->probe = 0; //清空标识
计算本次发送可用的窗口大小,这里 KCP 采用了可以配置的策略,正常情况下,KCP 的窗口大小由发送窗口 snd_wnd
,远端接收窗口 rmt_wnd
以及根据流控计算得到的 kcp->cwnd
共同决定;但是当开启了 nocwnd
模式时,窗口大小仅由前两者决定;
// calculate window size // 若没有流控,取发送窗口和远端接收窗口最小值 cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd); // 若存在流控,则取当前拥塞窗口、发送窗口和远端接收窗口三者最小值 if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);
将缓存在 snd_queue
中的数据移到 snd_buf
中等待发送
// move data from snd_queue to snd_buf // 从snd_queue移动到snd_buf的数量不能超出对方的接收能力,发送符合拥塞范围的分片 while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) { IKCPSEG *newseg; if (iqueue_is_empty(&kcp->snd_queue)) break; newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node); iqueue_del(&newseg->node); iqueue_add_tail(&newseg->node, &kcp->snd_buf); // 添加到发送缓存 kcp->nsnd_que--; kcp->nsnd_buf++; //设置数据分片的属性 newseg->conv = kcp->conv; newseg->cmd = IKCP_CMD_PUSH; newseg->wnd = seg.wnd; // 告知对方当前的接收窗口 newseg->ts = current; // 当前时间 newseg->sn = kcp->snd_nxt++; // 序号 newseg->una = kcp->rcv_nxt; // 告诉对方可以发送的下一个包序号 newseg->resendts = current; // 当前发送的时间 newseg->rto = kcp->rx_rto; // 超时重传的时间 newseg->fastack = 0; // 是否快速重传 newseg->xmit = 0; // 重传次数 }
在发送数据之前,先设置快重传的次数和重传间隔;KCP 允许设置快重传的次数,即 fastresend
参数。例如设置 fastresend
为2,并且发送端发送了1,2,3,4,5几个包,收到远端的ACK: 1, 3, 4, 5,当收到ACK3时,KCP知道2被跳过1次,收到ACK4时,知道2被“跳过”了2次,此时可以认为2号丢失,不用等超时,直接重传2号包;每个报文的 fastack
记录了该报文被跳过了几次,由函数 ikcp_parse_fastack
更新。于此同时,KCP 也允许设置 nodelay
参数,当激活该参数时,每个报文的超时重传时间将由 x2 变为 x1.5,即加快报文重传:
// calculate resent // 是否设置快重传次数 resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff; // 是否开启nodelay rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0;
将 snd_buf
中的数据发送出去
// flush data segments // 发送snd buf的分片,只要数据还在snd_buf 说明对方还没有应答 // 1、新的报文,正常发送 // 2、超时重传 // 3、快速重传(如果有) for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) { IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node); int needsend = 0; // 1、如果该报文是第一次传输,那么直接发送 if (segment->xmit == 0) { needsend = 1; segment->xmit++; // 分片发送次数 + 1 segment->rto = kcp->rx_rto; // 超时时间间隔 segment->resendts = current + segment->rto + rtomin; // 下一次要发送的时间 } // 2、当前时间达到了该报文的重传时间,但并没有新的ack到达,出现丢包, 重传 else if (_itimediff(current, segment->resendts) >= 0) { needsend = 1; segment->xmit++; kcp->xmit++; // 根据 nodelay 参数更新重传时间 if (kcp->nodelay == 0) { segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto); } else { IINT32 step = (kcp->nodelay < 2)? ((IINT32)(segment->rto)) : kcp->rx_rto; segment->rto += step / 2; //报文超时等待时间更新,控制RTO=1.5 } segment->resendts = current + segment->rto; //下一次发送的时间 lost = 1; // 丢包,反应到拥塞控制策略去了 } // 3、该报文的的被跳过次数超过设置的快速重传次数,需要重传 else if (segment->fastack >= resent) { if ((int)segment->xmit <= kcp->fastlimit || kcp->fastlimit <= 0) { needsend = 1; segment->xmit++; segment->fastack = 0; // 重置该分片被跳过的次数 segment->resendts = current + segment->rto; change++; // 标识快速重传的发生 } } // 需要发送数据 if (needsend) { int need; segment->ts = current; segment->wnd = seg.wnd; // 己方可用接收窗口大小 segment->una = kcp->rcv_nxt; // 待接收的下一个包序号 size = (int)(ptr - buffer); need = IKCP_OVERHEAD + segment->len; // 小包封装成大包发送 if (size + need > (int)kcp->mtu) { ikcp_output(kcp, buffer, size); ptr = buffer; } // 把segment封装成线性buffer发送 头部+数据 ptr = ikcp_encode_seg(ptr, segment); if (segment->len > 0) { memcpy(ptr, segment->data, segment->len); ptr += segment->len; } if (segment->xmit >= kcp->dead_link) { kcp->state = (IUINT32)-1; } } } // flash remain segments size = (int)(ptr - buffer); // 剩余的数据 // 最终只要有数据要发送,一定发出去 if (size > 0) { ikcp_output(kcp, buffer, size); }
根据设置的 lost
和 change
更新窗口大小;注意 快重传和丢包时的窗口更新算法不一致,这一点类似于 TCP 协议的拥塞控制和快恢复算法
// update ssthresh //如果发生了快速重传,拥塞窗口阈值降低为当前未确认包数量的一半或最小值 if (change) { IUINT32 inflight = kcp->snd_nxt - kcp->snd_una; kcp->ssthresh = inflight / 2; if (kcp->ssthresh < IKCP_THRESH_MIN) kcp->ssthresh = IKCP_THRESH_MIN; kcp->cwnd = kcp->ssthresh + resent; // 动态调整拥塞控制窗口 kcp->incr = kcp->cwnd * kcp->mss; } // 如果发生了丢包,阈值减半, cwd 窗口保留为 1 if (lost) { kcp->ssthresh = cwnd / 2; if (kcp->ssthresh < IKCP_THRESH_MIN) kcp->ssthresh = IKCP_THRESH_MIN; kcp->cwnd = 1; // 动态调整拥塞控制窗口 kcp->incr = kcp->mss; } if (kcp->cwnd < 1) { kcp->cwnd = 1; kcp->incr = kcp->mss; }
2.3、kcp 报文接收
kcp 报文接收
ikcp_recv
应用层接收函数为 ikcp_recv
,主要做三件事
- 读取组好包的数据 rcv_queue -> 用户 buffer
- 将接收缓存 rcv_buf 的分片转移到接收队列 rcv_queue
- 如果有接收空间则将
kcp->probe |= IKCP_ASK_TELL
; 以在update的时候告知对方可以发送数据了。
首先检测一下本次接收数据之后,是否需要进行窗口恢复。在前面的内容中解释过,KCP 协议在远端窗口为0的时候将会停止发送数据,此时如果远端调用 ikcp_recv
将数据从 rcv_queue
中移动到应用层 buffer 中之后,表明其可以再次接受数据,为了能够恢复数据的发送,远端可以主动发送 IKCP_ASK_TELL
来告知窗口大小;
if (kcp->nrcv_que >= kcp->rcv_wnd) recover = 1; // 标记可以开始窗口恢复
开始将 rcv_queue
中的数据根据分片编号 frg
merge 起来,然后拷贝到用户的 buffer 中。
// merge fragment // 将属于同一个消息的各分片重组完整数据,并删除rcv_queue中segment,nrcv_que减少 // 经过 ikcp_send 发送的数据会进行分片,分片编号为倒序序号,因此frg为0的数据包标记着完整接收到了一次 send 发送过来的数据 for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) { int fragment; seg = iqueue_entry(p, IKCPSEG, node); p = p->next; if (buffer) { memcpy(buffer, seg->data, seg->len); // 把queue的数据就放入用户buffer buffer += seg->len; } len += seg->len; fragment = seg->frg; if (ikcp_canlog(kcp, IKCP_LOG_RECV)) { ikcp_log(kcp, IKCP_LOG_RECV, "recv sn=%lu", (unsigned long)seg->sn); } if (ispeek == 0) { iqueue_del(&seg->node); ikcp_segment_delete(kcp, seg); // 删除节点 kcp->nrcv_que--; // nrcv_que接收队列-1 } // frg = 0,完整的数据接收到, 本次数据接收结束 if (fragment == 0) // break; }
下一步将 rcv_buf
中的数据转移到 rcv_queue
中,这个过程根据报文的 sn
编号来确保转移到 rcv_queue
中的数据一定是按序的:
// move available data from rcv_buf -> rcv_queue // 将 rcv_buf 中的数据转移到 rev_queue // 根据报文的sn来确保转移到 rcv_queue 中的数据一定是按序的 while (! iqueue_is_empty(&kcp->rcv_buf)) { seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node); // 1、根据 sn 确保数据是按序转移到 rcv_queue 中 // 2、接收队列nrcv_que < 接收窗口rcv_wnd; if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) { iqueue_del(&seg->node); kcp->nrcv_buf--; iqueue_add_tail(&seg->node, &kcp->rcv_queue); kcp->nrcv_que++; // 接收队列 有多少个分片 + 1 kcp->rcv_nxt++; // 接收序号 + 1 } else { break; } }
最后进行窗口恢复。此时如果 recover 标记为1,表明在此次接收之前,可用接收窗口为0,如果经过本次接收之后,可用窗口大于0,将主动发送 IKCP_ASK_TELL
数据包来通知对方已可以接收数据:
// fast recover // nrcv_que小于rcv_wnd, 说明接收端有空间继续接收数据了 if (kcp->nrcv_que < kcp->rcv_wnd && recover) { // ready to send back IKCP_CMD_WINS in ikcp_flush // tell remote my window size kcp->probe |= IKCP_ASK_TELL; }
ikcp_input
ikcp_input
ikcp_recv
仅为上层调用的接口,kcp 协议需要从底层接受数据到 rcv_buf
中,这是通过函数 ikcp_input
实现。ikcp_input
中的所有功能都在一个外层的循环中实现:
首先将接收到的数据包进行解码,并进行基本的数据包长度和类型校验;kcp 协议只会接收到前文中所介绍的四种数据包;
调用 ikcp_parse_una
来确定已经发送的数据包有哪些被对方接收到。kcp 中所有的报文类型均带有 una
信息。发送端发送的数据都会缓存在 snd_buf
中,直到接收到对方确认信息之后才会删除。当接收到 una
信息后,表明 sn
小于 una
的数据包都已经被对方接收到,因此可以直接从 snd_buf
中删除。同时调用 ikcp_shrink_buf
来更新 kcp 控制块的 snd_una
数值。
// 删除小于snd_buf中小于una的segment ikcp_parse_una(kcp, una); // 更新snd_una为snd_buf中seg->sn或kcp->snd_nxt ,更新下一个待应答的序号 ikcp_shrink_buf(kcp);
处理 IKCP_CMD_ACK
报文
if (cmd == IKCP_CMD_ACK) { if (_itimediff(kcp->current, ts) >= 0) { // 根据应答判断rtt //更新rx_srtt,rx_rttval,计算kcp->rx_rto ikcp_update_ack(kcp, _itimediff(kcp->current, ts)); } //遍历snd_buf中(snd_una, snd_nxt),将sn相等的删除,直到大于sn ikcp_parse_ack(kcp, sn); // 将已经ack的分片删除 ikcp_shrink_buf(kcp); // 更新控制块的 snd_una if (flag == 0) { flag = 1; //快速重传标记 maxack = sn; // 记录最大的 ACK 编号 latest_ts = ts; } else { if (_itimediff(sn, maxack) > 0) { maxack = sn; // 记录最大的 ACK 编号 latest_ts = ts; } }
处理 IKCP_CMD_PUSH
报文
else if (cmd == IKCP_CMD_PUSH) { //接收到具体的数据包 if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) { // 对该报文的确认 ACK 报文放入 ack 列表中 ikcp_ack_push(kcp, sn, ts); // 判断接收的数据分片编号是否符合要求,即:在接收窗口(滑动窗口)范围之内 if (_itimediff(sn, kcp->rcv_nxt) >= 0) { // 是要接受起始的序号 seg = ikcp_segment_new(kcp, len); seg->conv = conv; seg->cmd = cmd; seg->frg = frg; seg->wnd = wnd; seg->ts = ts; seg->sn = sn; seg->una = una; seg->len = len; if (len > 0) { memcpy(seg->data, data, len); } // 将该报文插入到 rcv_buf 链表中 ikcp_parse_data(kcp, seg); } } }
对于接收到的 IKCP_CMD_WASK
报文,直接标记下次将发送窗口通知报文;而对于报文 IKCP_CMD_WINS
无需做任何特殊操作;
else if (cmd == IKCP_CMD_WASK) { // ready to send back IKCP_CMD_WINS in ikcp_flush // tell remote my window size // 如果是探测包,添加相应的标识位 kcp->probe |= IKCP_ASK_TELL; } else if (cmd == IKCP_CMD_WINS) { // do nothing,如果是 tell me 远端窗口大小,什么都不做 }
据记录的最大的 ACK 编号 maxack
来更新 snd_buf
中的报文的 fastack
,这个过程在介绍 ikcp_flush
中提到过,对于 fastack
大于设置的 resend
参数时,将立马进行快重传;
最后,根据接收到报文的 una
和 kcp 控制块的 una
参数进行流控;