ESFramework介绍之(28)―― Udp组件

简介: ESFramework对Tcp和Udp协议都提供了完整的支持,在ESFramework介绍之(21)-- Tcp组件接口ITcp介绍 和 ESFramework介绍之(23)―― AgileTcp 两篇文章中介绍了Tcp组件,相对于Tcp来说,Udp要简单许多,所以我在这里打算用一篇文章来介绍它。
    ESFramework对Tcp和Udp协议都提供了完整的支持,在ESFramework介绍之(21)-- Tcp组件接口ITcp介绍ESFramework介绍之(23)―― AgileTcp 两篇文章中介绍了Tcp组件,相对于Tcp来说,Udp要简单许多,所以我在这里打算用一篇文章来介绍它。需要首先提出的是,ESFramework.Network命名空间下的所有直接类(即,不包括ESFramework.Network.Tcp和ESFramework.Network.Udp命名空间下的类)即可用于Tcp也可用于Udp。这中可复用性,将Tcp和Udp的区别仅仅隔离在最底层的通信组件中。

    由于Udp组件相对简单,所以其实现策略与Tcp组件稍有不同,最主要的不同之处在于,Udp组件不需要分配器组件,而是直接将接收到消息转交给处理器工厂。大家应该还记得这个图:

    
    当将其应用于Udp时,我们可以认为,消息分派器组件已经直接内嵌于Udp组件中了,所以上图对于Udp还是适用的。
    下面我们首先看看Udp组件的接口定义IEsbUdp:

    对应的代码如下:
    public   interface  IEsbUdp :IEsbUdpSender
    {
        
int   Port{ set  ;}
        IContractHelper       ContractHelper{
set  ;} 
        INetMessageHook    NetMessageHook{
set  ;}
        IDataDealerFactory  DataDealerFactory{
set  ;}

        
void  Initialize() ;
        
void  Start() ;    
        
void  Stop() ;    
    
        
event  CbInvalidMsg InvalidMsgRecieved ;
        
event  CbServiceCommitted ServiceCommitted ;
    }    

    
public   delegate   void  CbServiceCommitted( string  userID ,NetMessage msg) ;
    
public   delegate   void  CbInvalidMsg( byte [] data ,IPEndPoint remoteIPE) ;

    
public   interface  IEsbUdpSender
    {
        
void  HookAndSendNetMessage(IPEndPoint remoteIpe ,NetMessage msg) ; 
    }


    对于IEsbUdp接口有如下几点说明:
(1)同Tcp组件一样,Udp组件仍然可以使用Hook,由NetMessageHook属性体现。
(2)IEsbUdp有DataDealerFactory属性注入,而没有依赖与分派器组件,这个上面已经提到了。
(3)由于Udp不保证数据传输的正确、完整性,所以当接收到一个无效的消息时会触发InvalidMsgRecieved事件。而ServiceCommitted事件的含义与Tcp组件是完全一致的。
(4)IEsbUdp接口继承了IEsbUdpSender接口,IEsbUdpSender与Tcp组件中的IHookSender接口的目的是一致的。它用于保证在udp组件之外通过udp组件(有点绕口)发送消息时都能经过Hook链而不会有漏网之Message。
    
    通过接口我们了解了Udp组件的职责后,我们就可以实现这个组件了,ESFramework中默认的IEsbUdp接口的实现是EsbUdp,它通过一个循环的接收线程来专门接收消息,这个循环中的流程如下:
(1)接收一个Udp消息。
(2)检查这个消息的消息头的完整/正确性,如果不正确、则触发InvalidMsgRecieved事件。否则,进入下一步。
(3)解析消息头,并从中获取消息主体长度,然后判断消息主体是否完整,如果不完整、则触发InvalidMsgRecieved事件。否则,进入下一步。
(4)调用Hook链CaptureRecievedMsg。
(5)通过异步的方式,将消息的具体处理工作放到后台线程池的某个线程中处理。
(6)回到第一步,继续接收下个消息。

    之所以第五步将消息放到后台线程中处理,是为了避免阻塞接收线程,因为处理消息可能需要大量的cpu时间。我们继续看后台线程处理消息的流程:
(1)根据消息类型从处理器工厂获取对应的消息处理器。
(2)调用消息处理器处理消息
(3)调用Hook链CaptureBeforeSendMsg
(4)发送回复消息。
(5)触发ServiceCommitted事件。

    上面两个就是Udp组件的核心流程了,由于Udp是基于非连接的,所以不需要在Udp组件中进行像Tcp那样复杂的连接管理,当然,Udp协议仍然可以进行“伪在线”用户管理,这通过另外一个组件IUserManager做到,这个组件将在后面的文章中介绍。
    最后,给出EsbUdp的实现源码:

img_1c53668bcee393edac0d7b3b3daff1ae.gif img_405b18b4b6584ae338e0f6ecaf736533.gif EsbUdp
    public class EsbUdp :IEsbUdp
    {
        
private UdpClient  udpClient ;
        
private int        port = 10000;
        
private IContractHelper contractHelper ;
        
private INetMessageHook    netMessageHook = new EmptyNetMessageHook();
        
private IDataDealerFactory dataDealerFactory ;

        
private volatile bool goToStop = true ;
        
private ManualResetEvent stopEvent = new ManualResetEvent(false) ;
        
public event CbServiceCommitted ServiceCommitted ;

        
public EsbUdp()
        {            
            
this.netMessageHook = new EmptyNetMessageHook() ;
        }

        
public void Initialize()
        {
            
this.udpClient = new UdpClient(this.port) ;        
        }

        
#region IEsbUdp 成员
        
public event CbInvalidMsg InvalidMsgRecieved ;

        
#region property
        
public int Port
        {
            
set
            {
                
this.port = value ;
            }
        }

        
public IContractHelper ContractHelper
        {
            
set
            {
                
this.contractHelper = value ;
            }
        }

        
public IDataDealerFactory DataDealerFactory
        {
            
set
            {
                
this.dataDealerFactory = value ;
            }
        }

        
public INetMessageHook NetMessageHook
        {
            
set
            {
                
if(value != null)
                {
                    
this.netMessageHook = value ;
                }
                
else
                {
                    
this.netMessageHook = new EmptyNetMessageHook() ;
                }
            }
        }
        
#endregion

        
#region Method
        
public void Start()
        {
            
this.goToStop = false ;
            
this.stopEvent.Reset() ;
            CbSimple cb 
= new CbSimple(this.Worker) ;
            cb.BeginInvoke(
null ,null) ;
        }

        
public void Stop()
        {
            
this.goToStop = true ;
            
//发送eof消息给自己,使Receive不再阻塞
            byte[] eof = new byte[4] ;
            
this.udpClient.Send(eof ,4 ,new IPEndPoint(IPAddress.Parse("127.0.0.1") ,this.port)) ;

            
this.stopEvent.WaitOne() ;
        }

        
public void HookAndSendNetMessage(IPEndPoint remoteIpe ,NetMessage msg) 
        {
            
byte[] data = this.netMessageHook.CaptureBeforeSendMsg(msg).ToStream() ;
            
this.udpClient.Send(data ,data.Length ,remoteIpe) ;
        }    

        
#region Worker
        
private void Worker()
        {
            
while(! this.goToStop)
            {
                IPEndPoint remoteIPE 
= null ;
                
byte[] data = this.udpClient.Receive(ref remoteIPE ) ;
                
if(data.Length < this.contractHelper.MessageHeaderLength)
                {
                    
this.ActivateInvalidMsg(data ,remoteIPE) ;
                    
continue ;
                }

                IMessageHeader header 
= this.contractHelper.ParseMessageHeader(data ,0) ;
                
if(((this.contractHelper.MessageHeaderLength + header.MessageBodyLength) > data.Length) || (! this.contractHelper.ValidateMessageToken(header)))
                {
                    
this.ActivateInvalidMsg(data ,remoteIPE) ;
                    
continue ;
                }

                
byte[] body = null ;
                
if(header.MessageBodyLength > 0)
                {
                    body 
= new byte[header.MessageBodyLength] ;
                    
for(int i=0 ;i<body.Length ;i++)
                    {
                        body[i] 
= data[this.contractHelper.MessageHeaderLength + i] ;
                    }
                }

                NetMessage msg 
= new NetMessage(header ,body) ;
                msg.Tag 
= remoteIPE ;
                NetMessage hookedMsg 
= this.netMessageHook.CaptureRecievedMsg(msg) ;
                
                
if(this.goToStop)
                {
                    
break ;
                }

                CbMessageDealing cb 
= new CbMessageDealing(this.MessageDealing) ;
                cb.BeginInvoke(hookedMsg ,
null ,null) ;                
            }

            
this.stopEvent.Set() ;
        }

        
//异步处理消息
        private void MessageDealing(NetMessage msg)
        {
            IDataDealer dealer 
= this.dataDealerFactory.CreateDealer(msg.Header.ServiceKey ,msg.Header.TypeKey) ;
            NetMessage resMsg 
= dealer.DealRequestMessage(msg) ;

            
if(resMsg != null)
            {        
                NetMessage hookedResMsg 
= this.netMessageHook.CaptureBeforeSendMsg(resMsg) ;
                
byte[] bRes = hookedResMsg.ToStream() ;
                
this.udpClient.Send(bRes,bRes.Length ,(IPEndPoint)msg.Tag) ;
                
if(this.ServiceCommitted != null)
                {
                    
this.ServiceCommitted(resMsg.Header.UserID ,hookedResMsg) ;
                }
            }
        }

        
private void ActivateInvalidMsg(byte[] data ,IPEndPoint remoteIPE)
        {
            
if(this.InvalidMsgRecieved != null)
            {
                
this.InvalidMsgRecieved(data ,remoteIPE) ;
            }
        }
        
#endregion
        
#endregion

        
#endregion

    }

    
public delegate void CbMessageDealing(NetMessage msg ) ;

    关于ESFramework对Udp的支持,还有几个重要的主题需要介绍,除了上面提到的IUserManager组件,还有IUdpServerAgent组件(还记得ESFramework介绍之(7)-- 服务端代理IServerAgent),另外还有非常重要的一块--基于Udp的NAPT穿透。如果要了解这些内容,请继续关注本系列的文章。


上一篇:ESFramework介绍之(27)-- 支持OverdueMessage

转到  :ESFramework 可复用的通信框架(序) 



目录
相关文章
|
网络协议 网络性能优化
三十九、传输层概述和UDP协议
三十九、传输层概述和UDP协议
三十九、传输层概述和UDP协议
|
网络协议 Java 算法
ESFramework介绍之(23)―― AgileTcp
前面已经介绍了ITcp接口,而AgileTcp就是ESFramework给出的ITcp的参考实现。在之前,我曾经讲解过模拟完成端口的Tcp组件实现和异步Tcp组件实现,在它们的基础之上,我更改了处理策略,而形成了AgileTcp,目的是更清晰的结构、更高的效率。
812 0
|
网络协议
ESFramework介绍之(35)―― IMessageTransceiver
(本文适用于ESFramework V0.3+)        在ESFramework介绍之(7)-- 服务器代理IServerAgent 一文中,我们详细的介绍了IServerAgent,我们已经知道,客户端与服务器之间的所有通信都可经过IServerAgent,包括要转发的P2P消息。
907 0
|
网络协议 数据库 存储
ESFramework介绍之(18)―― Tcp用户管理器组件
当我们的应用中客户端与AS之间是通过Tcp进行通信的时候,通常,应用也要求管理所有在线的用户。这种管理至少包含以下几点:(1) 当用户上线时,记录上线时间(2) 当用户请求服务时,记录请求服务的时间、服务的类型、本次服务下载的数据量(3) 当用户下线时,记录下线时间。
650 0
|
网络协议 C++
ESFramework网络通信框架介绍之(1)――网络通信消息协议接口IContract
一.ESFramework网络通信框架与字节流        通过网络通信的系统之间(如客户端与服务端的通信)要想正常交互,它们必须有“共同的语言”,这种语言就是消息协议。遵守消息协议的消息才能被我们的系统所理解。
889 0
|
网络协议 API
ESFramework介绍之(21)-- Tcp组件接口ITcp介绍
写了这么多篇介绍ESFramework的文章才想起来还有一些很基础的内容没有介绍,前面介绍的一些组件、框架基本上是与协议无关的(比如无论是Tcp还是Udp甚至是Remoting、WebService都可以通用),然而到了应用的最底层,我们总需要选择一种通信协议,.
834 0
|
容器
ESFramework介绍之(29)―― 插件公共设施 AddinUtil
(本文适用于 ESFramework V0.2+)    不知你是否还记得,前面我们讲过,ESFramework规定了插件有如下特点: (1)一个插件是一个独立的物理单元。它可以独立的提供一项完整的服务(功能),而不需要依赖于其它插件。
774 0