Tomcat连接之KeepAlive逻辑分析

本文涉及的产品
应用实时监控服务-用户体验监控,每月100OCU免费额度
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
可观测可视化 Grafana 版,10个用户账号 1个月
简介: Tomcat连接之KeepAlive逻辑分析

背景介绍

我们的系统运行在阿里云上,负载均衡使用的SLB,应用运行环境使用的EDAS,Servlet容器使用的是taobao-tomcat-7.0.59。我们期望在停止应用之前,能够将已经与tomcat建立的连接安全的关闭掉,然后结合SLB的健康检查机制,实现停止应用前优雅的摘流能力,这样在应用停止的时候就不会有任何流量损失了。

但是,在将SLB配置为TCP监听,客户端通过HTTP KeepAlive的方式访问Tomcat的场景下,即使SLB已经检测到后端服务不健康,SLB依然会将已建立连接上的请求转发到不健康的后端服务上,导致无法优雅的摘流。

下面从Tomcat的角度,分析一下Tomcat如何管理HTTP KeepAlive连接的。

分析过程

两个参数

对于一个HTTP连接的管理,有以下两个方面需要考虑:

  • 如果一个HTTP连接建立之后,很少有报文进行交互,长时间不销毁是对资源的浪费
  • 如果一个HTTP连接建立之后,有大量的报文进行交互,可能会造成负载不均或其他问题

在Tomcat中控制以上两个行为的参数分别是keepAliveTimeout和maxKeepAliveRequests。

keepAliveTimeout

The number of milliseconds this Connector will wait for another HTTP request before closing the connection. The default value is to use the value that has been set for the connectionTimeout attribute. Use a value of -1 to indicate no (i.e. infinite) timeout.

此连接器在关闭连接之前等待另一个HTTP请求的毫秒数。默认值是使用为connectionTimeout属性设置的值。使用-1值表示没有(即无限)超时。

maxKeepAliveRequests

The maximum number of HTTP requests which can be pipelined until the connection is closed by the server. Setting this attribute to 1 will disable HTTP/1.0 keep-alive, as well as HTTP/1.1 keep-alive and pipelining. Setting this to -1 will allow an unlimited amount of pipelined or keep-alive HTTP requests. If not specified, this attribute is set to 100.

在服务器关闭连接之前可以流水线处理的HTTP请求的最大数量。将此属性设置为1将禁用HTTP/1.0的keep-alive,以及HTTP/1.1的keep-alive和pipelining。将其设置为-1将允许无限数量的管道或保持活动的HTTP请求。如果未指定,则将此属性设置为100。

通过arthas mbean命令查看Tomcat配置:

通过以上分析可知,keepAliveTimeout配置为15000毫秒,maxKeepAliveRequests配置为100,也就是说当HTTP连接空闲15000毫秒或者HTTP连接收到请求数量大于等于100的时候,Tomcat会关闭该HTTP连接。

下面分析Tomcat中HTTP KeepAlive的相关逻辑。

KeepAlive逻辑分析

以下从【建立HTTP连接】—>【解析请求】—>【准备响应】—>【关闭连接】进行分析。

建立HTTP连接

Socket acceptor thread,org.apache.tomcat.util.net.NioEndpoint$Acceptor,相关逻辑如下:

protected class Acceptor extends AbstractEndpoint.Acceptor {
    @Override
    public void run() {
        while (running) {
            ......
            SocketChannel socket = serverSock.accept();
            ......
            if (running && !paused) {
                if (!setSocketOptions(socket)) {
                    ......
                }
            } else {
               ......
            }
        }
    }
}
protected boolean setSocketOptions(SocketChannel socket) {
  ......
  NioChannel channel = nioChannels.poll();
  ......
  //将NioChannel交给Socket poller thread处理
  getPoller0().register(channel);
}

Socket poller thread主要逻辑如下:

public class Poller implements Runnable {
  public void register(final NioChannel socket) {
    socket.setPoller(this);
    // KeyAttachment继承了SocketWrapper
    KeyAttachment key = keyCache.poll();
    final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);
    ... ...
    // 将MaxKeepAliveRequests关联到socket上
    ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
    ... ...
    PollerEvent r = eventCache.poll();
    ... ...
    addEvent(r);
  }
  @Override
  public void run() {
    while (true) {
      ... ...
      Iterator<SelectionKey> iterator =
      keyCount > 0 ? selector.selectedKeys().iterator() : null;
      while (iterator != null && iterator.hasNext()) {
        SelectionKey sk = iterator.next();
        KeyAttachment attachment = (KeyAttachment)sk.attachment();
        // Attachment may be null if another thread has called
        // cancelledKey()
        if (attachment == null) {
          iterator.remove();
        } else {
          attachment.access();
          iterator.remove();
          // 处理Socket上的事件
          processKey(sk, attachment);
        }
      }
      ... ...
    }
  }
  protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
    ... ...
    if (sk.isReadable()) {
      if (!processSocket(channel, SocketStatus.OPEN_READ, true)) {
        closeSocket = true;
      }
    }
    if (!closeSocket && sk.isWritable()) {
      if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) {
        closeSocket = true;
      }
    }
    ... ...
  }
  public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
    ... ...
    KeyAttachment attachment = (KeyAttachment)socket.getAttachment();
    ... ...
    SocketProcessor sc = processorCache.poll();
        if ( sc == null ) sc = new SocketProcessor(socket,status);
        else sc.reset(socket,status);
    // 从Socket poller thread切换到Worker Threads pool
        if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
        else sc.run();
    ... ...
  }
}

流程由Socket poller thread切换到了Worker threads pool SocketProcessor,Worker threads pool是用来处理业务逻辑的线程池,接下来分析Tomcat如何解析请求的。

解析请求

SocketProcessor会流转到org.apache.coyote.http11.AbstractHttp11Processor.process中,解析请求相关逻辑如下:

public SocketState process(SocketWrapper<S> socketWrapper) throws IOException {
    ... ...
    keepAlive = true;
    // 如果当前处理业务的线程占总线程的比例超过disableKeepAlivePercentage(默认:75),
    // 则将socket可接收的请求数设置为0
    if (disableKeepAlive()) {
        socketWrapper.setKeepAliveLeft(0);
    }
    ... ...
    prepareRequest();
    ... ...
    if (maxKeepAliveRequests == 1) {
        keepAlive = false;
    } else if (maxKeepAliveRequests > 0 && socketWrapper.decrementKeepAlive() <= 0) {
        // 关键逻辑,socket已接收的请求数量是否超过配置值
        keepAlive = false;
    }
  ... ...
  // 调用Servlet service方法
  adapter.service(request, response);
  ... ...
}
@Override
protected boolean disableKeepAlive() {
    int threadRatio = -1;   
    // These may return zero or negative values
    // Only calculate a thread ratio when both are >0 to ensure we get a
    // sensible result
    int maxThreads, threadsBusy;
    if ((maxThreads = endpoint.getMaxThreads()) > 0
        && (threadsBusy = endpoint.getCurrentThreadsBusy()) > 0) {
        threadRatio = (threadsBusy * 100) / maxThreads;
    }
    // Disable keep-alive if we are running low on threads      
    if (threadRatio > getDisableKeepAlivePercentage()) {     
        return true;
    }
    return false;
}
protected void prepareRequest() {
    ... ...
    // 通过HTTP协议版本进行判断
    MessageBytes protocolMB = request.protocol();
    if (protocolMB.equals(Constants.HTTP_11)) {
        // HTTP 1.1为keepalive
        ......
    } else if (protocolMB.equals(Constants.HTTP_10)) {
        // HTTP 1.0为非keepalive
        keepAlive = false;
        ......
    } else if (protocolMB.equals("")) {
        // HTTP 0.9为非keepalive
        ......
        keepAlive = false;
    } else {
        // Unsupported protocol
        http11 = false;
        // Send 505; Unsupported HTTP version
        response.setStatus(505);
        ......
    }
  // 通过HTTP Header Connection进行判断
    // 当Connection : close,keepAlive为false
    // 当Connection : keep-alive,keepAlive为true
    MessageBytes connectionValueMB = headers.getValue(Constants.CONNECTION);
    if (connectionValueMB != null) {
        ByteChunk connectionValueBC = connectionValueMB.getByteChunk();
        if (findBytes(connectionValueBC, Constants.CLOSE_BYTES) != -1) {
            keepAlive = false;
        } else if (findBytes(connectionValueBC,
                             Constants.KEEPALIVE_BYTES) != -1) {
            keepAlive = true;
        }
    }
  ... ...
}

准备响应

不同的响应也会影响到keepAlive的设置,下面是相关逻辑:

public SocketState process(SocketWrapper<S> socketWrapper) throws IOException {
  ... ...
  // 调用Servlet service方法,其中会调用到prepareResponse()
  adapter.service(request, response);
  ... ...
  if (breakKeepAliveLoop(socketWrapper)) {
       break;
    }
  ... ...
}
private void prepareResponse() {
  boolean entityBody = true;
    contentDelimitation = false;
    int statusCode = response.getStatus();
    if (statusCode < 200 || statusCode == 204 || statusCode == 205 || statusCode == 304){
        // No entity body
        getOutputBuffer().addActiveFilter(outputFilters[Constants.VOID_FILTER]);
        entityBody = false;
        contentDelimitation = true;
    }
    MessageBytes methodMB = request.method();
    if (methodMB.equals("HEAD")) {
        // No entity body
        getOutputBuffer().addActiveFilter(outputFilters[Constants.VOID_FILTER]);
        contentDelimitation = true;
    }
    ... ...
    if ((entityBody) && (!contentDelimitation)) {
        // Mark as close the connection after the request, and add the
        // connection: close header
        keepAlive = false;
    }
    // This may disabled keep-alive to check before working out the
    // Connection header.
    checkExpectationAndResponseStatus();
    // If we know that the request is bad this early, add the
    // Connection: close header.
    keepAlive = keepAlive && !statusDropsConnection(statusCode);
    if (!keepAlive) {
        // Avoid adding the close header twice
        if (!connectionClosePresent) {
            headers.addValue(Constants.CONNECTION).setString(Constants.CLOSE);
        }
    } else if (!http11 && !getErrorState().isError()) {
        headers.addValue(Constants.CONNECTION).setString(Constants.KEEPALIVE);
    }
    ... ...
}
/**
 * Determine if we must drop the connection because of the HTTP status
 * code.  Use the same list of codes as Apache/httpd.
 */
protected boolean statusDropsConnection(int status) {
    return status == 400 /* SC_BAD_REQUEST */ ||
           status == 408 /* SC_REQUEST_TIMEOUT */ ||
           status == 411 /* SC_LENGTH_REQUIRED */ ||
           status == 413 /* SC_REQUEST_ENTITY_TOO_LARGE */ ||
           status == 414 /* SC_REQUEST_URI_TOO_LONG */ ||
           status == 500 /* SC_INTERNAL_SERVER_ERROR */ ||
           status == 503 /* SC_SERVICE_UNAVAILABLE */ ||
           status == 501 /* SC_NOT_IMPLEMENTED */;
}
@Override
protected boolean breakKeepAliveLoop(SocketWrapper<Socket> socketWrapper) {
    openSocket = keepAlive;
    // If we don't have a pipe-lined request allow this thread to be
    // used by another connection
    if (inputBuffer.lastValid == 0) {
        return true;
    }
    return false;
}

关闭连接

给客户端发送完响应就是判断是否关闭Socket的逻辑了:

public SocketState process(SocketWrapper<S> socketWrapper) throws IOException {
  ... ...
  if (breakKeepAliveLoop(socketWrapper)) {
       break;
    }
  ... ...
    if (getErrorState().isError() || endpoint.isPaused()) {
        return SocketState.CLOSED;
    } else if (isAsync() || comet) {
        return SocketState.LONG;
    } else if (isUpgrade()) {
        return SocketState.UPGRADING;
    } else if (getUpgradeInbound() != null) {
        return SocketState.UPGRADING_TOMCAT;
    } else {
        if (sendfileInProgress) {
            return SocketState.SENDFILE;
        } else {
            // 从上面分析可知,openSocket==keepAlive,
            // 所以当openSocket=false的时候,keepAlive=false,此时SocketState为CLOSED
            if (openSocket) {
                if (readComplete) {
                    return SocketState.OPEN;
                } else {
                    return SocketState.LONG;
                }
            } else {
                return SocketState.CLOSED;
            }
        }
    }
}

以上process方法执行完后,最终会返回到SocketProcessor.run中,逻辑如下:

public void run() {
    boolean launch = false;
    synchronized (socket) {
        try {
            SocketState state = SocketState.OPEN;
            ... ...
            state = handler.process(socket,status);
            // 当keepAlive=false的时候,关闭Socket
            if (state == SocketState.CLOSED) {
                // Close socket
                ... ...
                try {
                    socket.getSocket().close();
                } catch (IOException e) {
                    // Ignore
                }
            } else if (state == SocketState.OPEN ||
                    state == SocketState.UPGRADING ||
                    state == SocketState.UPGRADING_TOMCAT  ||
                    state == SocketState.UPGRADED){
                ... ...
                launch = true;
            } else if (state == SocketState.LONG) {
                socket.access();
                waitingRequests.add(socket);
            }
        }finally {
            if (launch) {
                try {
                    getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
                } catch (RejectedExecutionException x) {
                    ... ...
                }
            }
    }
}

总结

从以上分析可知,keepAlive既与请求有关,也与响应有关。

解析请求的时候,会根据系统繁忙程度、使用的HTTP协议版本,HTTP Header Conncetion及Socket接收的请求数量有关;

准备响应的时候,会根据业务返回的状态码,系统异常等情况进行判断,当Socket状态为SocketState.CLOSED的时候则会关闭连接。

通过以上分析,想必大家已经知道怎么优雅的关闭HTTP连接了。

参考资料

GitHub - apache/tomcat: Apache Tomcat

相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
目录
相关文章
|
7月前
|
XML 应用服务中间件 Apache
Tomcat AJP连接器配置secretRequired=“true“,但是属性secret确实空或者空字符串,这样的组合是无效的。
Tomcat AJP连接器配置secretRequired=“true“,但是属性secret确实空或者空字符串,这样的组合是无效的。
|
Arthas 弹性计算 安全
优雅上下线之如何安全的关闭Tomcat持久连接
优雅上下线之如何安全的关闭Tomcat持久连接
409 3
|
7月前
|
设计模式 安全 Java
【分布式技术专题】「Tomcat技术专题」 探索Tomcat技术架构设计模式的奥秘(Server和Service组件原理分析)
【分布式技术专题】「Tomcat技术专题」 探索Tomcat技术架构设计模式的奥秘(Server和Service组件原理分析)
112 0
|
Web App开发 应用服务中间件
解决在访问tomcat时出现连接失败,Firefox 无法建立到 localhost:8080 服务器的连接的问题~
解决在访问tomcat时出现连接失败,Firefox 无法建立到 localhost:8080 服务器的连接的问题~
222 0
|
4月前
|
Arthas Java 应用服务中间件
一次Tomcat返回404的分析
一个Web应用部署在阿里云EDAS上,使用Tomcat 7.0.59.3,在测试环境遭遇所有接口返回404的问题,而生产环境正常。测试与生产环境主要差异在于Apollo配置不同。通过Arthas工具监控,确认Spring已正确加载Controller,并且请求未进入Spring或Filter处理流程。进一步分析发现,Tomcat内部处理流程中设置了404状态码,最终定位到`org.apache.coyote.http11.AbstractHttp11Processor.process`方法存在问题。通过对代码逻辑的分析,确定原因是请求URL路径不正确。修正URL路径后问题得到解决。
94 1
一次Tomcat返回404的分析
|
7月前
|
存储 负载均衡 NoSQL
【分布式技术架构】「Tomcat技术专题」 探索Tomcat集群架构原理和开发分析指南
【分布式技术架构】「Tomcat技术专题」 探索Tomcat集群架构原理和开发分析指南
149 1
|
7月前
|
JSON 前端开发 Java
管理系统总结(前端:Vue-cli, 后端Jdbc连接mysql数据库,项目部署tomcat里)
管理系统总结(前端:Vue-cli, 后端Jdbc连接mysql数据库,项目部署tomcat里)
|
应用服务中间件
IDEA 配置部署JavaWeb项目在阿里云服务器的tomcat上,成功连接服务器,但Artifact 没有成功部署
IDEA 配置部署JavaWeb项目在阿里云服务器的tomcat上,成功连接服务器,但Artifact 没有成功部署
488 0
|
网络协议 应用服务中间件 Apache
100分布式电商项目 - Tomcat性能优化(禁用AJP连接器)
100分布式电商项目 - Tomcat性能优化(禁用AJP连接器)
81 0
|
监控 算法 Java
java tomcat服务无缘无故挂掉分析和解决方案
最近有同事反应有时候xxx系统有时候会时不时出现服务异常提示,一上机器,发现xxx服务进程不在,重启服务后又恢复了,所以这边就需要去跟进问题。
3332 0