tomcat对AQS的扩展:使用LimitLatch控制连接数

简介: tomcat对AQS的扩展:使用LimitLatch控制连接数

微信图片_20221212124840.jpgLimitLatch是一个共享性质的锁,这里的共享概念来自于AQS,指的是不同的线程可以同时获取该锁。本文开始之前,首先我要纠正之前的文章《面试官:谈一谈java中基于AQS的并发锁原理》的一个错误,LimitLatch并不是JDK实现的,而是tomcat实现的。


LimitLatch简介


jdk中对AQS的扩展有一个CountDownLatch,Latch是一个阀门的意思,CountDownLatch创建了一个阀门,之后阻塞,等待所有线程都执行结束并且countdown之后,才会继续执行。而本文要介绍的LimitLatch则更像是java中的Semaphore,用于控制资源的使用。

下面我们看一下LimitLatch中锁的实现:

private class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1L;
    public Sync() {
    }
    @Override
    protected int tryAcquireShared(int ignored) {
        long newCount = count.incrementAndGet();//定义了AtomicLong类型的count数量,每次获取锁之后会加1
        if (!released && newCount > limit) {//是否超过limit的限制
            // Limit exceeded
            count.decrementAndGet();//获取失败后减1
            return -1;//返回-1代表获取锁失败,这是就只能进入队列了
        } else {
            return 1;
        }
    }
    @Override
    protected boolean tryReleaseShared(int arg) {
        count.decrementAndGet();//释放锁的时候count数量减1
        return true;
    }
}

从上面的代码中我们可以看到,LimitLatch首先定义了一个limit,每次获取锁时都会累计获取成功的线程数量,如果大于limit,则获取成功等待入队,释放锁的时候线程数量会减1。


下面我将以tomcat的NIO2模式为例,看一下tomcat是如何使用LimitLatch来控制连接数的。


tomcat初始化


tomcat的Nio2EndPoint启动的时候,会创建LimitLatch,而LimitLatch中的limit,正是我们tomcat中配置的最大连接数。代码如下:

@Override
public void startInternal() throws Exception {
    if (!running) {
        allClosed = false;
        running = true;
        paused = false;
        //省略部分代码
        initializeConnectionLatch();//初始化LimitLatch
        startAcceptorThread();
    }
}
protected LimitLatch initializeConnectionLatch() {
    if (maxConnections==-1) return null;
    if (connectionLimitLatch==null) {
        connectionLimitLatch = new LimitLatch(getMaxConnections());//根据配置的最大连接数初始化LimitLatch
    }
    return connectionLimitLatch;
}
public LimitLatch(long limit) {
    this.limit = limit;
    this.count = new AtomicLong(0);
    this.sync = new Sync();
}

获取和释放连接


LimitLatch初始化后,就可以对连接的获取和释放进行管理了。下面我们看一下Nio2Endpoint中的内部类Nio2Acceptor

protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel> implements CompletionHandler<AsynchronousSocketChannel, Void> {
    protected int errorDelay = 0;
    public Nio2Acceptor(AbstractEndpoint<?, AsynchronousSocketChannel> endpoint) {
        super(endpoint);
    }
    @Override
    public void run() {
        // The initial accept will be called in a separate utility thread
        if (!isPaused()) {
            try {
                countUpOrAwaitConnection();//已经达到了最大的连接数,则入队等待通知
            } catch (InterruptedException e) {
                // Ignore
            }
            //省略部分代码
        } else {
            state = AcceptorState.PAUSED;
        }
    }
    @Override
    public void completed(AsynchronousSocketChannel socket,
            Void attachment) {
        // Successful accept, reset the error delay
        errorDelay = 0;
        // Continue processing the socket on the current thread
        // Configure the socket
        if (isRunning() && !isPaused()) {
            if (getMaxConnections() == -1) {
                serverSock.accept(null, this);
            } else {
                // Accept again on a new thread since countUpOrAwaitConnection may block
                getExecutor().execute(this);
            }
            if (!setSocketOptions(socket)) {//处理socket失败,关闭
                closeSocket(socket);
            }
        } else {
            if (isRunning()) {
                state = AcceptorState.PAUSED;
            }
            destroySocket(socket);//调用closeSocket,关闭socket
        }
    }
    @Override
    public void failed(Throwable t, Void attachment) {
        if (isRunning()) {
            if (!isPaused()) {
                if (getMaxConnections() == -1) {
                    serverSock.accept(null, this);
                } else {
                    // Accept again on a new thread since countUpOrAwaitConnection may block
                    getExecutor().execute(this);
                }
            } else {
                state = AcceptorState.PAUSED;
            }
            // We didn't get a socket
            countDownConnection();
            //省略部分代码
        } else {
            // We didn't get a socket
            countDownConnection();
        }
    }
}

上面代码中,有几点说明:

1.如果连接数已经达到最大连接,则会调用countUpOrAwaitConnection方法,代码如下:

protected void countUpOrAwaitConnection() throws InterruptedException {
    if (maxConnections==-1) return;
    LimitLatch latch = connectionLimitLatch;
    if (latch!=null) latch.countUpOrAwait();//入队等待
}

可以看到,达到最大连接数之后,就会入队等待

2.连接初始化失败,会调用countDownConnection方法,而连接处理结束后会调用closeSocket(destroySocket也调用closeSocket),最终调用countDownConnection,代码如下:

protected long countDownConnection() {
    if (maxConnections==-1) return -1;
    LimitLatch latch = connectionLimitLatch;
    if (latch!=null) {
        long result = latch.countDown();//最终调用LimitLatch的countDown方法,见下面代码
        if (result<0) {
            getLog().warn(sm.getString("endpoint.warn.incorrectConnectionCount"));
        }
        return result;
    } else return -1;
}
public long countDown() {
    sync.releaseShared(0);//调用AQS中的releaseShared释放锁
    long result = getCount();
    if (log.isDebugEnabled()) {
        log.debug("Counting down["+Thread.currentThread().getName()+"] latch="+result);
}
    return result;//返回count数量
}
public final boolean releaseShared(int arg) {//AQS中的代码
    if (tryReleaseShared(arg)) {//见文中开头的LimitLatch的中的锁代码
        doReleaseShared();
        return true;
    }
    return false;
}

可见,socket获取失败或者处理结束后,都会调用LimitLatch中的释放锁流程。


总结


LimitLatch的使用跟Semaphore有点类似,像是一个限流器,tomcat使用它进行了最大连接数的控制,看了这篇文章,是不是对tomcat的参数server.tomcat.max-threads参数的使用原理有了一定了解呢?

相关文章
|
7月前
|
前端开发 Java 应用服务中间件
Springboot对MVC、tomcat扩展配置
Springboot对MVC、tomcat扩展配置
|
2月前
|
Java 应用服务中间件
面对海量网络请求,Tomcat线程池如何进行扩展?
【10月更文挑战第4天】本文详细探讨了Tomcat线程池相较于标准Java实用工具包(JUC)线程池的关键改进。首先,Tomcat线程池在启动时即预先创建全部核心线程,以应对启动初期的高并发请求。其次,通过重写阻塞队列的入队逻辑,Tomcat能够在任务数超过当前线程数但未达最大线程数时,及时创建非核心线程,而非等到队列满才行动。此外,Tomcat还引入了在拒绝策略触发后重新尝试入队的机制,以提高吞吐量。这些优化使得Tomcat线程池更适应IO密集型任务,有效提升了性能。
面对海量网络请求,Tomcat线程池如何进行扩展?
|
网络协议 Java 应用服务中间件
详解Tomcat的连接数与线程池,调优必备
详解Tomcat的连接数与线程池,调优必备
|
网络协议 Java 应用服务中间件
Tomcat 连接数与线程池详解
在使用tomcat时,经常会遇到连接数、线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector)。
512 0
Tomcat 连接数与线程池详解
|
安全 Dubbo Java
原生线程池这么强大,Tomcat 为何还需扩展线程池?
Tomcat/Jetty 是目前比较流行的 Web 容器,两者接受请求之后都会转交给线程池处理,这样可以有效提高处理的能力与并发度。JDK 提高完整线程池实现,但是 Tomcat/Jetty 都没有直接使用。Jetty 采用自研方案,内部实现 QueuedThreadPool 线程池组件,而 Tomcat 采用扩展方案,踩在 JDK 线程池的肩膀上,扩展 JDK 原生线程池。 JDK 原生线程池可以说功能比较完善,使用也比较简单,那为何 Tomcat/Jetty 却不选择这个方案,反而自己去动手实现那?
原生线程池这么强大,Tomcat 为何还需扩展线程池?
|
Java 应用服务中间件 Linux
tomcat 的 JVM 设置和连接数设置
Windows环境下修改“%TOMCAT_HOME%\bin\catalina.bat”文件,在文件开头增加如下设置: set JAVA_OPTS=-Xms256m -Xmx512m Linux环境下修改“%TOMCAT_HOME%\bin\catalina.sh”文件,在文件开头增加如下设置: JAVA_OPTS=’-Xms256m -Xmx512m’ 其中,-Xms设置初始化内存大小,-Xmx设置可以使用的最大内存。
1306 0
|
Java 应用服务中间件 Windows
|
网络协议 Java 应用服务中间件
|
Java 应用服务中间件 Windows
Tomcat之jvm及连接数设置
一、Tomcat的JVM提示内存溢出 查看%TOMCAT_HOME%\logs文件夹下,日志文件是否有内存溢出错误 二、修改Tomcat的JVM 1、错误提示:java.lang.OutOfMemoryError: Java heap space Tomcat默认可以使用的内存为128MB,在较大型的应用项目中,这点内存是不够的,有可能导致系统无法运行。
1195 0