1. 修正实例列表乱序导致的负载均衡重试相同实例的问题
虽然之前考虑了通过每个请求的traceId
隔离负载均衡的position
来实现重试不会重试相同实例的问题,但是没有考虑在负载均衡过程中,实例列表的更新。
例如:
- 请求第一次调用负载均衡,实例列表是:[实例1,实例2],position为1,对2取余=1,所以请求发送到实例2上面了
- 请求失败,触发重试,实例列表缓存失效,更新后变成了:[实例2,实例1],position为2,对2取余=0,所以请求又发送到实例2上面了
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } Span currentSpan = tracer.currentSpan(); //如果没有 traceId,就生成一个新的,但是最好检查下为啥会没有 //是不是 MQ 消费这种没有主动生成 traceId 的情况,最好主动生成下 if (currentSpan == null) { currentSpan = tracer.newTrace(); } long l = currentSpan.context().traceId(); int seed = positionCache.get(l).getAndIncrement(); //这里,serviceInstances可能与上次的内容不同 //例如上次是实例1,实例2 //这次是实例2,实例1 return new DefaultResponse(serviceInstances.get(seed % serviceInstances.size())); }
所以,在这里追加排序,保证实例有序,从而进一步不会重试相同的实例。
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } Span currentSpan = tracer.currentSpan(); //如果没有 traceId,就生成一个新的,但是最好检查下为啥会没有 //是不是 MQ 消费这种没有主动生成 traceId 的情况,最好主动生成下 if (currentSpan == null) { currentSpan = tracer.newTrace(); } long l = currentSpan.context().traceId(); int seed = positionCache.get(l).getAndIncrement(); return new DefaultResponse(serviceInstances.stream().sorted(Comparator.comparing(ServiceInstance::getInstanceId)).collect(Collectors.toList()).get(seed % serviceInstances.size())); }
2. WebFlux环境兼容与WebClient实现相同功能
maven依赖:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.7.RELEASE</version> </parent> <properties> <disruptor.version>3.4.2</disruptor.version> <resilience4j-spring-cloud2.version>1.1.0</resilience4j-spring-cloud2.version> </properties> <dependencies> <!--内部缓存框架统一采用caffeine--> <!--这样Spring cloud loadbalancer用的本地实例缓存也是基于Caffeine--> <dependency> <groupId>com.github.ben-manes.caffeine</groupId> <artifactId>caffeine</artifactId> </dependency> <!--日志需要用log4j2--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <!--lombok简化代码--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!--注册到eureka--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!--spring cloud rpc相关--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <!--调用路径记录--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-sleuth</artifactId> </dependency> <!--暴露actuator相关端口--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId> </dependency> <dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-spring-cloud2</artifactId> <version>${resilience4j-spring-cloud2.version}</version> </dependency> <!--log4j2异步日志需要的依赖,所有项目都必须用log4j2和异步日志配置--> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>${disruptor.version}</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Hoxton.SR4</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
其他的配置是一样的,重点在于,如何使用WebClient
调用其他微服务,并且实现针对Get
请求重试或者是所有请求的网络 IO 异常,例如connect timeout
等等,或者是断路器异常(因为请求还没发出)。
WebClient
可以加入各种Filter
,通过实现这些Filter
来实现实例级别的断路器还有重试。
实现重试:
private static class RetryFilter implements ExchangeFilterFunction { private final String serviceName; private RetryFilter(String serviceName) { this.serviceName = serviceName; } @Override public Mono<ClientResponse> filter(ClientRequest clientRequest, ExchangeFunction exchangeFunction) { return exchangeFunction.exchange(clientRequest).retryWhen(Retry.onlyIf(retryContext -> { //get请求一定重试 return clientRequest.method().equals(HttpMethod.GET) //connect Timeout 是一种 IOException || retryContext.exception() instanceof IOException //实例级别的断路器的Exception || retryContext.exception() instanceof CallNotPermittedException; }).retryMax(1).exponentialBackoff(Duration.ofMillis(100), Duration.ofMillis(1000))); } }
实例级别的断路器:
private static class InstanceCircuitBreakerFilter implements ExchangeFilterFunction { private final String serviceName; private final CircuitBreakerRegistry circuitBreakerRegistry; ; private InstanceCircuitBreakerFilter(String serviceName, CircuitBreakerRegistry circuitBreakerRegistry) { this.serviceName = serviceName; this.circuitBreakerRegistry = circuitBreakerRegistry; } @Override public Mono<ClientResponse> filter(ClientRequest clientRequest, ExchangeFunction exchangeFunction) { CircuitBreaker circuitBreaker; //这时候的url是经过负载均衡器的,是实例的url String instancId = clientRequest.url().getHost() + ":" + clientRequest.url().getPort(); try { //使用实例id新建或者获取现有的CircuitBreaker,使用serviceName获取配置 circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId, serviceName); } catch (ConfigurationNotFoundException e) { circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId); } return exchangeFunction.exchange(clientRequest).transform(CircuitBreakerOperator.of(circuitBreaker)); } }
组装调用某个微服务(这里是service-provider
)的WebClient
:
public static final String SERVICE_PROVIDER = "service-provider"; @Autowired private ReactorLoadBalancerExchangeFilterFunction lbFunction; @Bean(SERVICE_PROVIDER) public WebClient getWebClient(CircuitBreakerRegistry circuitBreakerRegistry) { ConnectionProvider provider = ConnectionProvider.builder(SERVICE_PROVIDER) .maxConnections(50).pendingAcquireTimeout(Duration.ofSeconds(5)).build(); HttpClient httpClient = HttpClient.create(provider) .tcpConfiguration(client -> //链接超时 client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500) .doOnConnected(conn -> conn //读取超时 .addHandlerLast(new ReadTimeoutHandler(1)) .addHandlerLast(new WriteTimeoutHandler(1)) ) ); return WebClient.builder() .clientConnector(new ReactorClientHttpConnector(httpClient)) //Retry在负载均衡前 .filter(new RetryFilter(SERVICE_PROVIDER)) //负载均衡器,改写url .filter(lbFunction) //实例级别的断路器需要在负载均衡获取真正地址之后 .filter(new InstanceCircuitBreakerFilter(SERVICE_PROVIDER, circuitBreakerRegistry)) .baseUrl("http://" + SERVICE_PROVIDER) .build(); }
这样,我们就可以实现和之前feign
一样的微服务调用了。
@Log4j2 @RestController public class TestController { @Resource(name = WebClientConfig.SERVICE_PROVIDER) private WebClient webClient; @RequestMapping("/testGetTimeOut") public Mono<String> testGetTimeOut() { return webClient.get().uri("/test-read-time-out") .retrieve() .bodyToMono(new ParameterizedTypeReference<>() { }); } @RequestMapping("/testPostTimeOut") public Mono<String> testPostTimeOut() { return webClient.post().uri("/test-read-time-out") .retrieve() .bodyToMono(new ParameterizedTypeReference<>() { }); } }