深入浅出Tomcat网络通信的高并发处理机制

简介: 【10月更文挑战第3天】本文详细解析了Tomcat在处理高并发网络请求时的机制,重点关注了其三种不同的IO模型:NioEndPoint、Nio2EndPoint 和 AprEndPoint。NioEndPoint 采用多路复用模型,通过 Acceptor 接收连接、Poller 监听事件及 Executor 处理请求;Nio2EndPoint 则使用 AIO 异步模型,通过回调函数处理连接和数据就绪事件;AprEndPoint 通过 JNI 调用本地库实现高性能,但已在 Tomcat 10 中弃用

深入浅出Tomcat网络通信的高并发处理机制

随着互联网应用的快速发展,Web服务器面临的访问压力日益增大,如何高效处理高并发的网络请求成为关键

Tomcat作为Java世界中最受欢迎的Web容器之一,可以灵活选择不同的IO模型来处理网络通信,确保面对高并发的网络请求时能够快速处理

上篇文章21张图解析Tomcat运行原理与架构全貌,我们说到Tomcat中通过Connector来处理网络通信,其中Connector的职责主要由组件EndPoint、Processor、Adapter来完成

EndPoint负责网络通信、Processor负责解析、Adapter负责将请求转换为Servlet的请求并交给容器处理

image.png

本篇文章就来重点聊聊AbstractEndpoint的多种实现类是如何处理网络通信的

AbstractEndpoint有三种实现类:NioEndPoint、Nio2EndPoint、AprEndPoint

其中默认使用NioEndPoint(多路复用模型),Nio2EndPoint使用异步IO模型,而AprEndPoint为早期提供高性能(Tomcat 10时被弃用)

NioEndPoint

NioEndPoint将处理网络通信分离为三个步骤,分别使用三个组件进行执行:接收连接、检测IO事件、处理请求
image.png

我们先大致对这些组件的作用进行描述,后续再通过源码分析~

Acceptor用于接收连接(循环执行):使用LimitLatch限制最大连接数量,等待客户端完成TCP三次握手连接后,将连接交给Poller

Poller用于检测IO事件是否就绪(循环执行):将连接注册到Selector上,使用Selector监听IO事件,当事件发生(读就绪)时交给Executor进行处理

Executor池化管理线程,使用线程执行后续流程(解析请求、封装适配、交给容器处理...)

Acceptor

Acceptor使用LimitLatch限制连接数量,收到连接后将其逐步封装为PollerEvent并放到Poller的队列中

NioEndpoint.initServerSocket 在启动组件时初始化socket

//服务端Channel
ServerSocketChannel serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
//服务端channel绑定端口,getAcceptCount为连接上限
serverSock.bind(addr, getAcceptCount());

ServerSocketChannel作为服务端Channel只监听一个端口(连接器中设置的端口)

无论Acceptor线程数量为多少,都共享该服务端channel

getAcceptCount() 为建立连接后最大积压的数量,acceptCount默认为100

TCP三次握手完成后会将客户端连接放到accept队列中等待服务端拿走,acceptCount就是accept队列最大存放的连接数量

Acceptor.run Acceptor接收连接

为了简化流程,只保留了较重要的流程:

  1. 使用LimitLatch限制连接数量,如果达到最大值则等待
  2. 等待获取客户端连接socket channel(TCP三次握手完成)
  3. 把socket channel交给Poller处理
public void run() {
   

    //...
    while (!stopCalled) {
   
        //1.使用limit latch限制连接数量,如果达到最大值则等待
        endpoint.countUpOrAwaitConnection();

        U socket = null;

        //2.等待获取客户端socket channel
        socket = endpoint.serverSocketAccept();

        //3.交给Poller处理
        endpoint.setSocketOptions(socket);             
    } 
}

对于实现限制连接数量,当达到最大值时则进行等待,实现的关键为计数、等待

熟悉并发包的同学会立马想到信号量Semaphore,但Tomcat没有直接使用信号量组件,而是基于AQS自己实现同步组件LimitLatch

LimitLatch直接使用原子类进行计数,利用AQS来实现等待

(LimitLatch基于AQS实现比较简单,这里就不进行分析,不熟悉AQS的同学可以查看这篇文章:10分钟从源码级别搞懂AQS

image.png

如果没有超过限制则会获取下一个完成连接(TCP三次握手)的客户端连接SocketChannel result = serverSock.accept();

并将Channel包装成SocketWrapper,再包装为PollerEvent,加入Poller的队列中

public void register(final NioSocketWrapper socketWrapper) {
   
    //监听读事件(读就绪时poller能够继续处理)
    socketWrapper.interestOps(SelectionKey.OP_READ);
    //包装为PollerEvent 指定关心注册事件OP_REGISTER(后续poller将该通道注册到select上)
    PollerEvent pollerEvent = createPollerEvent(socketWrapper, OP_REGISTER);
    //放入poller的队列
    addEvent(pollerEvent);
}

Poller的队列SynchronizedQueue也是Tomcat自己实现的

Acceptor与Poller之间通过队列通信,SynchronizedQueue使用synchronized保证并发操作下的原子性

Poller

Poller循环处理队列中的PollerEvent事件,当Selector上监听的连接发生IO事件时迭代处理,将事件封装为SocketProcessor交给线程池处理

Poller.run

Poller主要循环检测是否有IO事件发生,主要流程为:

  1. 轮询处理队列中的事件PollerEvent,比如将通道注册到Selector上
  2. Selector阻塞到事件发生或超时
  3. 迭代遍历处理事件,交给线程池处理
public void run() {
   
    while (true) {
   
        //...

        //1.轮询处理队列中的事件
        events()//2.select 阻塞直到事件发生
        selector.select(selectorTimeout);

        //3.迭代遍历处理事件
        Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
        while (iterator != null && iterator.hasNext()) {
   
            SelectionKey sk = iterator.next();
            iterator.remove();
            //从附件中拿到连接的包装
            NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
            if (socketWrapper != null) {
   
                //交给线程池处理
                processKey(sk, socketWrapper);
            }
       }                

    }   
}

交给线程池处理前会将其封装为SocketProcessor

AbstractEndpoint.processSocket

public boolean processSocket(SocketWrapperBase<S> socketWrapper,
        SocketEvent event, boolean dispatch) {
   
    SocketProcessorBase<S> sc = null;
    if (processorCache != null) {
   
        sc = processorCache.pop();
    }
    //封装SocketProcessor
    if (sc == null) {
   
        sc = createSocketProcessor(socketWrapper, event);
    } else {
   
        sc.reset(socketWrapper, event);
    }

    Executor executor = getExecutor();
    if (dispatch && executor != null) {
   
        //交给线程池执行
        executor.execute(sc);
    } else {
   
        //当前线程执行(使用AIO时走这里)
        sc.run();
    }
}

该方法用于封装SocketProcessor和调用后续执行,放于父类中,所有实现类通用该方法

Executor

线程池中的线程执行SocketProcessor时会去交给ProtocolHandler处理 getHandler().process(socketWrapper, event)

SocketProcessor中存在连接等包装组件,通过它能够处理请求(读取数据)和响应(写回数据)

需要注意的是Tomcat中的线程池是自己实现的,而不是并发包下的ThreadPoolExecutor

(对于Tomcat的线程池,我们后续文章再进行分析)

NioEndPoint大致的运行流程如下图:

image.png

连接的包装类(NioSocketWrapper)为核心贯穿全文,封装流程如下:

Acceptor接收连接:NioChannel(拿到客户端连接) -> NioSocketWrapper -> PollerEvent(放入Poller的SynchronizedQueue)

Poller处理PollerEvent注册到Selector,并阻塞监听IO事件:

PollerEvent(从SynchronizedQueue中取出) -> NioSocketWrapper(注册到Selector) -> SocketProcessor(事件发生时交给Executor)

Nio2EndPoint

Nio2EndPoint使用异步IO模型(AIO)来处理网络通信

AIO的特点就是异步,使用回调函数,当数据就绪时使用异步线程调用回调函数

无需再像NIO中使用Selector阻塞,让应用线程来触发读取数据,阻塞到数据拷贝到应用缓冲区

Nio2实际上指的就是AIO,NIO2表明这是对原有NIO的一个升级版本

Nio2EndPoint处理网络通信时不再需要检测IO事件,把这件事交给内核去做,当事件发生(数据就绪)时使用异步线程调用回调函数即可

相比于NioEndPoint,Nio2EndPoint在处理网络通信时,不需要再用Poller检测IO事件

Nio2Acceptor

Nio2EndPoint中使用Nio2Acceptor接收连接,Nio2Acceptor继承Acceptor并实现回调接口CompletionHandler

class Nio2Acceptor 
extends Acceptor<AsynchronousSocketChannel> 
implements CompletionHandler<AsynchronousSocketChannel,Void>

回调接口第一个参数为IO操作的结果,第二个泛型为操作使用的附件,其中有两个方法分别代表着成功/失败后执行的回调

public interface CompletionHandler<V,A> {
   
    void completed(V result, A attachment);
    void failed(Throwable exc, A attachment);
}

Nio2Acceptor.run

由于使用AIO,Nio2Acceptor在执行任务时不再需要循环,只需要携带回调函数,当客户端连接完成时触发回调

在执行时主要做两件事:

  1. 使用LimitLatch限制连接数
  2. 接收连接
public void run() {
   
    if (!isPaused()) {
   
        try {
   
            //1.使用LimitLatch限制连接数
            countUpOrAwaitConnection();
        } catch (InterruptedException e) {
   
        }
        if (!isPaused()) {
   
            //2.接收连接
            serverSock.accept(null, this);
        } else {
   
            state = AcceptorState.PAUSED;
        }
    } else {
   
        state = AcceptorState.PAUSED;
    }
}

serverSock.accept(null, this); 在接收连接时,使用的服务端channel为AsynchronousServerSocketChannel,并把当前对象作为回调传入

这样在下次收到连接后的回调又可以调用该方法,以此来达到不需要循环调用

Nio2Acceptor.completed

回调成功的方法中主要做几件事:

  1. 是否限制连接数量
  2. 调用accept,方便接收下次连接
  3. 调用后续处理
public void completed(AsynchronousSocketChannel socket,Void attachment) {
   
    errorDelay = 0;
    if (isRunning() && !isPaused()) {
   
        //1.是否限制连接数量
        if (getMaxConnections() == -1) {
   
            //不限制连接数量,方便接收下一次连接
            serverSock.accept(null, this);
        } else if (getConnectionCount() < getMaxConnections()) {
   
            try {
   
                //当前连接数小于最大限制连接数,不阻塞,主要是去自增计数
                countUpOrAwaitConnection();
            } catch (InterruptedException e) {
   
                // Ignore
            }
            //方便接收下次连接
            serverSock.accept(null, this);
        } else {
   
            //当前连接数大于等于最大限制连接数,再调用limitlatch会阻塞,为了避免阻塞使用线程池去执行(排队)
            getExecutor().execute(this);
        }
        //setSocketOptions后续处理
        if (!setSocketOptions(socket)) {
   
            closeSocket(socket);
        }
    } else {
   
        if (isRunning()) {
   
            state = AcceptorState.PAUSED;
        }
        destroySocket(socket);
    }
}

在Acceptor中也会调用setSocketOptions方法,那时会将连接包装NioSocketWrapper,然后封装为PollerEvent放入poller队列

在AIO中由于不再存在poller,该方法会将连接包装为Nio2SocketWrapper,然后调用父类AbstractEndpoint.processSocket方法去执行

AbstractEndpoint.processSocket

在此方法中会先将包装类封装为SocketProcessor再去执行

    public boolean processSocket(SocketWrapperBase<S> socketWrapper,SocketEvent event, boolean dispatch) {
   
            //...

            //封装为SocketProcessor
            SocketProcessorBase<S> sc = null;
            if (processorCache != null) {
   
                sc = processorCache.pop();
            }
            if (sc == null) {
   
                sc = createSocketProcessor(socketWrapper, event);
            } else {
   
                sc.reset(socketWrapper, event);
            }
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
   
                executor.execute(sc);
            } else {
   
                //当前线程执行
                sc.run();
            }

            return true;
    }

需要注意的是,当前线程本来就是异步回调线程,参数dispatch会为false

也就是这里不会使用线程池去执行,而是由当前异步回调的线程去执行SocketProcessor

Nio2SocketWrapper

当前线程是连接完成执行异步回调的线程,去执行SocketProcessor也就是会使用Processor解析数据,但此时数据可能还未准备好

为了不让Processor阻塞等待,这里会失败,直到数据就绪时触发的异步回调来执行时才能够读到数据

Nio2SocketWrapper中包含一些读写事件的回调

比如读回调中:当数据就绪时,会去执行processSocket,也就是封装SocketProcessor进行后续调用(此时会第二次使用Processor进行读数据,这样确保数据已就绪)

this.readCompletionHandler = new CompletionHandler<Integer, ByteBuffer>() {
   
    @Override
    public void completed(Integer nBytes, ByteBuffer attachment) {
   
        //...
        getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_READ, false);
    }
}

Http11Processor

在Processor处理HTTP协议的实现类Http11Processor中,执行service解析请求时,会先解析请求头parseRequestLine

public SocketState service(SocketWrapperBase<?> socketWrapper)  throws IOException {
   
    //...
    inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(),protocol.getKeepAliveTimeout())
    //...    
}

当parseRequestLine返回false时,说明数据未就绪,不会执行后续操作,因此第一次读数据时由于数据未就绪不会再往后执行

NioEndPoint大致的运行流程如下图:

image.png

在Nio2EndPoint中,使用异步回调的方式,避免poller中的操作,能够提升效率,但是大量异步线程的引入又会带来线程上下文切换的开销

连接的包装类(Nio2SocketWrapper)为核心贯穿全文,封装流程如下:

Nio2Channel(回调拿到客户端连接) -> Nio2SocketWrapper -> SocketProcessor(事件发生时)

连接完成的回调:Nio2Channel -> Nio2SocketWrapper -> SocketProcessor -> Http11Processor(解析失败,数据未就绪)

读事件就绪的回调:Nio2SocketWrapper -> SocketProcessor -> Http11Processor

AprEndPoint

APR(Apache Portable Runtime)是Apache提供的可移植运行库,是为了早期的Tomcat的提供高性能的

早期NIO还不成熟,使用APR通过JNI调用本地C语言实现的库,能够使用操作系统的epoll来实现多路复用模型,旨在提高高性能

AprEndPoint 流程与NioEndPoint相同,只是其中调用的方法不同,NioEndPoint调用JDK NIO的API,而AprEndPoint调用APR库

AprEndPoint在通道上使用的缓冲区是基于直接内存的(DirectByteBuffer),而NioEndPoint与Nio2EndPoint都是使用堆内存的(HeapByteBuffer)

使用直接内存的好处是能够减少数据拷贝带来的开销,但无法使用JVM来进行管理内存

并且AprEndPoint还能使用零拷贝sendfile,将数据从磁盘读到网卡发送时减少各种拷贝开销

image.png

但在后来NIO、AIO逐渐成熟,AprEndPoint带来的好处逐渐被追平,在Tomcat 10时被遗弃

总结

NioEndPoint将处理网络通信分为接收连接、监听事件、处理请求三个步骤

其中Acceptor负责接收连接,使用LimitLatch限制连接数量(若超过上限则等待),获取客户端连接NioChannel,包装为NioSocketWrapper,再封装为PollerEvent放入Poller的队列中

Poller会轮询处理PollerEvent,通过PollerEvent拿到NioSocketWrapper将连接注册到Selector上,使用Selector监听事件,有事件触发时,从附件中获取连接的包装NioSocketWrapper,将其封装为SocketProcessor交给线程池处理

线程池的线程处理SocketProcessor时,则会使用Processor解析协议,后续再封装请求/响应调用容器处理

Nio2EndPoint 使用AIO,由内核监听事件(数据就绪)后使用异步线程执行回调

其中Nio2Acceptor继承Acceptor,接收连接不再循环处理,而是使用异步回调:当连接完成后再使用LimitLatch判断是否限制连接,调用非阻塞accept便于接收下次连接(回调),然后将客户端连接Nio2Channel封装为Nio2SocketWrapper再封装为SocketProcessor处理(后续调用processor无法解析,因为当前是连接完成的回调线程,数据还未就绪)

当数据就绪时,通过Nio2SocketWrapper的回调继续封装为SocketProcessor向后处理(后续调用processor可以解析,因为当前为读数据就绪的回调线程,第二次读)

早期的APR通过本地库、直接内存、零拷贝等多种方式进行性能优化

🌠最后(不要白嫖,一键三连求求拉~)

本篇文章被收入专栏 Tomcat全解析:架构设计与核心组件实现,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJavaGithub-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

相关文章
|
26天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
3天前
|
人工智能 Rust Java
10月更文挑战赛火热启动,坚持热爱坚持创作!
开发者社区10月更文挑战,寻找热爱技术内容创作的你,欢迎来创作!
332 14
|
18天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
6天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
20天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
23天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2586 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
5天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
178 2
|
3天前
|
编译器 C#
C#多态概述:通过继承实现的不同对象调用相同的方法,表现出不同的行为
C#多态概述:通过继承实现的不同对象调用相同的方法,表现出不同的行为
104 65
|
6天前
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
295 2
|
22天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1580 16
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码