C#使用Socket实现分布式事件总线,不依赖第三方MQ

简介: `CodeWF.EventBus.Socket` 是一个轻量级的、基于Socket的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。

使用 Socket 实现的分布式事件总线,支持 CQRS,不依赖第三方 MQ。

CodeWF.EventBus.Socket 是一个轻量级的、基于 Socket 的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。

Command

Command

Query

Query

特性

  • 轻量级:不依赖任何外部 MQ 服务,减少了系统复杂性和依赖。

  • 高性能:基于 Socket 的直接通信,提供低延迟、高吞吐量的消息传递。

  • 灵活性:支持自定义事件类型和消息处理器,易于集成到现有系统中。

  • 可扩展性:支持多客户端连接,适用于分布式系统环境。

通信协议

通过TCP 协议进行数据交互,协议包结构如下:

0.0.8@2x

安装

通过NuGet包管理器安装CodeWF.EventBus.Socket

Install-Package CodeWF.EventBus.Socket

服务端使用

运行事件服务

在服务端代码中,创建并启动EventServer实例以监听客户端连接和事件:

using CodeWF.EventBus.Socket;

// 创建事件服务器实例
IEventServer eventServer = new EventServer();

// 启动事件服务器,监听指定IP和端口
eventServer.Start("127.0.0.1", 9100);

停止事件服务

当不再需要事件服务时,调用Stop方法以优雅地关闭服务器:

eventServer.Stop();

客户端使用

连接事件服务

在客户端代码中,创建EventClient实例并连接到事件服务器:

using CodeWF.EventBus.Socket;

// 创建事件客户端实例
IEventClient eventClient = new EventClient();

// 连接到事件服务器,使用eventClient.ConnectStatus检查连接状态
eventClient.Connect("127.0.0.1", 9100));

订阅事件

订阅特定类型的事件,并指定事件处理函数:

eventClient.Subscribe<NewEmailCommand>("event.email.new", ReceiveNewEmailCommand);

private void ReceiveNewEmail(NewEmailCommand command)
{
   
    // 处理新邮件通知
    Console.WriteLine($"收到新邮件,主题是{message.Subject}");
}

发布命令(Command)

发布事件到指定的主题,供已订阅的客户端处理:

// 发布新邮件通知事件
eventClient.Publish("event.email.new", new NewEmailCommand {
    Subject = "恭喜您中Github一等奖", Content = "我们很开心,您在2024年7月...", SendTime = new DateTime(2024, 7, 27) });

查询(Query)

查询指定主题,需要有接收查询端订阅相同的主题(即生产者),收到请求后,再以相同的主题发布查询结果:

eventClient.Subscribe<EmailQuery>("event.email.query", ReceiveEmailQuery);

private void ReceiveEmailQuery(EmailQuery query)
{
   
    // 执行查询请求,准备查询结果
    var response = new EmailQueryResponse {
    Emails = EmailManager.QueryEmail(request.Subject) };

    // 以相同的主题,发布查询结果
    if (_eventClient!.Publish("event.email.query", response,
        out var errorMessage))
    {
   
        Logger.Info($"Response query result: {response}");
    }
    else
    {
   
        Logger.Error($"Response query failed: {errorMessage}");
    }
}

其他端可使用相同的主题查询(即消费者):

var response = _eventClient!.Query<EmailQuery, EmailQueryResponse>("event.email.query",
    new EmailQuery() {
    Subject = "Account" },
    out var errorMessage);
if (string.IsNullOrWhiteSpace(errorMessage) && response != null)
{
   
    Logger.Info($"Query event.email.query, result: {response}");
}
else
{
   
    Logger.Error(
        $"Query event.email.query failed: [{errorMessage}]");
}

取消订阅事件

不再需要接收某类事件时,可以取消订阅:

eventClient.Unsubscribe<NewEmailNotification>("event.email.new", ReceiveNewEmail);

断开事件服务

完成事件处理或需要断开与服务器的连接时,调用Disconnect方法:

eventClient.Disconnect();
Console.WriteLine("断开与事件服务的连接");

注意事项

  • 确保服务端和客户端使用的地址和端口号一致,并且端口未被其他服务占用。
  • 在生产环境中,服务端应配置为监听公共 IP 地址或适当的网络接口。
  • 考虑到网络异常和服务重启等情况,客户端可能需要实现重连逻辑。
  • 根据实际需求,可以扩展EventServerEventClient类以支持更复杂的功能,如消息加密、认证授权等。
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
C# 开发者
C# 一分钟浅谈:Socket 编程基础
【10月更文挑战第7天】本文介绍了Socket编程的基础知识、基本操作及常见问题,通过C#代码示例详细展示了服务器端和客户端的Socket通信过程,包括创建、绑定、监听、连接、数据收发及关闭等步骤,帮助开发者掌握Socket编程的核心技术和注意事项。
120 3
C# 一分钟浅谈:Socket 编程基础
|
1天前
|
网络协议 C# 开发工具
C#中简单Socket编程
1. 先运行服务器代码。服务器将开始监听指定的IP和端口,等待客户端连接。 1. 然后运行客户端代码。客户端将连接到服务器并发送消息。 1. 服务器接收到消息后,将回应客户端,并在控制台上显示接收到的消息。 1. 客户端接收到服务器的回应消息,并在控制台上显示。
30 15
|
24天前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
165 7
|
3月前
|
API C# Windows
【C#】在winform中如何实现嵌入第三方软件窗体
【C#】在winform中如何实现嵌入第三方软件窗体
142 0
|
5月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
147 2
|
6月前
|
SQL 开发框架 前端开发
在C#开发中使用第三方组件LambdaParser、DynamicExpresso、Z.Expressions,实现动态解析/求值字符串表达式
在C#开发中使用第三方组件LambdaParser、DynamicExpresso、Z.Expressions,实现动态解析/求值字符串表达式
|
5月前
|
C#
C# WPF 将第三方DLL嵌入 exe
C# WPF 将第三方DLL嵌入 exe
100 0
|
7月前
|
存储 关系型数据库 Java
技术经验解读:三种分布式事务LCN、Seata、MQ
技术经验解读:三种分布式事务LCN、Seata、MQ
233 0
|
2月前
|
C# 开发者
C# 一分钟浅谈:Code Contracts 与契约编程
【10月更文挑战第26天】本文介绍了 C# 中的 Code Contracts,这是一个强大的工具,用于通过契约编程增强代码的健壮性和可维护性。文章从基本概念入手,详细讲解了前置条件、后置条件和对象不变量的使用方法,并通过具体代码示例进行了说明。同时,文章还探讨了常见的问题和易错点,如忘记启用静态检查、过度依赖契约和性能影响,并提供了相应的解决建议。希望读者能通过本文更好地理解和应用 Code Contracts。
41 3
|
12天前
|
存储 安全 编译器
学懂C#编程:属性(Property)的概念定义及使用详解
通过深入理解和使用C#的属性,可以编写更清晰、简洁和高效的代码,为开发高质量的应用程序奠定基础。
56 12