ESFramework介绍之(23)―― AgileTcp

简介: 前面已经介绍了ITcp接口,而AgileTcp就是ESFramework给出的ITcp的参考实现。在之前,我曾经讲解过模拟完成端口的Tcp组件实现和异步Tcp组件实现,在它们的基础之上,我更改了处理策略,而形成了AgileTcp,目的是更清晰的结构、更高的效率。

    前面已经介绍了ITcp接口,而AgileTcp就是ESFramework给出的ITcp的参考实现。在之前,我曾经讲解过模拟完成端口的Tcp组件实现和异步Tcp组件实现,在它们的基础之上,我更改了处理策略,而形成了AgileTcp,目的是更清晰的结构、更高的效率。这个策略会在后面讲到。

    Tcp组件主要控制着系统与终端用户的所有消息的进出,ITcp接口描述了这个组件的外貌,告诉外部如何使用Tcp组件、如何与Tcp组件交互。而从实现的角度来看,我们必须理清Tcp组件的职责:
(1) 管理所有已经建立的Tcp连接
(2) 管理与每个连接相对应接收缓冲区
(3) 管理所有的工作者线程
(4) 处理长度大于接收缓冲区的消息

    我们来看看如何满足这些职责。
    由于每个连接都对应着一个接收缓冲区,所以可以将它们封装在一起形成ContextKey(连接上下文):
   

img_1c53668bcee393edac0d7b3b3daff1ae.gif img_405b18b4b6584ae338e0f6ecaf736533.gif ContextKey
    public class ContextKey
    {        
        
private byte[]  buffer ;          //封装接收缓冲区
        private ISafeNetworkStream netStream = null ;            
        
private volatile bool      isDataManaging = false ;
        
        
public ContextKey(ISafeNetworkStream net_Stream ,int buffSize)
        {
            
this.netStream = net_Stream ;            
            
this.buffer    = new byte[buffSize] ;            
        }

        
#region NetStream  
        
public ISafeNetworkStream NetStream
        {
            
get
            {
                
return this.netStream ;
            }
        }            

        
public byte[] Buffer
        {
            
get
            {
                
return this.buffer ;
            }            
        }        

        
public bool IsDataManaging
        {
            
get
            {
                
return this.isDataManaging ;
            }
            
set
            {
                
this.isDataManaging = value ;
            }
        }

        
private bool firstMessageExist = false ;
        
public  bool FirstMessageExist 
        {
            
get
            {
                
return this.firstMessageExist ;
            }
            
set
            {
                
this.firstMessageExist = value ;
            }
        }
        
#endregion            
    }    

    ContextKey中封装的是ISafeNetworkStream而不是NetworkStream,原因可参见这里
    IsDataManaging属性表明工作线程是否正在处理本连接对应的缓冲区中的数据,FirstMessageExist属性用于标志接收到的第一条消息,因为系统可能需要对接收到的第一条消息做一些特殊的处理。

    任何时刻,可能都有成千上万个连接存活着;任何时刻,都可能有新的连接建立、或已有的连接被释放。所有这些ContextKey对象需要被管理起来,这就是上下文管理器IContextKeyManager:

    public   interface  IContextKeyManager
    {
        
void  InsertContextKey(ContextKey context_key) ;
        
void  DisposeAllContextKey() ;        
        
void  RemoveContextKey( int  streamHashCode) ;
        ISafeNetworkStream GetNetStream(
int  streamHashCode) ;

        
int             ConnectionCount { get  ;}
        ICollection ContextKeyList{
get  ;}

        
event  CbSimpleInt StreamCountChanged ;        
    }    

    说到上下文管理器,先要讲讲如何标志每个不同的上下文了?使用连接字,连接字是Tcp连接的Hashcode,它们与连接一一对应,并且不会重复。所以在源码中你经常会看到名为“streamHashCode”的参数和变量。由于Tcp连接与streamHashCode一一对应,所以GetNetStream方法的实现就非常简单。不知道你是否记得,RoundedMessage类中有个ConnectID字段,它就是连接字,与streamHashCode意义一样。根据此字段,你可以清楚的知道这个RoundedMessage来自于哪个连接。

    关于工作者线程,很幸运的是,我们可以直接使用.NET提供的后台线程池,而不必要再去手动管理,这可以省却一些麻烦。当然你也可以使用ThreadPool类,甚至你可以从头开始实现自己的线程池组件,这也是不困难的。
    
    我经常被问到,接收缓冲区应该开辟多大?这取决于你的应用,但是有一点是错不了的――缓冲区的大小至少要大于消息头Header的大小,否则麻烦就多了。根据我的经验,一般缓冲区的大小至少应该能容纳所有接收消息中的60%-80%。对于大于缓冲区大小的消息,ESFramework采用的策略是使用缓冲区池IBufferPool。

    public   interface  IBufferPool
    {
        
byte [] RentBuffer( int  minSize) ;
        
void    GivebackBuffer( byte [] buffer) ;
    }

    通过上面的介绍我们已经知道如何满足Tcp组件的职责,现在我们来看看更细的实现策略:
(1) 使用Checker线程。
    使用Checker线程是AgileTcp组件的区别于模拟完成端口的Tcp组件实现和异步Tcp组件的主要特色。当AgileTcp启动时,Checker线程也随之启动,这个线程的主要工作就是检查已经存在的每个连接上是否有数据要接收(还记得Select网络模型),这可以通过NetworkStream.DataAvailable属性知道。如果发现某个连接上有待接收的数据,就将其放到工作者线程中去处理,并设置前面提到的ContextKey.IsDataManaging属性,然后再判断下个连接,如此循环下去。

        private   void  TaskChecker()
        {
            
while ( !   this .stop)
            {
                
foreach (ContextKey key  in   this .contextKeyManager.ContextKeyList)
                {
                    
if ( this .stop)
                    {
                        
break  ;
                    }

                    
if (( !  key.IsDataManaging &&  key.NetStream.DataAvailable)
                    {                        
                        key.IsDataManaging 
=   true  ;    
                        CbContextKey cb 
=   new  CbContextKey( this .DataManaging) ;
                        cb.BeginInvoke(key ,
null  , null  ) ;
                    }                    
                }

                System.Threading.Thread.Sleep(
50 ) ;
            }
        }

(2) 将消息头的解析置于Tcp组件之中
    将消息头解析置于Tcp组件之中这个方案我层考虑了非常久,原因是,这会破坏Tcp组件的单纯性,使得Tcp组件与协议(Contract)有所关联。最终采用这个策略的第一个理由是清晰,第二个理由是效率。清晰在于简化了ContextKey结构,避免了使用消息分裂器这样复杂的算法组件(如果大家看过我以前关于通信方案的文章,一定会得到这样的答案)。效率在于,当在此解析了Header之后,后面所有的处理器都可以使用这个Header对象了,而不用在自己去解析。这也是NetMessage类中有个Header字段的原因。

(3) 针对于某个连接,只有当上一个消息处理完并将回复发送后(如果有回复的话),才会去接收下一个消息。
    这个策略会使很多事情变得简单,而且不会影响任何有用的特性。由于不会在处理消息的时候去接收下一个消息,所以可以直接处理接收缓冲区中的数据,而不需要将数据从接收缓冲区拷贝到另外的地方去处理。这又对效率提高有所帮助。

    综上所述,我们可以总结工作者线程要做的事情:首先,从连接上接收MessageHeaderSize个字节,解析成Header,然后在接收Header. MessageBodyLength个字节,即是Body,接着构造成RoundedMessage对象交给消息分配器去处理,最后将得到的处理结果发送出去。代码如下所示:

img_1c53668bcee393edac0d7b3b3daff1ae.gif img_405b18b4b6584ae338e0f6ecaf736533.gif DataManaging
        private void DataManaging(ContextKey key)
        {    
            
int streamHashCode = key.NetStream.GetHashCode() ;    
            
int headerLen = this.contractHelper.MessageHeaderLength ;
            
            
while((key.NetStream.DataAvailable) && (! this.stop))
            {
                
byte[] rentBuff = null ;//每次分派的消息中,最多有一个rentBuff

                
try
                {
                    
#region 构造 RoundedMessage
                    NetHelper.RecieveData(key.NetStream ,key.Buffer ,
0 ,headerLen) ;
                    IMessageHeader header 
= this.contractHelper.ParseMessageHeader(key.Buffer ,0) ;    
                    
if(! this.contractHelper.ValidateMessageToken(header))
                    {
                        
this.DisposeOneConnection(streamHashCode ,DisconnectedCause.MessageTokenInvalid) ;
                        
return ;
                    }

                    RoundedMessage requestMsg 
= new RoundedMessage() ;
                    requestMsg.ConnectID      
= streamHashCode ;
                    requestMsg.Header         
= header ;
                    
                    
if(! key.FirstMessageExist)
                    {
                        requestMsg.IsFirstMessage 
= true ;
                        key.FirstMessageExist     
= true ;
                    }

                    
if((headerLen + header.MessageBodyLength) > this.maxMessageSize)
                    {
                        
this.DisposeOneConnection(streamHashCode ,DisconnectedCause.MessageSizeOverflow) ;
                        
return ;
                    }
                
                    
if(header.MessageBodyLength >0 )
                    {
                        
if((header.MessageBodyLength + headerLen) <= this.recieveBuffSize)
                        {
                            NetHelper.RecieveData(key.NetStream ,key.Buffer ,
0 ,header.MessageBodyLength) ;
                            requestMsg.Body 
= key.Buffer ;                            
                        }
                        
else
                        {                        
                            rentBuff 
= this.bufferPool.RentBuffer(header.MessageBodyLength) ;                        

                            NetHelper.RecieveData(key.NetStream ,rentBuff ,
0 ,header.MessageBodyLength) ;
                            requestMsg.Body 
= rentBuff ;                            
                        }
                    }
                    
#endregion                    
                
                    
bool closeConnection = false ;
                    NetMessage resMsg 
= this.tcpStreamDispatcher.DealRequestData(requestMsg ,ref closeConnection) ;

                    
if(rentBuff != null)
                    {
                        
this.bufferPool.GivebackBuffer(rentBuff) ;
                    }

                    
if(closeConnection)
                    {
                        
this.DisposeOneConnection(streamHashCode ,DisconnectedCause.OtherCause) ;
                        
return ;
                    }

                    
if((resMsg != null&&(! this.stop))
                    {                    
                        
byte[] bRes = resMsg.ToStream() ;
                        key.NetStream.Write(bRes ,
0 ,bRes.Length) ;

                        
if(this.ServiceCommitted != null)
                        {                                
                            
this.ServiceCommitted(streamHashCode ,resMsg) ;
                        }
                    }
                }
                
catch(Exception ee)
                {
                    
if(ee is System.IO.IOException) //正在读写流的时候,连接断开
                    {
                        
this.DisposeOneConnection(streamHashCode ,DisconnectedCause.NetworkError) ;
                        
break ;
                    }
                    
else
                    {
                        
this.esbLogger.Log(ee.Message ,"ESFramework.Network.Tcp.AgileTcp" ,ErrorLevel.Standard) ;
                    }

                    ee 
= ee ;                    
                }

            }

            key.IsDataManaging 
= false ;
        }

    AgileTcp组件的主要原理差不多就这些了,这种实现有个缺点,不知大家发现没有。那就是当客户端主动断开连接或掉线时,AgileTcp组件可能感受不到(除非对应的连接上正在发送或接收数据,此时会抛出异常),因为当连接断开时,key.NetStream.DataAvailable不会抛出异常,而是仍然返回false。这是个问题,幸好有补救的办法,一是要求客户端下线的时候给服务器发送Logoff消息,二是使用定时掉线检查器(IUserOnLineChecker)。当服务器检查或发现某用户下线时,即可调用ITcpClientsController.DisposeOneConnection方法来释放对应的连接和Context。(你应该还记得ITcp接口是从ITcpClientsController继承的)。关于这个问题,你有更好的解决办法吗?
    感谢关注!

上一篇文章:ESFramework介绍之(21)-- Tcp组件接口ITcp介绍

转到  :ESFramework 可复用的通信框架(序)



目录
相关文章
|
C#
[老老实实学WCF] 第十篇 消息通信模式(下) 双工
原文: [老老实实学WCF] 第十篇 消息通信模式(下) 双工 老老实实学WCF 第十篇 消息通信模式(下) 双工   在前一篇的学习中,我们了解了单向和请求/应答这两种消息通信模式。
856 0
|
网络协议
ESFramework介绍之(35)―― IMessageTransceiver
(本文适用于ESFramework V0.3+)        在ESFramework介绍之(7)-- 服务器代理IServerAgent 一文中,我们详细的介绍了IServerAgent,我们已经知道,客户端与服务器之间的所有通信都可经过IServerAgent,包括要转发的P2P消息。
907 0
|
网络协议 Java
ESFramework介绍之(28)―― Udp组件
ESFramework对Tcp和Udp协议都提供了完整的支持,在ESFramework介绍之(21)-- Tcp组件接口ITcp介绍 和 ESFramework介绍之(23)―― AgileTcp 两篇文章中介绍了Tcp组件,相对于Tcp来说,Udp要简单许多,所以我在这里打算用一篇文章来介绍它。
945 0
|
容器
ESFramework介绍之(29)―― 插件公共设施 AddinUtil
(本文适用于 ESFramework V0.2+)    不知你是否还记得,前面我们讲过,ESFramework规定了插件有如下特点: (1)一个插件是一个独立的物理单元。它可以独立的提供一项完整的服务(功能),而不需要依赖于其它插件。
774 0
|
网络协议
ESFramework介绍之(15)-- IRAS
每个城市都对应着自己的AS,每个AS都有一组FS为之服务,而所有的AS都由一个IRAS联系/管理起来(回顾)。前面我们已经提到,所有的FS都可以是动态添加/移除的,并且FS的地址也是自由可变的。
861 0
|
C++ 索引
ESFramework网络通信框架介绍之(2)――网络通信消息NetMessage
ESFramework网络通信框架与元数据 较之C++而言,.NET是一个更加“动态”的平台,其动态能力建立在反射机制之上,而反射的基础是“元数据”。    上文已经提到过,如果一个框架要为我们的应用做更多的事情,那么这个框架必须建立更多的标准,必须对框架自己要处理的消息有更多的了解,所以,每个消息都要是自描述的,也就是说每个消息要包含它自己的“元数据”。
881 0
|
监控 网络协议 数据安全/隐私保护
ESFramework介绍之(30)―― 消息侦察者 INetMessageSpy
(本文适用于ESFramework V0.2+)    现在我们回想一下,当网络组件(Tcp/Udp组件)接收到一个消息后,这个消息会流经哪些组件,然后再通过网络组件发送出去了。如果你研究过ESFramework V0.1,你会发现,消息“行走”的路线模型可以用下图表示出来:    请求消息(路径由黑线表示)经过网络组件后,会被Hook链中的各个Hook按照特定的顺序处理,然后到达消息处理器,消息处理器处理请求消息,并给出回复消息(路径由红线表示),回复消息同样再经过Hook链,然后通过网络组件发送出去。
848 0
|
网络协议 C++
ESFramework网络通信框架介绍之(1)――网络通信消息协议接口IContract
一.ESFramework网络通信框架与字节流        通过网络通信的系统之间(如客户端与服务端的通信)要想正常交互,它们必须有“共同的语言”,这种语言就是消息协议。遵守消息协议的消息才能被我们的系统所理解。
889 0
|
API 数据库 网络协议
ESFramework介绍之(24)―― 日志记录IEsbLogger
框架,从另外一个角度说,就是一个半成品的应用程序,既然如此,框架在运行的过程中也会遇到诸多的异常、错误情况,我们需要将这些情况记录下来,以便在发生问题时为我们的诊断提供必要的帮助。    最最开始,那还是在ESFramework的前身即EnterpriseServerBase的时候,由于当时只是将EnterpriseServerBase作为一个类库,而并没有提升到一个框架的高度,所以是没有必要为之配备一个日志记录器(Logger),就像你从来不曾看到在使用.NET类库时还必须传个日志记录器给它,以使.NET类库中的API碰到额外情况时能够记录下错误信息。
930 0
|
网络协议 C#
基于ESFramework的P2P实现 —— ESFramework扩展之EsfP2P
好久没有写关于ESFramework的文章了,曾很早就承诺过要写一篇介绍基于ESFramework实现NAPT P2P的文章,今天终于能抽出时间做这件事。    网络地址转换NAT(或者NAPT)的基本理论知识,网上有很多相关资料,不是很清楚的朋友可以先了解下什么是NAT、以及为什么要使用NAT。
919 0