SpringCloud Gateway 网关的请求体body的读取和修改

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
应用实时监控服务-应用监控,每月50GB免费额度
应用实时监控服务-用户体验监控,每月100OCU免费额度
简介: SpringCloud Gateway 框架中,为了处理请求体body,实现多次读取与修改,创建了一个名为`RequestParamGlobalFilter`的全局过滤器。这个过滤器使用`@Component`和`@Slf4j`注解,实现了`GlobalFilter`和`Ordered`接口,设置最高优先级以首先读取body。它通过缓存请求体并创建装饰过的`ServerHttpRequest`来实现body的动态获取。

SpringCloud Gateway 网关的请求体body的读取和修改

getway需要多次对body 进行操作,需要对body 进行缓存

缓存body 动态多次获取

新建顶层filter,对body 进行缓存

 

import lombok.extern.slf4j.Slf4j;

import org.springframework.cloud.gateway.filter.GatewayFilterChain;

import org.springframework.cloud.gateway.filter.GlobalFilter;

import org.springframework.core.Ordered;

import org.springframework.core.io.buffer.DataBuffer;

import org.springframework.core.io.buffer.DataBufferUtils;

import org.springframework.core.io.buffer.DefaultDataBuffer;

import org.springframework.core.io.buffer.DefaultDataBufferFactory;

import org.springframework.http.HttpHeaders;

import org.springframework.http.MediaType;

import org.springframework.http.codec.HttpMessageReader;

import org.springframework.http.server.reactive.ServerHttpRequest;

import org.springframework.http.server.reactive.ServerHttpRequestDecorator;

import org.springframework.stereotype.Component;

import org.springframework.web.reactive.function.server.HandlerStrategies;

import org.springframework.web.reactive.function.server.ServerRequest;

import org.springframework.web.server.ServerWebExchange;

import reactor.core.publisher.Flux;

import reactor.core.publisher.Mono;

import java.util.List;

/**

* @author: zhoumo

* @descriptions:

*/

@Component

@Slf4j

public class RequestParamGlobalFilter implements GlobalFilter, Ordered {

   @Override

   public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

       /**

        * save request path and serviceId into gateway context

        */

       ServerHttpRequest request = exchange.getRequest();

       HttpHeaders headers = request.getHeaders();

       // 处理参数

       MediaType contentType = headers.getContentType();

       long contentLength = headers.getContentLength();

       if (contentLength > 0) {

                 return readBody(exchange, chain);

       }

       return chain.filter(exchange);

   }

   /**

    * default HttpMessageReader

    */

   private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();

   /**

    * ReadJsonBody

    *

    * @param exchange

    * @param chain

    * @return

    */

   private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain) {

       /**

        * join the body

        */

       return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

           byte[] bytes = new byte[dataBuffer.readableByteCount()];

           dataBuffer.read(bytes);

           DataBufferUtils.release(dataBuffer);

           Flux<DataBuffer> cachedFlux = Flux.defer(() -> {

               DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);

               DataBufferUtils.retain(buffer);

               return Mono.just(buffer);

           });

           /**

            * repackage ServerHttpRequest

            */

           ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {

               @Override

               public Flux<DataBuffer> getBody() {

                   return cachedFlux;

               }

           };

           /**

            * mutate exchage with new ServerHttpRequest

            */

           ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();

           /**

            * read body string with default messageReaders

            */

           return ServerRequest.create(mutatedExchange, messageReaders).bodyToMono(String.class)

                   .doOnNext(objectValue -> {

                       log.debug("[GatewayContext]Read JsonBody:{}", objectValue);

                   }).then(chain.filter(mutatedExchange));

       });

   }

   @Override

   public int getOrder() {

       return HIGHEST_PRECEDENCE;

   }

}

在子节点层获取body




AtomicReference<String> requestBody = new AtomicReference<>("");                RecorderServerHttpRequestDecorator requestDecorator = new RecorderServerHttpRequestDecorator(request);                Flux<DataBuffer> body = requestDecorator.getBody();                body.subscribe(buffer -> {                    CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());                    requestBody.set(charBuffer.toString());                });                String body= requestBody.get();

重写获取body方法



  public class RecorderServerHttpRequestDecorator  extends ServerHttpRequestDecorator {        private final List<DataBuffer> dataBuffers = new ArrayList<>();        public RecorderServerHttpRequestDecorator(ServerHttpRequest delegate) {            super(delegate);            super.getBody().map(dataBuffer -> {                dataBuffers.add(dataBuffer);                return dataBuffer;            }).subscribe();        }        @Override        public Flux<DataBuffer> getBody() {            return copy();        }        private Flux<DataBuffer> copy() {            return Flux.fromIterable(dataBuffers)                    .map(buf -> buf.factory().wrap(buf.asByteBuffer()));        }    }


对body 进行修改重新封装



               String str=""+encodedDecryptedParam;                DataBuffer bodyDataBuffer = stringBuffer(str);                Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer);                MediaType contentType = request.getHeaders().getContentType();                ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(                        exchange.getRequest()) {                    @Override                    public HttpHeaders getHeaders() {                        HttpHeaders httpHeaders = new HttpHeaders();                        int length = str.getBytes().length;                        httpHeaders.putAll(super.getHeaders());                        httpHeaders.remove(HttpHeaders.CONTENT_TYPE);                        httpHeaders.remove(HttpHeaders.CONTENT_LENGTH);                        httpHeaders.setContentLength(length);                        httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType.toString());                        // 设置CONTENT_TYPE                        return httpHeaders;                    }                    @Override                    public Flux<DataBuffer> getBody() {                        return bodyFlux;                    }                };                return chain.filter(exchange.mutate().request(mutatedRequest).build());



一定必须加上 public HttpHeaders getHeaders()对header 重新封装,否则接口层会卡死,request 无限大

目录
相关文章
|
12天前
|
负载均衡 Java 应用服务中间件
Gateway服务网关
Gateway服务网关
25 1
Gateway服务网关
|
1月前
|
XML Java 数据格式
如何使用 Spring Cloud 实现网关
如何使用 Spring Cloud 实现网关
31 3
|
2月前
|
负载均衡 Java Nacos
SpringCloud基础2——Nacos配置、Feign、Gateway
nacos配置管理、Feign远程调用、Gateway服务网关
SpringCloud基础2——Nacos配置、Feign、Gateway
|
2月前
|
Java 开发者 Spring
Spring Cloud Gateway 中,过滤器的分类有哪些?
Spring Cloud Gateway 中,过滤器的分类有哪些?
48 3
|
2月前
|
负载均衡 Java 网络架构
实现微服务网关:Zuul与Spring Cloud Gateway的比较分析
实现微服务网关:Zuul与Spring Cloud Gateway的比较分析
102 5
|
1月前
|
负载均衡 Java API
【Spring Cloud生态】Spring Cloud Gateway基本配置
【Spring Cloud生态】Spring Cloud Gateway基本配置
37 0
|
2月前
|
安全 Java 开发者
强大!Spring Cloud Gateway新特性及高级开发技巧
在微服务架构日益盛行的今天,网关作为微服务架构中的关键组件,承担着路由、安全、监控、限流等多重职责。Spring Cloud Gateway作为新一代的微服务网关,凭借其基于Spring Framework 5、Project Reactor和Spring Boot 2.0的强大技术栈,正逐步成为业界的主流选择。本文将深入探讨Spring Cloud Gateway的新特性及高级开发技巧,助力开发者更好地掌握这一强大的网关工具。
222 6
|
3月前
|
存储 Kubernetes API
【APIM】Azure API Management Self-Host Gateway是否可以把请求的日志发送到Application Insights呢?让它和使用Azure上托管的 Gateway一样呢?
【APIM】Azure API Management Self-Host Gateway是否可以把请求的日志发送到Application Insights呢?让它和使用Azure上托管的 Gateway一样呢?
|
3月前
|
安全 API
【Azure API 管理】APIM Self-Host Gateway 自建本地环境中的网关数量超过10个且它们的出口IP为同一个时出现的429错误
【Azure API 管理】APIM Self-Host Gateway 自建本地环境中的网关数量超过10个且它们的出口IP为同一个时出现的429错误
|
3月前
|
存储 容器
【Azure 事件中心】为应用程序网关(Application Gateway with WAF) 配置诊断日志,发送到事件中心
【Azure 事件中心】为应用程序网关(Application Gateway with WAF) 配置诊断日志,发送到事件中心