深入浅出Tomcat网络通信的高并发处理机制

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 【10月更文挑战第3天】本文详细解析了Tomcat在处理高并发网络请求时的机制,重点关注了其三种不同的IO模型:NioEndPoint、Nio2EndPoint 和 AprEndPoint。NioEndPoint 采用多路复用模型,通过 Acceptor 接收连接、Poller 监听事件及 Executor 处理请求;Nio2EndPoint 则使用 AIO 异步模型,通过回调函数处理连接和数据就绪事件;AprEndPoint 通过 JNI 调用本地库实现高性能,但已在 Tomcat 10 中弃用

深入浅出Tomcat网络通信的高并发处理机制

随着互联网应用的快速发展,Web服务器面临的访问压力日益增大,如何高效处理高并发的网络请求成为关键

Tomcat作为Java世界中最受欢迎的Web容器之一,可以灵活选择不同的IO模型来处理网络通信,确保面对高并发的网络请求时能够快速处理

上篇文章21张图解析Tomcat运行原理与架构全貌,我们说到Tomcat中通过Connector来处理网络通信,其中Connector的职责主要由组件EndPoint、Processor、Adapter来完成

EndPoint负责网络通信、Processor负责解析、Adapter负责将请求转换为Servlet的请求并交给容器处理

image.png

本篇文章就来重点聊聊AbstractEndpoint的多种实现类是如何处理网络通信的

AbstractEndpoint有三种实现类:NioEndPoint、Nio2EndPoint、AprEndPoint

其中默认使用NioEndPoint(多路复用模型),Nio2EndPoint使用异步IO模型,而AprEndPoint为早期提供高性能(Tomcat 10时被弃用)

NioEndPoint

NioEndPoint将处理网络通信分离为三个步骤,分别使用三个组件进行执行:接收连接、检测IO事件、处理请求
image.png

我们先大致对这些组件的作用进行描述,后续再通过源码分析~

Acceptor用于接收连接(循环执行):使用LimitLatch限制最大连接数量,等待客户端完成TCP三次握手连接后,将连接交给Poller

Poller用于检测IO事件是否就绪(循环执行):将连接注册到Selector上,使用Selector监听IO事件,当事件发生(读就绪)时交给Executor进行处理

Executor池化管理线程,使用线程执行后续流程(解析请求、封装适配、交给容器处理...)

Acceptor

Acceptor使用LimitLatch限制连接数量,收到连接后将其逐步封装为PollerEvent并放到Poller的队列中

NioEndpoint.initServerSocket 在启动组件时初始化socket

//服务端Channel
ServerSocketChannel serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
//服务端channel绑定端口,getAcceptCount为连接上限
serverSock.bind(addr, getAcceptCount());

ServerSocketChannel作为服务端Channel只监听一个端口(连接器中设置的端口)

无论Acceptor线程数量为多少,都共享该服务端channel

getAcceptCount() 为建立连接后最大积压的数量,acceptCount默认为100

TCP三次握手完成后会将客户端连接放到accept队列中等待服务端拿走,acceptCount就是accept队列最大存放的连接数量

Acceptor.run Acceptor接收连接

为了简化流程,只保留了较重要的流程:

  1. 使用LimitLatch限制连接数量,如果达到最大值则等待
  2. 等待获取客户端连接socket channel(TCP三次握手完成)
  3. 把socket channel交给Poller处理
public void run() {
   

    //...
    while (!stopCalled) {
   
        //1.使用limit latch限制连接数量,如果达到最大值则等待
        endpoint.countUpOrAwaitConnection();

        U socket = null;

        //2.等待获取客户端socket channel
        socket = endpoint.serverSocketAccept();

        //3.交给Poller处理
        endpoint.setSocketOptions(socket);             
    } 
}

对于实现限制连接数量,当达到最大值时则进行等待,实现的关键为计数、等待

熟悉并发包的同学会立马想到信号量Semaphore,但Tomcat没有直接使用信号量组件,而是基于AQS自己实现同步组件LimitLatch

LimitLatch直接使用原子类进行计数,利用AQS来实现等待

(LimitLatch基于AQS实现比较简单,这里就不进行分析,不熟悉AQS的同学可以查看这篇文章:10分钟从源码级别搞懂AQS

image.png

如果没有超过限制则会获取下一个完成连接(TCP三次握手)的客户端连接SocketChannel result = serverSock.accept();

并将Channel包装成SocketWrapper,再包装为PollerEvent,加入Poller的队列中

public void register(final NioSocketWrapper socketWrapper) {
   
    //监听读事件(读就绪时poller能够继续处理)
    socketWrapper.interestOps(SelectionKey.OP_READ);
    //包装为PollerEvent 指定关心注册事件OP_REGISTER(后续poller将该通道注册到select上)
    PollerEvent pollerEvent = createPollerEvent(socketWrapper, OP_REGISTER);
    //放入poller的队列
    addEvent(pollerEvent);
}

Poller的队列SynchronizedQueue也是Tomcat自己实现的

Acceptor与Poller之间通过队列通信,SynchronizedQueue使用synchronized保证并发操作下的原子性

Poller

Poller循环处理队列中的PollerEvent事件,当Selector上监听的连接发生IO事件时迭代处理,将事件封装为SocketProcessor交给线程池处理

Poller.run

Poller主要循环检测是否有IO事件发生,主要流程为:

  1. 轮询处理队列中的事件PollerEvent,比如将通道注册到Selector上
  2. Selector阻塞到事件发生或超时
  3. 迭代遍历处理事件,交给线程池处理
public void run() {
   
    while (true) {
   
        //...

        //1.轮询处理队列中的事件
        events()//2.select 阻塞直到事件发生
        selector.select(selectorTimeout);

        //3.迭代遍历处理事件
        Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
        while (iterator != null && iterator.hasNext()) {
   
            SelectionKey sk = iterator.next();
            iterator.remove();
            //从附件中拿到连接的包装
            NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
            if (socketWrapper != null) {
   
                //交给线程池处理
                processKey(sk, socketWrapper);
            }
       }                

    }   
}

交给线程池处理前会将其封装为SocketProcessor

AbstractEndpoint.processSocket

public boolean processSocket(SocketWrapperBase<S> socketWrapper,
        SocketEvent event, boolean dispatch) {
   
    SocketProcessorBase<S> sc = null;
    if (processorCache != null) {
   
        sc = processorCache.pop();
    }
    //封装SocketProcessor
    if (sc == null) {
   
        sc = createSocketProcessor(socketWrapper, event);
    } else {
   
        sc.reset(socketWrapper, event);
    }

    Executor executor = getExecutor();
    if (dispatch && executor != null) {
   
        //交给线程池执行
        executor.execute(sc);
    } else {
   
        //当前线程执行(使用AIO时走这里)
        sc.run();
    }
}

该方法用于封装SocketProcessor和调用后续执行,放于父类中,所有实现类通用该方法

Executor

线程池中的线程执行SocketProcessor时会去交给ProtocolHandler处理 getHandler().process(socketWrapper, event)

SocketProcessor中存在连接等包装组件,通过它能够处理请求(读取数据)和响应(写回数据)

需要注意的是Tomcat中的线程池是自己实现的,而不是并发包下的ThreadPoolExecutor

(对于Tomcat的线程池,我们后续文章再进行分析)

NioEndPoint大致的运行流程如下图:

image.png

连接的包装类(NioSocketWrapper)为核心贯穿全文,封装流程如下:

Acceptor接收连接:NioChannel(拿到客户端连接) -> NioSocketWrapper -> PollerEvent(放入Poller的SynchronizedQueue)

Poller处理PollerEvent注册到Selector,并阻塞监听IO事件:

PollerEvent(从SynchronizedQueue中取出) -> NioSocketWrapper(注册到Selector) -> SocketProcessor(事件发生时交给Executor)

Nio2EndPoint

Nio2EndPoint使用异步IO模型(AIO)来处理网络通信

AIO的特点就是异步,使用回调函数,当数据就绪时使用异步线程调用回调函数

无需再像NIO中使用Selector阻塞,让应用线程来触发读取数据,阻塞到数据拷贝到应用缓冲区

Nio2实际上指的就是AIO,NIO2表明这是对原有NIO的一个升级版本

Nio2EndPoint处理网络通信时不再需要检测IO事件,把这件事交给内核去做,当事件发生(数据就绪)时使用异步线程调用回调函数即可

相比于NioEndPoint,Nio2EndPoint在处理网络通信时,不需要再用Poller检测IO事件

Nio2Acceptor

Nio2EndPoint中使用Nio2Acceptor接收连接,Nio2Acceptor继承Acceptor并实现回调接口CompletionHandler

class Nio2Acceptor 
extends Acceptor<AsynchronousSocketChannel> 
implements CompletionHandler<AsynchronousSocketChannel,Void>

回调接口第一个参数为IO操作的结果,第二个泛型为操作使用的附件,其中有两个方法分别代表着成功/失败后执行的回调

public interface CompletionHandler<V,A> {
   
    void completed(V result, A attachment);
    void failed(Throwable exc, A attachment);
}

Nio2Acceptor.run

由于使用AIO,Nio2Acceptor在执行任务时不再需要循环,只需要携带回调函数,当客户端连接完成时触发回调

在执行时主要做两件事:

  1. 使用LimitLatch限制连接数
  2. 接收连接
public void run() {
   
    if (!isPaused()) {
   
        try {
   
            //1.使用LimitLatch限制连接数
            countUpOrAwaitConnection();
        } catch (InterruptedException e) {
   
        }
        if (!isPaused()) {
   
            //2.接收连接
            serverSock.accept(null, this);
        } else {
   
            state = AcceptorState.PAUSED;
        }
    } else {
   
        state = AcceptorState.PAUSED;
    }
}

serverSock.accept(null, this); 在接收连接时,使用的服务端channel为AsynchronousServerSocketChannel,并把当前对象作为回调传入

这样在下次收到连接后的回调又可以调用该方法,以此来达到不需要循环调用

Nio2Acceptor.completed

回调成功的方法中主要做几件事:

  1. 是否限制连接数量
  2. 调用accept,方便接收下次连接
  3. 调用后续处理
public void completed(AsynchronousSocketChannel socket,Void attachment) {
   
    errorDelay = 0;
    if (isRunning() && !isPaused()) {
   
        //1.是否限制连接数量
        if (getMaxConnections() == -1) {
   
            //不限制连接数量,方便接收下一次连接
            serverSock.accept(null, this);
        } else if (getConnectionCount() < getMaxConnections()) {
   
            try {
   
                //当前连接数小于最大限制连接数,不阻塞,主要是去自增计数
                countUpOrAwaitConnection();
            } catch (InterruptedException e) {
   
                // Ignore
            }
            //方便接收下次连接
            serverSock.accept(null, this);
        } else {
   
            //当前连接数大于等于最大限制连接数,再调用limitlatch会阻塞,为了避免阻塞使用线程池去执行(排队)
            getExecutor().execute(this);
        }
        //setSocketOptions后续处理
        if (!setSocketOptions(socket)) {
   
            closeSocket(socket);
        }
    } else {
   
        if (isRunning()) {
   
            state = AcceptorState.PAUSED;
        }
        destroySocket(socket);
    }
}

在Acceptor中也会调用setSocketOptions方法,那时会将连接包装NioSocketWrapper,然后封装为PollerEvent放入poller队列

在AIO中由于不再存在poller,该方法会将连接包装为Nio2SocketWrapper,然后调用父类AbstractEndpoint.processSocket方法去执行

AbstractEndpoint.processSocket

在此方法中会先将包装类封装为SocketProcessor再去执行

    public boolean processSocket(SocketWrapperBase<S> socketWrapper,SocketEvent event, boolean dispatch) {
   
            //...

            //封装为SocketProcessor
            SocketProcessorBase<S> sc = null;
            if (processorCache != null) {
   
                sc = processorCache.pop();
            }
            if (sc == null) {
   
                sc = createSocketProcessor(socketWrapper, event);
            } else {
   
                sc.reset(socketWrapper, event);
            }
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
   
                executor.execute(sc);
            } else {
   
                //当前线程执行
                sc.run();
            }

            return true;
    }

需要注意的是,当前线程本来就是异步回调线程,参数dispatch会为false

也就是这里不会使用线程池去执行,而是由当前异步回调的线程去执行SocketProcessor

Nio2SocketWrapper

当前线程是连接完成执行异步回调的线程,去执行SocketProcessor也就是会使用Processor解析数据,但此时数据可能还未准备好

为了不让Processor阻塞等待,这里会失败,直到数据就绪时触发的异步回调来执行时才能够读到数据

Nio2SocketWrapper中包含一些读写事件的回调

比如读回调中:当数据就绪时,会去执行processSocket,也就是封装SocketProcessor进行后续调用(此时会第二次使用Processor进行读数据,这样确保数据已就绪)

this.readCompletionHandler = new CompletionHandler<Integer, ByteBuffer>() {
   
    @Override
    public void completed(Integer nBytes, ByteBuffer attachment) {
   
        //...
        getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_READ, false);
    }
}

Http11Processor

在Processor处理HTTP协议的实现类Http11Processor中,执行service解析请求时,会先解析请求头parseRequestLine

public SocketState service(SocketWrapperBase<?> socketWrapper)  throws IOException {
   
    //...
    inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(),protocol.getKeepAliveTimeout())
    //...    
}

当parseRequestLine返回false时,说明数据未就绪,不会执行后续操作,因此第一次读数据时由于数据未就绪不会再往后执行

NioEndPoint大致的运行流程如下图:

image.png

在Nio2EndPoint中,使用异步回调的方式,避免poller中的操作,能够提升效率,但是大量异步线程的引入又会带来线程上下文切换的开销

连接的包装类(Nio2SocketWrapper)为核心贯穿全文,封装流程如下:

Nio2Channel(回调拿到客户端连接) -> Nio2SocketWrapper -> SocketProcessor(事件发生时)

连接完成的回调:Nio2Channel -> Nio2SocketWrapper -> SocketProcessor -> Http11Processor(解析失败,数据未就绪)

读事件就绪的回调:Nio2SocketWrapper -> SocketProcessor -> Http11Processor

AprEndPoint

APR(Apache Portable Runtime)是Apache提供的可移植运行库,是为了早期的Tomcat的提供高性能的

早期NIO还不成熟,使用APR通过JNI调用本地C语言实现的库,能够使用操作系统的epoll来实现多路复用模型,旨在提高高性能

AprEndPoint 流程与NioEndPoint相同,只是其中调用的方法不同,NioEndPoint调用JDK NIO的API,而AprEndPoint调用APR库

AprEndPoint在通道上使用的缓冲区是基于直接内存的(DirectByteBuffer),而NioEndPoint与Nio2EndPoint都是使用堆内存的(HeapByteBuffer)

使用直接内存的好处是能够减少数据拷贝带来的开销,但无法使用JVM来进行管理内存

并且AprEndPoint还能使用零拷贝sendfile,将数据从磁盘读到网卡发送时减少各种拷贝开销

image.png

但在后来NIO、AIO逐渐成熟,AprEndPoint带来的好处逐渐被追平,在Tomcat 10时被遗弃

总结

NioEndPoint将处理网络通信分为接收连接、监听事件、处理请求三个步骤

其中Acceptor负责接收连接,使用LimitLatch限制连接数量(若超过上限则等待),获取客户端连接NioChannel,包装为NioSocketWrapper,再封装为PollerEvent放入Poller的队列中

Poller会轮询处理PollerEvent,通过PollerEvent拿到NioSocketWrapper将连接注册到Selector上,使用Selector监听事件,有事件触发时,从附件中获取连接的包装NioSocketWrapper,将其封装为SocketProcessor交给线程池处理

线程池的线程处理SocketProcessor时,则会使用Processor解析协议,后续再封装请求/响应调用容器处理

Nio2EndPoint 使用AIO,由内核监听事件(数据就绪)后使用异步线程执行回调

其中Nio2Acceptor继承Acceptor,接收连接不再循环处理,而是使用异步回调:当连接完成后再使用LimitLatch判断是否限制连接,调用非阻塞accept便于接收下次连接(回调),然后将客户端连接Nio2Channel封装为Nio2SocketWrapper再封装为SocketProcessor处理(后续调用processor无法解析,因为当前是连接完成的回调线程,数据还未就绪)

当数据就绪时,通过Nio2SocketWrapper的回调继续封装为SocketProcessor向后处理(后续调用processor可以解析,因为当前为读数据就绪的回调线程,第二次读)

早期的APR通过本地库、直接内存、零拷贝等多种方式进行性能优化

🌠最后(不要白嫖,一键三连求求拉~)

本篇文章被收入专栏 Tomcat全解析:架构设计与核心组件实现,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJavaGithub-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

相关文章
|
3月前
|
缓存 应用服务中间件 nginx
Web服务器的缓存机制与内容分发网络(CDN)
【8月更文第28天】随着互联网应用的发展,用户对网站响应速度的要求越来越高。为了提升用户体验,Web服务器通常会采用多种技术手段来优化页面加载速度,其中最重要的两种技术就是缓存机制和内容分发网络(CDN)。本文将深入探讨这两种技术的工作原理及其实现方法,并通过具体的代码示例加以说明。
244 1
|
20天前
|
设计模式 缓存 Java
Java高并发处理机制
Java高并发处理机制
19 1
|
23天前
|
Java Linux
【网络】高并发场景处理:线程池和IO多路复用
【网络】高并发场景处理:线程池和IO多路复用
36 2
|
2月前
|
设计模式 人工智能 安全
【Tomcat源码分析】生命周期机制 Lifecycle
Tomcat内部通过各种组件协同工作,构建了一个复杂的Web服务器架构。其中,`Lifecycle`机制作为核心,管理组件从创建到销毁的整个生命周期。本文详细解析了Lifecycle的工作原理及其方法,如初始化、启动、停止和销毁等关键步骤,并展示了LifecycleBase类如何通过状态机和模板模式实现这一过程。通过深入理解Lifecycle,我们可以更好地掌握组件生命周期管理,提升系统设计能力。欢迎关注【码上遇见你】获取更多信息,或搜索【AI贝塔】体验免费的Chat GPT。希望本章内容对你有所帮助。
|
4月前
|
SQL 关系型数据库 MySQL
(八)MySQL锁机制:高并发场景下该如何保证数据读写的安全性?
锁!这个词汇在编程中出现的次数尤为频繁,几乎主流的编程语言都会具备完善的锁机制,在数据库中也并不例外,为什么呢?这里牵扯到一个关键词:高并发,由于现在的计算机领域几乎都是多核机器,因此再编写单线程的应用自然无法将机器性能发挥到最大,想要让程序的并发性越高,多线程技术自然就呼之欲出,多线程技术一方面能充分压榨CPU资源,另一方面也能提升程序的并发支持性。
331 3
|
3月前
|
Java 网络安全 云计算
深入理解Java异常处理机制云计算与网络安全:技术挑战与应对策略
【8月更文挑战第27天】在Java编程的世界里,异常处理是维护程序健壮性的重要一环。本文将带你深入了解Java的异常处理机制,从基本的try-catch-finally结构到自定义异常类的设计,再到高级特性如try-with-resources和异常链的应用。通过具体代码示例,我们将探索如何优雅地管理错误和异常,确保你的程序即使在面对不可预见的情况时也能保持运行的稳定性。
|
3月前
|
安全 网络安全 数据安全/隐私保护
|
4月前
|
机器学习/深度学习 编解码 计算机视觉
【YOLOv8改进- Backbone主干】BoTNet:基于Transformer,结合自注意力机制和卷积神经网络的骨干网络
【YOLOv8改进- Backbone主干】BoTNet:基于Transformer,结合自注意力机制和卷积神经网络的骨干网络
|
3月前
|
机器学习/深度学习 自然语言处理 算法
深度学习的奥秘:探索神经网络的核心机制
在这篇文章中,我们将深入浅出地探讨深度学习背后的科学原理和实际应用。通过简化的语言和生动的比喻,我们将揭示神经网络如何模仿人脑处理信息的方式,以及它们如何在各种领域内实现惊人的成就。无论你是技术新手还是资深专家,这篇文章都将为你提供新的视角和深刻的见解。
|
3月前
|
Kubernetes 网络协议 Linux
容器跨主机通信:Flannel网络实现机制分析(二)
容器跨主机通信:Flannel网络实现机制分析(二)
50 0