Spring Cloud升级之路 - Hoxton - 4. 使用Resilience4j

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
简介: Spring Cloud升级之路 - Hoxton - 4. 使用Resilience4j

如何不启用Hystrix


由于我们的入口注解类从@SpringCloudApplication替换成了SpringBootApplication,这样不会启用Spring-Cloud-CircuitBreaker。引入的Hystrix依赖也就没有效果。请参考本系列第二节: Spring Cloud升级之路 - Hoxton - 2.入口类注解修改与OpenFeign的改造


使用Resilience4j实现实例级别的隔离与熔断


为什么需要实例级别的熔断呢?因为某个微服务可能某些实例暂时不可用,我们希望在重试的时候,暂时不再重试这些实例。默认的Spring-Cloud-CircuitBreaker一般实现了微服务级别的熔断,某个微服务某些实例暂时不可用但是某些实例可用的时候,就很有可能会发生整个微服务的熔断。一般在滚动发布的时候,如果操作不当,微服务级别的熔断导致这个微服务不可用,但是实际上某些实例是可用的。所以,我们需要实例级别的熔断,而不是微服务级别的。


为什么需要实例级别的线程隔离呢?防止某个实例发生问题,响应慢,阻塞了整个业务线程。


Spring-Cloud-CircuitBreaker里面的实现对于resilience4j的功能使用有限,我们想利用其更多的功能(例如线程隔离等等)。而且,Spring-Cloud-CircuitBreaker可以直接用来实现微服务级别的熔断,但是很难实现实例级别的熔断。主要原因是他的配置是根据微服务名称配置的,并且没有扩展,导致我们想实现的话,修改代码的地方太多了。所以我们舍弃了Spring-Cloud-CircuitBreaker


比较幸运的是,resilience4j官方有实现自己的spring-cloud-starter,里面实现了他的所有功能的核心bean配置,很好用。我们采用这个starter以及相关的配置方式来实现我们的实例级别的隔离与熔断。

在引入

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-spring-cloud2</artifactId>
    <version>${resilience4j-spring-cloud2.version}</version>
</dependency>


之后,会自动加载BulkheadRegistryThreadPoolBulkheadRegistryCircuitBreakerRegistryRateLimiterRegistryRetryRegistry这几个Bean,配置这些Bean的分别是通过:

  • io.github.resilience4j.bulkhead.autoconfigure.BulkheadProperties: 前缀resilience4j.bulkhead
  • io.github.resilience4j.bulkhead.autoconfigure.ThreadPoolBulkheadProperties:前缀resilience4j.thread-pool-bulkhead
  • io.github.resilience4j.circuitbreaker.autoconfigure.CircuitBreakerProperties: 前缀resilience4j.circuitbreaker
  • io.github.resilience4j.ratelimiter.autoconfigure.RateLimiterProperties:前缀resilience4j.ratelimiter
  • io.github.resilience4j.retry.autoconfigure.RetryProperties: 前缀resilience4j.retry

这里主要用到的元素是:CircuitBreakerThreadPoolBulkheadCircuitBreaker用来实现实例级别的熔断,ThreadPoolBulkhead用来实现实例级别的线程隔离。


如何配置以及如何使用


CircuitBreaker相关的配置:CircuitBreaker

CircuitBreaker有五种状态:CLOSED,OPEN 还有HALF_OPEN。剩下的两种状态是人为操作,我们这里不会用到:DISABLED还有 FORCED_OPEN. CLOSED 代表断路器关闭,请求照常处理。OPEN 代表断路器打开,如果有请求会抛出异常:CallNotPermittedException


微信图片_20220624124320.jpg


CircuitBreaker使用滑动窗口统计成功失败的请求,并打开或者关闭断路器。这个滑动窗口有两种:

  • 基于计数的滑动窗口:使用一个大小为 N 的环形数组,记录最近 N 个请求结果。
  • 基于计时的滑动窗口:记录最近 N 秒的请求结果


微信图片_20220624124350.jpg


我们这里实现的默认配置是:

resilience4j.circuitbreaker:
  configs:
    default:
      # 是否向 Actuator 的 HealthIndicator 注册
      registerHealthIndicator: true
      slidingWindowSize: 10
      minimumNumberOfCalls: 5
      slidingWindowType: TIME_BASED
      permittedNumberOfCallsInHalfOpenState: 3
      automaticTransitionFromOpenToHalfOpenEnabled: true
      waitDurationInOpenState: 2s
      failureRateThreshold: 30
      recordExceptions:
        - java.lang.Exception

以上配置代表,默认情况下,所有Exception以及其子类都认为是失败。滑动窗口采用基于计时的,并且记录最近10秒的请求。触发断路器判断必须在10秒内至少有5个请求,在失败比例达到30%以上之后,断路器变为OPEN。断路器OPEN之后,在2秒后自动转化为HALF_OPEN

ThreadPoolBulkhead相关的配置:Create and configure a ThreadPoolBulkhead


微信图片_20220624124405.jpg


我们这里实现的默认配置是:

resilience4j.thread-pool-bulkhead:
  configs:
    default:
      maxThreadPoolSize: 50
      coreThreadPoolSize: 10
      queueCapacity: 1


与Open-Feign粘合


我们需要在FeignClient被调用,选取好要发送请求的实例之后,加入CircuitBreakerThreadPoolBulkhead。也就是,我们需要拿到本次请求调用的实例,以及微服务名称,加载对应的CircuitBreakerThreadPoolBulkhead,包装调用请求,之后执行调用。


FeignClient的核心实现,根据org.springframework.cloud.openfeign.loadbalancer.DefaultFeignLoadBalancerConfiguration可以知道是org.springframework.cloud.openfeign.loadbalancer.FeignBlockingLoadBalancerClient

@Bean
@ConditionalOnMissingBean
public Client feignClient(BlockingLoadBalancerClient loadBalancerClient) {
  return new FeignBlockingLoadBalancerClient(new Client.Default(null, null),
      loadBalancerClient);
}

查看FeignBlockingLoadBalancerClient的源码:

@Override
public Response execute(Request request, Request.Options options) throws IOException {
  final URI originalUri = URI.create(request.url());
  //微服务名称
  String serviceId = originalUri.getHost();
  Assert.state(serviceId != null,
      "Request URI does not contain a valid hostname: " + originalUri);
  //从负载均衡器选择一个实例
  ServiceInstance instance = loadBalancerClient.choose(serviceId);
  if (instance == null) {
    String message = "Load balancer does not contain an instance for the service "
        + serviceId;
    if (LOG.isWarnEnabled()) {
      LOG.warn(message);
    }
    return Response.builder().request(request)
        .status(HttpStatus.SERVICE_UNAVAILABLE.value())
        .body(message, StandardCharsets.UTF_8).build();
  }
  //修改原url
  String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri)
      .toString();
  //构建新的Request
  Request newRequest = Request.create(request.httpMethod(), reconstructedUrl,
      request.headers(), request.body(), request.charset(),
      //这个RequestTemplate可以拿到微服务名称
      request.requestTemplate());
  return delegate.execute(newRequest, options);
}


所以,我们可以通过继承FeignBlockingLoadBalancerClient替换默认的实现,来代理调用请求。但是因为sleuth的存在以及其中的小bug导致RequestTemplate丢失,让我们拿不到微服务名称,这个可以参考我提的 PR:replace method for deprecation and keep reference of requestTemplate.但是在Hoxton版本不会合并了,所以需要我们建立同名同路径类进行替换:org.springframework.cloud.sleuth.instrument.web.client.feign.TracingFeignClient

Request build() {
    if (headers == null) {
        return delegate;
    }
    String url = delegate.url();
    byte[] body = delegate.body();
    Charset charset = delegate.charset();
    //保留requestTemplate
    return Request.create(delegate.httpMethod(), url, headers, body, charset, delegate.requestTemplate());
}


之后,我们实现带CircuitBreakerThreadPoolBulkheadFeignBlockingLoadBalancerClient,并优化其中的HttpClient:

@Bean
public HttpClient getHttpClient() {
    // 长连接保持30秒
    PoolingHttpClientConnectionManager pollingConnectionManager = new PoolingHttpClientConnectionManager(5, TimeUnit.MINUTES);
    // 总连接数
    pollingConnectionManager.setMaxTotal(1000);
    // 同路由的并发数
    pollingConnectionManager.setDefaultMaxPerRoute(1000);
    HttpClientBuilder httpClientBuilder = HttpClients.custom();
    httpClientBuilder.setConnectionManager(pollingConnectionManager);
    // 保持长连接配置,需要在头添加Keep-Alive
    httpClientBuilder.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy());
    return httpClientBuilder.build();
}
@Bean
public FeignBlockingLoadBalancerClient feignBlockingLoadBalancerCircuitBreakableClient(HttpClient httpClient, BlockingLoadBalancerClient loadBalancerClient, BulkheadRegistry bulkheadRegistry, ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry, CircuitBreakerRegistry circuitBreakerRegistry, RateLimiterRegistry rateLimiterRegistry, RetryRegistry retryRegistry, Tracer tracer) {
    return new FeignBlockingLoadBalancerClient(new CircuitBreakableClient(
            httpClient,
            bulkheadRegistry,
            threadPoolBulkheadRegistry,
            circuitBreakerRegistry,
            rateLimiterRegistry,
            retryRegistry,
            tracer),
            loadBalancerClient);
}
@Log4j2
public static class CircuitBreakableClient extends feign.httpclient.ApacheHttpClient {
    private final BulkheadRegistry bulkheadRegistry;
    private final ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry;
    private final CircuitBreakerRegistry circuitBreakerRegistry;
    private final RateLimiterRegistry rateLimiterRegistry;
    private final RetryRegistry retryRegistry;
    private final Tracer tracer;
    public CircuitBreakableClient(HttpClient httpClient, BulkheadRegistry bulkheadRegistry, ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry, CircuitBreakerRegistry circuitBreakerRegistry, RateLimiterRegistry rateLimiterRegistry, RetryRegistry retryRegistry, Tracer tracer) {
        super(httpClient);
        this.bulkheadRegistry = bulkheadRegistry;
        this.threadPoolBulkheadRegistry = threadPoolBulkheadRegistry;
        this.circuitBreakerRegistry = circuitBreakerRegistry;
        this.rateLimiterRegistry = rateLimiterRegistry;
        this.retryRegistry = retryRegistry;
        this.tracer = tracer;
    }
    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
        String serviceName = request.requestTemplate().feignTarget().name();
        URL url = new URL(request.url());
        String instanceId = serviceName + ":" + url.getHost() + ":" + url.getPort();
        //每个实例一个resilience4j熔断记录器,在实例维度做熔断,所有这个服务的实例共享这个服务的resilience4j熔断配置
        ThreadPoolBulkhead threadPoolBulkhead;
        CircuitBreaker circuitBreaker;
        try {
            threadPoolBulkhead = threadPoolBulkheadRegistry.bulkhead(instanceId, serviceName);
        } catch (ConfigurationNotFoundException e) {
            threadPoolBulkhead = threadPoolBulkheadRegistry.bulkhead(instanceId);
        }
        try {
            circuitBreaker = circuitBreakerRegistry.circuitBreaker(instanceId, serviceName);
        } catch (ConfigurationNotFoundException e) {
            circuitBreaker = circuitBreakerRegistry.circuitBreaker(instanceId);
        }
        //保持traceId
        Span span = tracer.currentSpan();
        Supplier<CompletionStage<Response>> completionStageSupplier = ThreadPoolBulkhead.decorateSupplier(threadPoolBulkhead,
                CircuitBreaker.decorateSupplier(circuitBreaker, () -> {
                    try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
                        log.info("call url: {} -> {}", request.httpMethod(), request.url());
                        Response execute = super.execute(request, options);
                        //对于非返回200的接口,抛出异常
                            if (execute.status() != HttpStatus.OK.value()) {
                                execute.close();//需要关闭,否则返回码不为200抛异常连接不会回收导致连接池耗尽
                                throw new ResponseWrapperException(execute.toString(), execute);
                            }
                        return execute;
                    } catch (Exception e) {
                        throw new ResponseWrapperException(e.getMessage(), e);
                    }
                })
        );
        try {
            return Try.ofSupplier(completionStageSupplier).get().toCompletableFuture().join();
        } catch (CompletionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof ResponseWrapperException) {
                ResponseWrapperException responseWrapperException = (ResponseWrapperException) cause;
                if (responseWrapperException.getResponse() != null) {
                    return (Response) responseWrapperException.getResponse();
                }
            }
            throw new ResponseWrapperException(cause.getMessage(), cause);
        }
    }
}

这样,我们就粘合了Open-Feign,加入了需要的基于实例的熔断和线程隔离


在 Spring Cloud Gateway 中实现基于实例的熔断


Spring Cloud Gateway 不用做线程隔离,因为 reactor 框架不是同步框架,某个实例发生阻塞对它影响不至于很大。我们只在其中做实例级别的熔断,每个微服务都会做这个实例级别的熔断。所以需要加一个GlobalFilter。这个熔断需要在负载均衡器选择实例并重写调用 url 之后,也就是在 ReactiveLoadBalancerClientFilter 之后。

由于Spring Cloud Gateway 基于 reactor 做了异步,所以我们需要 spring-cloud-starter-circuitbreaker-reactor-resilience4j里面的CircuitBreakerOperator将断路器转换成 reactor 兼容的Publisher。所以加入了依赖:


<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
</dependency>


我们只为了用CircuitBreakerOperator做个转换而已,其他的并没有用到。

InstanceCircuitBreakerFilter

/**
 * 实例级别的断路器
 */
@Log4j2
@Component
public class InstanceCircuitBreakerFilter implements GlobalFilter, Ordered {
    private final CircuitBreakerRegistry circuitBreakerRegistry;
    ObjectMapper objectMapper = new ObjectMapper();
    public InstanceCircuitBreakerFilter(CircuitBreakerRegistry circuitBreakerRegistry) {
        this.circuitBreakerRegistry = circuitBreakerRegistry;
    }
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
        ServerHttpRequest request = exchange.getRequest();
        String serviceName = request.getHeaders().getFirst(CommonConstant.SERVICE_NAME);
        String instanceId = url.getHost() + url.getPort();
        CircuitBreaker circuitBreaker;
        try {
            //使用实例id新建或者获取现有的CircuitBreaker,使用serviceName获取配置
            circuitBreaker = circuitBreakerRegistry.circuitBreaker(instanceId, serviceName);
        } catch (ConfigurationNotFoundException e) {
            circuitBreaker = circuitBreakerRegistry.circuitBreaker(instanceId);
        }
        try {
            log.info("try to send request to: {}: stats: {}", url, objectMapper.writeValueAsString(circuitBreaker.getMetrics()));
        } catch (JsonProcessingException e) {
        }
        return chain.filter(exchange).transform(CircuitBreakerOperator.of(circuitBreaker));
    }
    @Override
    public int getOrder() {
        try {
            //必须在负载均衡器选择实例并重写调用 url 之后,也就是在 `ReactiveLoadBalancerClientFilter` 之后。
            return (Integer) ReactiveLoadBalancerClientFilter.class.getDeclaredField("LOAD_BALANCER_CLIENT_FILTER_ORDER").get(null) + 1;
        } catch (Exception e) {
            return 10151;
        }
    }
}
相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
14天前
|
监控 负载均衡 Java
深入理解Spring Cloud中的服务网关
深入理解Spring Cloud中的服务网关
|
15天前
|
Java 开发工具 git
实现基于Spring Cloud的配置中心
实现基于Spring Cloud的配置中心
|
15天前
|
设计模式 监控 Java
解析Spring Cloud中的断路器模式原理
解析Spring Cloud中的断路器模式原理
|
1天前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
Spring Cloud Alibaba 发布了 Scheduling 任务调度模块 [#3732]提供了一套开源、轻量级、高可用的定时任务解决方案,帮助您快速开发微服务体系下的分布式定时任务。
|
11天前
|
消息中间件 Java 开发者
Spring Cloud微服务框架:构建高可用、分布式系统的现代架构
Spring Cloud是一个开源的微服务框架,旨在帮助开发者快速构建在分布式系统环境中运行的服务。它提供了一系列工具,用于在分布式系统中配置、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话、集群状态等领域的支持。
50 5
|
14天前
|
Java API 开发工具
Spring Boot与Spring Cloud Config的集成
Spring Boot与Spring Cloud Config的集成
|
14天前
|
存储 安全 Java
实现基于Spring Cloud的分布式配置管理
实现基于Spring Cloud的分布式配置管理
|
20天前
|
消息中间件 负载均衡 Java
Java和Spring Cloud构建分布式系统
Java和Spring Cloud构建分布式系统
|
20天前
|
消息中间件 负载均衡 Java
最容易学会的springboot gralde spring cloud 多模块微服务项目
最容易学会的springboot gralde spring cloud 多模块微服务项目