Spring Cloud升级之路 - Hoxton - 7. 后续更新(WebFlux等)

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
简介: Spring Cloud升级之路 - Hoxton - 7. 后续更新(WebFlux等)

1. 修正实例列表乱序导致的负载均衡重试相同实例的问题


虽然之前考虑了通过每个请求的traceId隔离负载均衡的position来实现重试不会重试相同实例的问题,但是没有考虑在负载均衡过程中,实例列表的更新。

例如:

  • 请求第一次调用负载均衡,实例列表是:[实例1,实例2],position为1,对2取余=1,所以请求发送到实例2上面了
  • 请求失败,触发重试,实例列表缓存失效,更新后变成了:[实例2,实例1],position为2,对2取余=0,所以请求又发送到实例2上面了
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
    if (serviceInstances.isEmpty()) {
        log.warn("No servers available for service: " + this.serviceId);
        return new EmptyResponse();
    }
    Span currentSpan = tracer.currentSpan();
    //如果没有 traceId,就生成一个新的,但是最好检查下为啥会没有
    //是不是 MQ 消费这种没有主动生成 traceId 的情况,最好主动生成下
    if (currentSpan == null) {
        currentSpan = tracer.newTrace();
    }
    long l = currentSpan.context().traceId();
    int seed = positionCache.get(l).getAndIncrement();
    //这里,serviceInstances可能与上次的内容不同
    //例如上次是实例1,实例2
    //这次是实例2,实例1
    return new DefaultResponse(serviceInstances.get(seed % serviceInstances.size()));
}


所以,在这里追加排序,保证实例有序,从而进一步不会重试相同的实例。

private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
    if (serviceInstances.isEmpty()) {
        log.warn("No servers available for service: " + this.serviceId);
        return new EmptyResponse();
    }
    Span currentSpan = tracer.currentSpan();
    //如果没有 traceId,就生成一个新的,但是最好检查下为啥会没有
    //是不是 MQ 消费这种没有主动生成 traceId 的情况,最好主动生成下
    if (currentSpan == null) {
        currentSpan = tracer.newTrace();
    }
    long l = currentSpan.context().traceId();
    int seed = positionCache.get(l).getAndIncrement();
    return new DefaultResponse(serviceInstances.stream().sorted(Comparator.comparing(ServiceInstance::getInstanceId)).collect(Collectors.toList()).get(seed % serviceInstances.size()));
}


2. WebFlux环境兼容与WebClient实现相同功能


maven依赖:

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.7.RELEASE</version>
    </parent>
    <properties>
        <disruptor.version>3.4.2</disruptor.version>
        <resilience4j-spring-cloud2.version>1.1.0</resilience4j-spring-cloud2.version>
    </properties>
    <dependencies>
        <!--内部缓存框架统一采用caffeine-->
        <!--这样Spring cloud loadbalancer用的本地实例缓存也是基于Caffeine-->
        <dependency>
            <groupId>com.github.ben-manes.caffeine</groupId>
            <artifactId>caffeine</artifactId>
        </dependency>
        <!--日志需要用log4j2-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>
        <!--lombok简化代码-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--注册到eureka-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!--spring cloud rpc相关-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <!--调用路径记录-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>
        <!--暴露actuator相关端口-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
        </dependency>
        <dependency>
            <groupId>io.github.resilience4j</groupId>
            <artifactId>resilience4j-spring-cloud2</artifactId>
            <version>${resilience4j-spring-cloud2.version}</version>
        </dependency>
        <!--log4j2异步日志需要的依赖,所有项目都必须用log4j2和异步日志配置-->
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>${disruptor.version}</version>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Hoxton.SR4</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

其他的配置是一样的,重点在于,如何使用WebClient调用其他微服务,并且实现针对Get请求重试或者是所有请求的网络 IO 异常,例如connect timeout等等,或者是断路器异常(因为请求还没发出)。

WebClient可以加入各种Filter,通过实现这些Filter来实现实例级别的断路器还有重试。

源码:WebClientConfig.java

实现重试:

private static class RetryFilter implements ExchangeFilterFunction {
    private final String serviceName;
    private RetryFilter(String serviceName) {
        this.serviceName = serviceName;
    }
    @Override
    public Mono<ClientResponse> filter(ClientRequest clientRequest, ExchangeFunction exchangeFunction) {
        return exchangeFunction.exchange(clientRequest).retryWhen(Retry.onlyIf(retryContext -> {
            //get请求一定重试
            return clientRequest.method().equals(HttpMethod.GET)
                    //connect Timeout 是一种 IOException
                    || retryContext.exception() instanceof IOException
                    //实例级别的断路器的Exception
                    || retryContext.exception() instanceof CallNotPermittedException;
        }).retryMax(1).exponentialBackoff(Duration.ofMillis(100), Duration.ofMillis(1000)));
    }
}


实例级别的断路器:

private static class InstanceCircuitBreakerFilter implements ExchangeFilterFunction {
    private final String serviceName;
    private final CircuitBreakerRegistry circuitBreakerRegistry;
    ;
    private InstanceCircuitBreakerFilter(String serviceName, CircuitBreakerRegistry circuitBreakerRegistry) {
        this.serviceName = serviceName;
        this.circuitBreakerRegistry = circuitBreakerRegistry;
    }
    @Override
    public Mono<ClientResponse> filter(ClientRequest clientRequest, ExchangeFunction exchangeFunction) {
        CircuitBreaker circuitBreaker;
        //这时候的url是经过负载均衡器的,是实例的url
        String instancId = clientRequest.url().getHost() + ":" + clientRequest.url().getPort();
        try {
            //使用实例id新建或者获取现有的CircuitBreaker,使用serviceName获取配置
            circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId, serviceName);
        } catch (ConfigurationNotFoundException e) {
            circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId);
        }
        return exchangeFunction.exchange(clientRequest).transform(CircuitBreakerOperator.of(circuitBreaker));
    }
}


组装调用某个微服务(这里是service-provider)的WebClient

public static final String SERVICE_PROVIDER = "service-provider";
@Autowired
private ReactorLoadBalancerExchangeFilterFunction lbFunction;
@Bean(SERVICE_PROVIDER)
public WebClient getWebClient(CircuitBreakerRegistry circuitBreakerRegistry) {
    ConnectionProvider provider = ConnectionProvider.builder(SERVICE_PROVIDER)
            .maxConnections(50).pendingAcquireTimeout(Duration.ofSeconds(5)).build();
    HttpClient httpClient = HttpClient.create(provider)
            .tcpConfiguration(client ->
                    //链接超时
                    client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500)
                            .doOnConnected(conn -> conn
                                    //读取超时
                                    .addHandlerLast(new ReadTimeoutHandler(1))
                                    .addHandlerLast(new WriteTimeoutHandler(1))
                            )
            );
    return WebClient.builder()
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            //Retry在负载均衡前
            .filter(new RetryFilter(SERVICE_PROVIDER))
            //负载均衡器,改写url
            .filter(lbFunction)
            //实例级别的断路器需要在负载均衡获取真正地址之后
            .filter(new InstanceCircuitBreakerFilter(SERVICE_PROVIDER, circuitBreakerRegistry))
            .baseUrl("http://" + SERVICE_PROVIDER)
            .build();
}


这样,我们就可以实现和之前feign一样的微服务调用了。

@Log4j2
@RestController
public class TestController {
    @Resource(name = WebClientConfig.SERVICE_PROVIDER)
    private WebClient webClient;
    @RequestMapping("/testGetTimeOut")
    public Mono<String> testGetTimeOut() {
        return webClient.get().uri("/test-read-time-out")
                .retrieve()
                .bodyToMono(new ParameterizedTypeReference<>() {
                });
    }
    @RequestMapping("/testPostTimeOut")
    public Mono<String> testPostTimeOut() {
        return webClient.post().uri("/test-read-time-out")
                .retrieve()
                .bodyToMono(new ParameterizedTypeReference<>() {
                });
    }
}
相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
5月前
|
监控 负载均衡 Java
深入理解Spring Cloud中的服务网关
深入理解Spring Cloud中的服务网关
|
15天前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
31 6
|
15天前
|
Java 关系型数据库 MySQL
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
35 5
|
15天前
|
缓存 监控 Java
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
27 5
|
5月前
|
Java 开发工具 git
实现基于Spring Cloud的配置中心
实现基于Spring Cloud的配置中心
|
5月前
|
设计模式 监控 Java
解析Spring Cloud中的断路器模式原理
解析Spring Cloud中的断路器模式原理
|
5月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
Spring Cloud Alibaba 发布了 Scheduling 任务调度模块 [#3732]提供了一套开源、轻量级、高可用的定时任务解决方案,帮助您快速开发微服务体系下的分布式定时任务。
15034 31
|
5月前
|
负载均衡 Java Spring
Spring cloud gateway 如何在路由时进行负载均衡
Spring cloud gateway 如何在路由时进行负载均衡
554 15
|
4月前
|
API 开发者 Java
API 版本控制不再难!Spring 框架带你玩转多样化的版本管理策略,轻松应对升级挑战!
【8月更文挑战第31天】在开发RESTful服务时,为解决向后兼容性问题,常需进行API版本控制。本文以Spring框架为例,探讨四种版本控制策略:URL版本控制、请求头版本控制、查询参数版本控制及媒体类型版本控制,并提供示例代码。此外,还介绍了通过自定义注解与过滤器实现更灵活的版本控制方案,帮助开发者根据项目需求选择最适合的方法,确保API演化的管理和客户端使用的稳定与兼容。
202 0
|
5月前
|
Java Spring
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
125 3

热门文章

最新文章