C# 高性能 TCP 服务的多种实现方式

本文涉及的产品
应用实时监控服务-应用监控,每月50GB免费额度
应用实时监控服务-用户体验监控,每月100OCU免费额度
简介:

哎~~ 想想大部分园友应该对 "高性能" 字样更感兴趣,为了吸引眼球所以标题中一定要突出,其实我更喜欢的标题是《猴赛雷,C# 编写 TCP 服务的花样姿势!》

本篇文章的主旨是使用 .NET/C# 实现 TCP 高性能服务的不同方式,包括但不限于如下内容:

在 .NET/C# 中对于 Socket 的支持均是基于 Windows I/O Completion Ports 完成端口技术的封装,通过不同的 Non-Blocking 封装结构来满足不同的编程需求。以上方式均已在 Cowboy.Sockets 中有完整实现,并且 APM 和 TAP 方式已经在实际项目中应用。Cowboy.Sockets 还在不断的进化和完善中,如有任何问题请及时指正。

虽然有这么多种实现方式,但抽象的看,它们是一样一样的,用两个 Loop 即可描述:Accept Loop和 Read Loop,如下图所示。(这里提及的 "Loop" 指的是一种循环方式,而非特指 while/for 等关键字。)

  • 在任何 TCP Server 的实现中,一定存在一个 Accept Socket Loop,用于接收 Client 端的 Connect 请求以建立 TCP Connection。
  • 在任何 TCP Server 的实现中,一定存在一个 Read Socket Loop,用于接收 Client 端 Write 过来的数据。

如果 Accept 循环阻塞,则会导致无法快速的建立连接,服务端 Pending Backlog 满,进而导致 Client 端收到 Connect Timeout 的异常。如果 Read 循环阻塞,则显然会导致无法及时收到 Client 端发过来的数据,进而导致 Client 端 Send Buffer 满,无法再发送数据。

从实现细节的角度看,能够导致服务阻塞的位置可能在:

  1. Accept 到新的 Socket,构建新的 Connection 需要分配各种资源,分配资源慢;
  2. Accept 到新的 Socket,没有及时触发下一次 Accept;
  3. Read 到新的 Buffer,判定 Payload 消息长度,判定过程长;
  4. Read 到新的 Buffer,发现 Payload 还没有收全,继续 Read,则 "可能" 会导致一次 Buffer Copy;
  5. Payload 接收完毕,进行 De-Serialization 转成可识别的 Protocol Message,反序列化慢;
  6. 由 Business Module 来处理相应的 Protocol Message,处理过程慢;

1-2 涉及到 Accept 过程和 Connection 的建立过程,3-4 涉及到 ReceiveBuffer 的处理过程,5-6 涉及到应用逻辑侧的实现。

Java 中著名的 Netty 网络库从 4.0 版本开始对于 Buffer 部分做了全新的尝试,采用了名叫 ByteBuf 的设计,实现 Buffer Zero Copy 以减少高并发条件下 Buffer 拷贝带来的性能损失和 GC 压力。DotNettyOrleans ,Helios 等项目正在尝试在 C# 中进行类似的 ByteBuf 的实现。

APM 方式:TcpSocketServer

TcpSocketServer 的实现是基于 .NET Framework 自带的 TcpListener 和 TcpClient 的更进一步的封装,采用基于 APM 的 BeginXXX 和 EndXXX 接口实现。

TcpSocketServer 中的 Accept Loop 指的就是,

  • BeginAccept -> EndAccept-> BeginAccept -> EndAccept -> BeginAccept -> ...

每一个建立成功的 Connection 由 TcpSocketSession 来处理,所以 TcpSocketSession 中会包含 Read Loop,

  • BeginRead -> EndRead -> BeginRead -> EndRead -> BeginRead -> ...

TcpSocketServer 通过暴露 Event 来实现 Connection 的建立与断开和数据接收的通知。

  event EventHandler<TcpClientConnectedEventArgs> ClientConnected;
  event EventHandler<TcpClientDisconnectedEventArgs> ClientDisconnected;
  event EventHandler<TcpClientDataReceivedEventArgs> ClientDataReceived;

使用也是简单直接,直接订阅事件通知。

复制代码
  private static void StartServer()
  {
      _server = new TcpSocketServer(22222);
      _server.ClientConnected += server_ClientConnected;
      _server.ClientDisconnected += server_ClientDisconnected;
      _server.ClientDataReceived += server_ClientDataReceived;
      _server.Listen();
  }
  
  static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session));
  }
  
  static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session));
  }
  
  static void server_ClientDataReceived(object sender, TcpClientDataReceivedEventArgs e)
  {
      var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength);
      Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session));
      Console.WriteLine(string.Format("{0}", text));
      _server.Broadcast(Encoding.UTF8.GetBytes(text));
  }
复制代码

TAP 方式:AsyncTcpSocketServer

AsyncTcpSocketServer 的实现是基于 .NET Framework 自带的 TcpListener 和 TcpClient 的更进一步的封装,采用基于 TAP 的 async/await 的 XXXAsync 接口实现。

然而,实际上 XXXAsync 并没有创建什么神奇的效果,其内部实现只是将 APM 的方法转换成了 TAP 的调用方式。

复制代码
  //************* Task-based async public methods *************************
  [HostProtection(ExternalThreading = true)]
  public Task<Socket> AcceptSocketAsync()
  {
      return Task<Socket>.Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null);
  }
  
  [HostProtection(ExternalThreading = true)]
  public Task<TcpClient> AcceptTcpClientAsync()
  {
      return Task<TcpClient>.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null);
  }
复制代码

AsyncTcpSocketServer 中的 Accept Loop 指的就是,

  while (IsListening)
  {
      var tcpClient = await _listener.AcceptTcpClientAsync();
  }

每一个建立成功的 Connection 由 AsyncTcpSocketSession 来处理,所以 AsyncTcpSocketSession 中会包含 Read Loop,

  while (State == TcpSocketConnectionState.Connected)
  {
      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);
  }

为了将 async/await 异步到底,AsyncTcpSocketServer 所暴露的接口也同样是 Awaitable 的。

复制代码
  public interface IAsyncTcpSocketServerMessageDispatcher
  {
      Task OnSessionStarted(AsyncTcpSocketSession session);
      Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(AsyncTcpSocketSession session);
  }
复制代码

使用时仅需将一个实现了该接口的对象注入到 AsyncTcpSocketServer 的构造函数中即可。

复制代码
  public class SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher
  {
      public async Task OnSessionStarted(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          await Task.CompletedTask;
      }
  
      public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      public async Task OnSessionClosed(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
          await Task.CompletedTask;
      }
  }
复制代码

当然,对于接口的实现也不是强制了,也可以在构造函数中直接注入方法的实现。

复制代码
  public AsyncTcpSocketServer(
      IPEndPoint listenedEndPoint,
      Func<AsyncTcpSocketSession, byte[], int, int, Task> onSessionDataReceived = null,
      Func<AsyncTcpSocketSession, Task> onSessionStarted = null,
      Func<AsyncTcpSocketSession, Task> onSessionClosed = null,
      AsyncTcpSocketServerConfiguration configuration = null)
  {}
复制代码

SAEA 方式:TcpSocketSaeaServer

SAEA 是 SocketAsyncEventArgs 的简写。SocketAsyncEventArgs 是 .NET Framework 3.5 开始支持的一种支持高性能 Socket 通信的实现。SocketAsyncEventArgs 相比于 APM 方式的主要优点可以描述如下:

The main feature of these enhancements is the avoidance of the repeated allocation and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket operation.

也就是说,优点就是无需为每次调用都生成 IAsyncResult 等对象,向原生 Socket 更靠近一些。

使用 SocketAsyncEventArgs 的推荐步骤如下:

  1. Allocate a new SocketAsyncEventArgs context object, or get a free one from an application pool.
  2. Set properties on the context object to the operation about to be performed (the callback delegate method and data buffer, for example).
  3. Call the appropriate socket method (xxxAsync) to initiate the asynchronous operation.
  4. If the asynchronous socket method (xxxAsync) returns true in the callback, query the context properties for completion status.
  5. If the asynchronous socket method (xxxAsync) returns false in the callback, the operation completed synchronously. The context properties may be queried for the operation result.
  6. Reuse the context for another operation, put it back in the pool, or discard it.

重点在于池化(Pooling),池化的目的就是为了重用和减少运行时分配和垃圾回收的压力。

TcpSocketSaeaServer 即是对 SocketAsyncEventArgs 的应用和封装,并实现了 Pooling 技术。TcpSocketSaeaServer 中的重点是 SaeaAwaitable 类,SaeaAwaitable 中内置了一个 SocketAsyncEventArgs,并通过 GetAwaiter 返回 SaeaAwaiter 来支持 async/await 操作。同时,通过 SaeaExtensions 扩展方法对来扩展 SocketAsyncEventArgs 的 Awaitable 实现。

  public static SaeaAwaitable AcceptAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable ConnectAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable DisonnectAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable ReceiveAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable SendAsync(this Socket socket, SaeaAwaitable awaitable)

SaeaPool 则是一个 QueuedObjectPool<SaeaAwaitable> 的衍生实现,用于池化 SaeaAwaitable 实例。同时,为了减少 TcpSocketSaeaSession 的构建过程,也实现了 SessionPool 即 QueuedObjectPool<TcpSocketSaeaSession>。

TcpSocketSaeaServer 中的 Accept Loop 指的就是,

复制代码
  while (IsListening)
  {
      var saea = _acceptSaeaPool.Take();
  
      var socketError = await _listener.AcceptAsync(saea);
      if (socketError == SocketError.Success)
      {
          var acceptedSocket = saea.Saea.AcceptSocket;
      }
  
      _acceptSaeaPool.Return(saea);
  }
复制代码

每一个建立成功的 Connection 由 TcpSocketSaeaSession 来处理,所以 TcpSocketSaeaSession 中会包含 Read Loop,

复制代码
  var saea = _saeaPool.Take();
  saea.Saea.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length);
  
  while (State == TcpSocketConnectionState.Connected)
  {
      saea.Saea.SetBuffer(0, _receiveBuffer.Length);
  
      var socketError = await _socket.ReceiveAsync(saea);
      if (socketError != SocketError.Success)
          break;
  
      var receiveCount = saea.Saea.BytesTransferred;
      if (receiveCount == 0)
          break;
  }
复制代码

同样,TcpSocketSaeaServer 对外所暴露的接口也同样是 Awaitable 的。

  public interface ITcpSocketSaeaServerMessageDispatcher
  {
      Task OnSessionStarted(TcpSocketSaeaSession session);
      Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(TcpSocketSaeaSession session);
  }

使用起来也是简单直接:

复制代码
  public class SimpleMessageDispatcher : ITcpSocketSaeaServerMessageDispatcher
  {
      public async Task OnSessionStarted(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          await Task.CompletedTask;
      }
  
      public async Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      public async Task OnSessionClosed(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
          await Task.CompletedTask;
      }
  }
复制代码

RIO 方式:TcpSocketRioServer

从 Windows 8.1 / Windows Server 2012 R2 开始,微软推出了 Registered I/O Networking Extensions来支持高性能 Socket 服务的实现,简称 RIO。

The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.

  • RIOCloseCompletionQueue
  • RIOCreateCompletionQueue
  • RIOCreateRequestQueue
  • RIODequeueCompletion
  • RIODeregisterBuffer
  • RIONotify
  • RIOReceive
  • RIOReceiveEx
  • RIORegisterBuffer
  • RIOResizeCompletionQueue
  • RIOResizeRequestQueue
  • RIOSend
  • RIOSendEx

到目前为止,.NET Framework 还没有推出对 RIO 的支持,所以若想在 C# 中实现 RIO 则只能通过 P/Invoke 方式,RioSharp 是开源项目中的一个比较完整的实现。

Cowboy.Sockets 直接引用了 RioSharp 的源代码,放置在 Cowboy.Sockets.Experimental 名空间下,以供实验和测试使用。

同样,通过 TcpSocketRioServer 来实现 Accept Loop,

复制代码
_listener.OnAccepted = (acceptedSocket) =>
{
    Task.Run(async () =>
    {
        await Process(acceptedSocket);
    })
    .Forget();
};
复制代码

通过 TcpSocketRioSession 来处理 Read Loop,

  while (State == TcpSocketConnectionState.Connected)
  {
      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);
      if (receiveCount == 0)
          break;
  }

测试代码一如既往的类似:

复制代码
  public class SimpleMessageDispatcher : ITcpSocketRioServerMessageDispatcher
  {
      public async Task OnSessionStarted(TcpSocketRioSession session)
      {
          //Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          Console.WriteLine(string.Format("TCP session has connected {0}.", session));
          await Task.CompletedTask;
      }
  
      public async Task OnSessionDataReceived(TcpSocketRioSession session, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          //Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.Write(string.Format("Client : --> "));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      public async Task OnSessionClosed(TcpSocketRioSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
          await Task.CompletedTask;
      }
  }
复制代码

参考资料










本文转自匠心十年博客园博客,原文链接:http://www.cnblogs.com/gaochundong/p/csharp_tcp_service_models.html,如需转载请自行联系原作者
相关实践学习
通过云拨测对指定服务器进行Ping/DNS监测
本实验将通过云拨测对指定服务器进行Ping/DNS监测,评估网站服务质量和用户体验。
目录
相关文章
|
3月前
|
网络协议 网络性能优化 C#
C# 一分钟浅谈:UDP 与 TCP 协议区别
【10月更文挑战第8天】在网络编程中,传输层协议的选择对应用程序的性能和可靠性至关重要。本文介绍了 TCP 和 UDP 两种常用协议的基础概念、区别及应用场景,并通过 C# 代码示例详细说明了如何处理常见的问题和易错点。TCP 适用于需要可靠传输和顺序保证的场景,而 UDP 适用于对延迟敏感且可以容忍一定数据丢失的实时应用。
60 1
|
5月前
|
开发者 C# C++
揭秘:如何轻松驾驭Uno Platform,用C#和XAML打造跨平台神器——一步步打造你的高性能WebAssembly应用!
【8月更文挑战第31天】Uno Platform 是一个跨平台应用程序框架,支持使用 C# 和 XAML 创建多平台应用,包括 Web。通过编译为 WebAssembly,Uno Platform 可实现在 Web 上运行高性能、接近原生体验的应用。本文介绍如何构建高效的 WebAssembly 应用:首先确保安装最新版本的 Visual Studio 或 VS Code 并配置 Uno Platform 开发环境;接着创建新的 Uno Platform 项目;然后通过安装工具链并使用 Uno WebAssembly CLI 编译应用;最后添加示例代码并测试应用。
161 0
|
5月前
|
开发者 Apache 程序员
揭秘Apache Wicket:页面生命周期背后的神秘力量!
【8月更文挑战第31天】李工是一位热爱Web开发的程序员,近日在技术博客上分享了他对Apache Wicket框架的学习心得,特别是页面生命周期的理解。他认为掌握Wicket页面生命周期对于开发富交互式Web应用至关重要。他通过一个简单的计数器应用示例,详细解释了Wicket的组件化设计理念以及页面和组件在生命周期中的变化。
59 0
|
8月前
|
网络协议 C# C++
BytesIO | C# 超简洁的TCP服务端开发(完整源码+视频教程)
本章将继续利用BytesIO开发TCP的服务端,简洁明了依然是主旋律,我们要在三十行代码内除了实现一个TCP服务端以外,使其支持聊天室(消息转发)、连接数限制、心跳超时检测等功能。 现在,一起跟着视频敲一敲吧!
550 0
BytesIO | C# 超简洁的TCP服务端开发(完整源码+视频教程)
|
8月前
|
网络协议 C# C++
BytesIO | 零基础轻松看懂 C# TCP客户端(完整源码+视频教程)
零基础轻松看懂 C# TCP客户端(完整源码+视频教程) 如果非IT行业的女朋友都能学会的话,应该就算0基础入门的教学视频了吧! 超简单的C# TCP开发入门,短短的代码,完整的功能,掏出你的VS码一个试试手吧!
193 0
BytesIO | 零基础轻松看懂 C# TCP客户端(完整源码+视频教程)
|
8月前
|
网络协议 Unix Linux
【Unity 3D】C#中Socket及TCP三次握手与四次挥手详解(超详细 图文解释)
【Unity 3D】C#中Socket及TCP三次握手与四次挥手详解(超详细 图文解释)
270 0
|
域名解析 网络协议
IP协议, TCP协议 和DNS 服务分别是干什么的?
大家好,我是阿萨。昨天讲解了网络四层协议[TCP/IP协议族分为哪4层?]今天我们学习下IP 协议, TCP 协议和DNS 协议分别是干什么的。
321 0
IP协议, TCP协议 和DNS 服务分别是干什么的?
|
存储 开发框架 安全
在 C# 中使用 Span<T> 和 Memory<T> 编写高性能代码
在 C# 中使用 Span 和 Memory 编写高性能代码 .NET 中支持的内存类型 .NET Core 2.1 中新增的类型 访问连续内存: Span 和 Memory Span 介绍 C# 中的 Span Span 和 Arrays Span 和 ReadOnlySpan Memory 入门 ReadOnlyMemory Span 和 Memory 的优势 连续和非连续内存缓冲区 不连续的缓冲区: ReadOnly 序列 实际场景 Benchmarking 基准测试 安装 NuGet 包 Benchmarking Span 执行基准测试 解读基准测试结果 Span 限制 结论
462 0
|
网络协议 网络架构
1.2.1计算机网络(分层结构、协议、接口、服务、ISO/OSI、TCP/IP)
分层结构、协议、接口、服务 1.为什么要分层? 2.怎么分层? 3.正式认识分层结构 4.概况总结 参考模型 ISO/OSI参考模型--怎么 来的? IS0/OSI参考模型 1.应用层 2.表示层​ 3.会话层 4.传输层 5.网络层 6.数据链路层 7.物理层 OSI参考模型与TCP/IP参考模型 OSI参考模型与TCP/IP参考模型相同点 OSI参考模型与TCP/IP参考模型不同点 5层参考模型
1.2.1计算机网络(分层结构、协议、接口、服务、ISO/OSI、TCP/IP)