二、自定义扩展日志级别,实现可配置的日志存取方式
上面代码工作完成之后,接下来需要在log4j2.xml中配置自定义日志级别,实现将自定义的日志打印到指定的文件中:
<!-- 这个会打印出所有的operation级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档 --> <RollingRandomAccessFile name="RollingFileOperation" fileName="${FILE_PATH}/operation.log" filePattern="${FILE_PATH}/OPERATION-%d{yyyy-MM-dd}_%i.log.gz"> <!--只输出action level级别的信息(onMatch),其他的直接拒绝(onMismatch)--> <LevelRangeFilter minLevel="OPERATION" maxLevel="OPERATION" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="${LOG_PATTERN}"/> <Policies> <!--interval属性用来指定多久滚动一次,默认是1 hour--> <TimeBasedTriggeringPolicy interval="${FILE_WRITE_INTERVAL}"/> <SizeBasedTriggeringPolicy size="${FILE_STORE_MAX}"/> </Policies> <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖--> <DefaultRolloverStrategy max="${LOG_MAX_HISTORY}"/> </RollingRandomAccessFile> <!-- 这个会打印出所有的api级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档 --> <RollingRandomAccessFile name="RollingFileApi" fileName="${FILE_PATH}/api.log" filePattern="${FILE_PATH}/API-%d{yyyy-MM-dd}_%i.log.gz"> <!--只输出visit level级别的信息(onMatch),其他的直接拒绝(onMismatch)--> <LevelRangeFilter minLevel="API" maxLevel="API" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="${LOG_PATTERN}"/> <Policies> <!--interval属性用来指定多久滚动一次,默认是1 hour--> <TimeBasedTriggeringPolicy interval="${FILE_WRITE_INTERVAL}"/> <SizeBasedTriggeringPolicy size="${FILE_STORE_MAX}"/> </Policies> <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖--> <DefaultRolloverStrategy max="${LOG_MAX_HISTORY}"/> </RollingRandomAccessFile> <loggers> <AsyncLogger name="AsyncLogger" level="debug" additivity="false"> <AppenderRef ref="Console"/> <AppenderRef ref="RollingFileDebug"/> <AppenderRef ref="RollingFileInfo"/> <AppenderRef ref="RollingFileWarn"/> <AppenderRef ref="RollingFileError"/> <AppenderRef ref="RollingFileOperation"/> <AppenderRef ref="RollingFileApi"/> </AsyncLogger> <root level="trace"> <appender-ref ref="Console"/> <appender-ref ref="RollingFileDebug"/> <appender-ref ref="RollingFileInfo"/> <appender-ref ref="RollingFileWarn"/> <appender-ref ref="RollingFileError"/> <AppenderRef ref="RollingFileOperation"/> <AppenderRef ref="RollingFileApi"/> </root> </loggers>
3、实现将日志保存到Kafka
前面的配置已基本满足了我们对于日志系统的基础需求,在这里,我们可以考虑通过配置Log4j2的配置文件,来实现动态配置将日志文件记录到指定的文件或消息中间件。
Log4j2将日志消息发送到Kafka需要用到Kfaka的客户端jar包,所以,这里首先引入kafka-clients包:
<!-- log4j2记录到kafka需要的依赖 --> <kafka.clients.version>3.1.0</kafka.clients.version> <!-- log4j2 kafka appender --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.clients.version}</version> </dependency>
修改log4j2.xml配置将操作日志记录到Kafka,这里需要注意,Log4j2官网说明了这里必须加<Logger name="org.apache.kafka" level="INFO" />配置,否则会出现递归调用。
<Kafka name="KafkaOperationLog" topic="operation_log" ignoreExceptions="false"> <LevelRangeFilter minLevel="OPERATION" maxLevel="OPERATION" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="${LOG_PATTERN}"/> <Property name="bootstrap.servers">172.16.20.220:9092,172.16.20.221:9092,172.16.20.222:9092</Property> <Property name="max.block.ms">2000</Property> </Kafka> <Kafka name="KafkaApiLog" topic="api_log" ignoreExceptions="false"> <LevelRangeFilter minLevel="API" maxLevel="API" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="${LOG_PATTERN}"/> <Property name="bootstrap.servers">172.16.20.220:9092,172.16.20.221:9092,172.16.20.222:9092</Property> <Property name="max.block.ms">2000</Property> </Kafka> <!-- Logger节点用来单独指定日志的形式,比如要为指定包下的class指定不同的日志级别等 --> <!-- 然后定义loggers,只有定义了logger并引入的appender,appender才会生效 --> <loggers> <!--过滤掉spring和mybatis的一些无用的DEBUG信息--> <logger name="org.mybatis" level="info" additivity="false"> <AppenderRef ref="Console"/> </logger> <!--若是additivity设为false,则子Logger 只会在自己的appender里输出,而不会在父Logger 的appender里输出 --> <Logger name="org.springframework" level="info" additivity="false"> <AppenderRef ref="Console"/> </Logger> <!-- 避免递归记录日志 --> <Logger name="org.apache.kafka" level="INFO" /> <AsyncLogger name="AsyncLogger" level="debug" additivity="false"> <AppenderRef ref="Console"/> <AppenderRef ref="RollingFileDebug"/> <AppenderRef ref="RollingFileInfo"/> <AppenderRef ref="RollingFileWarn"/> <AppenderRef ref="RollingFileError"/> <AppenderRef ref="RollingFileOperation"/> <AppenderRef ref="RollingFileApi"/> <AppenderRef ref="KafkaOperationLog"/> <AppenderRef ref="KafkaApiLog"/> </AsyncLogger> <root level="trace"> <appender-ref ref="Console"/> <appender-ref ref="RollingFileDebug"/> <appender-ref ref="RollingFileInfo"/> <appender-ref ref="RollingFileWarn"/> <appender-ref ref="RollingFileError"/> <AppenderRef ref="RollingFileOperation"/> <AppenderRef ref="RollingFileApi"/> <AppenderRef ref="KafkaOperationLog"/> <AppenderRef ref="KafkaApiLog"/> </root> </loggers>
综上,修改后完整的log4j.xml如下,可根据配置自己选择不将操作日志记录到文件:
<?xml version="1.0" encoding="UTF-8"?> <!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL --> <configuration monitorInterval="5" packages="org.apache.skywalking.apm.toolkit.log.log4j.v2.x"> <!--变量配置--> <Properties> <!-- 格式化输出:%date表示日期,traceId表示微服务Skywalking追踪id,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %m:日志消息,%n是换行符--> <!-- %c 输出类详情 %M 输出方法名 %pid 输出pid %line 日志在哪一行被打印 --> <!-- %logger{80} 表示 Logger 名字最长80个字符 --> <!-- value="${LOCAL_IP_HOSTNAME} %date [%p] %C [%thread] pid:%pid line:%line %throwable %c{10} %m%n"/>--> <property name="CONSOLE_LOG_PATTERN" value="%d %highlight{%-5level [%traceId] pid:%pid-%line}{ERROR=Bright RED, WARN=Bright Yellow, INFO=Bright Green, DEBUG=Bright Cyan, TRACE=Bright White} %style{[%t]}{bright,magenta} %style{%c{1.}.%M(%L)}{cyan}: %msg%n"/> <property name="LOG_PATTERN" value="%d %highlight{%-5level [%traceId] pid:%pid-%line}{ERROR=Bright RED, WARN=Bright Yellow, INFO=Bright Green, DEBUG=Bright Cyan, TRACE=Bright White} %style{[%t]}{bright,magenta} %style{%c{1.}.%M(%L)}{cyan}: %msg%n"/> <!-- 读取application.yaml文件中设置的日志路径 logging.file.path--> <Property name="FILE_PATH">${spring:logging.file.path}</Property> <!-- <property name="FILE_PATH">D:\\log4j2_cloud</property> --> <property name="applicationName">${spring:spring.application.name}</property> <property name="FILE_STORE_MAX" value="50MB"/> <property name="FILE_WRITE_INTERVAL" value="1"/> <property name="LOG_MAX_HISTORY" value="60"/> </Properties> <appenders> <!-- 控制台输出 --> <console name="Console" target="SYSTEM_OUT"> <!-- 输出日志的格式 --> <PatternLayout pattern="${CONSOLE_LOG_PATTERN}"/> <!-- 控制台只输出level及其以上级别的信息(onMatch),其他的直接拒绝(onMismatch) --> <ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/> </console> <!-- 这个会打印出所有的info及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档 --> <RollingRandomAccessFile name="RollingFileInfo" fileName="${FILE_PATH}/info.log" filePattern="${FILE_PATH}/INFO-%d{yyyy-MM-dd}_%i.log.gz"> <!-- 控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)--> <ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="${LOG_PATTERN}"/> <Policies> <!--interval属性用来指定多久滚动一次,默认是1 hour--> <TimeBasedTriggeringPolicy interval="${FILE_WRITE_INTERVAL}"/> <SizeBasedTriggeringPolicy size="${FILE_STORE_MAX}"/> </Policies> <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖--> <DefaultRolloverStrategy max="${LOG_MAX_HISTORY}"/> </RollingRandomAccessFile> <!-- 这个会打印出所有的debug及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档--> <RollingRandomAccessFile name="RollingFileDebug" fileName="${FILE_PATH}/debug.log" filePattern="${FILE_PATH}/DEBUG-%d{yyyy-MM-dd}_%i.log.gz"> <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)--> <ThresholdFilter level="debug" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="${LOG_PATTERN}"/> <Policies> <!--interval属性用来指定多久滚动一次,默认是1 hour--> <TimeBasedTriggeringPolicy interval="${FILE_WRITE_INTERVAL}"/> <SizeBasedTriggeringPolicy size="${FILE_STORE_MAX}"/> </Policies> <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖--> <DefaultRolloverStrategy max="${LOG_MAX_HISTORY}"/> </RollingRandomAccessFile> <!-- 这个会打印出所有的warn及以上级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档--> <RollingRandomAccessFile name="RollingFileWarn" fileName="${FILE_PATH}/warn.log" filePattern="${FILE_PATH}/WARN-%d{yyyy-MM-dd}_%i.log.gz"> <!-- 控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)--> <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="${LOG_PATTERN}"/> <Policies> <!-- interval属性用来指定多久滚动一次,默认是1 hour --> <TimeBasedTriggeringPolicy interval="${FILE_WRITE_INTERVAL}"/> <SizeBasedTriggeringPolicy size="${FILE_STORE_MAX}"/> </Policies> <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖 --> <DefaultRolloverStrategy max="${LOG_MAX_HISTORY}"/> </RollingRandomAccessFile> <!-- 这个会打印出所有的error及以上级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档 --> <RollingRandomAccessFile name="RollingFileError" fileName="${FILE_PATH}/error.log" filePattern="${FILE_PATH}/ERROR-%d{yyyy-MM-dd}_%i.log.gz"> <!--只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)--> <ThresholdFilter level="error" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="${LOG_PATTERN}"/> <Policies> <!--interval属性用来指定多久滚动一次,默认是1 hour--> <TimeBasedTriggeringPolicy interval="${FILE_WRITE_INTERVAL}"/> <SizeBasedTriggeringPolicy size="${FILE_STORE_MAX}"/> </Policies> <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖--> <DefaultRolloverStrategy max="${LOG_MAX_HISTORY}"/> </RollingRandomAccessFile> <!-- 这个会打印出所有的operation级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档 --> <RollingRandomAccessFile name="RollingFileOperation" fileName="${FILE_PATH}/operation.log" filePattern="${FILE_PATH}/OPERATION-%d{yyyy-MM-dd}_%i.log.gz"> <!--只输出action level级别的信息(onMatch),其他的直接拒绝(onMismatch)--> <LevelRangeFilter minLevel="OPERATION" maxLevel="OPERATION" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="${LOG_PATTERN}"/> <Policies> <!--interval属性用来指定多久滚动一次,默认是1 hour--> <TimeBasedTriggeringPolicy interval="${FILE_WRITE_INTERVAL}"/> <SizeBasedTriggeringPolicy size="${FILE_STORE_MAX}"/> </Policies> <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖--> <DefaultRolloverStrategy max="${LOG_MAX_HISTORY}"/> </RollingRandomAccessFile> <!-- 这个会打印出所有的api级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档 --> <RollingRandomAccessFile name="RollingFileApi" fileName="${FILE_PATH}/api.log" filePattern="${FILE_PATH}/API-%d{yyyy-MM-dd}_%i.log.gz"> <!--只输出visit level级别的信息(onMatch),其他的直接拒绝(onMismatch)--> <LevelRangeFilter minLevel="API" maxLevel="API" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="${LOG_PATTERN}"/> <Policies> <!--interval属性用来指定多久滚动一次,默认是1 hour--> <TimeBasedTriggeringPolicy interval="${FILE_WRITE_INTERVAL}"/> <SizeBasedTriggeringPolicy size="${FILE_STORE_MAX}"/> </Policies> <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖--> <DefaultRolloverStrategy max="${LOG_MAX_HISTORY}"/> </RollingRandomAccessFile> <Kafka name="KafkaOperationLog" topic="operation_log" ignoreExceptions="false"> <LevelRangeFilter minLevel="OPERATION" maxLevel="OPERATION" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="${LOG_PATTERN}"/> <Property name="bootstrap.servers">172.16.20.220:9092,172.16.20.221:9092,172.16.20.222:9092</Property> <Property name="max.block.ms">2000</Property> </Kafka> <Kafka name="KafkaApiLog" topic="api_log" ignoreExceptions="false"> <LevelRangeFilter minLevel="API" maxLevel="API" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="${LOG_PATTERN}"/> <Property name="bootstrap.servers">172.16.20.220:9092,172.16.20.221:9092,172.16.20.222:9092</Property> <Property name="max.block.ms">2000</Property> </Kafka> </appenders> <!-- Logger节点用来单独指定日志的形式,比如要为指定包下的class指定不同的日志级别等 --> <!-- 然后定义loggers,只有定义了logger并引入的appender,appender才会生效 --> <loggers> <!--过滤掉spring和mybatis的一些无用的DEBUG信息--> <logger name="org.mybatis" level="info" additivity="false"> <AppenderRef ref="Console"/> </logger> <!--若是additivity设为false,则子Logger 只会在自己的appender里输出,而不会在父Logger 的appender里输出 --> <Logger name="org.springframework" level="info" additivity="false"> <AppenderRef ref="Console"/> </Logger> <!-- 避免递归记录日志 --> <Logger name="org.apache.kafka" level="INFO" /> <AsyncLogger name="AsyncLogger" level="debug" additivity="false"> <AppenderRef ref="Console"/> <AppenderRef ref="RollingFileDebug"/> <AppenderRef ref="RollingFileInfo"/> <AppenderRef ref="RollingFileWarn"/> <AppenderRef ref="RollingFileError"/> <AppenderRef ref="RollingFileOperation"/> <AppenderRef ref="RollingFileApi"/> <AppenderRef ref="KafkaOperationLog"/> <AppenderRef ref="KafkaApiLog"/> </AsyncLogger> <root level="trace"> <appender-ref ref="Console"/> <appender-ref ref="RollingFileDebug"/> <appender-ref ref="RollingFileInfo"/> <appender-ref ref="RollingFileWarn"/> <appender-ref ref="RollingFileError"/> <AppenderRef ref="RollingFileOperation"/> <AppenderRef ref="RollingFileApi"/> <AppenderRef ref="KafkaOperationLog"/> <AppenderRef ref="KafkaApiLog"/> </root> </loggers> </configuration>
以上配置完成之后,我们对日志记录进行测试,查看日志是否记录到异步文件和kafka中,在Kfaka服务器启动消费者服务,可以实时观察日志是否推送到Kafka:
操作日志
接口日志
4、由Gateway记录可配置的请求日志
在业务开发过程中,除了操作日志的需求,我们通常还会遇到接口日志的需求,系统需要对接口的请求做统计分析。网关负责把请求转发到各个微服务,在此处比较适合进行API日志收集。
我们必然面临着哪些服务需要收集API日志,需要收集哪些类型的API日志的问题,那么在设计的时候,我们需要考虑使API日志收集可灵活配置。基于简单配置的考虑,我们将这些配置放到Nacos配置中心,如果有更多详细定制化的需求可以设计实现系统配置界面,将配置放到Redis缓存。
因为请求中的RequestBody和ResponseBody都是只能读取一次的,所以这里需要在过滤器中对数据进行一下处理,尽管Gateway提供了缓存RequestBody的过滤器AdaptCachedBodyGlobalFilter,但是我们这里除了一些对请求的定制化需求外,有可能会用到ResponseBody,所以这里最好还是自定义过滤器。
有一款开源插件spring-cloud-gateway-plugin非常全面的实现Gateway收集请求日志的过滤器,这里我们直接引用其实现,因为此款插件除了日志记录还有其他不需要的功能,且插件依赖SpringCloud版本,所以,这里只取其日志记录的功能,并根据我们的需求进行部分调整。
1、在我们的配置文件中增加如下配置项:
- 日志插件开关
- 记录请求参数开关
- 记录返回参数开关
- 需要记录API日志的微服务ID列表
- 需要记录API日志的URL列表
spring: cloud: gateway: plugin: config: # 是否开启Gateway日志插件 enable: true # requestLog==true && responseLog==false时,只记录请求参数日志;responseLog==true时,记录请求参数和返回参数。 # 记录入参 requestLog==false时,不记录日志 requestLog: true # 生产环境,尽量只记录入参,因为返回参数数据太大,且大多数情况是无意义的 # 记录出参 responseLog: true # all: 所有日志 configure:serviceId和pathList交集 serviceId: 只记录serviceId配置列表 pathList:只记录pathList配置列表 logType: all serviceIdList: - "gitegg-oauth" - "gitegg-service-system" pathList: - "/gitegg-oauth/oauth/token" - "/gitegg-oauth/oauth/user/info"
2、GatewayPluginConfig配置类,可以根据配置项,选择启用初始化哪些过滤器,根据spring-cloud-gateway-plugin GatewayPluginConfig.java修改。
/** * Quoted from @see https://github.com/chenggangpro/spring-cloud-gateway-plugin * * Gateway Plugin Config * @author chenggang * @date 2019/01/29 */ @Slf4j @Configuration public class GatewayPluginConfig { @Bean @ConditionalOnMissingBean(GatewayPluginProperties.class) @ConfigurationProperties(GatewayPluginProperties.GATEWAY_PLUGIN_PROPERTIES_PREFIX) public GatewayPluginProperties gatewayPluginProperties(){ return new GatewayPluginProperties(); } @Bean @ConditionalOnBean(GatewayPluginProperties.class) @ConditionalOnMissingBean(GatewayRequestContextFilter.class) @ConditionalOnProperty(prefix = GatewayPluginProperties.GATEWAY_PLUGIN_PROPERTIES_PREFIX, value = { "enable", "requestLog" },havingValue = "true") public GatewayRequestContextFilter gatewayContextFilter(@Autowired GatewayPluginProperties gatewayPluginProperties , @Autowired(required = false) ContextExtraDataGenerator contextExtraDataGenerator){ GatewayRequestContextFilter gatewayContextFilter = new GatewayRequestContextFilter(gatewayPluginProperties, contextExtraDataGenerator); log.debug("Load GatewayContextFilter Config Bean"); return gatewayContextFilter; } @Bean @ConditionalOnMissingBean(GatewayResponseContextFilter.class) @ConditionalOnProperty(prefix = GatewayPluginProperties.GATEWAY_PLUGIN_PROPERTIES_PREFIX, value = { "enable", "responseLog" }, havingValue = "true") public GatewayResponseContextFilter responseLogFilter(){ GatewayResponseContextFilter responseLogFilter = new GatewayResponseContextFilter(); log.debug("Load Response Log Filter Config Bean"); return responseLogFilter; } @Bean @ConditionalOnBean(GatewayPluginProperties.class) @ConditionalOnMissingBean(RemoveGatewayContextFilter.class) @ConditionalOnProperty(prefix = GatewayPluginProperties.GATEWAY_PLUGIN_PROPERTIES_PREFIX, value = { "enable" }, havingValue = "true") public RemoveGatewayContextFilter removeGatewayContextFilter(){ RemoveGatewayContextFilter gatewayContextFilter = new RemoveGatewayContextFilter(); log.debug("Load RemoveGatewayContextFilter Config Bean"); return gatewayContextFilter; } @Bean @ConditionalOnMissingBean(RequestLogFilter.class) @ConditionalOnProperty(prefix = GatewayPluginProperties.GATEWAY_PLUGIN_PROPERTIES_PREFIX, value = { "enable" },havingValue = "true") public RequestLogFilter requestLogFilter(@Autowired GatewayPluginProperties gatewayPluginProperties){ RequestLogFilter requestLogFilter = new RequestLogFilter(gatewayPluginProperties); log.debug("Load Request Log Filter Config Bean"); return requestLogFilter; } }
3、GatewayRequestContextFilter处理请求参数的过滤器,根据spring-cloud-gateway-plugin GatewayContextFilter.java修改。
/** * Quoted from @see https://github.com/chenggangpro/spring-cloud-gateway-plugin * * Gateway Context Filter * @author chenggang * @date 2019/01/29 */ @Slf4j @AllArgsConstructor public class GatewayRequestContextFilter implements GlobalFilter, Ordered { private GatewayPluginProperties gatewayPluginProperties; private ContextExtraDataGenerator contextExtraDataGenerator; private static final AntPathMatcher ANT_PATH_MATCHER = new AntPathMatcher(); /** * default HttpMessageReader */ private static final List<HttpMessageReader<?>> MESSAGE_READERS = HandlerStrategies.withDefaults().messageReaders(); @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); GatewayContext gatewayContext = new GatewayContext(); gatewayContext.setReadRequestData(shouldReadRequestData(exchange)); gatewayContext.setReadResponseData(gatewayPluginProperties.getResponseLog()); HttpHeaders headers = request.getHeaders(); gatewayContext.setRequestHeaders(headers); if(Objects.nonNull(contextExtraDataGenerator)){ GatewayContextExtraData gatewayContextExtraData = contextExtraDataGenerator.generateContextExtraData(exchange); gatewayContext.setGatewayContextExtraData(gatewayContextExtraData); } if(!gatewayContext.getReadRequestData()){ exchange.getAttributes().put(GatewayContext.CACHE_GATEWAY_CONTEXT, gatewayContext); log.debug("[GatewayContext]Properties Set To Not Read Request Data"); return chain.filter(exchange); } gatewayContext.getAllRequestData().addAll(request.getQueryParams()); /* * save gateway context into exchange */ exchange.getAttributes().put(GatewayContext.CACHE_GATEWAY_CONTEXT, gatewayContext); MediaType contentType = headers.getContentType(); if(headers.getContentLength()>0){ if(MediaType.APPLICATION_JSON.equals(contentType) || MediaType.APPLICATION_JSON_UTF8.equals(contentType)){ return readBody(exchange, chain,gatewayContext); } if(MediaType.APPLICATION_FORM_URLENCODED.equals(contentType)){ return readFormData(exchange, chain,gatewayContext); } } log.debug("[GatewayContext]ContentType:{},Gateway context is set with {}",contentType, gatewayContext); return chain.filter(exchange); } @Override public int getOrder() { return FilterOrderEnum.GATEWAY_CONTEXT_FILTER.getOrder(); } /** * check should read request data whether or not * @return boolean */ private boolean shouldReadRequestData(ServerWebExchange exchange){ if(gatewayPluginProperties.getRequestLog() && GatewayLogTypeEnum.ALL.getType().equals(gatewayPluginProperties.getLogType())){ log.debug("[GatewayContext]Properties Set Read All Request Data"); return true; } boolean serviceFlag = false; boolean pathFlag = false; boolean lbFlag = false; List<String> readRequestDataServiceIdList = gatewayPluginProperties.getServiceIdList(); List<String> readRequestDataPathList = gatewayPluginProperties.getPathList(); if(!CollectionUtils.isEmpty(readRequestDataPathList) && (GatewayLogTypeEnum.PATH.getType().equals(gatewayPluginProperties.getLogType()) || GatewayLogTypeEnum.CONFIGURE.getType().equals(gatewayPluginProperties.getLogType()))){ String requestPath = exchange.getRequest().getPath().pathWithinApplication().value(); for(String path : readRequestDataPathList){ if(ANT_PATH_MATCHER.match(path,requestPath)){ log.debug("[GatewayContext]Properties Set Read Specific Request Data With Request Path:{},Math Pattern:{}", requestPath, path); pathFlag = true; break; } } } Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); URI routeUri = route.getUri(); if(!"lb".equalsIgnoreCase(routeUri.getScheme())){ lbFlag = true; } String routeServiceId = routeUri.getHost().toLowerCase(); if(!CollectionUtils.isEmpty(readRequestDataServiceIdList) && (GatewayLogTypeEnum.SERVICE.getType().equals(gatewayPluginProperties.getLogType()) || GatewayLogTypeEnum.CONFIGURE.getType().equals(gatewayPluginProperties.getLogType()))){ if(readRequestDataServiceIdList.contains(routeServiceId)){ log.debug("[GatewayContext]Properties Set Read Specific Request Data With ServiceId:{}",routeServiceId); serviceFlag = true; } } if (GatewayLogTypeEnum.CONFIGURE.getType().equals(gatewayPluginProperties.getLogType()) && serviceFlag && pathFlag && !lbFlag) { return true; } else if (GatewayLogTypeEnum.SERVICE.getType().equals(gatewayPluginProperties.getLogType()) && serviceFlag && !lbFlag) { return true; } else if (GatewayLogTypeEnum.PATH.getType().equals(gatewayPluginProperties.getLogType()) && pathFlag) { return true; } return false; } /** * ReadFormData * @param exchange * @param chain * @return */ private Mono<Void> readFormData(ServerWebExchange exchange, GatewayFilterChain chain, GatewayContext gatewayContext){ HttpHeaders headers = exchange.getRequest().getHeaders(); return exchange.getFormData() .doOnNext(multiValueMap -> { gatewayContext.setFormData(multiValueMap); gatewayContext.getAllRequestData().addAll(multiValueMap); log.debug("[GatewayContext]Read FormData Success"); }) .then(Mono.defer(() -> { Charset charset = headers.getContentType().getCharset(); charset = charset == null? StandardCharsets.UTF_8:charset; String charsetName = charset.name(); MultiValueMap<String, String> formData = gatewayContext.getFormData(); /* * formData is empty just return */ if(null == formData || formData.isEmpty()){ return chain.filter(exchange); } StringBuilder formDataBodyBuilder = new StringBuilder(); String entryKey; List<String> entryValue; try { /* * repackage form data */ for (Map.Entry<String, List<String>> entry : formData.entrySet()) { entryKey = entry.getKey(); entryValue = entry.getValue(); if (entryValue.size() > 1) { for(String value : entryValue){ formDataBodyBuilder.append(entryKey).append("=").append(URLEncoder.encode(value, charsetName)).append("&"); } } else { formDataBodyBuilder.append(entryKey).append("=").append(URLEncoder.encode(entryValue.get(0), charsetName)).append("&"); } } }catch (UnsupportedEncodingException e){} /* * substring with the last char '&' */ String formDataBodyString = ""; if(formDataBodyBuilder.length()>0){ formDataBodyString = formDataBodyBuilder.substring(0, formDataBodyBuilder.length() - 1); } /* * get data bytes */ byte[] bodyBytes = formDataBodyString.getBytes(charset); int contentLength = bodyBytes.length; HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.putAll(exchange.getRequest().getHeaders()); httpHeaders.remove(HttpHeaders.CONTENT_LENGTH); /* * in case of content-length not matched */ httpHeaders.setContentLength(contentLength); /* * use BodyInserter to InsertFormData Body */ BodyInserter<String, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromObject(formDataBodyString); CachedBodyOutputMessage cachedBodyOutputMessage = new CachedBodyOutputMessage(exchange, httpHeaders); log.debug("[GatewayContext]Rewrite Form Data :{}",formDataBodyString); return bodyInserter.insert(cachedBodyOutputMessage, new BodyInserterContext()) .then(Mono.defer(() -> { ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator( exchange.getRequest()) { @Override public HttpHeaders getHeaders() { return httpHeaders; } @Override public Flux<DataBuffer> getBody() { return cachedBodyOutputMessage.getBody(); } }; return chain.filter(exchange.mutate().request(decorator).build()); })); })); } /** * ReadJsonBody * @param exchange * @param chain * @return */ private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain, GatewayContext gatewayContext){ return DataBufferUtils.join(exchange.getRequest().getBody()) .flatMap(dataBuffer -> { /* * read the body Flux<DataBuffer>, and release the buffer * //TODO when SpringCloudGateway Version Release To G.SR2,this can be update with the new version's feature * see PR https://github.com/spring-cloud/spring-cloud-gateway/pull/1095 */ byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); Flux<DataBuffer> cachedFlux = Flux.defer(() -> { DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes); DataBufferUtils.retain(buffer); return Mono.just(buffer); }); /* * repackage ServerHttpRequest */ ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) { @Override public Flux<DataBuffer> getBody() { return cachedFlux; } }; ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build(); return ServerRequest.create(mutatedExchange, MESSAGE_READERS) .bodyToMono(String.class) .doOnNext(objectValue -> { gatewayContext.setRequestBody(objectValue); log.debug("[GatewayContext]Read JsonBody Success"); }).then(chain.filter(mutatedExchange)); }); } }
4、GatewayResponseContextFilter处理返回参数的过滤器,根据spring-cloud-gateway-plugin ResponseLogFilter.java修改。
/** * Quoted from @see https://github.com/chenggangpro/spring-cloud-gateway-plugin * * * @author: chenggang * @createTime: 2019-04-11 * @version: v1.2.0 */ @Slf4j public class GatewayResponseContextFilter implements GlobalFilter, Ordered { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { GatewayContext gatewayContext = exchange.getAttribute(GatewayContext.CACHE_GATEWAY_CONTEXT); if(!gatewayContext.getReadResponseData()){ log.debug("[ResponseLogFilter]Properties Set Not To Read Response Data"); return chain.filter(exchange); } ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(exchange.getResponse()) { @Override public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { return DataBufferUtils.join(Flux.from(body)) .flatMap(dataBuffer -> { byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); Flux<DataBuffer> cachedFlux = Flux.defer(() -> { DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes); DataBufferUtils.retain(buffer); return Mono.just(buffer); }); BodyInserter<Flux<DataBuffer>, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromDataBuffers(cachedFlux); CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, exchange.getResponse().getHeaders()); DefaultClientResponse clientResponse = new DefaultClientResponse(new ResponseAdapter(cachedFlux, exchange.getResponse().getHeaders()), ExchangeStrategies.withDefaults()); Optional<MediaType> optionalMediaType = clientResponse.headers().contentType(); if(!optionalMediaType.isPresent()){ log.debug("[ResponseLogFilter]Response ContentType Is Not Exist"); return Mono.defer(()-> bodyInserter.insert(outputMessage, new BodyInserterContext()) .then(Mono.defer(() -> { Flux<DataBuffer> messageBody = cachedFlux; HttpHeaders headers = getDelegate().getHeaders(); if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)) { messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount())); } return getDelegate().writeWith(messageBody); }))); } MediaType contentType = optionalMediaType.get(); if(!contentType.equals(MediaType.APPLICATION_JSON) && !contentType.equals(MediaType.APPLICATION_JSON_UTF8)){ log.debug("[ResponseLogFilter]Response ContentType Is Not APPLICATION_JSON Or APPLICATION_JSON_UTF8"); return Mono.defer(()-> bodyInserter.insert(outputMessage, new BodyInserterContext()) .then(Mono.defer(() -> { Flux<DataBuffer> messageBody = cachedFlux; HttpHeaders headers = getDelegate().getHeaders(); if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)) { messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount())); } return getDelegate().writeWith(messageBody); }))); } return clientResponse.bodyToMono(Object.class) .doOnNext(originalBody -> { GatewayContext gatewayContext = exchange.getAttribute(GatewayContext.CACHE_GATEWAY_CONTEXT); gatewayContext.setResponseBody(originalBody); log.debug("[ResponseLogFilter]Read Response Data To Gateway Context Success"); }) .then(Mono.defer(()-> bodyInserter.insert(outputMessage, new BodyInserterContext()) .then(Mono.defer(() -> { Flux<DataBuffer> messageBody = cachedFlux; HttpHeaders headers = getDelegate().getHeaders(); if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)) { messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount())); } return getDelegate().writeWith(messageBody); })))); }); } @Override public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) { return writeWith(Flux.from(body) .flatMapSequential(p -> p)); } }; return chain.filter(exchange.mutate().response(responseDecorator).build()); } @Override public int getOrder() { return FilterOrderEnum.RESPONSE_DATA_FILTER.getOrder(); } public class ResponseAdapter implements ClientHttpResponse { private final Flux<DataBuffer> flux; private final HttpHeaders headers; public ResponseAdapter(Publisher<? extends DataBuffer> body, HttpHeaders headers) { this.headers = headers; if (body instanceof Flux) { flux = (Flux) body; } else { flux = ((Mono)body).flux(); } } @Override public Flux<DataBuffer> getBody() { return flux; } @Override public HttpHeaders getHeaders() { return headers; } @Override public HttpStatus getStatusCode() { return null; } @Override public int getRawStatusCode() { return 0; } @Override public MultiValueMap<String, ResponseCookie> getCookies() { return null; } } }
5、RemoveGatewayContextFilter清空请求参数的过滤器,根据spring-cloud-gateway-plugin RemoveGatewayContextFilter.java修改。
/** * Quoted from @see https://github.com/chenggangpro/spring-cloud-gateway-plugin * * remove gatewayContext Attribute * @author chenggang * @date 2019/06/19 */ @Slf4j public class RemoveGatewayContextFilter implements GlobalFilter, Ordered { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return chain.filter(exchange).doFinally(s -> exchange.getAttributes().remove(GatewayContext.CACHE_GATEWAY_CONTEXT)); } @Override public int getOrder() { return HIGHEST_PRECEDENCE; } }
6、RequestLogFilter进行日志记录的过滤器,根据spring-cloud-gateway-plugin RequestLogFilter.java修改。
/** * Quoted from @see https://github.com/chenggangpro/spring-cloud-gateway-plugin * * Filter To Log Request And Response(exclude response body) * @author chenggang * @date 2019/01/29 */ @Log4j2 @AllArgsConstructor public class RequestLogFilter implements GlobalFilter, Ordered { private static final String START_TIME = "startTime"; private static final String HTTP_SCHEME = "http"; private static final String HTTPS_SCHEME = "https"; private GatewayPluginProperties gatewayPluginProperties; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); URI requestURI = request.getURI(); String scheme = requestURI.getScheme(); GatewayContext gatewayContext = exchange.getAttribute(GatewayContext.CACHE_GATEWAY_CONTEXT); /* * not http or https scheme */ if ((!HTTP_SCHEME.equalsIgnoreCase(scheme) && !HTTPS_SCHEME.equals(scheme)) || !gatewayContext.getReadRequestData()){ return chain.filter(exchange); } long startTime = System.currentTimeMillis(); exchange.getAttributes().put(START_TIME, startTime); // 当返回参数为true时,记录请求参数和返回参数 if (gatewayPluginProperties.getEnable()) { return chain.filter(exchange).then(Mono.fromRunnable(() -> logApiRequest(exchange))); } else { return chain.filter(exchange); } } @Override public int getOrder() { return FilterOrderEnum.REQUEST_LOG_FILTER.getOrder(); } /** * log api request * @param exchange */ private Mono<Void> logApiRequest(ServerWebExchange exchange){ ServerHttpRequest request = exchange.getRequest(); URI requestURI = request.getURI(); String scheme = requestURI.getScheme(); Long startTime = exchange.getAttribute(START_TIME); Long endTime = System.currentTimeMillis(); Long duration = ( endTime - startTime); ServerHttpResponse response = exchange.getResponse(); GatewayApiLog gatewayApiLog = new GatewayApiLog(); gatewayApiLog.setClientHost(requestURI.getHost()); gatewayApiLog.setClientIp(IpUtils.getIP(request)); gatewayApiLog.setStartTime(startTime); gatewayApiLog.setEndTime(startTime); gatewayApiLog.setDuration(duration); gatewayApiLog.setMethod(request.getMethodValue()); gatewayApiLog.setScheme(scheme); gatewayApiLog.setRequestUri(requestURI.getPath()); gatewayApiLog.setResponseCode(String.valueOf(response.getRawStatusCode())); GatewayContext gatewayContext = exchange.getAttribute(GatewayContext.CACHE_GATEWAY_CONTEXT); // 记录参数请求日志 if (gatewayPluginProperties.getRequestLog()) { MultiValueMap<String, String> queryParams = request.getQueryParams(); if(!queryParams.isEmpty()){ queryParams.forEach((key,value)-> log.debug("[RequestLogFilter](Request)Query Param :Key->({}),Value->({})",key,value)); gatewayApiLog.setQueryParams(JsonUtils.mapToJson(queryParams)); } HttpHeaders headers = request.getHeaders(); MediaType contentType = headers.getContentType(); long length = headers.getContentLength(); log.debug("[RequestLogFilter](Request)ContentType:{},Content Length:{}",contentType,length); if(length>0 && null != contentType && (contentType.includes(MediaType.APPLICATION_JSON) ||contentType.includes(MediaType.APPLICATION_JSON_UTF8))){ log.debug("[RequestLogFilter](Request)JsonBody:{}",gatewayContext.getRequestBody()); gatewayApiLog.setRequestBody(gatewayContext.getRequestBody()); } if(length>0 && null != contentType && contentType.includes(MediaType.APPLICATION_FORM_URLENCODED)){ log.debug("[RequestLogFilter](Request)FormData:{}",gatewayContext.getFormData()); gatewayApiLog.setRequestBody(JsonUtils.mapToJson(gatewayContext.getFormData())); } } // 记录参数返回日志 if (gatewayPluginProperties.getResponseLog()) { log.debug("[RequestLogFilter](Response)HttpStatus:{}",response.getStatusCode()); HttpHeaders headers = response.getHeaders(); headers.forEach((key,value)-> log.debug("[RequestLogFilter]Headers:Key->{},Value->{}",key,value)); MediaType contentType = headers.getContentType(); long length = headers.getContentLength(); log.info("[RequestLogFilter](Response)ContentType:{},Content Length:{}", contentType, length); log.debug("[RequestLogFilter](Response)Response Body:{}", gatewayContext.getResponseBody()); try { gatewayApiLog.setResponseBody(JsonUtils.objToJson(gatewayContext.getResponseBody())); } catch (Exception e) { log.error("记录API日志返回数据转换JSON发生错误:{}", e); } log.debug("[RequestLogFilter](Response)Original Path:{},Cost:{} ms", exchange.getRequest().getURI().getPath(), duration); } Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); URI routeUri = route.getUri(); String routeServiceId = routeUri.getHost().toLowerCase(); // API日志记录级别 try { log.log(LogLevelConstant.API_LEVEL,"{\"serviceId\":{}, \"data\":{}}", routeServiceId, JsonUtils.objToJson(gatewayApiLog)); } catch (Exception e) { log.error("记录API日志数据发生错误:{}", e); } return Mono.empty(); } }
7、启动服务,对数据进行测试,我们可以在控制台启动Kfaka消费者,并查看是否有api_log主题的消息:
监听api_log消息
监听api_log消息
8、关于日志数据的存储和处理
将日志消息保存到文件或者Kafka之后,就需要考虑如何处理这些数据,在有规模的微服务集群模式下,是尽量不提倡或者说禁止保存到MySQL这类关系数据库的,如果实在有需要的话,可以通过上篇介绍的,使用Spring Cloud Stream消费日志消息,并保存到指定数据库。下一篇讲如何搭建ELK日志分析系统,处理分析提取这些数据量庞大的日志数据。