LimitLatch是一个共享性质的锁,这里的共享概念来自于AQS,指的是不同的线程可以同时获取该锁。本文开始之前,首先我要纠正之前的文章《面试官:谈一谈java中基于AQS的并发锁原理》的一个错误,LimitLatch并不是JDK实现的,而是tomcat实现的。
LimitLatch简介
jdk中对AQS的扩展有一个CountDownLatch,Latch是一个阀门的意思,CountDownLatch创建了一个阀门,之后阻塞,等待所有线程都执行结束并且countdown之后,才会继续执行。而本文要介绍的LimitLatch则更像是java中的Semaphore,用于控制资源的使用。
下面我们看一下LimitLatch中锁的实现:
private class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1L; public Sync() { } @Override protected int tryAcquireShared(int ignored) { long newCount = count.incrementAndGet();//定义了AtomicLong类型的count数量,每次获取锁之后会加1 if (!released && newCount > limit) {//是否超过limit的限制 // Limit exceeded count.decrementAndGet();//获取失败后减1 return -1;//返回-1代表获取锁失败,这是就只能进入队列了 } else { return 1; } } @Override protected boolean tryReleaseShared(int arg) { count.decrementAndGet();//释放锁的时候count数量减1 return true; } }
从上面的代码中我们可以看到,LimitLatch首先定义了一个limit,每次获取锁时都会累计获取成功的线程数量,如果大于limit,则获取成功等待入队,释放锁的时候线程数量会减1。
下面我将以tomcat的NIO2模式为例,看一下tomcat是如何使用LimitLatch来控制连接数的。
tomcat初始化
tomcat的Nio2EndPoint启动的时候,会创建LimitLatch,而LimitLatch中的limit,正是我们tomcat中配置的最大连接数。代码如下:
@Override public void startInternal() throws Exception { if (!running) { allClosed = false; running = true; paused = false; //省略部分代码 initializeConnectionLatch();//初始化LimitLatch startAcceptorThread(); } } protected LimitLatch initializeConnectionLatch() { if (maxConnections==-1) return null; if (connectionLimitLatch==null) { connectionLimitLatch = new LimitLatch(getMaxConnections());//根据配置的最大连接数初始化LimitLatch } return connectionLimitLatch; } public LimitLatch(long limit) { this.limit = limit; this.count = new AtomicLong(0); this.sync = new Sync(); }
获取和释放连接
LimitLatch初始化后,就可以对连接的获取和释放进行管理了。下面我们看一下Nio2Endpoint中的内部类Nio2Acceptor
protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel> implements CompletionHandler<AsynchronousSocketChannel, Void> { protected int errorDelay = 0; public Nio2Acceptor(AbstractEndpoint<?, AsynchronousSocketChannel> endpoint) { super(endpoint); } @Override public void run() { // The initial accept will be called in a separate utility thread if (!isPaused()) { try { countUpOrAwaitConnection();//已经达到了最大的连接数,则入队等待通知 } catch (InterruptedException e) { // Ignore } //省略部分代码 } else { state = AcceptorState.PAUSED; } } @Override public void completed(AsynchronousSocketChannel socket, Void attachment) { // Successful accept, reset the error delay errorDelay = 0; // Continue processing the socket on the current thread // Configure the socket if (isRunning() && !isPaused()) { if (getMaxConnections() == -1) { serverSock.accept(null, this); } else { // Accept again on a new thread since countUpOrAwaitConnection may block getExecutor().execute(this); } if (!setSocketOptions(socket)) {//处理socket失败,关闭 closeSocket(socket); } } else { if (isRunning()) { state = AcceptorState.PAUSED; } destroySocket(socket);//调用closeSocket,关闭socket } } @Override public void failed(Throwable t, Void attachment) { if (isRunning()) { if (!isPaused()) { if (getMaxConnections() == -1) { serverSock.accept(null, this); } else { // Accept again on a new thread since countUpOrAwaitConnection may block getExecutor().execute(this); } } else { state = AcceptorState.PAUSED; } // We didn't get a socket countDownConnection(); //省略部分代码 } else { // We didn't get a socket countDownConnection(); } } }
上面代码中,有几点说明:
1.如果连接数已经达到最大连接,则会调用countUpOrAwaitConnection方法,代码如下:
protected void countUpOrAwaitConnection() throws InterruptedException { if (maxConnections==-1) return; LimitLatch latch = connectionLimitLatch; if (latch!=null) latch.countUpOrAwait();//入队等待 }
可以看到,达到最大连接数之后,就会入队等待
2.连接初始化失败,会调用countDownConnection方法,而连接处理结束后会调用closeSocket(destroySocket也调用closeSocket),最终调用countDownConnection,代码如下:
protected long countDownConnection() { if (maxConnections==-1) return -1; LimitLatch latch = connectionLimitLatch; if (latch!=null) { long result = latch.countDown();//最终调用LimitLatch的countDown方法,见下面代码 if (result<0) { getLog().warn(sm.getString("endpoint.warn.incorrectConnectionCount")); } return result; } else return -1; } public long countDown() { sync.releaseShared(0);//调用AQS中的releaseShared释放锁 long result = getCount(); if (log.isDebugEnabled()) { log.debug("Counting down["+Thread.currentThread().getName()+"] latch="+result); } return result;//返回count数量 } public final boolean releaseShared(int arg) {//AQS中的代码 if (tryReleaseShared(arg)) {//见文中开头的LimitLatch的中的锁代码 doReleaseShared(); return true; } return false; }
可见,socket获取失败或者处理结束后,都会调用LimitLatch中的释放锁流程。
总结
LimitLatch的使用跟Semaphore有点类似,像是一个限流器,tomcat使用它进行了最大连接数的控制,看了这篇文章,是不是对tomcat的参数server.tomcat.max-threads参数的使用原理有了一定了解呢?