SpringCloud Gateway 网关的请求体body的读取和修改
getway需要多次对body 进行操作,需要对body 进行缓存
缓存body 动态多次获取
新建顶层filter,对body 进行缓存
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
/**
* @author: zhoumo
* @descriptions:
*/
@Component
@Slf4j
public class RequestParamGlobalFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
/**
* save request path and serviceId into gateway context
*/
ServerHttpRequest request = exchange.getRequest();
HttpHeaders headers = request.getHeaders();
// 处理参数
MediaType contentType = headers.getContentType();
long contentLength = headers.getContentLength();
if (contentLength > 0) {
return readBody(exchange, chain);
}
return chain.filter(exchange);
}
/**
* default HttpMessageReader
*/
private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
/**
* ReadJsonBody
*
* @param exchange
* @param chain
* @return
*/
private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain) {
/**
* join the body
*/
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux.defer(() -> {
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
DataBufferUtils.retain(buffer);
return Mono.just(buffer);
});
/**
* repackage ServerHttpRequest
*/
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
/**
* mutate exchage with new ServerHttpRequest
*/
ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();
/**
* read body string with default messageReaders
*/
return ServerRequest.create(mutatedExchange, messageReaders).bodyToMono(String.class)
.doOnNext(objectValue -> {
log.debug("[GatewayContext]Read JsonBody:{}", objectValue);
}).then(chain.filter(mutatedExchange));
});
}
@Override
public int getOrder() {
return HIGHEST_PRECEDENCE;
}
}
在子节点层获取body
AtomicReference<String> requestBody = new AtomicReference<>(""); RecorderServerHttpRequestDecorator requestDecorator = new RecorderServerHttpRequestDecorator(request); Flux<DataBuffer> body = requestDecorator.getBody(); body.subscribe(buffer -> { CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer()); requestBody.set(charBuffer.toString()); }); String body= requestBody.get();
重写获取body方法
public class RecorderServerHttpRequestDecorator extends ServerHttpRequestDecorator { private final List<DataBuffer> dataBuffers = new ArrayList<>(); public RecorderServerHttpRequestDecorator(ServerHttpRequest delegate) { super(delegate); super.getBody().map(dataBuffer -> { dataBuffers.add(dataBuffer); return dataBuffer; }).subscribe(); } @Override public Flux<DataBuffer> getBody() { return copy(); } private Flux<DataBuffer> copy() { return Flux.fromIterable(dataBuffers) .map(buf -> buf.factory().wrap(buf.asByteBuffer())); } }
对body 进行修改重新封装
String str=""+encodedDecryptedParam; DataBuffer bodyDataBuffer = stringBuffer(str); Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer); MediaType contentType = request.getHeaders().getContentType(); ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator( exchange.getRequest()) { @Override public HttpHeaders getHeaders() { HttpHeaders httpHeaders = new HttpHeaders(); int length = str.getBytes().length; httpHeaders.putAll(super.getHeaders()); httpHeaders.remove(HttpHeaders.CONTENT_TYPE); httpHeaders.remove(HttpHeaders.CONTENT_LENGTH); httpHeaders.setContentLength(length); httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType.toString()); // 设置CONTENT_TYPE return httpHeaders; } @Override public Flux<DataBuffer> getBody() { return bodyFlux; } }; return chain.filter(exchange.mutate().request(mutatedRequest).build());