GPDB-内核特性-UDPIFC超时重传
GreenPlum默认使用UDP协议进行数据传输。发生网络拥塞时,实现了超时重传以解决拥塞。
1、unack_queue_ring队列
超时重传的基础数据结构是unack_queue_ring队列。如图所示:
1)currentTime用来标记checkExpiration位于哪个时间周期。发送数据包时回更新该值:
sendBuffers第一次发送时,unack_queue_ring.currentTime更新为发送的时间(会调整到TIMER_SPAN周期内)
checkExpiration检查超时:now为调用该函数时时间。距离上次checkExpiration检测超时时间超过TIMER_CHECKING_PERIOD(gp_interconnect_timer_checking_period,默认20ms)才会再次进入checkExpiration,并更新为检查周期的起使时间:从checkExpiration函数中可知,进入checkExpiration时,距上次已经超过了5ms,则判定超时了,需要进行重发:
handleAckForDisorderPkt调用putIntoUnackQueueRing可能会更新currentTime值:
#define TIMER_SPAN (Gp_interconnect_timer_period * 1000ULL) //5ms
UDP interconnect timer周期,默认5ms
2)idx:slots的下标。currentTime对应的槽位。
3)numOutStanding:unack_queue_ring中未处理包的个数
4)numSharedOutStanding:拥塞窗口中使用共享带宽的未处理包的个数
5)ICBufferList slots[UNACK_QUEUE_RING_SLOTS_NUM]:slots数组,大小2000。每个槽位都是一个链表,表示一个时间段内的所有ICBuffer包
2、超时重传机制
1)执行器启动时初始化lastExpirationCheckTime时间,可以认为最初是ExecutorStart的时间戳
2)SendChunkUDPIFC函数发送数据包。
(1)首先调用sendBuffers函数将发送队列conn->sndQueue中的所有ICBuffer数据包都发送。由于向该队列放的时候是向队列尾放,所以从队列头开始发送,先发送最老的包。
(2)从发送队列sndQueue中pop出一个ICBuffer,并放到未接收ack的队列unackQueue中。同样向队列尾放。
(3)会根据此时的时间戳方将ICBuffer放到unack_queue_ring中,最后发送走。将ICBuffer放到unack_queue_ring的操作见第3)步。
(4)判断此时距离上次超时检测是否超过50ms。
(5)超过50ms:pollAcks->poll超时时间是0,不阻塞立即返回。若有事件,则调用handleAcks处理接收的ack,否则直接调用checkExceptions进行超时检测
(6)未超过50ms:通过computeTimeout计算出超时时间,和ICBuffer的重发次数有关。
- unackQueue队列为空,即发送的包都收到ack了:timeout为20ms
- unackQueue队列头中的ICBuffer(最老的包)没有重发过,并且没有因无可用ICBuffer而checkExceptions检测过:timeout为0ms
- 其他情况超时时间为20ms
static inline int computeTimeout(MotionConn *conn, int retry) { if (icBufferListLength(&conn->unackQueue) == 0) return TIMER_CHECKING_PERIOD;//gp_interconnect_timer_checking_period:20ms ICBufferLink *bufLink = icBufferListFirst(&conn->unackQueue); ICBuffer *buf = GET_ICBUFFER_FROM_PRIMARY(bufLink); if (buf->nRetry == 0 && retry == 0) return 0; if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS) return TIMER_CHECKING_PERIOD;//20ms /* for capacity based flow control */ return TIMEOUT(buf->nRetry); }
3)通过PutIntoUnackQueueRing将ICBuffer放到unack_queue_ring.slogs[]中,需要先计算下超时时间expTime:
computeExpirationPeriod(curBuf->conn, curBuf->nRetry)://包重发的次数 uint32 factor = (retry <= 12 ? retry : 12);//factor次数不能超过12次 return Max(MIN_EXPIRATION_PERIOD, Min(MAX_EXPIRATION_PERIOD, (conn->rtt + (conn->dev << 2)) << (factor)))
其中:
#define MIN_EXPIRATION_PERIOD (Gp_interconnect_min_rto * 1000)/* 默认: 20ms */
#define MAX_EXPIRATION_PERIOD (1000 * 1000) /* 1s */
conn->rtt:表示往返时延
conn->dev:表示偏差
expTime超时时间随着重发次数的增加会变大,但最小为20ms,最大不能超过1s
然后,看下slots[]使用机制:
#define UNACK_QUEUE_RING_LENGTH (UNACK_QUEUE_RING_SLOTS_NUM * TIMER_SPAN) static void putIntoUnackQueueRing(UnackQueueRing *uqr, ICBuffer *buf, uint64 expTime, uint64 now) { if (uqr->currentTime == 0)//ExecutorStart执行,第一次发送数据包 uqr->currentTime = now - (now % TIMER_SPAN); diff = now + expTime - uqr->currentTime;//现在+超时时间后,位于哪个超时段slots[] if (diff >= UNACK_QUEUE_RING_LENGTH) diff = UNACK_QUEUE_RING_LENGTH - 1; else if (diff < TIMER_SPAN) diff = TIMER_SPAN;//[5ms,10s) idx = (uqr->idx + diff / TIMER_SPAN) % UNACK_QUEUE_RING_SLOTS_NUM; buf->unackQueueRingSlot = idx;//2000ms内,每5ms一个slot icBufferListAppend(&unack_queue_ring.slots[idx], buf); }
4)每隔20ms进行一次超时检测。超时检测的当前时间之前的还未收到ack的ICBuffer都需要重发。从最开始的时间段slots[unack_queue_ring.idx](对应unack_queue_ring.currentTime时间段)开始,每隔5ms看下是否超过现在时间,若没有超过,即表示距离上次检测开始:发送的包在超时时间内还没收到ack(因为会加上超时时间后再定位在哪个slots[]中),这时就需要重发了(即语义:超过超时时间还未收到ack,重发)。那么最近刚放进来的包,会不会又立即重发呢?当然不会,因为最近刚放进来的包,也是加了它的超时时间再定位到slots[]的,若在当前now时间内,则表示超过超时时间还未收到ack,若不再则不会重发。
3、总结
- 当发送一个包时,会计算一个超时时间expTime,该超时时间至少20ms,不超过1s,然后now-currentTime+expTime定位到超时时间段的slots[]数组中,如图所示,放到队列尾部。
- 当超时检测时间now1时,将now1所在slots[now1]之前以知道currentTime的全部重发,而now时间发送过的,因为在now1之后,则不重发。
- 当超时检测时间now2时,now发送的就需要重发了。
这样,至少20ms内没有接收到ack的都需要重发。