深入Jetty源码之Connector

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介:

Connector概述

Connector是Jetty中可以直接接受客户端连接的抽象,一个Connector监听Jetty服务器的一个端口,所有客户端的连接请求首先通过该端口,而后由操作系统分配一个新的端口(Socket)与客户端进行数据通信(先握手,然后建立连接,但是使用不同的端口)。不同的Connector实现可以使用不同的底层结构,如Socket Connector、NIO Connector等,也可以使用不同的协议,如Ssl Connector、AJP Connector,从而在不同的Connector中配置不同的EndPoint和Connection。EndPoint用于连接(Socket)的读写数据(参见 深入Jetty源码之EndPoint ),Connection用于关联Request、Response、EndPoint、Server,并将解析出来的Request、Response传递给在Server中注册的Handler来处理(参见 深入Jetty源码之Connection )。在Jetty中Connector的有:SocketConnector、SslSocketConnector、Ajp13SocketConnector、BlockingChannelConnector、SelectChannelConnector、SslSelectChannelConnector、LocalConnector、NestedConnector等,其类图如下。

Connector类图


Connector接口

首先Connector实现了LifeCycle接口,在启动Jetty服务器时,会调用其的start方法,用于初始化Connector内部状态,并打开Connector以接受客户端的请求(调用open方法);而在停止Jetty服务器时会调用其stop方法,以关闭Connector以及内部元件(如Connection等)以及做一些清理工作。因而open、close方法是Connector中用于处理生命周期的方法;对每个Connector都有name字段用于标记该Connector,默认值为:hostname:port;Connector中还有Server的引用,可以从中获取ThreadPool,并作为Handler的容器被使用(在创建HttpConnection时,Server实例作为构造函数参数传入,并在handleRequest()方法中将解析出来的Request、Response传递给Server注册的Handler);Connector还定义了一些用于配置当前Connector的方法,如Buffer Size、Max Idle Time、Low Resource Max Idle Time,以及一些统计信息,如当前Connector总共处理过的请求数、总共处理过的连接数、当前打开的连接数等信息。
Connector的接口定义如下:
public  interface Connector  extends LifeCycle { 
     //  Connector名字,默认值hostname:port
    String getName();
     //  打开当前Connector
     void open()  throws IOException;
     //  关闭当前Connector
     void close()  throws IOException;
     //  对Server的引用
     void setServer(Server server);
    Server getServer();

     //  在处理请求消息头时使用的Buffer大小
     int getRequestHeaderSize();
     void setRequestHeaderSize( int size);
     //  在处理响应消息头时使用的Buffer大小
     int getResponseHeaderSize();
     void setResponseHeaderSize( int size);
     //  在处理请求消息内容时使用的Buffer大小
     int getRequestBufferSize();
     void setRequestBufferSize( int requestBufferSize);
     //  在处理响应消息内容使用的Buffer大小
     int getResponseBufferSize();
     void setResponseBufferSize( int responseBufferSize);
     //  在处理请求消息时使用的Buffer工厂
    Buffers getRequestBuffers();
     //  在处理响应消息时使用的Buffer工厂
    Buffers getResponseBuffers();

     // User Data Constraint的配置可以是None、Integral、Confidential,对这三种值的解释:
     // None:A value of NONE means that the application does not require any transport guarantees.
     // Integral:A value of INTEGRAL means that the application requires the data sent between the client and server to be sent in such a way that it can't be changed in transit.
     // Confidential:A value of CONFIDENTIAL means that the application requires the data to be transmitted in a fashion that prevents other entities from observing the contents of the transmission on.
     // In most cases, the presence of the INTEGRAL or CONFIDENTIAL flag indicates that the use of SSL is required. 参考: 这里
     // 如果配置了user-data-constraint为Integral或confidential表示所有相应请求都会重定向到使用Integral/Confidential Schema/Port构建的新的URL中。
     int getIntegralPort();
    String getIntegralScheme();
     boolean isIntegral(Request request);
     int getConfidentialPort();
    String getConfidentialScheme();
     boolean isConfidential(Request request);

     / / 在将HttpConnection交给Server中的Handlers处理前,根据当前Connector,自定义一些EndPoint和Request的配置,如设置EndPoint的MaxIdleTime,Request的timestamp,
     / / 清除SelectChannelEndPoint中的idleTimestamp,检查forward头等。
     void customize(EndPoint endpoint, Request request)  throws IOException;

     / / 主要用于SelectChannelConnector中,重置SelectChannelEndPoint中的idleTimestamp,即重新计时idle的时间。
    / 它在每个Request处理结束,EndPoint还未关闭,并且当前连接属于keep-alive类型的时候被调用。
     void persist(EndPoint endpoint)  throws IOException;

     / / 底层的链接实例,如ServerSocket、ServerSocketChannel等。
    Object getConnection();
    
     / /是否对"X-Forwarded-For"头进行DNS名字解析
     boolean getResolveNames();
    
     / / 当前Connector绑定的主机名、端口号等。貌似在Connector的实现中没有一个默认的主机名。
     / 由于端口号可以设置为0,表示由操作系统随机的分配一个还没有被使用的端口,因而这里由LocalPort用于存储Connector实际上绑定的端口号;
     / 其中-1表示这个Connector还未开启,-2表示Connector已经关闭。
    String getHost();
     void setHost(String hostname);
     void setPort( int port);
     int getPort();
     int getLocalPort();
    
     / / Socket的最大空闲时间,以及在资源比较少(如线程池中的任务数比最大可用线程数要多)的情况下的最大空闲时间,当空闲时间超过这个时间后关闭当前连接(Socket)。
     int getMaxIdleTime();
     void setMaxIdleTime( int ms);
     int getLowResourceMaxIdleTime();
     void setLowResourceMaxIdleTime( int ms);

     / / 是否当前Connector处于LowResources状态,即线程池中的任务数比最大可用线程数要多
     public  boolean isLowResources();

     /*  ------------------------以下是一些获取和当前Connector相关的统计信息------------------------------------  */
     / / 打开或关闭统计功能
     public  void setStatsOn( boolean on);
     / / 统计功能的开闭状态
     public  boolean getStatsOn();
     / / 重置统计数据,以及统计开始时间
     public  void statsReset();
     / / 统计信息的开启时间戳
     public  long getStatsOnMs();

     / / 当前Connector处理的请求数
     public  int getRequests();
     / / 当前Connector接收到过的连接数
     public  int getConnections() ;
     / / 当前Connector所有当前还处于打开状态的连接数
     public  int getConnectionsOpen() ;
     / / 当前Connector历史上同时处于打开状态的最大连接数
     public  int getConnectionsOpenMax() ;
     / / 当前Connector所有连接的持续时间总和
     public  long getConnectionsDurationTotal();
     / / 当前Connector的最长连接持续时间
     public  long getConnectionsDurationMax();
     / / 当前Connector平均连接持续时间
     public  double getConnectionsDurationMean() ;
     / / 当前Connector所有连接持续时间的标准偏差
     public  double getConnectionsDurationStdDev() ;
     / / 当前Connector的所有连接的平均请求数
     public  double getConnectionsRequestsMean() ;
     / / 当前Connector的所有连接的请求数标准偏差
     public  double getConnectionsRequestsStdDev() ;
     / / 当前Connector的所有连接的最大请求数
     public  int getConnectionsRequestsMax();
}

AbstractConnector实现

Jetty中所有的Connector都继承自AbstractConnector,而它自身继承自HttpBuffers,HttpBuffers包含了Request、Response的Buffer工厂的创建以及相应Size的配置。AbstractConnector还包含了Name、Server、maxIdleTime以及一些统计信息的引用,用于实现Connector接口中的方法,只是一些Get、Set方法,不详述。除了Connector接口相关的配置,AbstractConnector还为定义了两个字段:_acceptQueueSize用于表示ServerSocket 或ServerSocketChannel中最大可等待的请求数,_acceptors用于表示用于调用ServerSocket或ServerSocketChannel中accept方法的线程数,建议这个数字小于或等于可用处理器的2倍。

在AbstractConnector中还定义了Acceptor内部类,它实现了Runnable接口,在其run方法实现中,它将自己的Thread实例赋值给AbstractConnector中的_acceptorThread数组中acceptor号对应的bucket,并更新线程名为:<name> Acceptor<index> <Connector.toString>。然后根据配置的_acceptorPriorityOffset设置当前线程的priority。只要当前Connector处于Running状态,并且底层链接实例不为null,不断的调用accept方法()。在退出的finally语句快中清理_acceptorThread数组中相应的Bucket值为null,并将线程原来的Name、Priority设置回来。

AbstractConnector实现了doStart()方法,它首先保证Server实例的存在;然后打开当前Connector(调用其open()方法),并调用父类的doStart方法(这里是HttpBuffers,用于初始化对应的Buffers);如果没有自定义的ThreadPool,则从Server中获取ThreadPool;最后根据acceptors的值创建_acceptorThread数组,将acceptors个Acceptor实例dispatch给ThreadPool。在doStop方法实现中,它首先调用close方法,然后对非Server中的ThreadPool调用其stop方法,再调用父类的doStop方法清理Buffers的引用,最后遍历_acceptorThread数据,调用每个Thread的interrupt方法。

在各个子类的accept方法实现中,他们在获取客户端过来的Socket连接后,都会对该Socket做一些配置,即调用AbstractConnector的configure方法,它首先设置Socket的TCP_NODELAY为true,即禁用Nagle算法(关于禁用的理由可以参考:http://jerrypeng.me/2013/08/mythical-40ms-delay-and-tcp-nodelay/#sec-4-2,简单的,如果该值为false,则TCP的数据包要么达到TCP Segment Size,要么收到一个Ack,才会发送出去,即有Delay);然后如果设置了_soLingerTime,则开启Socket中SO_LINGER选项,否则,关闭该选项(SO_LINGER选项用于控制关闭一个Socket的行为,如果开启了该选项,则在关闭Socket时会等待_soLingerTime时间,此时如果有数据还未发送完,则会发送这些数据;如果关闭了该选项,则Socket的关闭会立即返回,此时也有可能继续发送未发送完成的数据,具体参考:http://blog.csdn.net/factor2000/article/details/3929816)。

在ServerSocket和ServerSocketChannel中还有一个SO_REUSEADDR的配置,一般来说当一个端口被释放后会等待两分钟再被使用,此时如果重启服务器,可能会导致启动时的绑定错误,设置该值可以让端口释放后可以立即被使用(具体参考:http://www.cnblogs.com/mydomain/archive/2011/08/23/2150567.html)。在AbstractConnector中可以使用setReuseAddress方法来配置,默认该值设置为true。

AbstractConnector中还实现了customize方法,它在forwarded设置为true的情况下设置相应的attribute:javax.servlet.request.cipher_suite, javax.servlet.request.ssl_session_id,以及Request中对应Host、Server等头信息。这个逻辑具体含义目前还不是很了解。。。。

最后关于统计数据的更新方法,AbstractConnector定义了如下方法:
protected  void connectionOpened(Connection connection)
protected  void connectionUpgraded(Connection oldConnection, Connection newConnection)
protected  void connectionClosed(Connection connection)

SocketConnector实现

有了AbstractConnector的实现,SocketConnector的实现就变的非常简单了,它保存了一个EndPoint的Set,表示所有在这个Connector下正在使用的EndPoint,然后是ServerSocket,在open方法中创建,并在getConnection()方法中返回,还有一个localPort字段,当ServerSocket被创建时从ServerSocket实例中获取,并在getLocalPort()方法中返回。在close方法中关闭ServerSocket,并设置localPort为-2;在accept方法中,调用ServerSocket的accept方法,返回一个Socket,调用configure方法对新创建的Socket做一些基本的配置,然后使用该Socket创建ConnectorEndPoint,并调用其dispatch方法;在customize方法中,在调用AbstractConnector的customize方法的同时还设置ConnectorEndPoint的MaxIdleTime,即设置Socket的SO_TIMEOUT选项,用于配置该Socket的空闲可等待时间;在doStart中会先清理ConnectorEndPoint的集合,而在doStop中会关闭所有还处于打开状态的ConnectorEndPoint。

SelectChannelConnector实现

SelectChannelConnector内部使用ServerSocketChannel,在open方法中创建ServerSocketChannel,配置其为非blocking模式,并设置localPort值;在accept方法中调用ServerSocket的accept方法获得一个SocketChannel,配置该Channel为非blocking模式,调用AbstractChannel的configure方法做相应Socket配置,最后将该SocketChannel注册给ConnectorSelectManager;在doStart方法中,它会初始化ConnectorSelectManager的SelectSets值为acceptors值、MaxIdleTime、LowResourceConnections、LowResourcesMaxIdleTime等值,并启动该Manager,并dispatch acceptors个线程,不断的调用Manager的doSelect方法;在close方法中会先stop ConnectorSelectManager,然后关闭ServerSocketChannel,设置localPort为-2;在customize方法中会清除SelectChannelEndPoint的idleTimestamp,重置其MaxIdleTime以及重置Request中的timestamp的值;在persist方法中会重置SelectChannelEndPoint中idleTimestamp的值。

BlockingChannelConnector实现

BlockingChannelConnector实现类似SocketConnector,不同的是它使用ServerSocketChannel,并且其EndPoint为BlockingChannelEndPoint。所不同的是它需要在doStart方法中启动一个线程不断的检查所有还在connections集合中的BlockingChannelEndPoint是否已经超时,每400ms检查一次,如果超时则关闭该EndPoint。

其他的Connector的实现都比较类似,而SSL相关的Connector需要也只是加入了SSL相关的逻辑,这里不再赘述。

相关文章
|
网络协议 安全 Java
Jetty架构设计之Connector、Handler组件(上)
Jetty架构设计之Connector、Handler组件
271 0
Jetty架构设计之Connector、Handler组件(上)
|
Java 应用服务中间件 调度
Jetty架构设计之Connector、Handler组件(中)
Jetty架构设计之Connector、Handler组件
153 0
Jetty架构设计之Connector、Handler组件(中)
|
安全 Java 应用服务中间件
Jetty架构设计之Connector、Handler组件(下)
Jetty架构设计之Connector、Handler组件
272 0
Jetty架构设计之Connector、Handler组件(下)
|
缓存 分布式计算 API
Spark Netty与Jetty (源码阅读十一)
  spark呢,对Netty API又做了一层封装,那么Netty是什么呢~是个鬼。它基于NIO的服务端客户端框架,具体不再说了,下面开始。   创建了一个线程工厂,生成的线程都给定一个前缀名。      像一般的netty框架一样,创建Netty的EventLoopGroup:      在常用...
1104 0