深入Jetty源码之EndPoint

简介:

概述

在Jetty中,使用Connector来抽象Jetty服务器对某个端口的监听。在Connector启动时,它会启动acceptors个Acceptor线程用于监听在Connector中配置的端口。对于客户端的每次连接,Connector都会创建相应的EndPoint来表示该连接,一般在创建EndPoint的同时会同时创建Connection,这里EndPoint用于和Socket打交道,而Connection用于在从Socket中读取到数据后的处理逻辑以及生成响应数据的处理逻辑。

不同的Connector会创建不同的EndPoint和Connection实例。如SocketConnector创建ConnectorEndPoint和HttpConnection,SslSocketConnector创建SslConnectorEndPoint和HttpConnection,SelectChannelConnector创建SelectChannelEndPoint和SelectChannelHttpConnection,SslSelectChannelConnector创建SslSelectChannelEndPoint和SelectChannelHttpConnection,BlockingChannelConnector创建BlockingChannelEndPoint和HttpConnection等。

EndPoint接口定义

Jetty中EndPoint接口定义如下: 
public  interface EndPoint {
     //  EndPoint是对一次客户端到服务器连接的抽象,每一个新的连接都会创建一个新的EndPoint,并且在这个EndPoint中包含这次连接的Socket。由于EndPoint包含底层的连接Socket,因而它主要用于处理从Socket中读取数据和向Socket中写入数据,即对应EndPoint接口中的fill和flush方法。

    
//  从Socket中读取数据,并写入Buffer中直到数据读取完成或putIndex到Buffer的capacity。返回总共读取的字节数。在实现中,StreamEndPoint使用Buffer直接从Socket的InputStream中读取数据,而ChannelEndPoint则向Channel读取数据到Buffer。
     int fill(Buffer buffer)  throws IOException;

     //  将Buffer中的数据(从getIndex到putIndex的数据)写入到Socket中,同时清除缓存(调用Buffer的clear方法)。在实现中,StreamEndPoint使用Buffer直接向Socket的OutputStream写入数据,而ChanelEndPoint则将Buffer中的数据写入Channel中。
     int flush(Buffer buffer)  throws IOException;
    
     //  类似上面的flush,它会将传入的header、buffer、trailer按顺序写入Socket中(OutputStream或者Channel)。返回总共写入的字节数。
     int flush(Buffer header, Buffer buffer, Buffer trailer)  throws IOException;

     //  当在处理HTTP/1.0请求时或当前Request中的KeepAlive的值为false时,在处理完成当前请求后,需要调用shutdownOutput()方法,关闭当前连接;或在处理当前请求时出现比较严重的错误、Socket超时时。在调用完shutdownOutput()方法后,isOutputShutdown()方法返回true。
     void shutdownOutput()  throws IOException;
     boolean isOutputShutdown();

     //  当Server无法从当前连接(Socket)中读取数据时(即read返回-1)时,调用shutdownInput()方法以关闭当前连接,此时isInputShutdown()返回true。
     void shutdownInput()  throws IOException;
     boolean isInputShutdown();

     //  当Socket超时或在读写Socket过程中出现任何IO错误时,Server会直接调用close()方法以关闭当前连接。
     void close()  throws IOException;
     //  当前Connection是否已经打开,对ChannelEndPoint来说表示Channel.isOpen()返回true,对SocketEndPoint来说,表示Socket没有被关闭。
     public  boolean isOpen();

     //  对StreamEndPoint来说,它的读写是阻塞式的,但是对ChannelEndPoint来说,如果它内部的channel是SelectableChannel,那么这个Channel的读写可以配置成非阻塞的(通过SelectableChannel.isBlocking()方法判断)。因而对SelectChannelEndPoint需要使用blockReadable()方法来阻塞直到超时。返回true表示阻塞读取失败,此时HttpParser会关闭这个EndPoint,并抛出异常。blockWritable()方法类似blockReadable()用于SelectChannelEndPoint以等待有数据写入到Channel中,如果返回false,表示在指定的时间内没有数据可写入Channel中(即超时),此时会关闭该EndPoint,并抛出异常。
     public  boolean isBlocking();
     public  boolean blockReadable( long millisecs)  throws IOException;
     public  boolean blockWritable( long millisecs)  throws IOException;

     //  对SslSelectChannelEndPoint,它是Buffered,因而它的isBuffered()方法返回true,而isBufferingInput()和isBufferingOutput()根据内部的_inNIOBuffer和_outNIOBuffer字段的hasContent()方法判断是否返回true或false。对其他类型的EndPoint来说,这三个方法都返回false。而flush()方法则将_outNIOBuffer中缓存的数据写入Channel中。
     public  boolean isBufferred();
     public  boolean isBufferingInput();
     public  boolean isBufferingOutput();
     public  void flush()  throws IOException;

     //  EndPoint还定义了一些和EndPoint相关链的信息和状态:

     //  返回该EndPoint内部使用的传输工具,如ChannelEndPoint内部使用Channel,而SocketEndPoint内部使用Socket。该方法用于对内部传输工具的配置。
     public Object getTransport();
     //  用于配置Socket的SO_TIMEOUT的时间,即等待客户连接的超时时间。
     public  int getMaxIdleTime();
     public  void setMaxIdleTime( int timeMs)  throws IOException;

     //  返回当前EndPoint所在服务器的IP地址、主机名、端口号以及客户端的IP地址、主机名、端口号。
     public String getLocalAddr();
     public String getLocalHost();
     public  int getLocalPort();
     public String getRemoteAddr();
     public String getRemoteHost();
     public  int getRemotePort();
}

EndPoint类图


EndPoint接口定义概述

EndPoint最主要的方法从底层传输链路中读取数据并填入Buffer中的fill方法,以及将Buffer中的数据写入底层传输链路的flush方法;读数据对应Input,写数据对应Output,可以单独的关闭Input或Output,并提供方法判断Input或Output是否已经被关闭;可以用close方法关闭EndPoint,也可以通过isOpen方法判断是否这个EndPoint是否已经被关闭;可以以阻塞的方式读写EndPoint,并判断当前EndPoint是否处于阻塞状态(主要用于SelectChannelEndPoint中);对SslSelectChannelEndPoint来说,它在读写时都可能内部缓存数据,因而EndPoint中定义了一些方法用于判断当前EndPoint是否有输入/输出换成,以及使用flush将缓存中的数据写入到底层链路中;对底层Socket,EndPoint还可以配置其最长的空闲时间;最后EndPoint还提供一些方法用于获取本地和远程的地址、主机名、端口号,以及获取底层传输类,如Socket、Channel等。

StreamEndPoint实现

StreamEndPoint采用古老的Stream方法从Socket中读写数据,它包含InputStream和OutputStream,分别表示读写数据流;它永远是阻塞式读写,因而isBlocking、blockReadable、blockWritable永远返回true;它也不会在内部缓存读写数据,因而isBufferingInput、isBufferingOutput、isBufferred永远返回false,而flush方法直接调用OutputStream的flush方法;对fill实现,直接使用传入的Buffer从InputStream中读取数据;对flush实现,直接将Buffer中的数据写入到OutputStream中;close方法同时关闭InputStream和OutputStream,并将成员变量置为null;对StreamEndPoint本身,没有本地或远程的地址、主机名、端口号信息。

SocketEndPoint 是StreamEndPoint的子类,它从Socket中获取InputStream和OutputStream,以及本地和远程的地址、主机名、端口号;而isInputShutdown、isOutputShutdown、shutdownInput、shutdownOutput等方法直接调用Socket中相应的方法;getTransport直接返回Socket实例;setMaxIdleTime方法同时设置Socket的SO_TIMEOUT值;当空闲超时,只关闭Input。 

ConnectorEndPoint 继承自SocketEndPoint,它是SocketConnector的内部类,每一个客户端的连接请求创建一个ConnectorEndPoint实例,在创建ConnectorEndPoint的同时,会在内部创建一个HttpConnection实例;它还实现了ConnectedEndPoint,因而可以从外部设置Connection实例;在读数据时,如果遇到EOF,表示连接已经断开,因而关闭当前EndPoint;在关闭EndPoint时,cancel当前Connection中Request实例的AsyncContinuation。ConnectorEndPoint还实现了Runnable接口,在其run方法的实现中,它首先更新处理的Connection的引用计数,然后保存当前Connection实例,在SocketConnector已经启动,并且ConnectorEndPoint未被关闭的状态下循环调用Connection的handle方法,在每个循环开始前检查当前Connector是否处于Low Resources状态(如线程池的可用线程已经不多),此时更新EndPoint的MaxIdleTime为当前Connector的LowResourcesMaxIdleTime的值,以减少一些连接的空闲等待时间;对任何Exception,关闭当前EndPoint;最后更新Connector中的一些统计信息,将当前Connection从Connector的当前正在处理的connections集合中移除,如果此时Socket还未关闭,读取Socket中的数据直到数据读完或超过MaxIdleTime,此时如果Socket还未关闭,则关闭当前Socket。而在Connector创建ConnectorEndPoint时,会调用其dispatch方法,将其自身仍给相应的线程池处理,以在某个时间在另一个线程中调用其run方法。

SslConnectorEndPoint 继承自ConnectorEndPoint,它在关闭Input和Output时会同时关闭整个EndPoint,而在执行真正的处理逻辑前有一个handle shake的过程。

ChannelEndPoint实现

ChannelEndPoint采用NIO实现,从Channel中读写数据。在创建ChannelEndPoint时传入ByteChannel,如果传入的ByteChannel是SocketChannel,则同时纪录Socket实例,以及获取本地、远程的地址信息,并设置MaxIdleTime值为SO_TIMEOUT值。如果该ByteChannel是SelectableChannel类型(ServerSocketChannel、SocketChannel、DiagramChannel、SinkChannel、SourceChannel),并且其isBlocking()方法返回false,表示该Channel是非阻塞式的读写,否则这个Channel是阻塞式的读写,但是默认情况下,blockReadable、blockWritable直接返回true,表示阻塞式的读写。对非SslSelectChannelEndPoint的EndPoint不会在内部缓存数据,因而isBufferred、isBufferingOutput、isBufferingInput直接返回false,而flush方法为空实现;对SocketChannel,在设置MaxIdleTime时,同时将该值设置到底层Socket的SO_TIMEOUT的值中;getTransport直接返回底层channel实例;shutdownInput、shutdownOutput、isInputShutdown、isOutputShutdown使用Socket实现;fill实现只支持NIOBuffer,它使用Channel将数据写入内部的ByteBuffer中;flush实现使用Channel将ByteBuffer中的数据写入到Channel中,或使用GatheringByteChannel将多个ByteBuffer同时写入到Channel中。

BlockingChannelEndPoint 类是BlockingChannelConnector的内部类,它继承自ChannelEndPoint,并实现了ConnectedEndPoint和Runnable接口。在创建BlcokingChannelEndPoint时,同样也会创建HttpConnection实例;每次调用fill、flush方法时,都会更新_idleTimestamp的值为当前时间戳(该值也会在每一次Connection开始重新被处理时更新),在BlockingChannelConnector启动时会生成一个Task,它没400毫秒遍历一次所有正在处理的EndPoint,如果发现有EndPoint已经超时(checkIdleTimestamp()方法,即空闲时间超过MaxIdleTime),则调用其idleExpired()方法,将该EndPoint关闭;BlockingChannelConnector在接到一个连接后,先会设置SocketChannel的blockingChannel为true,然后使用这个SocketChannel创建一个BlockingChannelEndPoint,并调用其dispatch()方法,将它丢到一个线程池中,在BlockingChannelEndPoint的run方法实现中,首先更新一些统计数据,纪录当前正在处理的EndPoint;只要当前EndPoint还处于打开状态,先更新_idleTimestamp为当前时间戳,然后如果当前ThreadPool处于LowOnThread状态,将timeout时间更新为LowResourcesMaxIdleTime,而后调用Connection的handle方法;对任何Exception,直接关闭EndPoint;在最后退出时,如果EndPoint还未关闭,读取EndPoint的数据,直到超时,并强制关闭EndPoint。

SelectChannelEndPoint类在SelectChannelConnector中被使用,它继承自ChannelEndPoint,并实现了ConnectedEndPoint和AsyncEndPoint接口,SelectChannelConnector采用NIO中多路复用的机制,因而实现会比较复杂一些。在创建Connector时,首先创建ConnectorSelectorManager实例(_manager),在SelectChannelConnector启动时,设置_manager的SelectSets(acceptors)、MaxIdleTime、LowResourcesConnections、LowResourcesIdleTime,然后启动_manager,并且启动acceptors个线程,只要SelectChannelConnector处于Running状态,就不断的调用_manager.doSelect()方法。ConnectorSelectorManager在启动时会创建_selectSets个SelectSet;而doSelect方法会调用根据传入的索引号对应的SelectSet的doSelect()方法。当客户端的连接到来后,SelectChannelConnector首先会配置SocketChannel的configureBlocking为false,然后将该SocketChannel注册到_manager中,在注册过程中,根据当前的SelectSet索引值找到相应的SelectSet(之后索引自增),然后调用SelectSet的addChange(传入SocketChannel)和wakeup方法。因而这里最重要的就是SelectSet的实现,它是SelectorManager中的一个内部类。
SelectSet类的实现中,它内部有一个Selector,一个ConcurrentLinkedQueue的changes队列,以及SelectChannelEndPoint到SelectSet的集合(它用于调用SelectChannelEndPoint中的checkIdleTimestamp()方法以检查并关闭处于Idle Timeout的SelectChannelEndPoint)。SelectSet使用addChange()方法添加需要改变状态的对象,这些对象有EndPoint、ChannelAndAttachment、SocketChannel、Runnable。在doSelect()方法中,首先检查changes队列中是否有对象,如果有SelectChannelEndPoint对象,则调用其doUpdateKey()方法;如果是SocketChannel对象,则注册OP_READ操作到Selector中,创建新的SelectChannelEndPoint,attach新创建的SelectChannelEndPoint到SelectionKey中,调用SelectChannelEndPoint的schedule()方法。对ChannelAndAttachment对象,如果其Channel是SocketChannel,并且处于Connected状态,则类似对SocketChannel对象的处理,否则,注册OP_CONNECT操作到Selector;如果是Runnable对象,则dispatch该Runnable对象。然后调用Selector的selectNow()方法,如果没有任何可用的事件,则计算出等待时间,然后带等待时间的调用Selector的select()方法;遍历所有Selected Keys,对Invalid的SelectionKey,直接调用其attach的SelectChannelEndPoint的doUpdateKey()方法,否则对类型是SelectChannelEndPoint的attachment调用其schedule()方法,对connectable的SelectionKey创建新的SelectChannelEndPoint并调用schedule()方法,否则创建新的SelectChannelEndPoint并对readable的SelectionKey调用其schedule()方法。
SelectChannelEndPoint采用NIO的非阻塞读写方式,而NIO基于Channel的非阻塞操作是基于注册的操作集(OP_READ, OP_WRITE, O_CONNECT, OP_ACCEPT)以从Selector中选出已经可用的SelectionKey(包含对应的Channel、interestOps、readable、writable、attachment等),之后可以使用对应的Channel以及根据SelectionKey中对应的已经可用的操作执行相应的操作(如读写),因而SelectChannelEndPoint的其中一个任务是要实时的更新当前它感兴趣的操作集,并重新像Selector中注册。 SelectChannelEndPoint使用updateKey()方法跟新感兴趣操作集合,并且它只关注OP_READ和OP_WRITE操作,在实现时,OP_READ只需要在Socket的输入没有关闭,且还没有dispatch或当前处于readBlocked状态下才需要关注;OP_WRITE只需要在Socket的输出没有关闭,且writable为false(当需要向Channel中写数据,但是还没有写完的情况下)或当前处于writeBlocked状态下才需要关注;如果和当前已注册的操作集相同,则不需要重新注册,否者将自身通过SelectSet的addChange()方法添加到SelectSet中,在SelectSet的doSelect()方法中会最终调用SelectChannelEndPoint中的doUpdateKey()方法,该方法的实现:1. 当Channel处于Open状态,存在感兴趣的操作,SelectionKey为null或invalid,如果Channel已经注册了,重新调用updateKey()方法(感觉这里一般不会被调用到,如果被调用到了,则可能出现死循环),否则将Channel重新向Selector中重新注册interestOps的操作集(如果出错,则canncel SelectionKey,并且从SelectSet中销毁当前EndPoint)。2. 当Channel处于Open状态,存在感兴趣的操作,SelectionKey存在且valid,则直接使用interestOps更新SelectionKey的感兴趣集(调用SelectionKey的interestOps()方法)。3. 当Channel处于Open状态,不存在感兴趣的操作,清空SelectionKey的interestOps,或清理SelectionKey引用。4. 如果Channel处于关闭状态,则canncel SelectionKey,并从SelectSet中销毁当前EndPoint。
对阻塞读写(readBlocked、writeBlocked),在blockReadable()、blockWritable()方法中,会设置readBlocked、writeBlocked为true,调用updateKey()方法,然后计算等待时间并进入等待(调用wait方法),如果因为超时而退出等待,则返回false,否则返回true(在返回时设置readBlocked、writeBlocked为false);当调用SelectChannelEndPoint的schedule()方法时,它会更新readBlocked、writeBlocked、interestOps的值(同时使用该值更新SelectionKey中的状态),并调用notifyAll()方法唤醒blockReadable()、blockWritable()方法:1. 如果SelectionKey为null或invalid,readBlocked、writeBlocked设置为false,调用notifyAll(),并返回;2. 如果readBlocked或writeBlocked为true,使用SelectionKey的readable、writable更新readBlocked和writeBlocked的值,调用notifyAll(),如果已经dispatched,清除所有interestOps,并返回;3. 如果还没有dispatched,直接清除所有interestOps,并返回;4. 如果注册了OP_WRITE,并且已经可写,则清除OP_WRITE操作,设置writable为true;5. 如果还没有dispatched,则调用dispatch()方法。在dispatch()方法中,它设置dispatched为true,并将handler扔给ThreadPool(在handler调用Connection的handle()方法,由于SelectChannelEndPoint的生命周期是在SelectManager维护,并且dispatch()方法可能被多次调用,因而没有在handler的handle()方法中判断EndPoint的close状态,并循环的调用Connection的handle()方法,而是在每次handle()方法结束后退出当前线程,在下次schedule()时会使用重新将handler扔给ThreadPool以支持AsyncContinuation的实现,并且AsyncEndPoint接口的定义也是用于AsyncContinuation的实现,这个将在以后的博客中详述)。在flush()方法中,如果没有任何数据能写入Channel时,设置writable为false(从而在updateKey()方法中能将OP_WRITE注册到SelectionKey的interestOps中),并在没有dispatch的情况下调用updateKey。最后清理Selector中的selectedKeys,expire所有timeout中注册的Task(使用scheduleTimeout()方法注册),依次调用检查EndPoints中是否已经TimeOut。
在SelectChannelEndPoint的构建中,它使用SocketChannel、SelectSet、SelectionKey构建,内部从SelectSet中获取SelectorManager,并使用SelectorManager 创建Connection实例,初始化_dispatched、_redispatched为false,_open、_writable为true,_readBlocked、_writeBlocked为false,_interestOps为0,最后更新_idleTimestamp为当前时间。当一个客户端连接到来后,SelectChannelConnector会向SelectorManager(SelectSet)中注册一个SocketChannel,当后台线程调用SelectSet中的doSelect()方法时,它使用该SocketChannel,向该SelectSet中的Selector注册OP_READ得到一个SelectionKey,并使用这个SocketChannel、当前SelectSet、以及这个SelectionKey创建一个SelectChannelEndPoint,而后调用SelectChannelEndPoint的schedule()方法。

SslSelectChannelEndPoint类采用Buffer的形式先将数据读写到内部缓存中,然后使用SSLEngine来wrap或unwrap(encode/decode)数据。这里不再详述。

相关文章
|
缓存 分布式计算 API
Spark Netty与Jetty (源码阅读十一)
  spark呢,对Netty API又做了一层封装,那么Netty是什么呢~是个鬼。它基于NIO的服务端客户端框架,具体不再说了,下面开始。   创建了一个线程工厂,生成的线程都给定一个前缀名。      像一般的netty框架一样,创建Netty的EventLoopGroup:      在常用...
1068 0