Tomcat连接之KeepAlive逻辑分析

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
可观测可视化 Grafana 版,10个用户账号 1个月
简介: Tomcat连接之KeepAlive逻辑分析

背景介绍

我们的系统运行在阿里云上,负载均衡使用的SLB,应用运行环境使用的EDAS,Servlet容器使用的是taobao-tomcat-7.0.59。我们期望在停止应用之前,能够将已经与tomcat建立的连接安全的关闭掉,然后结合SLB的健康检查机制,实现停止应用前优雅的摘流能力,这样在应用停止的时候就不会有任何流量损失了。

但是,在将SLB配置为TCP监听,客户端通过HTTP KeepAlive的方式访问Tomcat的场景下,即使SLB已经检测到后端服务不健康,SLB依然会将已建立连接上的请求转发到不健康的后端服务上,导致无法优雅的摘流。

下面从Tomcat的角度,分析一下Tomcat如何管理HTTP KeepAlive连接的。

分析过程

两个参数

对于一个HTTP连接的管理,有以下两个方面需要考虑:

  • 如果一个HTTP连接建立之后,很少有报文进行交互,长时间不销毁是对资源的浪费
  • 如果一个HTTP连接建立之后,有大量的报文进行交互,可能会造成负载不均或其他问题

在Tomcat中控制以上两个行为的参数分别是keepAliveTimeout和maxKeepAliveRequests。

keepAliveTimeout

The number of milliseconds this Connector will wait for another HTTP request before closing the connection. The default value is to use the value that has been set for the connectionTimeout attribute. Use a value of -1 to indicate no (i.e. infinite) timeout.

此连接器在关闭连接之前等待另一个HTTP请求的毫秒数。默认值是使用为connectionTimeout属性设置的值。使用-1值表示没有(即无限)超时。

maxKeepAliveRequests

The maximum number of HTTP requests which can be pipelined until the connection is closed by the server. Setting this attribute to 1 will disable HTTP/1.0 keep-alive, as well as HTTP/1.1 keep-alive and pipelining. Setting this to -1 will allow an unlimited amount of pipelined or keep-alive HTTP requests. If not specified, this attribute is set to 100.

在服务器关闭连接之前可以流水线处理的HTTP请求的最大数量。将此属性设置为1将禁用HTTP/1.0的keep-alive,以及HTTP/1.1的keep-alive和pipelining。将其设置为-1将允许无限数量的管道或保持活动的HTTP请求。如果未指定,则将此属性设置为100。

通过arthas mbean命令查看Tomcat配置:

通过以上分析可知,keepAliveTimeout配置为15000毫秒,maxKeepAliveRequests配置为100,也就是说当HTTP连接空闲15000毫秒或者HTTP连接收到请求数量大于等于100的时候,Tomcat会关闭该HTTP连接。

下面分析Tomcat中HTTP KeepAlive的相关逻辑。

KeepAlive逻辑分析

以下从【建立HTTP连接】—>【解析请求】—>【准备响应】—>【关闭连接】进行分析。

建立HTTP连接

Socket acceptor thread,org.apache.tomcat.util.net.NioEndpoint$Acceptor,相关逻辑如下:

protected class Acceptor extends AbstractEndpoint.Acceptor {
    @Override
    public void run() {
        while (running) {
            ......
            SocketChannel socket = serverSock.accept();
            ......
            if (running && !paused) {
                if (!setSocketOptions(socket)) {
                    ......
                }
            } else {
               ......
            }
        }
    }
}
protected boolean setSocketOptions(SocketChannel socket) {
  ......
  NioChannel channel = nioChannels.poll();
  ......
  //将NioChannel交给Socket poller thread处理
  getPoller0().register(channel);
}

Socket poller thread主要逻辑如下:

public class Poller implements Runnable {
  public void register(final NioChannel socket) {
    socket.setPoller(this);
    // KeyAttachment继承了SocketWrapper
    KeyAttachment key = keyCache.poll();
    final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);
    ... ...
    // 将MaxKeepAliveRequests关联到socket上
    ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
    ... ...
    PollerEvent r = eventCache.poll();
    ... ...
    addEvent(r);
  }
  @Override
  public void run() {
    while (true) {
      ... ...
      Iterator<SelectionKey> iterator =
      keyCount > 0 ? selector.selectedKeys().iterator() : null;
      while (iterator != null && iterator.hasNext()) {
        SelectionKey sk = iterator.next();
        KeyAttachment attachment = (KeyAttachment)sk.attachment();
        // Attachment may be null if another thread has called
        // cancelledKey()
        if (attachment == null) {
          iterator.remove();
        } else {
          attachment.access();
          iterator.remove();
          // 处理Socket上的事件
          processKey(sk, attachment);
        }
      }
      ... ...
    }
  }
  protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
    ... ...
    if (sk.isReadable()) {
      if (!processSocket(channel, SocketStatus.OPEN_READ, true)) {
        closeSocket = true;
      }
    }
    if (!closeSocket && sk.isWritable()) {
      if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) {
        closeSocket = true;
      }
    }
    ... ...
  }
  public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
    ... ...
    KeyAttachment attachment = (KeyAttachment)socket.getAttachment();
    ... ...
    SocketProcessor sc = processorCache.poll();
        if ( sc == null ) sc = new SocketProcessor(socket,status);
        else sc.reset(socket,status);
    // 从Socket poller thread切换到Worker Threads pool
        if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
        else sc.run();
    ... ...
  }
}

流程由Socket poller thread切换到了Worker threads pool SocketProcessor,Worker threads pool是用来处理业务逻辑的线程池,接下来分析Tomcat如何解析请求的。

解析请求

SocketProcessor会流转到org.apache.coyote.http11.AbstractHttp11Processor.process中,解析请求相关逻辑如下:

public SocketState process(SocketWrapper<S> socketWrapper) throws IOException {
    ... ...
    keepAlive = true;
    // 如果当前处理业务的线程占总线程的比例超过disableKeepAlivePercentage(默认:75),
    // 则将socket可接收的请求数设置为0
    if (disableKeepAlive()) {
        socketWrapper.setKeepAliveLeft(0);
    }
    ... ...
    prepareRequest();
    ... ...
    if (maxKeepAliveRequests == 1) {
        keepAlive = false;
    } else if (maxKeepAliveRequests > 0 && socketWrapper.decrementKeepAlive() <= 0) {
        // 关键逻辑,socket已接收的请求数量是否超过配置值
        keepAlive = false;
    }
  ... ...
  // 调用Servlet service方法
  adapter.service(request, response);
  ... ...
}
@Override
protected boolean disableKeepAlive() {
    int threadRatio = -1;   
    // These may return zero or negative values
    // Only calculate a thread ratio when both are >0 to ensure we get a
    // sensible result
    int maxThreads, threadsBusy;
    if ((maxThreads = endpoint.getMaxThreads()) > 0
        && (threadsBusy = endpoint.getCurrentThreadsBusy()) > 0) {
        threadRatio = (threadsBusy * 100) / maxThreads;
    }
    // Disable keep-alive if we are running low on threads      
    if (threadRatio > getDisableKeepAlivePercentage()) {     
        return true;
    }
    return false;
}
protected void prepareRequest() {
    ... ...
    // 通过HTTP协议版本进行判断
    MessageBytes protocolMB = request.protocol();
    if (protocolMB.equals(Constants.HTTP_11)) {
        // HTTP 1.1为keepalive
        ......
    } else if (protocolMB.equals(Constants.HTTP_10)) {
        // HTTP 1.0为非keepalive
        keepAlive = false;
        ......
    } else if (protocolMB.equals("")) {
        // HTTP 0.9为非keepalive
        ......
        keepAlive = false;
    } else {
        // Unsupported protocol
        http11 = false;
        // Send 505; Unsupported HTTP version
        response.setStatus(505);
        ......
    }
  // 通过HTTP Header Connection进行判断
    // 当Connection : close,keepAlive为false
    // 当Connection : keep-alive,keepAlive为true
    MessageBytes connectionValueMB = headers.getValue(Constants.CONNECTION);
    if (connectionValueMB != null) {
        ByteChunk connectionValueBC = connectionValueMB.getByteChunk();
        if (findBytes(connectionValueBC, Constants.CLOSE_BYTES) != -1) {
            keepAlive = false;
        } else if (findBytes(connectionValueBC,
                             Constants.KEEPALIVE_BYTES) != -1) {
            keepAlive = true;
        }
    }
  ... ...
}

准备响应

不同的响应也会影响到keepAlive的设置,下面是相关逻辑:

public SocketState process(SocketWrapper<S> socketWrapper) throws IOException {
  ... ...
  // 调用Servlet service方法,其中会调用到prepareResponse()
  adapter.service(request, response);
  ... ...
  if (breakKeepAliveLoop(socketWrapper)) {
       break;
    }
  ... ...
}
private void prepareResponse() {
  boolean entityBody = true;
    contentDelimitation = false;
    int statusCode = response.getStatus();
    if (statusCode < 200 || statusCode == 204 || statusCode == 205 || statusCode == 304){
        // No entity body
        getOutputBuffer().addActiveFilter(outputFilters[Constants.VOID_FILTER]);
        entityBody = false;
        contentDelimitation = true;
    }
    MessageBytes methodMB = request.method();
    if (methodMB.equals("HEAD")) {
        // No entity body
        getOutputBuffer().addActiveFilter(outputFilters[Constants.VOID_FILTER]);
        contentDelimitation = true;
    }
    ... ...
    if ((entityBody) && (!contentDelimitation)) {
        // Mark as close the connection after the request, and add the
        // connection: close header
        keepAlive = false;
    }
    // This may disabled keep-alive to check before working out the
    // Connection header.
    checkExpectationAndResponseStatus();
    // If we know that the request is bad this early, add the
    // Connection: close header.
    keepAlive = keepAlive && !statusDropsConnection(statusCode);
    if (!keepAlive) {
        // Avoid adding the close header twice
        if (!connectionClosePresent) {
            headers.addValue(Constants.CONNECTION).setString(Constants.CLOSE);
        }
    } else if (!http11 && !getErrorState().isError()) {
        headers.addValue(Constants.CONNECTION).setString(Constants.KEEPALIVE);
    }
    ... ...
}
/**
 * Determine if we must drop the connection because of the HTTP status
 * code.  Use the same list of codes as Apache/httpd.
 */
protected boolean statusDropsConnection(int status) {
    return status == 400 /* SC_BAD_REQUEST */ ||
           status == 408 /* SC_REQUEST_TIMEOUT */ ||
           status == 411 /* SC_LENGTH_REQUIRED */ ||
           status == 413 /* SC_REQUEST_ENTITY_TOO_LARGE */ ||
           status == 414 /* SC_REQUEST_URI_TOO_LONG */ ||
           status == 500 /* SC_INTERNAL_SERVER_ERROR */ ||
           status == 503 /* SC_SERVICE_UNAVAILABLE */ ||
           status == 501 /* SC_NOT_IMPLEMENTED */;
}
@Override
protected boolean breakKeepAliveLoop(SocketWrapper<Socket> socketWrapper) {
    openSocket = keepAlive;
    // If we don't have a pipe-lined request allow this thread to be
    // used by another connection
    if (inputBuffer.lastValid == 0) {
        return true;
    }
    return false;
}

关闭连接

给客户端发送完响应就是判断是否关闭Socket的逻辑了:

public SocketState process(SocketWrapper<S> socketWrapper) throws IOException {
  ... ...
  if (breakKeepAliveLoop(socketWrapper)) {
       break;
    }
  ... ...
    if (getErrorState().isError() || endpoint.isPaused()) {
        return SocketState.CLOSED;
    } else if (isAsync() || comet) {
        return SocketState.LONG;
    } else if (isUpgrade()) {
        return SocketState.UPGRADING;
    } else if (getUpgradeInbound() != null) {
        return SocketState.UPGRADING_TOMCAT;
    } else {
        if (sendfileInProgress) {
            return SocketState.SENDFILE;
        } else {
            // 从上面分析可知,openSocket==keepAlive,
            // 所以当openSocket=false的时候,keepAlive=false,此时SocketState为CLOSED
            if (openSocket) {
                if (readComplete) {
                    return SocketState.OPEN;
                } else {
                    return SocketState.LONG;
                }
            } else {
                return SocketState.CLOSED;
            }
        }
    }
}

以上process方法执行完后,最终会返回到SocketProcessor.run中,逻辑如下:

public void run() {
    boolean launch = false;
    synchronized (socket) {
        try {
            SocketState state = SocketState.OPEN;
            ... ...
            state = handler.process(socket,status);
            // 当keepAlive=false的时候,关闭Socket
            if (state == SocketState.CLOSED) {
                // Close socket
                ... ...
                try {
                    socket.getSocket().close();
                } catch (IOException e) {
                    // Ignore
                }
            } else if (state == SocketState.OPEN ||
                    state == SocketState.UPGRADING ||
                    state == SocketState.UPGRADING_TOMCAT  ||
                    state == SocketState.UPGRADED){
                ... ...
                launch = true;
            } else if (state == SocketState.LONG) {
                socket.access();
                waitingRequests.add(socket);
            }
        }finally {
            if (launch) {
                try {
                    getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
                } catch (RejectedExecutionException x) {
                    ... ...
                }
            }
    }
}

总结

从以上分析可知,keepAlive既与请求有关,也与响应有关。

解析请求的时候,会根据系统繁忙程度、使用的HTTP协议版本,HTTP Header Conncetion及Socket接收的请求数量有关;

准备响应的时候,会根据业务返回的状态码,系统异常等情况进行判断,当Socket状态为SocketState.CLOSED的时候则会关闭连接。

通过以上分析,想必大家已经知道怎么优雅的关闭HTTP连接了。

参考资料

GitHub - apache/tomcat: Apache Tomcat

相关实践学习
部署高可用架构
本场景主要介绍如何使用云服务器ECS、负载均衡SLB、云数据库RDS和数据传输服务产品来部署多可用区高可用架构。
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
目录
相关文章
|
2月前
|
XML 应用服务中间件 Apache
Tomcat AJP连接器配置secretRequired=“true“,但是属性secret确实空或者空字符串,这样的组合是无效的。
Tomcat AJP连接器配置secretRequired=“true“,但是属性secret确实空或者空字符串,这样的组合是无效的。
|
8月前
|
Arthas 弹性计算 安全
优雅上下线之如何安全的关闭Tomcat持久连接
优雅上下线之如何安全的关闭Tomcat持久连接
319 3
|
7月前
|
Web App开发 应用服务中间件
解决在访问tomcat时出现连接失败,Firefox 无法建立到 localhost:8080 服务器的连接的问题~
解决在访问tomcat时出现连接失败,Firefox 无法建立到 localhost:8080 服务器的连接的问题~
126 0
|
7月前
|
网络协议 应用服务中间件 Apache
100分布式电商项目 - Tomcat性能优化(禁用AJP连接器)
100分布式电商项目 - Tomcat性能优化(禁用AJP连接器)
32 0
|
7月前
|
应用服务中间件
IDEA 配置部署JavaWeb项目在阿里云服务器的tomcat上,成功连接服务器,但Artifact 没有成功部署
IDEA 配置部署JavaWeb项目在阿里云服务器的tomcat上,成功连接服务器,但Artifact 没有成功部署
443 0
|
弹性计算 Java 应用服务中间件
关于购买阿里云学生服务器以及win7安装Tomcat连接服务器的过程总结
关于购买阿里云学生服务器以及win7安装Tomcat连接服务器的过程总结
138 0
|
弹性计算 Oracle 安全
阿里云学生服务器(Windows)的配置以及安装Tomcat连接服务器的教程
阿里云学生服务器(Windows)的配置以及安装Tomcat连接服务器的教程
434 0
阿里云学生服务器(Windows)的配置以及安装Tomcat连接服务器的教程
|
应用服务中间件
tomcat升级版本为8.5.68后.启动报错: java.lang.IllegalArgumentException: AJP连接器配置secretRequired=“true”
ttomcat升级版本为8.5.68后.启动报错: java.lang.IllegalArgumentException: AJP连接器配置secretRequired=“true” 属性secret确实为空 1.tomcat启动报错内容如下
790 0
tomcat升级版本为8.5.68后.启动报错: java.lang.IllegalArgumentException: AJP连接器配置secretRequired=“true”
|
弹性计算 Oracle 安全
服务器配置:阿里云服务器(Windows)的配置以及安装Tomcat连接服务器的教程
服务器配置:阿里云服务器(Windows)的配置以及安装Tomcat连接服务器的教程
1733 0
服务器配置:阿里云服务器(Windows)的配置以及安装Tomcat连接服务器的教程
|
设计模式 网络协议 Java
Tomcat的连接器是如何设计的?
Tomcat的连接器是如何设计的?
98 0
Tomcat的连接器是如何设计的?