libjingle源码解析(5)-【PseudoTcp】建立UDP之上的TCP(3):对成块数据流的处理

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介:

PseudoTcp对成块数据流的处理

上一篇谈论了TCPPTCP交互数据流的处理方法。这一篇谈论另一个数据流--成块数据流。成块数据流主要采用滑动窗口协议和慢启动算法来控制成块数据的流量。

滑动窗口

    滑动窗口允许发送方在停止并等待确认前可以连续发送多个分组。因此发送方不必每发一个就停下来等待,这样可以加速数据的传输。这个Nagle算法冲突么?不会,因为成块数据流的分组都是满载传输的,根据Nagle算法,当等待发送数据的大小和窗口大小都大于MSS时,会立即发送。

    如果发送方一直传输数据会出现经常丢包的现象,特别是快的发送方发给慢的接收方。当接收方还没有处理数据,发送方就接连发来了数据会填满接收方的缓冲区,从而后续的数据将被丢弃,为了减少网络上丢包的次数,用一种机制来限制发送方传输数据。

因此出现了滑动窗口,如下图:


    滑动窗口分为4个部分:

        上图1~3为发送并确认的数据段

        上图4~6为已经发送,但是没有被确认的数据段

        上图7~9为可用的窗口,即滑动窗口,发送方还可以发送的数据段空间

        上图10以上为不能够发送。


    当接收方确认数据后,滑动窗口两边不断的向右移动。

        窗口合拢:当发送方发送数据并等待确认时,滑动窗口的左边向右移动。

        窗口张开:当接收方收到数据并确认且释放缓冲区数据时,右边向右移动。 

        窗口收缩:当接收方的缓冲区大小变小时,右边向左移动,但不建议使用这种方式。

    滑动窗口时通过窗口大小来更新。当接收方收到数据后,重新计算接收缓冲区的大小,并通告发送方。如果通告窗口大小为0,则发送方不能再发送数据,等到窗口大小为非0,这样可以有效的避免因接收方缓冲区满导致的分组的丢失。

    那么PTCP是怎么实现的呢?

    PTCP通过m_rbuf_len来标示接收缓冲区大小。如果缓冲区大小小于65536时,m_rwnd_scale0m_rcv_wnd标示窗口大小,而大于65535时,通过如下算法来调整m_rbuf_lenm_rwnd_scale。调整后根据缓冲区中可用空间来更新窗口大小m_rcv_wnd 。为什么选择65535为界限呢?因为在PTCP的头部中window字段的长度为16bit,只能支持窗口打小范围0~65535(包含65535)。

[cpp]   view plain copy
  1. void  
  2. PseudoTcp::resizeReceiveBuffer(uint32 new_size) {  
  3.   uint8 scale_factor = 0;  
  4.   //处理大于65536字节的缓冲区,更新scale_factor  
  5.   while (new_size > 0xFFFF) {  
  6.     ++scale_factor;  
  7.     new_size >>= 1;  
  8.   }  
  9.   new_size <<= scale_factor;//当缓冲区大小大于65535时,大小会被调整  
  10.   bool result = m_rbuf.SetCapacity(new_size);//更新缓冲区  
  11.   m_rbuf_len = new_size;//更新缓冲区大小  
  12.   m_rwnd_scale = scale_factor;//更新窗口扩大因子  
  13.   m_ssthresh = new_size;  
  14.   size_t available_space = 0;  
  15.   m_rbuf.GetWriteRemaining(&available_space);  
  16.   m_rcv_wnd = available_space;//更新可用窗口大小  
  17. }  

     当PTCP三次握手时,通过PTCP选项TCP_OPT_WND_SCALE来通告对方m_rwnd_scale的大小。

[cpp]   view plain copy
  1. void  
  2. PseudoTcp::queueConnectMessage() {  
  3.   talk_base::ByteBuffer buf(talk_base::ByteBuffer::ORDER_NETWORK);  
  4.   buf.WriteUInt8(CTL_CONNECT);  
  5.   if (m_support_wnd_scale) {//判断窗口扩大选项是否开启  
  6.     buf.WriteUInt8(TCP_OPT_WND_SCALE);//增加窗口扩大选项  
  7.     buf.WriteUInt8(1);  
  8.     buf.WriteUInt8(m_rwnd_scale);//窗口扩大扩大因子  
  9.   }  
  10.   m_snd_wnd = buf.Length();  
  11.   queue(buf.Data(), buf.Length(), true);  
  12. }  

    PTCP接收窗口扩大因子对应的控制包之后,通过parseOptions方法来解析此包如下:

[cpp]   view plain copy
  1. void  
  2. PseudoTcp::parseOptions(const char* data, uint32 len) {  
  3.   std::set<uint8> options_specified;  
  4.   talk_base::ByteBuffer buf(data, len);  
  5.   while (buf.Length()) {  
  6.     uint8 kind = TCP_OPT_EOL;  
  7.     buf.ReadUInt8(&kind);  
  8.     if (kind == TCP_OPT_EOL) {//判断是否到了缓冲区末  
  9.       break;  
  10.     } else if (kind == TCP_OPT_NOOP) {//空选项  
  11.       continue;  
  12.     }  
  13.     UNUSED(len);  
  14.     uint8 opt_len = 0;  
  15.     buf.ReadUInt8(&opt_len);  
  16.     if (opt_len <= buf.Length()) {  
  17.       applyOption(kind, buf.Data(), opt_len);//更新选项对应的值  
  18.       buf.Consume(opt_len);  
  19.     } else {  
  20.       return;  
  21.     }  
  22.     options_specified.insert(kind);  
  23.   }  
  24.   if (options_specified.find(TCP_OPT_WND_SCALE) == options_specified.end()) {  
  25.     if (m_rwnd_scale > 0) {  
  26.       resizeReceiveBuffer(DEFAULT_RCV_BUF_SIZE);//如果对端不支持窗口扩大因子,且本端的缓冲区大小超过了65535,则改为60K,因为必须两端都支持窗口扩大因子才能使用m_swnd_scale。  
  27.       m_swnd_scale = 0;  
  28.     }  
  29.   }  
  30. }  

    接收方调整窗口大小,如下:

    窗口合拢:当接收方收到数据时,会从窗口大小里减去把接收缓冲区消耗的数据大小。

[cpp]   view plain copy
  1. bool PseudoTcp::process(Segment& seg) {  
  2.     ......  
  3.       uint32 nOffset = seg.seq - m_rcv_nxt;  
  4.       talk_base::StreamResult result = m_rbuf.WriteOffset(seg.data, seg.len,  
  5.                                                           nOffset, NULL);  
  6.       ASSERT(result == talk_base::SR_SUCCESS);  
  7.       UNUSED(result);  
  8.       if (seg.seq == m_rcv_nxt) {//如果当前收到的分组恰好是下一个需要的分组  
  9.         m_rbuf.ConsumeWriteBuffer(seg.len);//消耗接收缓冲区  
  10.         m_rcv_nxt += seg.len;//更新下一个需要的分组  
  11.         m_rcv_wnd -= seg.len;//更新窗口大小,减去刚才消耗的缓冲区  
  12.         bNewData = true;  
  13.         RList::iterator it = m_rlist.begin();  
  14.         while ((it != m_rlist.end()) && (it->seq <= m_rcv_nxt)) {  
  15.           if (it->seq + it->len > m_rcv_nxt) {  
  16.             sflags = sfImmediateAck; // (Fast Recovery)  
  17.             uint32 nAdjust = (it->seq + it->len) - m_rcv_nxt;  
  18.             m_rbuf.ConsumeWriteBuffer(nAdjust);  
  19.             m_rcv_nxt += nAdjust;//之前收到的分组包含了下一个需要的seq number,调整m_rcv_nxt   
  20.             m_rcv_wnd -= nAdjust;//m_rcv_nxt增加了,且接收缓冲区被填充了,窗口大小也随之更新。  
  21.           }  
  22.           it = m_rlist.erase(it);  
  23.         }  
  24.       } else {//拿到的分组不是所需要的,但是有效的分组  
  25.         RSegment rseg;  
  26.         rseg.seq = seg.seq;  
  27.         rseg.len = seg.len;  
  28.         RList::iterator it = m_rlist.begin();  
  29.         while ((it != m_rlist.end()) && (it->seq < rseg.seq)) {  
  30.           ++it;  
  31.         }  
  32.         m_rlist.insert(it, rseg);//更新接收分组列表,当收到下一个所需要的分组时,重组恢复所用。  
  33.       }  
  34. ......  
  35. }  

    窗口张开当应用层调用Recv来获取PTCP接收的数据时,PTCP会把此部分数据清除,腾空缓冲区并扩大窗口大小。

[cpp]   view plain copy
  1. int PseudoTcp::Recv(char* buffer, size_t len) {  
  2.  ......  
  3.   talk_base::StreamResult result = m_rbuf.Read(buffer, len, &read, NULL);  
  4. ......  
  5.   size_t available_space = 0;  
  6.   m_rbuf.GetWriteRemaining(&available_space);//获取接收缓冲区可用空间  
  7.   if (uint32(available_space) - m_rcv_wnd >=  
  8.       talk_base::_min<uint32>(m_rbuf_len / 2, m_mss)) {  
  9.     bool bWasClosed = (m_rcv_wnd == 0); // !?! Not sure about this was closed business  
  10.     m_rcv_wnd = available_space;//更新窗口大小,此为窗口张开过程  
  11.     if (bWasClosed) {  
  12.       attemptSend(sfImmediateAck);//如果窗口大小从0变为有可用空间时,立即通告对方可以继续发送数据。  
  13.     }  
  14.   }  
  15.   return read;  
  16. }  

    通告窗口大小给对方:

[cpp]   view plain copy
  1. IPseudoTcpNotify::WriteResult PseudoTcp::packet(uint32 seq, uint8 flags,  
  2.                                                 uint32 offset, uint32 len) {  
  3.   ASSERT(HEADER_SIZE + len <= MAX_PACKET);  
  4.   uint32 now = Now();  
  5.   uint8 buffer[MAX_PACKET];  
  6.   long_to_bytes(m_conv, buffer);  
  7.   long_to_bytes(seq, buffer + 4);  
  8.   long_to_bytes(m_rcv_nxt, buffer + 8);  
  9.   buffer[12] = 0;  
  10.   buffer[13] = flags;  
  11.   short_to_bytes(static_cast<uint16>(m_rcv_wnd >> m_rwnd_scale), buffer + 14);//这里会把窗口扩大因子也算进去  
  12. ......  
  13. }  

    当发送方收到接收方发送的窗口大小后,可发送大小计算为窗口大小减去已经发送但未被确认的数据大小。

[cpp]   view plain copy
  1. void PseudoTcp::attemptSend(SendFlags sflags) {  
  2. ......  
  3.     uint32 nWindow = talk_base::_min(m_snd_wnd, cwnd);//接收方窗口大小  
  4.     uint32 nInFlight = m_snd_nxt - m_snd_una;//已经发送但未被确认的数据大小  
  5. uint32 nUseable = (nInFlight < nWindow) ? (nWindow - nInFlight) : 0;//发送方可发送数据大小  
  6. ......  
  7. }  

慢启动

    当接收方和发送方之间存在多个路由器和速率较慢的链路时,一些中间的路由器必须缓存分组。一开始发送方向接收方发送多个分组,可能会把缓存填满,这会严重降低TCP的吞吐量。

    TCP通过慢启动算法解决上述问题:首先设置拥塞窗口cwnd1,当发送方每收到一个ACK拥塞窗口加1个报文段。发送方取拥塞窗口和通告窗口的最小值为发送上限。拥塞窗口是发送方使用的流量控制,而通告窗口时接收方使用的流量控制。

    发送方首先发送一个报文段,当收到ACK时,cwnd变为2,可以发送2个报文段,当收到2ACKcwnd变为4,发送方可以发送4个报文段,依次类推,慢启动算法是指数增长的。

    PTCP实现慢启动算法如下:

    Cwnd初始值为2MSS,当收到ACKcwnd增加一个MSS

[cpp]   view plain copy
  1. Bool PseudoTcp::process(Segment& seg) {  
  2. ......  
  3.   // Check if this is a valuable ack  
  4.   if ((seg.ack > m_snd_una) && (seg.ack <= m_snd_nxt)) {  
  5. if (m_dup_acks >= 3) {  
  6. ......  
  7. }else{  
  8.       m_dup_acks = 0;  
  9.       // Slow start, congestion avoidance  
  10.       if (m_cwnd < m_ssthresh) {  
  11.         m_cwnd += m_mss;//当收到有效的ACK时,cwnd增加一个MSS。  
  12.       } else {  
  13.         m_cwnd += talk_base::_max<uint32>(1, m_mss * m_mss / m_cwnd);  
  14.       }  
  15. }  
  16.   }  
  17. ......  
  18. }  


    当发送方发送数据时,取窗口大小为通告窗口(m_snd_wnd和拥塞窗口(cwnd)的最小值,然后减去已经发送的未被确认的大小为当前可发送数据大小(nUseable )。

[cpp]   view plain copy
  1. void PseudoTcp::attemptSend(SendFlags sflags) {  
  2. ......  
  3.   while (true) {  
  4.     uint32 cwnd = m_cwnd;  
  5.     if ((m_dup_acks == 1) || (m_dup_acks == 2)) { // Limited Transmit  
  6.       cwnd += m_dup_acks * m_mss;  
  7.     }  
  8.     uint32 nWindow = talk_base::_min(m_snd_wnd, cwnd);//取窗口大小为通告窗口和拥塞窗口的最小值  
  9.     uint32 nInFlight = m_snd_nxt - m_snd_una;  
  10.     uint32 nUseable = (nInFlight < nWindow) ? (nWindow - nInFlight) : 0;//减去已经发送的未被确认的大小为当前可发送数据大小  
  11.     size_t snd_buffered = 0;  
  12.     m_sbuf.GetBuffered(&snd_buffered);  
  13.     uint32 nAvailable =  
  14.         talk_base::_min(static_cast<uint32>(snd_buffered) - nInFlight, m_mss);//已经缓存的数据中可发送数据大小  
  15. ......  
  16. }  
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
96 2
|
16天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
16天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
16天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
18天前
|
监控 网络协议 网络性能优化
不再困惑!一文搞懂TCP与UDP的所有区别
本文介绍网络基础中TCP与UDP的区别及其应用场景。TCP是面向连接、可靠传输的协议,适用于HTTP、FTP等需要保证数据完整性的场景;UDP是无连接、不可靠但速度快的协议,适合DNS、RIP等对实时性要求高的应用。文章通过对比两者在连接方式、可靠性、速度、流量控制和数据包大小等方面的差异,帮助读者理解其各自特点与适用场景。
|
27天前
|
存储 网络协议 安全
用于 syslog 收集的协议:TCP、UDP、RELP
系统日志是从Linux/Unix设备及网络设备生成的日志,可通过syslog服务器集中管理。日志传输支持UDP、TCP和RELP协议。UDP无连接且不可靠,不推荐使用;TCP可靠,常用于rsyslog和syslog-ng;RELP提供可靠传输和反向确认。集中管理日志有助于故障排除和安全审计,EventLog Analyzer等工具可自动收集、解析和分析日志。
111 2
|
2月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
60 12
|
1月前
|
PyTorch Shell API
Ascend Extension for PyTorch的源码解析
本文介绍了Ascend对PyTorch代码的适配过程,包括源码下载、编译步骤及常见问题,详细解析了torch-npu编译后的文件结构和三种实现昇腾NPU算子调用的方式:通过torch的register方式、定义算子方式和API重定向映射方式。这对于开发者理解和使用Ascend平台上的PyTorch具有重要指导意义。
|
2月前
|
网络协议
网络通信的基石:TCP/IP协议栈的层次结构解析
在现代网络通信中,TCP/IP协议栈是构建互联网的基础。它定义了数据如何在网络中传输,以及如何确保数据的完整性和可靠性。本文将深入探讨TCP/IP协议栈的层次结构,揭示每一层的功能和重要性。
82 5
|
2月前
|
网络协议 网络性能优化 数据处理
深入解析:TCP与UDP的核心技术差异
在网络通信的世界里,TCP(传输控制协议)和UDP(用户数据报协议)是两种核心的传输层协议,它们在确保数据传输的可靠性、效率和实时性方面扮演着不同的角色。本文将深入探讨这两种协议的技术差异,并探讨它们在不同应用场景下的适用性。
86 4

推荐镜像

更多