SpringCloud升级之路2020.0.x版-45. 实现公共日志记录

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: SpringCloud升级之路2020.0.x版-45. 实现公共日志记录

image.png


本系列代码地址: https://github.com/JoJoTec/spring-cloud-parent

我们这一节在前面实现的带有链路信息的 Publisher 的工厂的基础上,实现公共日志记录的 GlobalFilter。回顾下我们的需求:

我们需要在网关记录每个请求的:

  • HTTP 相关元素:
  • URL 相关信息
  • 请求信息,例如 HTTP HEADER,请求时间等等
  • 某些类型的请求体
  • 响应信息,例如响应码
  • 某些类型响应的响应体


  • 链路信息


记录请求与响应的 Body 需要注意的地方


前面的章节我们提到过,对于请求与响应的 body 处理,如果用其结果放入主链路的话,会造成 Spring Cloud Sleuth 的链路信息丢失。还有两个要注意的地方是:

  • TCP 粘包拆包导致一个请求体分割成好几份或者一个包包含几个请求
  • 读取后要释放原本的请求 body 读取出来的 DataBuffer

为何要释放原本的请求 body 读取出来的 DataBuffer?因为读取出来后占用的 DataBuffer 如果手动不释放那么底层的计数一直不归零会造成内存泄漏。可以参考框架代码看出,这里的 DataBuffer 是需要手动释放的,参考源码:

ByteBufferDecoder.java

@Override
public ByteBuffer decode(DataBuffer dataBuffer, ResolvableType elementType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  int byteCount = dataBuffer.readableByteCount();
  ByteBuffer copy = ByteBuffer.allocate(byteCount);
  copy.put(dataBuffer.asByteBuffer());
  copy.flip();
  DataBufferUtils.release(dataBuffer);
  if (logger.isDebugEnabled()) {
    logger.debug(Hints.getLogPrefix(hints) + "Read " + byteCount + " bytes");
  }
  return copy;
}

我们是想把可以输出到日志的 body 转换成字符串进行输出,为了代码简洁防止出错,我们使用一个工具类来完成将 DataBuffer 读取成字符串并释放的操作:

package com.github.jojotech.spring.cloud.apigateway.common;
import com.google.common.base.Charsets;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
public class BufferUtil {
  public static String dataBufferToString(DataBuffer dataBuffer) {
    byte[] content = new byte[dataBuffer.readableByteCount()];
    dataBuffer.read(content);
    DataBufferUtils.release(dataBuffer);
    return new String(content, Charsets.UTF_8);
  }
}


编写实现公共日志记录 GlobalFilter


前面铺垫了那么多,我们终于可以着手开始写这个日志 GlobalFilter 了:

package com.github.jojotech.spring.cloud.apigateway.filter;
import java.net.URI;
import java.util.Set;
import com.alibaba.fastjson.JSON;
import com.github.jojotech.spring.cloud.apigateway.common.BufferUtil;
import com.github.jojotech.spring.cloud.apigateway.common.TracedPublisherFactory;
import lombok.extern.log4j.Log4j2;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
@Log4j2
@Component
public class CommonLogFilter implements GlobalFilter, Ordered {
  //可以输出的 body 格式
  public static final Set<MediaType> legalLogMediaTypes = Set.of(
      MediaType.TEXT_XML,
      MediaType.TEXT_PLAIN,
      MediaType.APPLICATION_XML,
      MediaType.APPLICATION_JSON
  );
  @Autowired
  private TracedPublisherFactory tracedPublisherFactory;
  @Override
  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    long startTime = System.currentTimeMillis();
    ServerHttpRequest request = exchange.getRequest();
    ServerHttpResponse response = exchange.getResponse();
    //获取用于拆包处理聚合读取请求和响应 body 的 buffer 的 factory
    DataBufferFactory dataBufferFactory = response.bufferFactory();
    //请求 http 头
    HttpHeaders requestHeaders = request.getHeaders();
    //请求 body 类型
    MediaType requestContentType = requestHeaders.getContentType();
    //请求 uri
    String uri = request.getURI().toString();
    //请求 http 方法
    HttpMethod method = request.getMethod();
    log.info("{} -> {}: header: {}", method, uri, JSON.toJSONString(requestHeaders));
    Flux<DataBuffer> dataBufferFlux = tracedPublisherFactory.getTracedFlux(request.getBody(), exchange)
        //使用 buffer 在这里将所有 body 读取完避免拆包影响
        .buffer()
        .map(dataBuffers -> {
          //将所有 buffer 粘合在一起
          DataBuffer dataBuffer = dataBufferFactory.join(dataBuffers);
          //只有在 debug 开启的时候,才会输出 body
          if (log.isDebugEnabled()) {
            //只有特定的 body 类型才会输出具体的
            if (legalLogMediaTypes.contains(requestContentType)) {
              try {
                //将 body 转化为 String 进行输出,同时注意,原始的 buffer 需要被释放,因为 body 流已经被读取出来,但是没有地方回收
                //参考
                String s = BufferUtil.dataBufferToString(dataBuffer);
                log.debug("body: {}", s);
                dataBuffer = dataBufferFactory.wrap(s.getBytes());
              }
              catch (Exception e) {
                log.error("error read request body: {}", e.getMessage(), e);
              }
            }
            else {
              log.debug("body: {}", request);
            }
          }
          return dataBuffer;
        });
    return chain.filter(exchange.mutate().request(new ServerHttpRequestDecorator(request) {
      @Override
      public Flux<DataBuffer> getBody() {
        return dataBufferFlux;
      }
    }).response(new ServerHttpResponseDecorator(response) {
      @Override
      public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
        HttpHeaders responseHeaders = super.getHeaders();
        //调用这里的是写响应回客户端的 HttpClientConnect 的回写,已经跳出了 Spring Cloud Sleuth 的链路 Span,所以没有链路追踪信息
        //但是我们在 CommonTraceFilter 我们将链路信息放入了响应 Header 中,所以这里我们就不用手动增加链路信息了
        log.info("response: {} -> {} {} header: {}, time: {}ms", method, uri, getStatusCode(), JSON.toJSONString(responseHeaders), System.currentTimeMillis() - startTime);
        final MediaType contentType = responseHeaders.getContentType();
        if (contentType != null && body instanceof Flux && legalLogMediaTypes.contains(contentType) && log.isDebugEnabled()) {
          //有TCP粘包拆包问题,这个body是多次写入的,一次调用拿不到完整的body,所以这里转换成fluxBody利用其中的buffer来接受完整的body
          Flux<? extends DataBuffer> fluxBody = tracedPublisherFactory.getTracedFlux(Flux.from(body), exchange);
          return super.writeWith(fluxBody.buffer().map(buffers -> {
            DataBuffer buffer = dataBufferFactory.join(buffers);
            try {
              String s = BufferUtil.dataBufferToString(buffer);
              log.debug("response: body: {}", s);
              return dataBufferFactory.wrap(s.getBytes());
            } catch (Exception e) {
              log.error("error read response body: {}", e.getMessage(), e);
            }
            return buffer;
          }));
        }
        // if body is not a flux. never got there.
        return super.writeWith(body);
      }
    }).build());
  }
  @Override
  public int getOrder() {
    //指定顺序,在 CommonTraceFilter(这个Filter是读取链路信息,最好在所有 Filter 之前) 之后
    return new CommonTraceFilter().getOrder() + 1;
  }
}

需要注意的点都在注释当中明确标出了,请大家参考。


查看日志


我们通过加入下面的日志配置,打开 body 的日志,这样日志就全了:

<AsyncLogger name="com.github.jojotech.spring.cloud.apigateway.filter.CommonLogFilter" level="debug" additivity="false" includeLocation="true">
    <appender-ref ref="console" />
</AsyncLogger>

发送一个 POST 带 body 的请求,从日志中就能看到:

2021-11-29 14:08:42,231  INFO [sports,8481ce2786b686fa,8481ce2786b686fa] [24916] [reactor-http-nio-2][com.github.jojotech.spring.cloud.apigateway.filter.CommonLogFilter:59]:POST -> http://127.0.0.1:8181/test-ss/anything?test=1: header: {"Content-Type":["text/plain"],"User-Agent":["PostmanRuntime/7.28.4"],"Accept":["*/*"],"Postman-Token":["666b17c9-0789-46e6-b515-9a4538803308"],"Host":["127.0.0.1:8181"],"Accept-Encoding":["gzip, deflate, br"],"Connection":["keep-alive"],"content-length":["8"]}
2021-11-29 14:08:42,233 DEBUG [sports,8481ce2786b686fa,8481ce2786b686fa] [24916] [reactor-http-nio-2][com.github.jojotech.spring.cloud.apigateway.filter.CommonLogFilter:74]:body: ifasdasd
2021-11-29 14:08:42,463  INFO [sports,,] [24916] [reactor-http-nio-2][com.github.jojotech.spring.cloud.apigateway.filter.CommonLogFilter$1:96]:response: POST -> http://127.0.0.1:8181/test-ss/anything?test=1 200 OK header: {"traceId":["8481ce2786b686fa"],"spanId":["8481ce2786b686fa"],"Date":["Mon, 29 Nov 2021 14:08:43 GMT"],"Content-Type":["application/json"],"Server":["gunicorn/19.9.0"],"Access-Control-Allow-Origin":["*"],"Access-Control-Allow-Credentials":["true"],"content-length":["886"]}, time: 232ms
2021-11-29 14:08:42,466 DEBUG [sports,8481ce2786b686fa,8481ce2786b686fa] [24916] [reactor-http-nio-2][com.github.jojotech.spring.cloud.apigateway.filter.CommonLogFilter$1:105]:response: body: {
  "args": {
    "test": "1"
  }, 
  "data": "ifasdasd", 
  "files": {}, 
  "form": {}, 
  "headers": {
    "Accept": "*/*", 
    "Accept-Encoding": "gzip, deflate, br", 
    "Content-Length": "8", 
    "Content-Type": "text/plain", 
    "Forwarded": "proto=http;host=\"127.0.0.1:8181\";for=\"127.0.0.1:57526\"", 
    "Host": "httpbin.org", 
    "Postman-Token": "666b17c9-0789-46e6-b515-9a4538803308", 
    "User-Agent": "PostmanRuntime/7.28.4", 
    "X-Amzn-Trace-Id": "Root=1-61a4deeb-3d016ff729306d862edcca0b", 
    "X-B3-Parentspanid": "8481ce2786b686fa", 
    "X-B3-Sampled": "0", 
    "X-B3-Spanid": "5def545b28a7a842", 
    "X-B3-Traceid": "8481ce2786b686fa", 
    "X-Forwarded-Host": "127.0.0.1:8181", 
    "X-Forwarded-Prefix": "/test-ss"
  }, 
  "json": null, 
  "method": "POST", 
  "origin": "127.0.0.1, 61.244.202.46", 
  "url": "http://127.0.0.1:8181/anything?test=1"
}
2021-11-29 14:08:42,474  INFO [sports,,] [24916] [reactor-http-nio-2][reactor.util.Logge
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
8月前
|
缓存 负载均衡 Java
Spring Cloud Alibaba client升级问题之升级报错如何解决
Spring Cloud Alibaba提供了一套在Spring Cloud框架基础上构建的微服务解决方案,旨在简化分布式系统的开发和管理;本合集将探讨Spring Cloud Alibaba在实际应用中的部署和使用技巧,以及该框架常见问题的诊断方法和解决步骤。
|
Java Nacos 开发工具
Spring Boot 单体应用一键升级成 Spring Cloud Alibaba(2)
Spring Boot 单体应用一键升级成 Spring Cloud Alibaba
155 0
Spring Boot 单体应用一键升级成 Spring Cloud Alibaba(2)
|
20天前
|
SQL 存储 缓存
日志服务 SQL 引擎全新升级
SQL 作为 SLS 基础功能,每天承载了用户大量日志数据的分析请求,既有小数据量的快速查询(如告警、即席查询等);也有上万亿数据规模的报表级分析。SLS 作为 Serverless 服务,除了要满足不同用户的各类需求,还要兼顾性能、隔离性、稳定性等要求。过去一年多的时间,SLS SQL 团队做了大量的工作,对 SQL 引擎进行了全新升级,SQL 的执行性能、隔离性等方面都有了大幅的提升。
|
Kubernetes Java 微服务
Spring Boot 单体应用一键升级成 Spring Cloud Alibaba(1)
Spring Boot 单体应用一键升级成 Spring Cloud Alibaba(1)
157 0
Spring Boot 单体应用一键升级成 Spring Cloud Alibaba(1)
|
6月前
|
SQL 运维 监控
SLS 数据加工全面升级,集成 SPL 语法
在系统开发、运维过程中,日志是最重要的信息之一,其最大的优点是简单直接。SLS 数据加工功能旨在解决非结构化的日志数据处理,当前全面升级,集成 SPL 语言、更强的数据处理性能、更优的使用成本。
18222 147
|
5月前
|
存储 消息中间件 人工智能
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
早期 MiniMax 基于 Grafana Loki 构建了日志系统,在资源消耗、写入性能及系统稳定性上都面临巨大的挑战。为此 MiniMax 开始寻找全新的日志系统方案,并基于阿里云数据库 SelectDB 版内核 Apache Doris 升级了日志系统,新系统已接入 MiniMax 内部所有业务线日志数据,数据规模为 PB 级, 整体可用性达到 99.9% 以上,10 亿级日志数据的检索速度可实现秒级响应。
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
|
5月前
|
存储 Java Spring
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?
|
5月前
|
缓存 Oracle Java
JDK8到JDK22版本升级的新特性问题之在JDK17中,日志的刷新如何操作
JDK8到JDK22版本升级的新特性问题之在JDK17中,日志的刷新如何操作
|
8月前
|
存储 监控 安全
带你读《Apache Doris 案例集》——07查询平均提速700% ,奇安信基于 Apache Doris 升级日志安全分析系统(1)
带你读《Apache Doris 案例集》——07查询平均提速700% ,奇安信基于 Apache Doris 升级日志安全分析系统(1)
254 1
|
8月前
|
SQL 存储 安全
带你读《Apache Doris 案例集》——07查询平均提速700% ,奇安信基于 Apache Doris 升级日志安全分析系统(2)
带你读《Apache Doris 案例集》——07查询平均提速700% ,奇安信基于 Apache Doris 升级日志安全分析系统(2)
354 0