基于消息与.Net Remoting的分布式处理架构

简介:

分布式处理在大型企业应用系统中,最大的优势是将负载分布。通过多台服务器处理多个任务,以优化整个系统的处理能力和运行效率。分布式处理的技术核心是完 成服务与服务之间、服务端与客户端之间的通信。在.Net 1.1中,可以利用Web Service或者.Net Remoting来实现服务进程之间的通信。本文将介绍一种基于消息的分布式处理架构,利用了.Net Remoting技术,并参考了CORBA Naming Service的处理方式,且定义了一套消息体制,来实现分布式处理。 

 一、消息的定义

       要实现进程间的通信,则通信内容的载体——消息,就必须在服务两端具有统一的消息标准定义。从通信的角度来看,消息可以分为两类:Request Messge和Reply Message。为简便起见,这两类消息可以采用同样的结构。

       消息的主体包括ID,Name和Body,我们可以定义如下的接口方法,来获得消息主体的相关属性:

C#语言
public  interface IMessage:ICloneable
{
     IMessageItemSequence GetMessageBody();
      string GetMessageID();
      string GetMessageName();

      void SetMessageBody(IMessageItemSequence aMessageBody);
      void SetMessageID( string aID);
      void SetMessageName( string aName);
}

    消息主体类Message实现了IMessage接口。在该类中,消息体Body为IMessageItemSequence类型。这个类型用于Get和Set消息的内容:Value和Item:

C#语言
public  interface IMessageItemSequence:ICloneable
{       

     IMessageItem GetItem( string aName);
      void SetItem( string aName,IMessageItem aMessageItem);        

      string GetValue( string aName);   
      void SetValue( string aName, string aValue);
}

       Value为string类型,并利用HashTable来存储Key和Value的键值对。而Item则为IMessageItem类型,同样的在 IMessageItemSequence的实现类中,利用HashTable存储了Key和Item的键值对。

       IMessageItem支持了消息体的嵌套。它包含了两部分:SubValue和SubItem。实现的方式和IMessageItemSequence相似。定义这样的嵌套结构,使得消息的扩展成为可能。一般的结构如下:

       IMessage——Name
                     ——ID
                     ——Body(IMessageItemSequence)
                            ——Value
                            ——Item(IMessageItem)
                                   ——SubValue
                                   ——SubItem(IMessageItem)
                                          ——……

       各个消息对象之间的关系如下:

 


       在实现服务进程通信之前,我们必须定义好各个服务或各个业务的消息格式。通过消息体的方法在服务的一端设置消息的值,然后发送,并在服务的另一端获得这些值。例如发送消息端定义如下的消息体:

C#语言:
IMessageFactory factory =  new MessageFactory();
IMessageItemSequence body = factory.CreateMessageItemSequence();
body.SetValue( "name1", "value1");
body.SetValue( "name2", "value2");

IMessageItem item = factory.CreateMessageItem();
item.SetSubValue( "subname1", "subvalue1");
item.SetSubValue( "subname2", "subvalue2");

IMessageItem subItem1 = factory.CreateMessageItem();
subItem1.SetSubValue( "subsubname11", "subsubvalue11");
subItem1.SetSubValue( "subsubname12", "subsubvalue12");
IMessageItem subItem2 = factory.CreateMessageItem();
subItem1.SetSubValue( "subsubname21", "subsubvalue21");
subItem1.SetSubValue( "subsubname22", "subsubvalue22");

item.SetSubItem( "subitem1",subItem1);
item.SetSubItem( "subitem2",subItem2);

body.SetItem( "item",item);

//Send Request Message
MyServiceClient service =  new MyServiceClient( "Client");
IMessageItemSequence reply = service.SendRequest( "TestService", "Test1",body);

       在接收消息端就可以通过获得body的消息体内容,进行相关业务的处理。 

 二、.Net Remoting服务

       在.Net中要实现进程间的通信,主要是应用Remoting技术。根据前面对消息的定义可知,实际上服务的实现,可以认为是对消息的处理。因此,我们可以对服务进行抽象,定义接口IService:

C#语言
public  interface IService
{
     IMessage Execute(IMessage aMessage);
}

        Execute()方法接受一条Request Message,对其进行处理后,返回一条Reply Message。在整个分布式处理架构中,可以认为所有的服务均实现该接口。但受到Remoting技术的限制,如果要实现服务,则该服务类必须继承自 MarshalByRefObject,同时必须在服务端被Marshal。随着服务类的增多,必然要在服务两端都要对这些服务的信息进行管理,这加大了 系统实现的难度与管理的开销。如果我们从另外一个角度来分析服务的性质,基于消息处理而言,所有服务均是对Request Message的处理。我们完全可以定义一个Request服务负责此消息的处理。

       然而,Request服务处理消息的方式虽然一致,但毕竟服务实现的业务,即对消息处理的具体实现,却是不相同的。对我们要实现的服务,可以分为两大类: 业务服务与Request服务。实现的过程为:首先,具体的业务服务向Request服务发出Request请求,Request服务侦听到该请求,然后 交由其侦听的服务来具体处理。

       业务服务均具有发出Request请求的能力,且这些服务均被Request服务所侦听,因此我们可以为业务服务抽象出接口IListenService:

C#语言:
public  interface IListenService
{
     IMessage OnRequest(IMessage aMessage);  
}

        Request服务实现了IService接口,并包含IListenService类型对象的委派,以执行OnRequest()方法:

C#语言
public  class  RequestListener:MarshalByRefObject,IService
{
      public RequestListener(IListenService listenService)
      {
         m_ListenService = listenService;
      }

      private IListenService m_ListenService;

      #region IService Members

      public IMessage Execute(IMessage aMessage)
      {
          return  this.m_ListenService.OnRequest(aMessage);
      }       

      #endregion
      public  override  object InitializeLifetimeService()
      {
          return  null;
      }
}

       在RequestListener服务中,继承了MarshalByRefObject类,同时实现了IService接口。通过该类的构造函数,接收IListService对象。

       由于Request消息均由Request服务即RequestListener处理,因此,业务服务的类均应包含一个RequestListener的 委派,唯一的区别是其服务名不相同。业务服务类实现IListenService接口,但不需要继承MarshalByRefObject,因为被 Marshal的是该业务服务内部的RequestListener对象,而非业务服务本身:

C#语言
public  abstract  class  Service:IListenService
{
      public Service( string serviceName)
      {
         m_ServiceName = serviceName;  
         m_RequestListener =  new RequestListener( this); 
      }       

      #region IListenService Members
      public IMessage OnRequest(IMessage aMessage)
      {
          //……
      }  

      #endregion

      private  string m_ServiceName;
      private RequestListener m_RequestListener;     
}

       Service类是一个抽象类,所有的业务服务均继承自该类。最后的服务架构如下:

 

       我们还需要在Service类中定义发送Request消息的行为,通过它,才能使业务服务被RequestListener所侦听。 

C#语言
public IMessageItemSequence SendRequest( stringaServiceName, string                                        aMessageName,IMessageItemSequence aMessageBody)
{

     IMessage message = m_Factory.CreateMessage();
     message.SetMessageName(aMessageName);
     message.SetMessageID( "");
     message.SetMessageBody(aMessageBody);

     IService service = FindService(aServiceName);
     IMessageItemSequence replyBody = m_Factory.CreateMessageItemSequence();
      if (service !=  null)
      {
          IMessage replyMessage = service.Execute(message);
          replyBody = replyMessage.GetMessageBody();          
      }
      else
      {          
          replyBody.SetValue( "result", "Failure");          
      }
      return replyBody;
}

       注意SendRequest()方法的定义,其参数包括服务名,消息名和被发送的消息主体。而在实现中最关键的一点是FindService()方法。我 们要查找的服务正是与之对应的RequestListener服务。不过,在此之前,我们还需要先将服务Marshal:

C#语言
public  void Initialize()
{                                          
    RemotingServices.Marshal( this.m_RequestListener, this.m_ServiceName +   ".RequestListener");
}

       我们Marshal的对象,是业务服务中的Request服务对象m_RequestListener,这个对象在Service的构造函数中被实例化:

C#语言:
m_RequestListener =  new RequestListener( this);

       注意,在实例化的时候是将this作为IListenService对象传递给RequestListener。因此,此时被Marshal的服务对象, 保留了业务服务本身即Service的指引。可以看出,在Service和RequestListener之间,采用了“双重委派”的机制。

       通过调用Initialize()方法,初始化了一个服务对象,其类型为RequestListener(或IService),其服务名 为:Service的服务名 + ".RequestListener"。而该服务正是我们在SendRequest()方法中要查找的Service:

C#语言:
IService service = FindService(aServiceName);

       下面我们来看看FindService()方法的实现:

C#语言
protected IService FindService( string aServiceName)
{
     lock ( this.m_Services)
     {
         IService service = (IService)m_Services[aServiceName];
          if (service !=  null)
          {
              return service;
          }
          else
          {
             IService tmpService = GetService(aServiceName);
             AddService(aServiceName,tmpService);
              return tmpService;
          }
     }
}

        可以看到,这个服务是被添加到m_Service对象中,该对象为SortedList类型,服务名为Key,IService对象为Value。如果没有找到,则通过私有方法GetService()来获得:

C#语言:
private IService GetService( string aServiceName)
{
    IService service = (IService)Activator.GetObject( typeof(RequestListener),
          "tcp://localhost:9090/" + aServiceName +  ".RequestListener");
     return service;
}

        在这里,Channel、IP、Port应该从配置文件中获取,为简便起见,这里直接赋为常量。

       再分析SendRequest方法,在找到对应的服务后,执行了IService的Execute()方法。此时的IService为 RequestListener,而从前面对RequestListener的定义可知,Execute()方法执行的其实是其侦听的业务服务的 OnRequest()方法。

       我们可以定义一个具体的业务服务类,来分析整个消息传递的过程。该类继承于Service抽象类:

C#语言
public  class  MyService:Service
{
      public MyService( string aServiceName): base(aServiceName)
      {}          
}

        假设把进程分为服务端和客户端,那么对消息处理的步骤如下:
 1、 在客户端调用MyService的SendRequest()方法发送Request消息;
 2、 查找被Marshal的服务,即RequestListener对象,此时该对象应包含对应的业务服务对象MyService;
 3、 在服务端调用RequestListener的Execute()方法。该方法则调用业务服务MyService的OnRequest()方法。

在这些步骤中,除了第一步在客户端执行外,其他的步骤均是在服务端进行。

 三、业务服务对于消息的处理

       前面实现的服务架构,已经较为完整地实现了分布式的服务处理。但目前的实现,并未体现对消息的处理。我认为,对消息的处理,等价与具体的业务处理。这些业 务逻辑必然是在服务端完成。每个服务可能会处理单个业务,也可能会处理多个业务。并且,服务与服务之间仍然存在通信,某个服务在处理业务时,可能需要另一 个服务的业务行为。也就是说,每一种类的消息,处理的方式均有所不同,而这些消息的唯一标识,则是在SendRequest()方法已经有所体现的 aMessageName。

       虽然,处理的消息不同,所需要的服务不同,但是根据我们对消息的定义,我们仍然可以将这些消息处理机制抽象为一个统一的格式;在.Net中,体现这种机制的莫过于委托delegate。我们可以定义这样的一个委托:

C#语言
public  delegate  void RequestHandler( string aMessageName,IMessageItemSequence aMessageBody, ref IMessageItemSequence aReplyMessageBody);

       在RequestHandler委托中,它代表了这样一族方法:接收三个入 参,aMessageName,aMessageBody,aReplyMessageBody,返回值为void。其中,aMessageName代表 了消息名,它是消息的唯一标识;aMessageBody是待处理消息的主体,业务所需要的所有数据都存储在aMessageBody对象中。 aReplyMessageBody是一个引用对象,它存储了消息处理后的返回结果,通常情况下,我们可以 用<"result","Success">或<"result", "Failure">来代表处理的结果是成功还是失败。

       这些委托均在服务初始化时被添加到服务类的SortedList对象中,键值为aMessageName。所以我们可以在抽象类中定义如下方法:     

C#语言
protected  abstract  void AddRequestHandlers();
protected  void AddRequestHandler( string aMessageName,RequestHandler handler)
{
     lock ( this.m_EventHandlers)
     {
          if (! this.m_EventHandlers.Contains(aMessageName))
          {
              this.m_EventHandlers.Add(aMessageName,handler);
          }
     }
}

protected RequestHandler FindRequestHandler( string aMessageName)
{
     lock ( this.m_EventHandlers)
     {
         RequestHandler handler = (RequestHandler)m_EventHandlers[aMessageName];
          return handler;
     }
}

       AddRequestHandler()用于添加委托对象与aMessageName的键值对,而FindRequestHandler()方法用于查找 该委托对象。而抽象方法AddRequestHandlers()则留给Service的子类实现,简单的实现如MyService的 AddRequestHandlers()方法:

C#语言
public  class  MyService:Service
{
      public MyService( string aServiceName): base(aServiceName)
      {}

      protected  override  void AddRequestHandlers()
      {
          this.AddRequestHandler( "Test1", new RequestHandler(Test1));
          this.AddRequestHandler( "Test2", new RequestHandler(Test2));
      }

      private  void Test1( string aMessageName,IMessageItemSequence aMessageBody, ref  IMessageItemSequence aReplyMessageBody)
      {
         Console.WriteLine( "MessageName:{0}\n",aMessageName);
         Console.WriteLine( "MessageBody:{0}\n",aMessageBody);
         aReplyMessageBody.SetValue( "result", "Success");
      }

      private  void Test2( string aMessageName,IMessageItemSequence aMessageBody, ref   IMessageItemSequence aReplyMessageBody)
      {
         Console.WriteLine( "Test2" + aMessageBody.ToString());
      }
}

       Test1和Test2方法均为匹配RequestHandler委托签名的方法,然后在AddRequestHandlers()方法中,通过调用 AddRequestHandler()方法将这些方法与MessageName对应起来,添加到m_EventHandlers中。

       需要注意的是,本文为了简要的说明这种处理方式,所以简化了Test1和Test2方法的实现。而在实际开发中,它们才是实现具体业务的重要方法。而利用这种方式,则解除了服务之间依赖的耦合度,我们随时可以为服务添加新的业务逻辑,也可以方便的增加服务。

       通过这样的设计,Service的OnRequest()方法的最终实现如下所示:

C#语言
public IMessage OnRequest(IMessage aMessage)
{
     string messageName = aMessage.GetMessageName();
     string messageID = aMessage.GetMessageID();
    IMessage message = m_Factory.CreateMessage();

    IMessageItemSequence replyMessage = m_Factory.CreateMessageItemSequence();
    RequestHandler handler = FindRequestHandler(messageName);
    handler(messageName,aMessage.GetMessageBody(), ref replyMessage);

    message.SetMessageName(messageName);
    message.SetMessageID(messageID);
    message.SetMessageBody(replyMessage);

     return message;
}

       利用这种方式,我们可以非常方便的实现服务间通信,以及客户端与服务端间的通信。例如,我们分别在服务端定义MyService(如前所示)和TestService:

C#语言:
public  class  TestService:Service
{
      public TestService( string aServiceName): base(aServiceName)
      {}

      protected  override  void AddRequestHandlers()
      {
          this.AddRequestHandler( "Test1", new RequestHandler(Test1));         
      }

      private  void Test1( string aMessageName,IMessageItemSequence aMessageBody, ref  IMessageItemSequence aReplyMessageBody)
      {           
         aReplyMessageBody = SendRequest( "MyService",aMessageName,aMessageBody);
         aReplyMessageBody.SetValue( "result2", "Success");
      }

}

       注意在TestService中的Test1方法,它并未直接处理消息aMessageBody,而是通过调用SendRequest()方法,将其传递到MyService中。

       对于客户端而言,情况比较特殊。根据前面的分析,我们知道除了发送消息的操作是在客户端完成外,其他的具体执行都在服务端实现。所以诸如 MyService和TestService等服务类,只需要部署在服务端即可。而客户端则只需要定义一个实现Service的空类即可:

C#语言:
public  class  MyServiceClient:Service
{
     public MyServiceClient( string aServiceName): base(aServiceName)
      {}

      protected  override  void AddRequestHandlers()
      {}
}

       MyServiceClient类即为客户端定义的服务类,在AddRequestHandlers()方法中并不需要实现任何代码。如果我们在 Service抽象类中,将AddRequestHandlers()方法定义为virtual而非abstract方法,则这段代码在客户端服务中也可 以省去。另外,客户端服务类中的aServiceName可以任意赋值,它与服务端的服务名并无实际联系。至于客户端具体会调用哪个服务,则由 SendRequest()方法中的aServiceName决定:

C#语言:
IMessageFactory factory =  new MessageFactory();
IMessageItemSequence body = factory.CreateMessageItemSequence();
//……
MyServiceClient service =  new MyServiceClient( "Client");
IMessageItemSequence reply = service.SendRequest( "TestService", "Test1",body);

        对于service.SendRequest()的执行而言,会先调用TestService的Test1方法;然后再通过该方法向MyService发送,最终调用MyService的Test1方法。

       我们还需要另外定义一个类,负责添加服务,并初始化这些服务:

C#语言
public  class  Server
{
      public Server()
      {
         m_Services =  new ArrayList();
      }
      private ArrayList m_Services;    
      public  void AddService(IListenService service)
      {
          this.m_Services.Add(service);
      }

      public  void Initialize()
      {  

         IDictionary tcpProp =  new Hashtable();
         tcpProp[ "name"] =  "tcp9090";
         tcpProp[ "port"] = 9090;

         TcpChannel channel =  new TcpChannel(tcpProp, new BinaryClientFormatterSinkProvider(),
                                                      new BinaryServerFormatterSinkProvider());           
         ChannelServices.RegisterChannel(channel);
          foreach (Service service  in m_Services)
          {
              service.Initialize();
          }           
      }
}

       同理,这里的Channel,IP和Port均应通过配置文件读取。最终的类图如下所示:

 

       在服务端,可以调用Server类来初始化这些服务:

C#语言
static  void Main( string[] args)
{  
    MyService service =  new MyService( "MyService");
    TestService service1 =  new TestService( "TestService");


    Server server =  new Server();
    server.AddService(service);
    server.AddService(service1);

    server.Initialize();
    Console.ReadLine();
}

 四、结论

       利用这个基于消息与.Net Remoting技术的分布式架构,可以将企业的业务逻辑转换为对消息的定义和处理。要增加和修改业务,就体现在对消息的修改上。服务间的通信机制则完全 交给整个架构来处理。如果我们将每一个委托所实现的业务(或者消息)理解为Contract,则该结构已经具备了SOA的雏形。当然,该架构仅仅处理了消 息的传递,而忽略了对底层事件的处理(类似于Corba的Event Service),这个功能我想留待后面实现。

       唯一遗憾的是,我缺乏验证这个架构稳定性和效率的环境。应该说,这个架构是我们在企业项目解决方案中的一个实践。但是解决方案则是利用了CORBA中间 件,在Unix环境下实现并运行。本架构仅仅是借鉴了核心的实现思想和设计理念,从而完成的在.Net平台下的移植。由于Unix与Windows Server的区别,其实际的优势还有待验证。







本文转自wayfarer51CTO博客,原文链接:http://blog.51cto.com/wayfarer/279909,如需转载请自行联系原作者

相关文章
|
12天前
|
存储 Prometheus Cloud Native
分布式系统架构6:链路追踪
本文深入探讨了分布式系统中的链路追踪理论,涵盖追踪与跨度的概念、追踪系统的模块划分及数据收集的三种方式。链路追踪旨在解决复杂分布式系统中请求流转路径不清晰的问题,帮助快速定位故障和性能瓶颈。文中介绍了基于日志、服务探针和边车代理的数据收集方法,并简述了OpenTracing、OpenCensus和OpenTelemetry等链路追踪协议的发展历程及其特点。通过理解这些概念,可以更好地掌握开源链路追踪框架的使用。
67 41
|
4月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
4月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
4月前
|
存储 JSON 数据库
Elasticsearch 分布式架构解析
【9月更文第2天】Elasticsearch 是一个分布式的搜索和分析引擎,以其高可扩展性和实时性著称。它基于 Lucene 开发,但提供了更高级别的抽象,使得开发者能够轻松地构建复杂的搜索应用。本文将深入探讨 Elasticsearch 的分布式存储和检索机制,解释其背后的原理及其优势。
328 5
|
22天前
|
设计模式 存储 算法
分布式系统架构5:限流设计模式
本文是小卷关于分布式系统架构学习的第5篇,重点介绍限流器及4种常见的限流设计模式:流量计数器、滑动窗口、漏桶和令牌桶。限流旨在保护系统免受超额流量冲击,确保资源合理分配。流量计数器简单但存在边界问题;滑动窗口更精细地控制流量;漏桶平滑流量但配置复杂;令牌桶允许突发流量。此外,还简要介绍了分布式限流的概念及实现方式,强调了限流的代价与收益权衡。
66 11
|
24天前
|
设计模式 监控 Java
分布式系统架构4:容错设计模式
这是小卷对分布式系统架构学习的第4篇文章,重点介绍了三种常见的容错设计模式:断路器模式、舱壁隔离模式和重试模式。断路器模式防止服务故障蔓延,舱壁隔离模式通过资源隔离避免全局影响,重试模式提升短期故障下的调用成功率。文章还对比了这些模式的优缺点及适用场景,并解释了服务熔断与服务降级的区别。尽管技术文章阅读量不高,但小卷坚持每日更新以促进个人成长。
48 11
|
26天前
|
消息中间件 存储 安全
分布式系统架构3:服务容错
分布式系统因其复杂性,故障几乎是必然的。那么如何让系统在不可避免的故障中依然保持稳定?本文详细介绍了分布式架构中7种核心的服务容错策略,包括故障转移、快速失败、安全失败等,以及它们在实际业务场景中的应用。无论是支付场景的快速失败,还是日志采集的安全失败,每种策略都有自己的适用领域和优缺点。此外,文章还为技术面试提供了解题思路,助你在关键时刻脱颖而出。掌握这些策略,不仅能提升系统健壮性,还能让你的技术栈更上一层楼!快来深入学习,走向架构师之路吧!
59 11
|
28天前
|
自然语言处理 负载均衡 Kubernetes
分布式系统架构2:服务发现
服务发现是分布式系统中服务实例动态注册和发现机制,确保服务间通信。主要由注册中心和服务消费者组成,支持客户端和服务端两种发现模式。注册中心需具备高可用性,常用框架有Eureka、Zookeeper、Consul等。服务注册方式包括主动注册和被动注册,核心流程涵盖服务注册、心跳检测、服务发现、服务调用和注销。
75 12
|
1月前
|
消息中间件 架构师 数据库
本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
45岁资深架构师尼恩分享了一篇关于分布式事务的文章,详细解析了如何在10Wqps高并发场景下实现分布式事务。文章从传统单体架构到微服务架构下分布式事务的需求背景出发,介绍了Seata这一开源分布式事务解决方案及其AT和TCC两种模式。随后,文章深入探讨了经典ebay本地消息表方案,以及如何使用RocketMQ消息队列替代数据库表来提高性能和可靠性。尼恩还分享了如何结合延迟消息进行事务数据的定时对账,确保最终一致性。最后,尼恩强调了高端面试中需要准备“高大上”的答案,并提供了多个技术领域的深度学习资料,帮助读者提升技术水平,顺利通过面试。
本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
|
1月前
|
存储 算法 安全
分布式系统架构1:共识算法Paxos
本文介绍了分布式系统中实现数据一致性的重要算法——Paxos及其改进版Multi Paxos。Paxos算法由Leslie Lamport提出,旨在解决分布式环境下的共识问题,通过提案节点、决策节点和记录节点的协作,确保数据在多台机器间的一致性和可用性。Multi Paxos通过引入主节点选举机制,优化了基本Paxos的效率,减少了网络通信次数,提高了系统的性能和可靠性。文中还简要讨论了数据复制的安全性和一致性保障措施。
47 1