Spring Cloud LoadBalancer目前Spring官方是放在spring-cloud-commons里,
Spring Cloud最新版本为2021.0.2
Spring Cloud LoadBalancer 官网文档地址 https://docs.spring.io/spring-cloud-commons/docs/3.1.2/reference/html/#spring-cloud-loadbalancer
Spring Cloud官网文档地址 https://docs.spring.io/spring-cloud/docs/current/reference/html/
一方面Netflix Ribbon停止更新,Spring Cloud LoadBalancer是Spring Cloud官方自己提供的客户端负载均衡器,抽象和实现,用来替代Ribbon。
常见负载均衡器分为服务端负载均衡器(如网关层均衡负载)和客户端层均衡负载。
网关层如硬件层面的F5或软件层面的LVS、或者nginx等。
客户端层就如Spring Cloud LoadBalancer,作为一个客户端去发现更新维护服务列表,自定义服务的均衡负载策略(随机、轮询、小流量的金丝雀等等)。
Spring Cloud提供了自己的客户端负载平衡器抽象和实现。对于负载均衡机制,
增加了ReactiveLoadBalancer接口,并提供了基于round-robin轮询和Random随机的实现。
为了从响应式ServiceInstanceListSupplier中选择实例,需要使用ServiceInstanceListSupplier。目前支持ServiceInstanceListSupplier的基于服务发现的实现,该实现使用类路径中的发现客户端从Service Discovery中检索可用的实例。
可以通过如下配置来禁用Spring Cloud LoadBalance
spring:
cloud:
loadbalancer:
enabled: false
入门示例
前面simple-ecommerce项目创建已在父Pom引入三大父依赖,详细可以看下前面的文章<<SpringCloudAlibaba注册中心与配置中心之利器Nacos实战与源码分析>>,其中Spring Cloud的版本为2021.0.1,
前面文章也已说过,Spring Cloud Alibaba整合在spring-cloud-starter-alibaba-nacos-discovery本身就依赖spring-cloud-loadbalancer。
注意如果是Hoxton之前的版本,默认负载均衡器为Ribbon,需要移除Ribbon引用和增加配置spring.cloud.loadbalancer.ribbon.enabled: false。
如果是在Spring Boot项目中添加下面的启动器依赖,该starter也包含了Spring Boot Caching and Evictor.
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-loadbalancer</artifactId> </dependency>
我们使用Spring官方提供了负载均衡的客户端之一RestTemplate,RestTemplate是Spring提供的用于访问Rest服务的客户端,RestTemplate提供了多种便捷访问远程Http服务的方法,能够大大提高客户端的编写效率。默认情况下,RestTemplate默认依赖jdk的HTTP连接工具。创建RestTemplateConfig配置类,
标注 @LoadBalanced注解,默认使用的ReactiveLoadBalancer实现是RoundRobinLoadBalancer。
原理:
RestTemplate
Spring Cloud LoadBalancer源码分析我们先从RestTemplate负载均衡的简单实现来分析入手,除此之外其支持Spring Web Flux响应式编程的实现原理思想也是相同,都是通过客户端添加拦截器,在拦截器中实现负载均衡。从RestTemplate的源码中可以知道其继承自InterceptingHttpAccessor抽象类
而InterceptingHttpAccessor抽象类则提供了一个方法setInterceptors,用于设置拦截器,拦截器需要实现ClientHttpRequestInterceptor接口即可,在实际远程请求服务端接口之前会先调用拦截器的intercept方法。这里的拦截器相当于Servlet技术中的Filter功能
// 代码实现在抽象父类InterceptingHttpAccessor里 // RestTemplate.InterceptingHttpAccessor#setInterceptors public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) { Assert.noNullElements(interceptors, "'interceptors' must not contain null elements"); // Take getInterceptors() List as-is when passed in here if (this.interceptors != interceptors) { this.interceptors.clear(); this.interceptors.addAll(interceptors); AnnotationAwareOrderComparator.sort(this.interceptors); } }
LoadBalancerAutoConfiguration
从官网可以知道Spring Cloud LoadBalancer放在spring-cloud-commons,因此也作为其核心的@LoadBalanced注解也就是由spring-cloud-commons来实现,依据SpringBoot自动装配的原理先查看依赖包的实现逻辑,不难发现spring-cloud-commons引入了自动配置类LoadBalancerAutoConfiguration和ReactorLoadBalancerClientAutoConfiguration。
当满足上述的条件时(@Conditional为条件注解),将自动创建LoadBalancerInterceptor并注入到RestTemplate中。
LoadBalancerLnterceptor
LoadBalancerInterceptor实现了ClientHttpRequestInterceptor接口,因此也实现intercept方法,用于实现负载均衡的拦截处理。
LoadBalancerClient
LoadBalancerClient用于进行负载均衡逻辑,继承自ServiceInstanceChooser接口,从服务列表中选择出一个服务地址进行调用。在LoadBalancerClient种存在两个execute()方法,均是用来执行请求的,reconstructURI()是用来重构URL。
对于LoadBalancerClient接口Spring Cloud LoadBalancer的提供默认实现为BlockingLoadBalancerClient
@SuppressWarnings({ "unchecked", "rawtypes" }) public class BlockingLoadBalancerClient implements LoadBalancerClient { private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory; /** * @deprecated in favour of * {@link BlockingLoadBalancerClient#BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory)} */ @Deprecated public BlockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory, LoadBalancerProperties properties) { this.loadBalancerClientFactory = loadBalancerClientFactory; } public BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) { this.loadBalancerClientFactory = loadBalancerClientFactory; } @Override public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { String hint = getHint(serviceId); LoadBalancerRequestAdapter<T, DefaultRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(request, new DefaultRequestContext(request, hint)); Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId); supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest)); ServiceInstance serviceInstance = choose(serviceId, lbRequest); // 选择服务 if (serviceInstance == null) { supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete( new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, new EmptyResponse()))); throw new IllegalStateException("No instances available for " + serviceId); } return execute(serviceId, serviceInstance, lbRequest); } @Override public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { DefaultResponse defaultResponse = new DefaultResponse(serviceInstance); Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId); Request lbRequest = request instanceof Request ? (Request) request : new DefaultRequest<>(); supportedLifecycleProcessors .forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, new DefaultResponse(serviceInstance))); try { T response = request.apply(serviceInstance); Object clientResponse = getClientResponse(response); supportedLifecycleProcessors .forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS, lbRequest, defaultResponse, clientResponse))); return response; } catch (IOException iOException) { supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete( new CompletionContext<>(CompletionContext.Status.FAILED, iOException, lbRequest, defaultResponse))); throw iOException; } catch (Exception exception) { supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete( new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, defaultResponse))); ReflectionUtils.rethrowRuntimeException(exception); } return null; } private <T> Object getClientResponse(T response) { ClientHttpResponse clientHttpResponse = null; if (response instanceof ClientHttpResponse) { clientHttpResponse = (ClientHttpResponse) response; } if (clientHttpResponse != null) { try { return new ResponseData(clientHttpResponse, null); } catch (IOException ignored) { } } return response; } private Set<LoadBalancerLifecycle> getSupportedLifecycleProcessors(String serviceId) { return LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors( loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class), DefaultRequestContext.class, Object.class, ServiceInstance.class); } @Override public URI reconstructURI(ServiceInstance serviceInstance, URI original) { return LoadBalancerUriTools.reconstructURI(serviceInstance, original); } @Override public ServiceInstance choose(String serviceId) { return choose(serviceId, REQUEST); } // 通过不同的负载均衡客户端实现选择不同的服务 @Override public <T> ServiceInstance choose(String serviceId, Request<T> request) { ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId); if (loadBalancer == null) { return null; } Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block(); if (loadBalancerResponse == null) { return null; } return loadBalancerResponse.getServer(); } private String getHint(String serviceId) { LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId); String defaultHint = properties.getHint().getOrDefault("default", "default"); String hintPropertyValue = properties.getHint().get(serviceId); return hintPropertyValue != null ? hintPropertyValue : defaultHint; } }
LoadBalancerClientFactory
BlockingLoadBalancerClient中持有LoadBalancerClientFactory通过调用其getInstance方法获取具体的负载均衡客户端。通过工厂类LoadBalancerClientFactory获取具体的负载均衡器实例,后面的loadBalancer.choose(request)调用其接口choose()方法实现根据负载均衡算法选择下一个服务器完成负载均衡,而ReactiveLoadBalancer getInstance(String serviceId) 有默认实现LoadBalancerClientFactory
LoadBalancerClientFactory客户端实现了不同的负载均衡算法,比如轮询、随机等。LoadBalancerClientFactory继承自NamedContextFactory,NamedContextFactory继承ApplicationContextAware,实现Spring ApplicationContext容器操作。
ReactiveLoadBalancer
ReactiveLoadBalancer负载均衡器实现服务选择,Spring Cloud Balancer中实现了轮询RoundRobinLoadBalancer、随机RandomLoadBalancer、NacosLoadBalancer算法。
LoadBalancerClientConfiguration
如果没有显式指定负载均衡算法,默认缺省值为RoundRobinLoadBalancer
@Bean
@ConditionalOnMissingBean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new RoundRobinLoadBalancer(
loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
LoadBalancerRequestFactory
LoadBalancerRequest工厂类调用createRequest方法用于创建LoadBalancerRequest。其内部持有LoadBalancerClient对象也即持有BlockingLoadBalancerClient。
在日常项目中,一般负载均衡都是结合Feign使用,后续我们有时间再来分析Feign整合LoadBalancer的自动配置类FeignLoadBalancerAutoConfiguration的实现
ReactorLoadBalancerClientAutoConfiguration
我们也抛一下基于WebClient的@Loadbalanced的流程的引入,首先声明负载均衡过滤器ReactorLoadBalancerClientAutoConfiguration是一个自动装配器类,在项目中引入了 WebClient 和 ReactiveLoadBalancer 类之后,自动装配流程就开始运行,它会初始化一个实现了 ExchangeFilterFunction 的实例,在后面该实例将作为过滤器被注入到WebClient。后续流程有兴趣再自行研究
自定义负载均衡器
从上面可以知道LoadBalancerClientFactory是创建客户机、负载均衡器和客户机配置实例的工厂。它根据客户端名称创建一个Spring ApplicationContext,并从中提取所需的bean。因此进入到LoadBalancerClientFactory类中,需要去实现它的子接口ReactorServiceInstanceLoadBalancer,因为去获取负载均衡器实例的时候,是通过去容器中查找ReactorServiceInstanceLoadBalancer类型的bean来实现的,可以参照RandomLoadBalancer实现代码
自定义负载均衡器
从上面可以知道LoadBalancerClientFactory是创建客户机、负载均衡器和客户机配置实例的工厂。它根据客户端名称创建一个Spring ApplicationContext,并从中提取所需的bean。因此进入到LoadBalancerClientFactory类中,需要去实现它的子接口ReactorServiceInstanceLoadBalancer,因为去获取负载均衡器实例的时候,是通过去容器中查找ReactorServiceInstanceLoadBalancer类型的bean来实现的,可以参照RandomLoadBalancer实现代码