开篇
这篇文章的主要目的是分析下Tomcat在处理连接请求的整个过程,参考了前人的文章并在文末指出,通过时序图能够较清楚的走通整个流程。
Tomcat处理流程
说明:
Connector 启动以后会启动一组线程用于不同阶段的请求处理过程,Acceptor、Poller、worker 所在的线程组都维护在 NioEndpoint 中。
-
- Acceptor线程组。用于接受新连接,并将新连接封装一下,选择一个 Poller 将新连接添加到 Poller 的事件队列中,Acceptor线程组是多个线程组成的线程组。
-
- Poller 线程组。用于监听 Socket 事件,当 Socket 可读或可写等等时,将 Socket 封装一下添加到 worker 线程池的任务队列中,Poller线程组是多个线程组成的线程组。
-
- worker 线程组。用于对请求进行处理,包括分析请求报文并创建 Request 对象,调用容器的 pipeline 进行处理,worker线程组是Executor创建的线程池。
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {
public void startInternal() throws Exception {
// 创建worker线程组
if ( getExecutor() == null ) {
createExecutor();
}
// Poller线程组由一堆线程组成
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
startAcceptorThreads();
}
}
}
public abstract class AbstractEndpoint<S> {
// Acceptor线程组由一堆线程组成
protected final void startAcceptorThreads() {
int count = getAcceptorThreadCount();
acceptors = new Acceptor[count];
for (int i = 0; i < count; i++) {
acceptors[i] = createAcceptor();
String threadName = getName() + "-Acceptor-" + i;
acceptors[i].setThreadName(threadName);
Thread t = new Thread(acceptors[i], threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}
// worker的线程组由executor创建线程池组成
public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
}
请求处理过程分解
Acceptor接收连接过程
说明:
Acceptor接受的新连接没有立即注册到selector当中,需要先封装成PollerEvent对象后保存至PollerEvent队列当中,Poller对象会消费PollerEvent队列,类似生产消费模型。
-
- Acceptor 在启动后会阻塞在 ServerSocketChannel.accept(); 方法处,当有新连接到达时,该方法返回一个 SocketChannel。
-
- setSocketOptions()方法将 Socket 封装到 NioChannel 中,并注册到 Poller。
-
- 我们一开始就启动了多个 Poller 线程,注册的时候采用轮询选择 Poller 。NioEndpoint 维护了一个 Poller 数组,当一个连接分配给 pollers[index] 时,下一个连接就会分配给 pollers[(index+1)%pollers.length]。
-
- addEvent() 方法会将 Socket 添加到该 Poller 的 PollerEvent 队列中。到此 Acceptor 的任务就完成了。
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {
private volatile ServerSocketChannel serverSock = null;
protected class Acceptor extends AbstractEndpoint.Acceptor {
public void run() {
while (running) {
state = AcceptorState.RUNNING;
try {
SocketChannel socket = null;
try {
// 监听socket负责接收新连接
socket = serverSock.accept();
} catch (IOException ioe) {
}
if (running && !paused) {
// 处理接受到的socket对象
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
}
} catch (Throwable t) {
}
}
state = AcceptorState.ENDED;
}
}
protected boolean setSocketOptions(SocketChannel socket) {
try {
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
channel = new NioChannel(socket, bufhandler);
// 注册到Poller当中
getPoller0().register(channel);
} catch (Throwable t) {
}
return true;
}
public Poller getPoller0() {
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}
public class Poller implements Runnable {
public void register(final NioChannel socket) {
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
r = new PollerEvent(socket,ka,OP_REGISTER);
// 添加PollerEvent队列当中
addEvent(r);
}
private void addEvent(PollerEvent event) {
// 投入到PollerEvent队列当中
events.offer(event);
if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
}
}
}
Poller处理请求
说明:
Poller会消费PollerEvent队列(由Acceptor进行投递),并注册到Selector当中。当注册到Selector的socket数据可读的时候将socket封装成SocketProcessor对象,投递到Executor实现的线程池进行处理。
-
- selector.select(1000)。当 Poller 启动后因为 selector 中并没有已注册的 Channel,所以当执行到该方法时只能阻塞。所有的 Poller 共用一个 Selector,其实现类是 sun.nio.ch.SelectorImpl。
-
- events() 方法通过 addEvent() 方法添加到事件队列中的 Socket 注册到SelectorImpl。这里指的socket是accept过来的请求的socket。
-
- 当 Socket 可读时,Poller 才对其进行处理,createSocketProcessor() 方法将 Socket 封装到 SocketProcessor 中,SocketProcessor 实现了 Runnable 接口。worker 线程通过调用其 run() 方法来对 Socket 进行处理。
-
- execute(SocketProcessor) 方法将 SocketProcessor 提交到线程池,放入线程池的 workQueue 中。workQueue 是 BlockingQueue 的实例。
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {
public static class PollerEvent implements Runnable {
private NioChannel socket;
private int interestOps;
private NioSocketWrapper socketWrapper;
public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) {
reset(ch, w, intOps);
}
public void run() {
if (interestOps == OP_REGISTER) {
try {
socket.getIOChannel().register(
socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
}
}
}
}
public class Poller implements Runnable {
public void run() {
while (true) {
// events()负责处理PollerEvent事件并注册到selector当中
hasEvents = events();
keyCount = selector.select(selectorTimeout);
// 处理新接受的socket的读写事件
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
processKey(sk, attachment);
}
}
}
// 处理读写事件
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
if (sk.isReadable()) {
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
}
}
}
public abstract class AbstractEndpoint<S> {
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
sc = createSocketProcessor(socketWrapper, event);
Executor executor = getExecutor();
// 注册到Worker的线程池ThreadPoolExecutor。
if (dispatch && executor != null) {
executor.execute(sc);
}
} catch (RejectedExecutionException ree) {
}
return true;
}
}
Worker处理具体请求
说明:
-
- 当新任务添加到 workQueue(ThreadPoolExecutor)后,workQueue.take()方法会返回一个 Runnable,通常是 SocketProcessor,然后 worker 线程调用 SocketProcessor的run() -> doRun()方法对 Socket 进行处理。
-
- createProcessor() 会创建一个Http11Processor, 它用来解析 Socket,将 Socket 中的内容封装到Request中。注意这个Request是临时使用的一个类,它的全类名是org.apache.coyote.Request。
-
- CoyoteAdapter的postParseRequest()方法封装一下 Request,并处理一下映射关系(从 URL 映射到相应的 Host、Context、Wrapper)。
-
- CoyoteAdapter将 Rquest 提交给 Container(StandardEngine) 处理之前,并将 org.apache.coyote.Request封装到 org.apache.catalina.connector.Request,传递给 Container处理的 Request 是 org.apache.catalina.connector.Request。
-
- connector.getService().getMapper().map(),用来在Mapper中查询 URL 的映射关系。映射关系会保留到 org.apache.catalina.connector.Request 中,Container处理阶段 request.getHost()是使用的就是这个阶段查询到的映射主机,以此类推 request.getContext()、request.getWrapper()都是。
-
- connector.getService().getContainer().getPipeline().getFirst().invoke()会将请求传递到 Container(StandardEngine)处理,至此进入了Engine->Host->Context->Wrapper的处理流程,当然了 Container处理也是在 Worker线程中执行的(也就是说Tomcat处理请求是通过ThreadPoolExecutor的线程池实现的),但是这是一个相对独立的模块,所以单独分出来一节。
Container单个请求处理流程
说明:
-
- 每个容器(Engine、Host、Context、Wrapper)的 StandardPipeline 上都会有多个已注册的 Valve,我们只关注每个容器的 Basic Valve,其他 Valve 都是在 Basic Valve 前执行。
-
- request.getHost().getPipeline().getFirst().invoke() 先获取对应的 StandardHost,并执行其 pipeline。
-
- request.getContext().getPipeline().getFirst().invoke() 先获取对应的 StandardContext,并执行其 pipeline。
-
- request.getWrapper().getPipeline().getFirst().invoke() 先获取对应的 StandardWrapper,并执行其 pipeline。
-
- StandardWrapper的Basic Valve是StandardWrapperValve,通过allocate() 用来加载并初始化 Servlet,值的一提的是 Servlet 并不都是单例的,当 Servlet 实现了 SingleThreadModel 接口后,StandardWrapper 会维护一组 Servlet 实例,这是享元模式。当然了 SingleThreadModel在 Servlet 2.4 以后就弃用了。
-
- createFilterChain() 方法会从 StandardContext 中获取到所有的过滤器,然后将匹配 Request URL 的所有过滤器挑选出来添加到 filterChain 中。
doFilter() 执行过滤链,当所有的过滤器都执行完毕后调用 Servlet 的 service() 方法。
- createFilterChain() 方法会从 StandardContext 中获取到所有的过滤器,然后将匹配 Request URL 的所有过滤器挑选出来添加到 filterChain 中。
参考文章
招聘信息
【招贤纳士】
欢迎热爱技术、热爱生活的你和我成为同事,和贝贝共同成长。
贝贝集团诚招算法、大数据、BI、Java、PHP、android、iOS、测试、运维、DBA等人才,有意可投递zhi.wang@beibei.com。
贝贝集团创建于2011年,旗下拥有贝贝网、贝店、贝贷等平台,致力于成为全球领先的家庭消费平台。
贝贝创始团队来自阿里巴巴,先后获得IDG资本、高榕资本、今日资本、新天域资本、北极光等数亿美金的风险投资。
公司地址:杭州市江干区普盛巷9号东谷创业园(上下班有多趟班车)