分布式消息总线,基于.NET Socket Tcp的发布-订阅框架之离线支持,附代码下载

简介:

一、分布式消息总线以及基于Socket的实现

     在前面的分享一个分布式消息总线,基于.NET Socket Tcp的发布-订阅框架,附代码下载一文之中给大家分享和介绍了一个极其简单也非常容易上的基于.NET Socket Tcp 技术实现的分布消息总线,也是一个简单的发布订阅框架:

image

    并且以案例的形式为大家演示了如何使用这个分布式消息总线架构发布订阅架构模式的应用程序,在得到各位同仁的反馈的同时,大家也非常想了解订阅者离线的情况,即支持离线构发布订阅框架。

二、离线架构

     不同于订阅者、发布者都同时在线的情况,支持订阅者离线,架构将有所变化,如下图所示:

image

     也会比原先的结构将更加复杂,其中需要处理以下两个关键点:

     1)订阅者的持久化存储。

     2)订阅者离线之后其所订阅消息的持久存储。

三、解决方案

     为解决消息总线的离线支持机制,我们在Socket 框架之中增加了一个接口ISubscribeStorager

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Linq;
   4: using System.Text;
   5:  
   6: namespace EAS.Messages
   7: {   
   8:     /// <summary>
   9:     /// 消息订阅存储接口。
  10:     /// </summary>
  11:     public interface ISubscribeStorager
  12:     {
  13:         /// <summary>
  14:         /// 持久化订阅。
  15:         /// </summary>
  16:         /// <param name="subscriber">订阅者。</param>
  17:         /// <param name="topic">消息主题。</param>
  18:         void Subscribe(string subscriber, string topic);
  19:  
  20:         /// <summary>
  21:         /// 持久化退订。
  22:         /// </summary>
  23:         /// <param name="subscriber">订阅者。</param>
  24:         /// <param name="topic">消息主题。</param>
  25:         void Unsubscribe(string subscriber, string topic);
  26:  
  27:         /// <summary>
  28:         /// 装载订阅信息。
  29:         /// </summary>
  30:         /// <returns>系统之中的订阅清单。</returns>
  31:         List<SubscribeItem> LoadSubscribes();
  32:  
  33:         /// <summary>
  34:         /// 写入消息。
  35:         /// </summary>
  36:         /// <param name="subscriber">订阅者。</param>
  37:         /// <param name="message">消息对象。</param>
  38:         void Write(string subscriber, QueueMessage message);
  39:  
  40:         /// <summary>
  41:         /// 读消息。
  42:         /// </summary>
  43:         /// <param name="subscriber">订阅者。</param>
  44:         /// <param name="message">消息对象。</param>
  45:         /// <returns>成功读取返回true,否则返回false。</returns>
  46:         bool Read(string subscriber, out QueueMessage message);
  47:     }
  48: }

     ISubscribeStorager共提供持久化订阅持久化消息存储共五个函数,其中:

     LoadSubscribes:服务端初始化时读取所有的离线订阅关系,即那个订阅都订阅那那个主题。

     Subscribe:持久化订阅者,当订阅才上线订阅消息时,持久化订阅关系,供离线检测之用。

     Unsubscribe:持久化取消订阅,当订阅者退订消息时,从持久化订阅关系之中删除。

     Write:当订阅者离线时,把订阅消息写入持久化存储。

     Read:当离线订阅者上线时,从持久存储之中读取一条消息向其发送。

     ISubscribeStorager:可以选择自己实现这个接口,以建立满足自己规则的离线存储机制,当然在AgileEAS.NET SOA 中间件之中提供了两种离线存储机制,存储于数据库和存储于MSMQ,下面向大家介绍一下这两种内置实现。

四、两种内置离线存储机制

     在AgileEAS.NET SOA 中间件平台之中提供了两个ISubscribeStorager的实现,基于数据库的离线订阅存储实现EAS.Messages.DbSubscribeStorager和基于MSMQ的离线订阅存储实现EAS.Messages.MsmqSubscribeStorager

     EAS.Messages.DbSubscribeStorager存储订阅关系在messageSubscribe.Config文件之中,消息存储在关系数据库SOA_SUBSCRIBEEVENTS表之中,使用前必须要建立相应的表结构,以下是SQL Server的DDL脚本:

   1: CREATE TABLE [SOA_SUBSCRIBEEVENT](
   2:     [GUID] [varchar](36) NOT NULL,
   3:     [SUBSCRIBER] [nvarchar](128) NOT NULL,
   4:     [TOPIC] [nvarchar](128) NOT NULL,
   5:     [BODY] [image] NULL,
   6:     [FCTIME] [datetime] NOT NULL,
   7:  CONSTRAINT [PK_SOA_SUBSCRIBEEVENT] PRIMARY KEY CLUSTERED 
   8: (
   9:     [GUID] ASC
  10: )
  11: ) 

      目前理论上支持SQLServer 、Mysql、ORACLE、Sqlite四种数据库结构,具体建表脚本请自行参考相应资料书写,也可以使用AgileEAS.NET SOA中间件所提供的数据库初始化工具创建。

      EAS.Messages.MsmqSubscribeStorager存储订阅关系在messageSubscribe.Config文件之中,消息存储Msmq消息队列之中,使用之前请确保机器上安装了MSMQ消息对列。

五、关于自定义实现ISubscribeStorager

     有兴趣的朋友可以自定义实现接口ISubscribeStorager,这样就可以按自己的规则进行存储,比如把离线消息存储到mongodb、Redis、或者直接存储在文件之中,或者其他更多的实现规则,在此就不一一介绍,如有相关兴趣,请联系作者,如确有必要需要给在家介绍一下如何实现,将会另开一文本介绍如何自定义实现ISubscribeStorager接口。

六、改进在线例子支持离线

     还是跟上次一样,以案例为在家展示一下怎么进行离线消息,就不重新开始例子,对原有例子做一些改进,改进后例子如下:

image

     其中在原有项目的基础上增加了:Demo.Subscriber1和Demo.Subscriber2项目,其项目配置代码、配置文件基本上同Demo.Subscriber一样,其中唯一的差别在于,Demo.Subscriber1和Demo.Subscriber2向服务器提交订阅的时候都增加一个另friendName参数,其使用IMessageBus接口的以下订阅函数:

   1: /// <summary>
   2: /// 订阅消息。
   3: /// </summary>
   4: /// <param name="subscriber">订阅者。</param>
   5: /// <param name="friendName">订阅者名称,用于处理离线订阅。</param>
   6: /// <param name="topic">主题。</param>
   7: /// <param name="notifyHandler">订阅通知。</param>
   8: void Subscribe(object subscriber,string friendName ,string topic, MessageNotifyHandler notifyHandler);

                Demo.Publisher项目为发布者代码。

                Demo.Subscriber项目为订阅者代码。

                Demo.Server项目为服务端代码。

     Demo.Subscriber1项目之中,其Program.cs代码如下:

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Linq;
   4: using System.Windows.Forms;
   5: using EAS.Messages;
   6:  
   7: namespace Demo.Subscriber1
   8: {
   9:     class Program
  10:     {
  11:         static void Main(string[] args)
  12:         {
  13:             var container = EAS.Objects.ContainerBuilder.BuilderDefault();
  14:             var bus = container.GetComponentInstance("MessageBus") as IMessageBus;
  15:             System.Console.WriteLine("Subscriber1");
  16:  
  17:             bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);
  18:             System.Console.ReadLine();
  19:         }
  20:  
  21:         static void MessageNotify(object m)
  22:         {
  23:             Demo.Messages.Message message = m as Demo.Messages.Message;
  24:             System.Console.WriteLine(string.Format("Subscribe:{0}", message.ID));
  25:         }
  26:     }
  27: }

     其中bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);在订阅消息的时候给了一个friendName为Subscriber1,Demo.Subscriber2与Demo.Subscriber1项目的唯一的差别就是此处为Subscriber2.

     我们使用内置的EAS.Messages.DbSubscribeStorager,则不需要修改服务端的代码,只需要修改服务端的配置文件如下:

   1: <?xml version="1.0" encoding="utf-8"?>
   2: <configuration>
   3:   <configSections>
   4:     <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" />
   5:   </configSections>
   6:   <startup useLegacyV2RuntimeActivationPolicy="true">
   7:     <supportedRuntime version="v4.0"/>
   8:   </startup>
   9:   <eas>
  10:     <objects>
  11:       <!--数据库连接-->
  12:       <object name="DbProvider" assembly="EAS.Data" type="EAS.Data.Access.SqlClientDbProvider" LifestyleType="Thread">
  13:         <property name="ConnectionString" type="string" value="Data Source=.;Initial Catalog=eas_db;Integrated Security=SSPI;Connect Timeout=30" />
  14:       </object>
  15:       <!--数据访问器-->
  16:       <object name="DataAccessor" assembly="EAS.Data" type="EAS.Data.Access.DataAccessor" LifestyleType="Thread">
  17:         <property name="DbProvider" type="object" value="DbProvider"/>
  18:         <property name="Language" type="object" value="TSqlLanguage"/>
  19:       </object>
  20:       <!--ORM访问器-->
  21:       <object name="OrmAccessor" assembly="EAS.Data" type="EAS.Data.ORM.OrmAccessor" LifestyleType="Thread">
  22:         <property name="DataAccessor" type="object" value="DataAccessor"/>
  23:       </object>
  24:       <!--查询语言-->
  25:       <object name="TSqlLanguage" assembly="EAS.Data" type="EAS.Data.Linq.TSqlLanguage" LifestyleType="Thread"/>
  26:       <!--消息持久化-->
  27:       <object name="SubscribeStorager" assembly="EAS.SOA.BootStrap" type="EAS.Messages.DbSubscribeStorager" LifestyleType="Singleton"/>
  28:       <!--日志管理-->
  29:       <object name="Logger" assembly="EAS.MicroKernel" type="EAS.Loggers.TextLogger" LifestyleType="Singleton">
  30:         <property name="RootPath" type="string" value="G:\App.Work\Pub_Sub\Offline\Publish\logs" />
  31:       </object>
  32:     </objects>
  33:   </eas>
  34: </configuration>

     在配置文件的IOC配置之中我们配置了消息存储对象以及其所依赖的数据库访问对象、Linq查询语言表达式,另外需要说明的是,我们需要把配置文件之中所涉及的EAS.Data.dll、EAS.SOA.BootStrap.dll复制到编译输出Publish,这两个文件可以从AgileEAS.NET SOA 中间件平台发布包之中寻找,本案例的下载压碎包之中会包括这两个文件。

     有关于基于Msmq的配置,只需要修改配置文件如下:

   1: <?xml version="1.0" encoding="utf-8"?>
   2: <configuration>
   3:   <configSections>
   4:     <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" />
   5:   </configSections>
   6:   <startup useLegacyV2RuntimeActivationPolicy="true">
   7:     <supportedRuntime version="v4.0"/>
   8:   </startup>
   9:   <eas>
  10:     <objects>
  11:       <!--消息持久化-->
  12:       <object name="SubscribeStorager" assembly="EAS.SOA.BootStrap" type="EAS.Messages.MsmqSubscribeStorager" LifestyleType="Singleton"/>
  13:       <!--日志管理-->
  14:       <object name="Logger" assembly="EAS.MicroKernel" type="EAS.Loggers.TextLogger" LifestyleType="Singleton">
  15:         <property name="RootPath" type="string" value="G:\App.Work\Pub_Sub\Offline\Publish\logs" />
  16:       </object>
  17:     </objects>
  18:   </eas>
  19: </configuration>

     到此为止,所有代码均已完成,是不是很简单,接下来,我们跑起来验证一下效果。

七、验证效果

     我们在编译输入目录Publish下先启动Demo.Server.exe,再各启动Demo.Subscriber.exe、Demo.Subscriber1.exe、Demo.Subscriber2.exe,再启动一个Demo.Publisher.exe,在Demo.Publisher.exe控制台按回车键:

N_[]_C%GTD_$0[KL}}B~E$A

目前程序三个订阅者都是在线的,Demo.Publisher发布了三条消息,三个订阅者都收到了三条消息,那么我们关闭Demo.Subscriber2之后再由Demo.Publisher发布两条消息:

R(OM90FW6B0WYO6R)0MQ7D4

然后我们再启动Demo.Subscriber2,看是否还能收到其离线之后由Demo.Publisher发布的两条消息:

ZE8XA`V4PE)5%F~QF}NO]0N

OK,到此为止。

八、源代码下载

     本程序的源代码已上传到服务器,请通过http://112.74.65.50/downloads/eas/Demo.Pub_Sub_Offline.rar进行下载,如果在开发过程之中想要了解更多有关Socket通信框架以及更多AgileEAS.NET SOA中间件平台的技术资源,请通过AgileEAS.NET SOA 网站:http://www.smarteas.net最新下载栏目进行下载。   

九、问题反馈

     麻烦大家在通过视频进行学习的时候能及时把问题反馈给楼主,或者有什么需要改进的一些建议都请向楼主直接反馈,以下是联系方式:

AgileEAS.NET SOA 网站:http://www.smarteas.net

官方博客:http://eastjade.cnblogs.com

楼主QQ:47920381,AgileEAS.NET

QQ群:113723486(AgileEAS SOA 平台)/上限1000人

199463175(AgileEAS SOA 交流)/上限1000人

120661978(AgileEAS.NET 平台交流)/上限1000人

邮件:james@agilelab.cn,mail.james@qq.com,

电话:18629261335。



    本文转自魏琼东博客园博客,原文链接:http://www.cnblogs.com/eastjade/p/3926651.html,如需转载请自行联系原作者

相关文章
|
16天前
|
消息中间件 开发框架 监控
NET任务调度框架Hangfire使用指南
Hangfire 是一个用于 .NET 应用程序的开源任务调度框架,支持长时间运行任务、定时任务等。通过简单的安装配置,即可将任务从主线程分离,提升应用性能。支持多种数据库,提供丰富的任务类型如立即执行、延迟执行和周期性任务,并有可视化管理界面 Hangfire Dashboard。还支持安全性配置及扩展插件,如 Hangfire.HttpJob,适合各种复杂场景下的任务调度需求。
41 1
NET任务调度框架Hangfire使用指南
|
1月前
|
开发框架 安全 .NET
在数字化时代,.NET 技术凭借跨平台兼容性、丰富的开发工具和框架、高效的性能及强大的安全稳定性,成为软件开发的重要支柱
在数字化时代,.NET 技术凭借跨平台兼容性、丰富的开发工具和框架、高效的性能及强大的安全稳定性,成为软件开发的重要支柱。它不仅加速了应用开发进程,提升了开发质量和可靠性,还促进了创新和业务发展,培养了专业人才和技术社区,为软件开发和数字化转型做出了重要贡献。
27 5
|
1月前
|
传感器 人工智能 供应链
.NET开发技术在数字化时代的创新作用,从高效的开发环境、强大的性能表现、丰富的库和框架资源等方面揭示了其关键优势。
本文深入探讨了.NET开发技术在数字化时代的创新作用,从高效的开发环境、强大的性能表现、丰富的库和框架资源等方面揭示了其关键优势。通过企业级应用、Web应用及移动应用的创新案例,展示了.NET在各领域的广泛应用和巨大潜力。展望未来,.NET将与新兴技术深度融合,拓展跨平台开发,推动云原生应用发展,持续创新。
35 4
|
1月前
|
开发框架 .NET C#
.NET 技术凭借高效开发环境、强大框架支持及跨平台特性,在软件开发中占据重要地位
.NET 技术凭借高效开发环境、强大框架支持及跨平台特性,在软件开发中占据重要地位。从企业应用到电子商务,再到移动开发,.NET 均展现出卓越性能,助力开发者提升效率与项目质量,推动行业持续发展。
32 4
|
1月前
|
消息中间件 监控 数据可视化
基于.NET开源、功能强大且灵活的工作流引擎框架
基于.NET开源、功能强大且灵活的工作流引擎框架
|
1月前
|
网络协议 Unix Linux
精选2款C#/.NET开源且功能强大的网络通信框架
精选2款C#/.NET开源且功能强大的网络通信框架
|
1月前
|
开发框架 JavaScript 前端开发
2024年全面且功能强大的.NET快速开发框架推荐,效率提升利器!
2024年全面且功能强大的.NET快速开发框架推荐,效率提升利器!
|
3月前
|
开发框架 前端开发 JavaScript
ASP.NET MVC 教程
ASP.NET 是一个使用 HTML、CSS、JavaScript 和服务器脚本创建网页和网站的开发框架。
51 7
|
3月前
|
存储 开发框架 前端开发
ASP.NET MVC 迅速集成 SignalR
ASP.NET MVC 迅速集成 SignalR
82 0
|
4月前
|
开发框架 前端开发 .NET
ASP.NET MVC WebApi 接口返回 JOSN 日期格式化 date format
ASP.NET MVC WebApi 接口返回 JOSN 日期格式化 date format
60 0