Spring Cloud升级之路 - Hoxton - 10. 网关重试Body丢失的问题

简介: Spring Cloud升级之路 - Hoxton - 10. 网关重试Body丢失的问题

带 Body 的重试 Body 丢失


之前我们的配置里面,只对 Get 请求针对 500 响应码重试,但是针对像 Post 这样的请求,只对那种根本还没到发送 Body 的阶段的异常(例如连接异常)这种重试,那么如果我们要对带 Body 的 Post 进行重试呢?或者就是用 Postman 构建一个带 Body 的 Get 请求,重试是否正常呢?


我们启动之前第6节的 EurekaServer,修改/test-exception-thrown接口,增加 RequestBody 参数:


@RequestMapping(value = "/test-exception-thrown", method = {RequestMethod.GET, RequestMethod.POST, RequestMethod.PUT, RequestMethod.DELETE})
public String testExceptionThrown(HttpServletRequest httpServletRequest, @RequestBody Map<String, String> body) {
    log.info("testExceptionThrow called {}, {}", httpServletRequest.getMethod(), body);
    if (shouldThrowException) {
        throw new IllegalStateException();
    }
    return zone;
}


启动zone1-service-provider-instance1zone1-service-provider-instance2,其中,zone1-service-provider-instance1是接口访问会抛出异常的那个实例。启动网关,使用 Postman 调用接口,发现出现重试,请求先发送到了zone1-service-provider-instance1,之后重试到了zone1-service-provider-instance2,但是zone1-service-provider-instance2返回 400 错误,也就是没有收到 RequestBody,这是怎么回事呢?


Api网关调用日志


2020-07-28 01:55:29.781  INFO [service-api-gateway,fc71e34f22e1bd17,fc71e34f22e1bd17]
            [7860] [boundedElastic-1][com.github.hashjang.hoxton.api.gateway.filter.InstanceCircuitBreakerFilter:54]: try to send request to: http://192.168.0.142:8001/test-exception-thrown: stats: {"numberOfNotPermittedCalls":0,"numberOfSlowCalls":0,"numberOfBufferedCalls":0,"slowCallRate":-1.0,"failureRate":-1.0,"numberOfSuccessfulCalls":0,"numberOfFailedCalls":0,"numberOfSlowSuccessfulCalls":0,"numberOfSlowFailedCalls":0}
2020-07-28 01:55:30.115  INFO [service-api-gateway,fc71e34f22e1bd17,fc71e34f22e1bd17]
            [7860] [boundedElastic-1][com.github.hashjang.hoxton.api.gateway.filter.InstanceCircuitBreakerFilter:54]: try to send request to: http://192.168.0.142:8001/test-exception-thrown: stats: {"numberOfNotPermittedCalls":0,"numberOfSlowCalls":0,"numberOfBufferedCalls":1,"slowCallRate":-1.0,"failureRate":-1.0,"numberOfSuccessfulCalls":1,"numberOfFailedCalls":0,"numberOfSlowSuccessfulCalls":0,"numberOfSlowFailedCalls":0}


zone1-service-provider-instance1日志:

2020-07-28 01:55:29.789 ERROR
            [service-provider,,] [24956]
            [XNIO-2 task-4][io.undertow.servlet.api.LoggingExceptionHandler:80]:UT005023: Exception handling request to /test-exception-thrown


zone1-service-provider-instance2日志:

2020-07-28 01:55:30.133  WARN
            [service-provider,fc71e34f22e1bd17,da6d3f91fcfc053f] [24956]
            [XNIO-2 task-5][org.springframework.web.servlet.handler.AbstractHandlerExceptionResolver:199]:Resolved [org.springframework.http.converter.HttpMessageNotReadableException: Required request body is missing: public java.lang.String com.github.hashjang.hoxton.service.provider.controller.TestServiceController.testExceptionThrown(javax.servlet.http.HttpServletRequest,java.util.Map<java.lang.String, java.lang.String>)]


为了定位问题,我们添加一个放在最开头的 LogFilter,开启 Body 的追踪:

@Component
public class LogFilter implements GlobalFilter, Ordered {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        return chain.filter(exchange.mutate().request(new ServerHttpRequestDecorator(exchange.getRequest()) {
            @Override
            public Flux<DataBuffer> getBody() {
                //body开启日志,记录操作body的filter
                return exchange.getRequest().getBody().log();
            }
        }).build());
    }
    @Override
    public int getOrder() {
        //放在最开头
        return Ordered.HIGHEST_PRECEDENCE;
    }
}


重启网关,发送请求:

2020-07-28 02:22:16.026  INFO [service-api-gateway,1ae80e0b643da3c7,1ae80e0b643da3c7]
            [4408] [boundedElastic-1][com.github.hashjang.hoxton.api.gateway.filter.InstanceCircuitBreakerFilter:54]: try to send request to: http://192.168.0.142:8001/test-exception-thrown: stats: {"numberOfNotPermittedCalls":0,"numberOfSlowSuccessfulCalls":0,"numberOfSuccessfulCalls":0,"numberOfSlowFailedCalls":0,"numberOfFailedCalls":0,"failureRate":-1.0,"slowCallRate":-1.0,"numberOfBufferedCalls":0,"numberOfSlowCalls":0}
2020-07-28 02:22:16.034  INFO [service-api-gateway,,]
            [4408] [reactor-http-nio-4][reactor.util.Loggers$Slf4JLogger:274]: onContextUpdate(Context3{class brave.propagation.TraceContext=1ae80e0b643da3c7/1ae80e0b643da3c7, class org.springframework.cloud.sleuth.instrument.web.client.HttpClientBeanPostProcessor$CurrentClientSpan=NoopSpan(1ae80e0b643da3c7/38ecc8fd2b789c2e), reactor.onDiscard.local=reactor.core.publisher.Operators$$Lambda$1179/0x00000008019d1840@469db4af})
2020-07-28 02:22:16.034  INFO [service-api-gateway,1ae80e0b643da3c7,1ae80e0b643da3c7]
            [4408] [reactor-http-nio-2][reactor.util.Loggers$Slf4JLogger:274]: onSubscribe([Fuseable] ScopePassingSpanSubscriber)
2020-07-28 02:22:16.034  INFO [service-api-gateway,1ae80e0b643da3c7,1ae80e0b643da3c7]
            [4408] [reactor-http-nio-2][reactor.util.Loggers$Slf4JLogger:274]: request(unbounded)
2020-07-28 02:22:16.035  INFO [service-api-gateway,1ae80e0b643da3c7,1ae80e0b643da3c7]
            [4408] [reactor-http-nio-2][reactor.util.Loggers$Slf4JLogger:274]: onNext(PooledSlicedByteBuf(ridx: 0, widx: 10, cap: 10/10, unwrapped: PooledUnsafeDirectByteBuf(ridx: 326, widx: 326, cap: 1024)))
2020-07-28 02:22:16.035  INFO [service-api-gateway,1ae80e0b643da3c7,1ae80e0b643da3c7]
            [4408] [reactor-http-nio-2][reactor.util.Loggers$Slf4JLogger:274]: onComplete()
2020-07-28 02:22:16.165  INFO [service-api-gateway,1ae80e0b643da3c7,1ae80e0b643da3c7]
            [4408] [boundedElastic-1][com.github.hashjang.hoxton.api.gateway.filter.InstanceCircuitBreakerFilter:54]: try to send request to: http://192.168.0.142:8002/test-exception-thrown: stats: {"numberOfNotPermittedCalls":0,"numberOfSlowSuccessfulCalls":0,"numberOfSuccessfulCalls":0,"numberOfSlowFailedCalls":0,"numberOfFailedCalls":0,"failureRate":-1.0,"slowCallRate":-1.0,"numberOfBufferedCalls":0,"numberOfSlowCalls":0}
2020-07-28 02:22:16.169  INFO [service-api-gateway,,]
            [4408] [reactor-http-nio-3][reactor.util.Loggers$Slf4JLogger:274]: onContextUpdate(Context3{class brave.propagation.TraceContext=1ae80e0b643da3c7/1ae80e0b643da3c7, class org.springframework.cloud.sleuth.instrument.web.client.HttpClientBeanPostProcessor$CurrentClientSpan=NoopSpan(1ae80e0b643da3c7/d05978ee5d1cb64c), reactor.onDiscard.local=reactor.core.publisher.Operators$$Lambda$1179/0x00000008019d1840@3a55aa8})
2020-07-28 02:22:16.170  INFO [service-api-gateway,1ae80e0b643da3c7,1ae80e0b643da3c7]
            [4408] [reactor-http-nio-2][reactor.util.Loggers$Slf4JLogger:274]: onSubscribe([Fuseable] ScopePassingSpanSubscriber)
2020-07-28 02:22:16.170  INFO [service-api-gateway,1ae80e0b643da3c7,1ae80e0b643da3c7]
            [4408] [reactor-http-nio-2][reactor.util.Loggers$Slf4JLogger:274]: request(unbounded)
2020-07-28 02:22:16.170  INFO [service-api-gateway,1ae80e0b643da3c7,1ae80e0b643da3c7]
            [4408] [reactor-http-nio-2][reactor.util.Loggers$Slf4JLogger:274]: onComplete()


我们发现,这个 Body 的 Flux 在重试的时候,使用的还是原来同样的 Flux,但是这个 Flux 已经被第一次调用消费过了,所以重试的时候,再去消费,直接返回消费完成,不会有:onNext(PooledSlicedByteBuf(ridx: 0, widx: 10, cap: 10/10, unwrapped: PooledUnsafeDirectByteBuf(ridx: 326, widx: 326, cap: 1024)))


那么如何解决呢?有两种方式,一种是自己实现 Body 缓存,参考我提的 Issue + PR(https://github.com/spring-cloud/spring-cloud-gateway/pull/1863),但是这实际上是我的乌龙,我没注意到 Spring Cloud Gateway实际上已经实现了:

Publisher<Void> publisher = chain.filter(exchange.mutate().request(
  new ServerHttpRequestDecorator(request) {
    @Override
    public Flux<DataBuffer> getBody() {
      int currentIteration = exchange
          .getAttributeOrDefault(RETRY_ITERATION_KEY, -1);
      //根据currentIteration判断是否是重试,如果不是,就返回原始Request 的 body Flux
      //如果是,则返回缓存的String重新生成的Flux,保证重试也有正确的body
      return currentIteration > -1 ? Flux.from(Mono.just(dataBufferFactory.wrap(((String) exchange.getAttributes().get(BODY)).getBytes()))) :
          request.getBody().map(dataBuffer -> {
            if (LEGAL_LOG_MEDIA_TYPES.contains(contentType)) {
              try {
                String body = (String) exchange.getAttributes().get(BODY);
                if (body == null) {
                  byte[] content = new byte[dataBuffer.readableByteCount()];
                  try {
                    dataBuffer.read(content);
                  } finally {
                    DataBufferUtils.release(dataBuffer);
                  }
                  String s = new String(content, Charset.defaultCharset());
                  exchange.getAttributes().put(BODY, s);
                  dataBuffer = dataBufferFactory.wrap(s.getBytes());
                } else {
                  dataBuffer = dataBufferFactory.wrap(body.getBytes());
                }
              } catch (Exception e) {
                log.error("error read body in retry", e);
              }
            }
            return dataBuffer;
          });
    }
  }
).build())


另一种是使用 Spring Cloud Gateway 已有的缓存机制AdaptCachedBodyGlobalFilter: AdaptCachedBodyGlobalFilter源码:

public class AdaptCachedBodyGlobalFilter
    implements GlobalFilter, Ordered, ApplicationListener<EnableBodyCachingEvent> {
    /**
   * 缓存RequestBody的Route
   */
  private ConcurrentMap<String, Boolean> routesToCache = new ConcurrentHashMap<>();
  /**
   * 缓存RequestBody的Attribute Key
   */
  @Deprecated
  public static final String CACHED_REQUEST_BODY_KEY = CACHED_REQUEST_BODY_ATTR;
    /**
   * 收到EnableBodyCachingEvent,则将EnableBodyCachingEvent中的RouteId加入到要缓存的Route的Map
   */
  @Override
  public void onApplicationEvent(EnableBodyCachingEvent event) {
    this.routesToCache.putIfAbsent(event.getRouteId(), true);
  }
    //。。。。略
}


由于我们是全局的重试,所以可以对每一个Route都加上缓存 Body 的机制,所以可以这么实现:

ApiGatewayConfig

@Configuration
@EnableConfigurationProperties(ApiGatewayRetryConfig.class)
@LoadBalancerClients(defaultConfiguration = CommonLoadBalancerConfig.class)
public class ApiGatewayConfig {
    @Autowired
    private AdaptCachedBodyGlobalFilter adaptCachedBodyGlobalFilter;
    @Autowired
    private GatewayProperties gatewayProperties;
    @PostConstruct
    public void init() {
        //让每一个路径都做body Cache,这样重试有Body的请求的时候,重试的请求不会没有body,因为原始body是一次性的基于netty的FluxReceive
        gatewayProperties.getRoutes().forEach(routeDefinition -> {
            EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(new Object(), routeDefinition.getId());
            adaptCachedBodyGlobalFilter.onApplicationEvent(enableBodyCachingEvent);
        });
    }
}


这样修改后,重启网关,我们再调用触发重试:

2020-07-28 02:48:18.972  INFO [service-api-gateway,72eba79a3afc324f,72eba79a3afc324f]
            [6784] [reactor-http-nio-2][reactor.util.Loggers$Slf4JLogger:274]: onContextUpdate(Context2{class brave.propagation.TraceContext=72eba79a3afc324f/72eba79a3afc324f, reactor.onDiscard.local=reactor.core.publisher.Operators$$Lambda$1041/0x0000000801979440@119927bc})
2020-07-28 02:48:18.972  INFO [service-api-gateway,72eba79a3afc324f,72eba79a3afc324f]
            [6784] [reactor-http-nio-2][reactor.util.Loggers$Slf4JLogger:274]: onSubscribe([Fuseable] ScopePassingSpanSubscriber)
2020-07-28 02:48:18.973  INFO [service-api-gateway,72eba79a3afc324f,72eba79a3afc324f]
            [6784] [reactor-http-nio-2][reactor.util.Loggers$Slf4JLogger:274]: request(unbounded)
2020-07-28 02:48:18.973  INFO [service-api-gateway,72eba79a3afc324f,72eba79a3afc324f]
            [6784] [reactor-http-nio-2][reactor.util.Loggers$Slf4JLogger:274]: onNext(PooledSlicedByteBuf(ridx: 0, widx: 10, cap: 10/10, unwrapped: PooledUnsafeDirectByteBuf(ridx: 326, widx: 326, cap: 512)))
2020-07-28 02:48:18.974  INFO [service-api-gateway,72eba79a3afc324f,72eba79a3afc324f]
            [6784] [reactor-http-nio-2][reactor.util.Loggers$Slf4JLogger:274]: onComplete()
2020-07-28 02:48:18.986  INFO [service-api-gateway,72eba79a3afc324f,72eba79a3afc324f]
            [6784] [boundedElastic-1][com.github.hashjang.hoxton.api.gateway.filter.InstanceCircuitBreakerFilter:54]: try to send request to: http://192.168.0.142:8001/test-exception-thrown: stats: {"numberOfNotPermittedCalls":0,"failureRate":-1.0,"slowCallRate":-1.0,"numberOfSlowFailedCalls":0,"numberOfFailedCalls":0,"numberOfSlowSuccessfulCalls":0,"numberOfSuccessfulCalls":0,"numberOfBufferedCalls":0,"numberOfSlowCalls":0}
2020-07-28 02:48:19.138  INFO [service-api-gateway,72eba79a3afc324f,72eba79a3afc324f]
            [6784] [boundedElastic-1][com.github.hashjang.hoxton.api.gateway.filter.InstanceCircuitBreakerFilter:54]: try to send request to: http://192.168.0.142:8002/test-exception-thrown: stats: {"numberOfNotPermittedCalls":0,"failureRate":-1.0,"slowCallRate":-1.0,"numberOfSlowFailedCalls":0,"numberOfFailedCalls":0,"numberOfSlowSuccessfulCalls":0,"numberOfSuccessfulCalls":1,"numberOfBufferedCalls":1,"numberOfSlowCalls":0}

发现重试调用,Body没有丢,重试成功了

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
1天前
|
Java 数据安全/隐私保护 Sentinel
微服务学习 | Spring Cloud 中使用 Sentinel 实现服务限流
微服务学习 | Spring Cloud 中使用 Sentinel 实现服务限流
|
2天前
|
Java API Nacos
第十二章 Spring Cloud Alibaba Sentinel
第十二章 Spring Cloud Alibaba Sentinel
12 0
|
2天前
|
监控 Java 微服务
第八章 Spring Cloud 之 Hystrix
第八章 Spring Cloud 之 Hystrix
|
2天前
|
监控 Java API
第七章 Spring Cloud 之 GateWay
第七章 Spring Cloud 之 GateWay
|
2天前
|
负载均衡 前端开发 Java
第六章 Spring Cloud 之 OpenFeign
第六章 Spring Cloud 之 OpenFeign
|
2天前
|
消息中间件 Java Nacos
第三章 Spring Cloud简介
第三章 Spring Cloud简介
|
2天前
|
Java Nacos 开发者
Java从入门到精通:4.2.1学习新技术与框架——以Spring Boot和Spring Cloud Alibaba为例
Java从入门到精通:4.2.1学习新技术与框架——以Spring Boot和Spring Cloud Alibaba为例
|
2天前
|
Dubbo Java 应用服务中间件
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
|
9天前
|
负载均衡 Java 开发者
细解微服务架构实践:如何使用Spring Cloud进行Java微服务治理
【4月更文挑战第17天】Spring Cloud是Java微服务治理的首选框架,整合了Eureka(服务发现)、Ribbon(客户端负载均衡)、Hystrix(熔断器)、Zuul(API网关)和Config Server(配置中心)。通过Eureka实现服务注册与发现,Ribbon提供负载均衡,Hystrix实现熔断保护,Zuul作为API网关,Config Server集中管理配置。理解并运用Spring Cloud进行微服务治理是现代Java开发者的关键技能。
|
9天前
|
Java API 对象存储
对象存储OSS产品常见问题之使用Spring Cloud Alibaba情况下文档添加水印如何解决
对象存储OSS是基于互联网的数据存储服务模式,让用户可以安全、可靠地存储大量非结构化数据,如图片、音频、视频、文档等任意类型文件,并通过简单的基于HTTP/HTTPS协议的RESTful API接口进行访问和管理。本帖梳理了用户在实际使用中可能遇到的各种常见问题,涵盖了基础操作、性能优化、安全设置、费用管理、数据备份与恢复、跨区域同步、API接口调用等多个方面。
26 2