如何不启用Hystrix
由于我们的入口注解类从@SpringCloudApplication
替换成了SpringBootApplication
,这样不会启用Spring-Cloud-CircuitBreaker
。引入的Hystrix
依赖也就没有效果。请参考本系列第二节: Spring Cloud升级之路 - Hoxton - 2.入口类注解修改与OpenFeign的改造
使用Resilience4j实现实例级别的隔离与熔断
为什么需要实例级别的熔断呢?因为某个微服务可能某些实例暂时不可用,我们希望在重试的时候,暂时不再重试这些实例。默认的Spring-Cloud-CircuitBreaker
一般实现了微服务级别的熔断,某个微服务某些实例暂时不可用但是某些实例可用的时候,就很有可能会发生整个微服务的熔断。一般在滚动发布的时候,如果操作不当,微服务级别的熔断导致这个微服务不可用,但是实际上某些实例是可用的。所以,我们需要实例级别的熔断,而不是微服务级别的。
为什么需要实例级别的线程隔离呢?防止某个实例发生问题,响应慢,阻塞了整个业务线程。
Spring-Cloud-CircuitBreaker
里面的实现对于resilience4j
的功能使用有限,我们想利用其更多的功能(例如线程隔离等等)。而且,Spring-Cloud-CircuitBreaker
可以直接用来实现微服务级别的熔断,但是很难实现实例级别的熔断。主要原因是他的配置是根据微服务名称配置的,并且没有扩展,导致我们想实现的话,修改代码的地方太多了。所以我们舍弃了Spring-Cloud-CircuitBreaker
。
比较幸运的是,resilience4j
官方有实现自己的spring-cloud-starter
,里面实现了他的所有功能的核心bean
配置,很好用。我们采用这个starter
以及相关的配置方式来实现我们的实例级别的隔离与熔断。
在引入
<dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-spring-cloud2</artifactId> <version>${resilience4j-spring-cloud2.version}</version> </dependency>
之后,会自动加载BulkheadRegistry
,ThreadPoolBulkheadRegistry
,CircuitBreakerRegistry
,RateLimiterRegistry
,RetryRegistry
这几个Bean
,配置这些Bean
的分别是通过:
io.github.resilience4j.bulkhead.autoconfigure.BulkheadProperties
: 前缀resilience4j.bulkhead
io.github.resilience4j.bulkhead.autoconfigure.ThreadPoolBulkheadProperties
:前缀resilience4j.thread-pool-bulkhead
io.github.resilience4j.circuitbreaker.autoconfigure.CircuitBreakerProperties
: 前缀resilience4j.circuitbreaker
io.github.resilience4j.ratelimiter.autoconfigure.RateLimiterProperties
:前缀resilience4j.ratelimiter
io.github.resilience4j.retry.autoconfigure.RetryProperties
: 前缀resilience4j.retry
这里主要用到的元素是:CircuitBreaker
和ThreadPoolBulkhead
。CircuitBreaker
用来实现实例级别的熔断,ThreadPoolBulkhead
用来实现实例级别的线程隔离。
如何配置以及如何使用
CircuitBreaker
相关的配置:CircuitBreaker
CircuitBreaker
有五种状态:CLOSED,OPEN 还有HALF_OPEN。剩下的两种状态是人为操作,我们这里不会用到:DISABLED还有 FORCED_OPEN. CLOSED 代表断路器关闭,请求照常处理。OPEN 代表断路器打开,如果有请求会抛出异常:CallNotPermittedException
。
CircuitBreaker
使用滑动窗口统计成功失败的请求,并打开或者关闭断路器。这个滑动窗口有两种:
- 基于计数的滑动窗口:使用一个大小为 N 的环形数组,记录最近 N 个请求结果。
- 基于计时的滑动窗口:记录最近 N 秒的请求结果
我们这里实现的默认配置是:
resilience4j.circuitbreaker: configs: default: # 是否向 Actuator 的 HealthIndicator 注册 registerHealthIndicator: true slidingWindowSize: 10 minimumNumberOfCalls: 5 slidingWindowType: TIME_BASED permittedNumberOfCallsInHalfOpenState: 3 automaticTransitionFromOpenToHalfOpenEnabled: true waitDurationInOpenState: 2s failureRateThreshold: 30 recordExceptions: - java.lang.Exception
以上配置代表,默认情况下,所有Exception
以及其子类都认为是失败。滑动窗口采用基于计时的,并且记录最近10秒的请求。触发断路器判断必须在10秒内至少有5个请求,在失败比例达到30%以上之后,断路器变为OPEN
。断路器OPEN
之后,在2秒后自动转化为HALF_OPEN
。
ThreadPoolBulkhead
相关的配置:Create and configure a ThreadPoolBulkhead
我们这里实现的默认配置是:
resilience4j.thread-pool-bulkhead: configs: default: maxThreadPoolSize: 50 coreThreadPoolSize: 10 queueCapacity: 1
与Open-Feign粘合
我们需要在FeignClient
被调用,选取好要发送请求的实例之后,加入CircuitBreaker
和ThreadPoolBulkhead
。也就是,我们需要拿到本次请求调用的实例,以及微服务名称,加载对应的CircuitBreaker
和ThreadPoolBulkhead
,包装调用请求,之后执行调用。
FeignClient
的核心实现,根据org.springframework.cloud.openfeign.loadbalancer.DefaultFeignLoadBalancerConfiguration
可以知道是org.springframework.cloud.openfeign.loadbalancer.FeignBlockingLoadBalancerClient
:
@Bean @ConditionalOnMissingBean public Client feignClient(BlockingLoadBalancerClient loadBalancerClient) { return new FeignBlockingLoadBalancerClient(new Client.Default(null, null), loadBalancerClient); }
查看FeignBlockingLoadBalancerClient
的源码:
@Override public Response execute(Request request, Request.Options options) throws IOException { final URI originalUri = URI.create(request.url()); //微服务名称 String serviceId = originalUri.getHost(); Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri); //从负载均衡器选择一个实例 ServiceInstance instance = loadBalancerClient.choose(serviceId); if (instance == null) { String message = "Load balancer does not contain an instance for the service " + serviceId; if (LOG.isWarnEnabled()) { LOG.warn(message); } return Response.builder().request(request) .status(HttpStatus.SERVICE_UNAVAILABLE.value()) .body(message, StandardCharsets.UTF_8).build(); } //修改原url String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri) .toString(); //构建新的Request Request newRequest = Request.create(request.httpMethod(), reconstructedUrl, request.headers(), request.body(), request.charset(), //这个RequestTemplate可以拿到微服务名称 request.requestTemplate()); return delegate.execute(newRequest, options); }
所以,我们可以通过继承FeignBlockingLoadBalancerClient
替换默认的实现,来代理调用请求。但是因为sleuth
的存在以及其中的小bug
导致RequestTemplate
丢失,让我们拿不到微服务名称,这个可以参考我提的 PR:replace method for deprecation and keep reference of requestTemplate.但是在Hoxton
版本不会合并了,所以需要我们建立同名同路径类进行替换:org.springframework.cloud.sleuth.instrument.web.client.feign.TracingFeignClient
Request build() { if (headers == null) { return delegate; } String url = delegate.url(); byte[] body = delegate.body(); Charset charset = delegate.charset(); //保留requestTemplate return Request.create(delegate.httpMethod(), url, headers, body, charset, delegate.requestTemplate()); }
之后,我们实现带CircuitBreaker
和ThreadPoolBulkhead
的FeignBlockingLoadBalancerClient
,并优化其中的HttpClient
:
@Bean public HttpClient getHttpClient() { // 长连接保持30秒 PoolingHttpClientConnectionManager pollingConnectionManager = new PoolingHttpClientConnectionManager(5, TimeUnit.MINUTES); // 总连接数 pollingConnectionManager.setMaxTotal(1000); // 同路由的并发数 pollingConnectionManager.setDefaultMaxPerRoute(1000); HttpClientBuilder httpClientBuilder = HttpClients.custom(); httpClientBuilder.setConnectionManager(pollingConnectionManager); // 保持长连接配置,需要在头添加Keep-Alive httpClientBuilder.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy()); return httpClientBuilder.build(); } @Bean public FeignBlockingLoadBalancerClient feignBlockingLoadBalancerCircuitBreakableClient(HttpClient httpClient, BlockingLoadBalancerClient loadBalancerClient, BulkheadRegistry bulkheadRegistry, ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry, CircuitBreakerRegistry circuitBreakerRegistry, RateLimiterRegistry rateLimiterRegistry, RetryRegistry retryRegistry, Tracer tracer) { return new FeignBlockingLoadBalancerClient(new CircuitBreakableClient( httpClient, bulkheadRegistry, threadPoolBulkheadRegistry, circuitBreakerRegistry, rateLimiterRegistry, retryRegistry, tracer), loadBalancerClient); } @Log4j2 public static class CircuitBreakableClient extends feign.httpclient.ApacheHttpClient { private final BulkheadRegistry bulkheadRegistry; private final ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry; private final CircuitBreakerRegistry circuitBreakerRegistry; private final RateLimiterRegistry rateLimiterRegistry; private final RetryRegistry retryRegistry; private final Tracer tracer; public CircuitBreakableClient(HttpClient httpClient, BulkheadRegistry bulkheadRegistry, ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry, CircuitBreakerRegistry circuitBreakerRegistry, RateLimiterRegistry rateLimiterRegistry, RetryRegistry retryRegistry, Tracer tracer) { super(httpClient); this.bulkheadRegistry = bulkheadRegistry; this.threadPoolBulkheadRegistry = threadPoolBulkheadRegistry; this.circuitBreakerRegistry = circuitBreakerRegistry; this.rateLimiterRegistry = rateLimiterRegistry; this.retryRegistry = retryRegistry; this.tracer = tracer; } @Override public Response execute(Request request, Request.Options options) throws IOException { String serviceName = request.requestTemplate().feignTarget().name(); URL url = new URL(request.url()); String instanceId = serviceName + ":" + url.getHost() + ":" + url.getPort(); //每个实例一个resilience4j熔断记录器,在实例维度做熔断,所有这个服务的实例共享这个服务的resilience4j熔断配置 ThreadPoolBulkhead threadPoolBulkhead; CircuitBreaker circuitBreaker; try { threadPoolBulkhead = threadPoolBulkheadRegistry.bulkhead(instanceId, serviceName); } catch (ConfigurationNotFoundException e) { threadPoolBulkhead = threadPoolBulkheadRegistry.bulkhead(instanceId); } try { circuitBreaker = circuitBreakerRegistry.circuitBreaker(instanceId, serviceName); } catch (ConfigurationNotFoundException e) { circuitBreaker = circuitBreakerRegistry.circuitBreaker(instanceId); } //保持traceId Span span = tracer.currentSpan(); Supplier<CompletionStage<Response>> completionStageSupplier = ThreadPoolBulkhead.decorateSupplier(threadPoolBulkhead, CircuitBreaker.decorateSupplier(circuitBreaker, () -> { try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) { log.info("call url: {} -> {}", request.httpMethod(), request.url()); Response execute = super.execute(request, options); //对于非返回200的接口,抛出异常 if (execute.status() != HttpStatus.OK.value()) { execute.close();//需要关闭,否则返回码不为200抛异常连接不会回收导致连接池耗尽 throw new ResponseWrapperException(execute.toString(), execute); } return execute; } catch (Exception e) { throw new ResponseWrapperException(e.getMessage(), e); } }) ); try { return Try.ofSupplier(completionStageSupplier).get().toCompletableFuture().join(); } catch (CompletionException e) { Throwable cause = e.getCause(); if (cause instanceof ResponseWrapperException) { ResponseWrapperException responseWrapperException = (ResponseWrapperException) cause; if (responseWrapperException.getResponse() != null) { return (Response) responseWrapperException.getResponse(); } } throw new ResponseWrapperException(cause.getMessage(), cause); } } }
这样,我们就粘合了Open-Feign,加入了需要的基于实例的熔断和线程隔离
在 Spring Cloud Gateway 中实现基于实例的熔断
Spring Cloud Gateway 不用做线程隔离,因为 reactor 框架不是同步框架,某个实例发生阻塞对它影响不至于很大。我们只在其中做实例级别的熔断,每个微服务都会做这个实例级别的熔断。所以需要加一个GlobalFilter
。这个熔断需要在负载均衡器选择实例并重写调用 url 之后,也就是在 ReactiveLoadBalancerClientFilter
之后。
由于Spring Cloud Gateway 基于 reactor 做了异步,所以我们需要 spring-cloud-starter-circuitbreaker-reactor-resilience4j
里面的CircuitBreakerOperator
将断路器转换成 reactor 兼容的Publisher
。所以加入了依赖:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId> </dependency>
我们只为了用CircuitBreakerOperator
做个转换而已,其他的并没有用到。
/** * 实例级别的断路器 */ @Log4j2 @Component public class InstanceCircuitBreakerFilter implements GlobalFilter, Ordered { private final CircuitBreakerRegistry circuitBreakerRegistry; ObjectMapper objectMapper = new ObjectMapper(); public InstanceCircuitBreakerFilter(CircuitBreakerRegistry circuitBreakerRegistry) { this.circuitBreakerRegistry = circuitBreakerRegistry; } @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); ServerHttpRequest request = exchange.getRequest(); String serviceName = request.getHeaders().getFirst(CommonConstant.SERVICE_NAME); String instanceId = url.getHost() + url.getPort(); CircuitBreaker circuitBreaker; try { //使用实例id新建或者获取现有的CircuitBreaker,使用serviceName获取配置 circuitBreaker = circuitBreakerRegistry.circuitBreaker(instanceId, serviceName); } catch (ConfigurationNotFoundException e) { circuitBreaker = circuitBreakerRegistry.circuitBreaker(instanceId); } try { log.info("try to send request to: {}: stats: {}", url, objectMapper.writeValueAsString(circuitBreaker.getMetrics())); } catch (JsonProcessingException e) { } return chain.filter(exchange).transform(CircuitBreakerOperator.of(circuitBreaker)); } @Override public int getOrder() { try { //必须在负载均衡器选择实例并重写调用 url 之后,也就是在 `ReactiveLoadBalancerClientFilter` 之后。 return (Integer) ReactiveLoadBalancerClientFilter.class.getDeclaredField("LOAD_BALANCER_CLIENT_FILTER_ORDER").get(null) + 1; } catch (Exception e) { return 10151; } } }