Cowboy 开源 WebSocket 网络库

简介:

Cowboy.WebSockets 是一个托管在 GitHub 上的基于 .NET/C# 实现的开源 WebSocket 网络库,其完整的实现了 RFC 6455 (The WebSocket Protocol) 协议标准,并部分实现了 RFC 7692 (Compression Extensions for WebSocket) 协议标准。

WebSocket 可理解为建立在 TCP 连接通道上的更进一步的握手,并确定了消息封装格式。

通过定义控制帧 (Control Frame) 和数据帧 (Data Frame) 来控制通道内的通信和数据传输,下图用使用 ABNF 格式描述了帧头部的格式。

Cowboy.WebSockets 中对于 WebSocket 的 Client/Server 分别做了实现,分别对应代码中的:

Cowboy.WebSockets 的内部实现是基于 Cowboy.Sockets 中的 TAP 模式的 AsyncTcpSocketServer 和 AsyncTcpSocketClient 。关于 Cowboy.Sockets 可以参考文章《C#高性能TCP服务的多种实现方式》。

可通过 NuGet 查找 Cowboy 来获取 nuget 包。

WebSocket 服务端应用

实现 AsyncWebSocketServerModule 抽象类,其中 ModulePath 对应着 "ws://host:port/path" 中的 path 部分。可以实现多个 Module,将多个 Module 注入到 AsyncWebSocketServerModuleCatalog 中,或者采用反射机制等自动发现 Module。

复制代码
  public class TestWebSocketModule : AsyncWebSocketServerModule
  {
      public TestWebSocketModule()
          : base(@"/test")
      {
      }
  
      public override async Task OnSessionStarted(AsyncWebSocketSession session)
      {
          Console.WriteLine(string.Format("WebSocket session [{0}] has connected.", session.RemoteEndPoint));
          await Task.CompletedTask;
      }
  
      public override async Task OnSessionTextReceived(AsyncWebSocketSession session, string text)
      {
          Console.Write(string.Format("WebSocket session [{0}] received Text --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendTextAsync(text);
      }
  
      public override async Task OnSessionBinaryReceived(AsyncWebSocketSession session, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("WebSocket session [{0}] received Binary --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendBinaryAsync(Encoding.UTF8.GetBytes(text));
      }
  
      public override async Task OnSessionClosed(AsyncWebSocketSession session)
      {
          Console.WriteLine(string.Format("WebSocket session [{0}] has disconnected.", session.RemoteEndPoint));
          await Task.CompletedTask;
      }
  }
复制代码

实例化 AsyncWebSocketServer,并将 AsyncWebSocketServerModuleCatalog 实例注入,即可启动 WebSocket 的服务端监听。

复制代码
  class Program
  {
      static AsyncWebSocketServer _server;
  
      static void Main(string[] args)
      {
          NLogLogger.Use();
  
          try
          {
              var catalog = new AsyncWebSocketServerModuleCatalog();
              catalog.RegisterModule(new TestWebSocketModule());
  
              var config = new AsyncWebSocketServerConfiguration();
              //config.SslEnabled = true;
              //config.SslServerCertificate = new System.Security.Cryptography.X509Certificates.X509Certificate2(@"D:\\Cowboy.pfx", "Cowboy");
              //config.SslPolicyErrorsBypassed = true;
  
              _server = new AsyncWebSocketServer(22222, catalog, config);
              _server.Listen();
  
              Console.WriteLine("WebSocket server has been started on [{0}].", _server.ListenedEndPoint);
              Console.WriteLine("Type something to send to clients...");
              while (true)
              {
                  try
                  {
                      string text = Console.ReadLine();
                      if (text == "quit")
                          break;
                      Task.Run(async () =>
                      {
                          //await _server.BroadcastText(text);
                          //Console.WriteLine("WebSocket server [{0}] broadcasts text -> [{1}].", _server.ListenedEndPoint, text);
                          await _server.BroadcastBinaryAsync(Encoding.UTF8.GetBytes(text));
                          Console.WriteLine("WebSocket server [{0}] broadcasts binary -> [{1}].", _server.ListenedEndPoint, text);
                      });
                  }
                  catch (Exception ex)
                  {
                      Console.WriteLine(ex.Message);
                  }
              }
  
              _server.Shutdown();
              Console.WriteLine("WebSocket server has been stopped on [{0}].", _server.ListenedEndPoint);
          }
          catch (Exception ex)
          {
              Logger.Get<Program>().Error(ex.Message, ex);
          }
  
          Console.ReadKey();
      }
  }
复制代码

WebSocket 客户端应用

客户端侧在实例化 AsyncWebSocketClient 时有两种方式:

  1. 实现 IAsyncWebSocketClientMessageDispatcher 接口;
  2. 直接构造函数注入接受各种事件的 Func<> 实现;
复制代码
  public interface IAsyncWebSocketClientMessageDispatcher
  {
      Task OnServerConnected(AsyncWebSocketClient client);
      Task OnServerTextReceived(AsyncWebSocketClient client, string text);
      Task OnServerBinaryReceived(AsyncWebSocketClient client, byte[] data, int offset, int count);
      Task OnServerDisconnected(AsyncWebSocketClient client);
  
      Task OnServerFragmentationStreamOpened(AsyncWebSocketClient client, byte[] data, int offset, int count);
      Task OnServerFragmentationStreamContinued(AsyncWebSocketClient client, byte[] data, int offset, int count);
      Task OnServerFragmentationStreamClosed(AsyncWebSocketClient client, byte[] data, int offset, int count);
  }
复制代码

下面的 DEMO 采用了方式二。

复制代码
  class Program
  {
      static AsyncWebSocketClient _client;
  
      static void Main(string[] args)
      {
          NLogLogger.Use();
  
          Task.Run(async () =>
          {
              try
              {
                  var config = new AsyncWebSocketClientConfiguration();
                  //config.SslTargetHost = "Cowboy";
                  //config.SslClientCertificates.Add(new System.Security.Cryptography.X509Certificates.X509Certificate2(@"D:\\Cowboy.cer"));
                  //config.SslPolicyErrorsBypassed = true;
  
                  //var uri = new Uri("ws://echo.websocket.org/");
                  //var uri = new Uri("wss://127.0.0.1:22222/test");
                  var uri = new Uri("ws://127.0.0.1:22222/test");
                  _client = new AsyncWebSocketClient(uri,
                      OnServerTextReceived,
                      OnServerBinaryReceived,
                      OnServerConnected,
                      OnServerDisconnected,
                      config);
                  await _client.Connect();
  
                  Console.WriteLine("WebSocket client has connected to server [{0}].", uri);
                  Console.WriteLine("Type something to send to server...");
                  while (_client.State == WebSocketState.Open)
                  {
                      try
                      {
                          string text = Console.ReadLine();
                          if (text == "quit")
                              break;
                          Task.Run(async () =>
                          {
                              //await _client.SendText(text);
                              //Console.WriteLine("Client [{0}] send text -> [{1}].", _client.LocalEndPoint, text);
                              await _client.SendBinaryAsync(Encoding.UTF8.GetBytes(text));
                              Console.WriteLine("Client [{0}] send binary -> [{1}].", _client.LocalEndPoint, text);
                          }).Forget();
                      }
                      catch (Exception ex)
                      {
                          Console.WriteLine(ex.Message);
                      }
                  }
  
                  await _client.Close(WebSocketCloseCode.NormalClosure);
                  Console.WriteLine("WebSocket client has disconnected from server [{0}].", uri);
              }
              catch (Exception ex)
              {
                  Logger.Get<Program>().Error(ex.Message, ex);
              }
          }).Wait();
  
          Console.ReadKey();
      }
  
      private static async Task OnServerConnected(AsyncWebSocketClient client)
      {
          Console.WriteLine(string.Format("WebSocket server [{0}] has connected.", client.RemoteEndPoint));
          await Task.CompletedTask;
      }
  
      private static async Task OnServerTextReceived(AsyncWebSocketClient client, string text)
      {
          Console.Write(string.Format("WebSocket server [{0}] received Text --> ", client.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));
  
          await Task.CompletedTask;
      }
  
      private static async Task OnServerBinaryReceived(AsyncWebSocketClient client, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("WebSocket server [{0}] received Binary --> ", client.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));
  
          await Task.CompletedTask;
      }
  
      private static async Task OnServerDisconnected(AsyncWebSocketClient client)
      {
          Console.WriteLine(string.Format("WebSocket server [{0}] has disconnected.", client.RemoteEndPoint));
          await Task.CompletedTask;
      }
  }
复制代码

相关资料





本文转自匠心十年博客园博客,原文链接:http://www.cnblogs.com/gaochundong/p/cowboy_websockets.html,如需转载请自行联系原作者

目录
相关文章
|
2月前
|
C++
基于Reactor模型的高性能网络库之地址篇
这段代码定义了一个 InetAddress 类,是 C++ 网络编程中用于封装 IPv4 地址和端口的常见做法。该类的主要作用是方便地表示和操作一个网络地址(IP + 端口)
160 58
|
2月前
|
网络协议 算法 Java
基于Reactor模型的高性能网络库之Tcpserver组件-上层调度器
TcpServer 是一个用于管理 TCP 连接的类,包含成员变量如事件循环(EventLoop)、连接池(ConnectionMap)和回调函数等。其主要功能包括监听新连接、设置线程池、启动服务器及处理连接事件。通过 Acceptor 接收新连接,并使用轮询算法将连接分配给子事件循环(subloop)进行读写操作。调用链从 start() 开始,经由线程池启动和 Acceptor 监听,最终由 TcpConnection 管理具体连接的事件处理。
57 2
|
2月前
基于Reactor模型的高性能网络库之Tcpconnection组件
TcpConnection 由 subLoop 管理 connfd,负责处理具体连接。它封装了连接套接字,通过 Channel 监听可读、可写、关闭、错误等
75 1
|
2月前
|
负载均衡 算法 安全
基于Reactor模式的高性能网络库之线程池组件设计篇
EventLoopThreadPool 是 Reactor 模式中实现“一个主线程 + 多个工作线程”的关键组件,用于高效管理多个 EventLoop 并在多核 CPU 上分担高并发 I/O 压力。通过封装 Thread 类和 EventLoopThread,实现线程创建、管理和事件循环的调度,形成线程池结构。每个 EventLoopThread 管理一个子线程与对应的 EventLoop(subloop),主线程(base loop)通过负载均衡算法将任务派发至各 subloop,从而提升系统性能与并发处理能力。
107 3
|
2月前
基于Reactor模式的高性能网络库github地址
https://github.com/zyi30/reactor-net.git
45 0
|
2月前
基于Reactor模型的高性能网络库之Poller(EpollPoller)组件
封装底层 I/O 多路复用机制(如 epoll)的抽象类 Poller,提供统一接口支持多种实现。Poller 是一个抽象基类,定义了 Channel 管理、事件收集等核心功能,并与 EventLoop 绑定。其子类 EPollPoller 实现了基于 epoll 的具体操作,包括事件等待、Channel 更新和删除等。通过工厂方法可创建默认的 Poller 实例,实现多态调用。
194 60
|
2月前
基于Reactor模型的高性能网络库之Channel组件篇
Channel 是事件通道,它绑定某个文件描述符 fd,注册感兴趣的事件(如读/写),并在事件发生时分发给对应的回调函数。
155 60
|
2月前
|
安全 调度
基于Reactor模型的高性能网络库之核心调度器:EventLoop组件
它负责:监听事件(如 I/O 可读写、定时器)、分发事件、执行回调、管理事件源 Channel 等。
168 57
|
2月前
基于Reactor模型的高性能网络库之时间篇
是一个用于表示时间戳(精确到微秒)**的简单封装类
125 57
|
2月前
|
JSON 网络安全 数据格式
Python网络请求库requests使用详述
总结来说,`requests`库非常适用于需要快速、简易、可靠进行HTTP请求的应用场景,它的简洁性让开发者避免繁琐的网络代码而专注于交互逻辑本身。通过上述方式,你可以利用 `requests`处理大部分常见的HTTP请求需求。
267 51

热门文章

最新文章