微服务生态组件之Spring Cloud LoadBalancer详解和源码分析

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: Spring Cloud LoadBalancer详解和源码分析

Spring Cloud LoadBalancer目前Spring官方是放在spring-cloud-commons里,

Spring Cloud最新版本为2021.0.2

Spring Cloud LoadBalancer 官网文档地址 https://docs.spring.io/spring-cloud-commons/docs/3.1.2/reference/html/#spring-cloud-loadbalancer

Spring Cloud官网文档地址 https://docs.spring.io/spring-cloud/docs/current/reference/html/


一方面Netflix Ribbon停止更新,Spring Cloud LoadBalancer是Spring Cloud官方自己提供的客户端负载均衡器,抽象和实现,用来替代Ribbon。

常见负载均衡器分为服务端负载均衡器(如网关层均衡负载)和客户端层均衡负载。

网关层如硬件层面的F5或软件层面的LVS、或者nginx等。

客户端层就如Spring Cloud LoadBalancer,作为一个客户端去发现更新维护服务列表,自定义服务的均衡负载策略(随机、轮询、小流量的金丝雀等等)。

Spring Cloud提供了自己的客户端负载平衡器抽象和实现。对于负载均衡机制,

增加了ReactiveLoadBalancer接口,并提供了基于round-robin轮询和Random随机的实现。

为了从响应式ServiceInstanceListSupplier中选择实例,需要使用ServiceInstanceListSupplier。目前支持ServiceInstanceListSupplier的基于服务发现的实现,该实现使用类路径中的发现客户端从Service Discovery中检索可用的实例。

可以通过如下配置来禁用Spring Cloud LoadBalance

spring:

 cloud:

   loadbalancer:

     enabled: false


入门示例

前面simple-ecommerce项目创建已在父Pom引入三大父依赖,详细可以看下前面的文章<<SpringCloudAlibaba注册中心与配置中心之利器Nacos实战与源码分析>>,其中Spring Cloud的版本为2021.0.1,

前面文章也已说过,Spring Cloud Alibaba整合在spring-cloud-starter-alibaba-nacos-discovery本身就依赖spring-cloud-loadbalancer。

注意如果是Hoxton之前的版本,默认负载均衡器为Ribbon,需要移除Ribbon引用和增加配置spring.cloud.loadbalancer.ribbon.enabled: false。

如果是在Spring Boot项目中添加下面的启动器依赖,该starter也包含了Spring Boot Caching and Evictor.

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>


我们使用Spring官方提供了负载均衡的客户端之一RestTemplate,RestTemplate是Spring提供的用于访问Rest服务的客户端,RestTemplate提供了多种便捷访问远程Http服务的方法,能够大大提高客户端的编写效率。默认情况下,RestTemplate默认依赖jdk的HTTP连接工具。创建RestTemplateConfig配置类,

标注 @LoadBalanced注解,默认使用的ReactiveLoadBalancer实现是RoundRobinLoadBalancer。


原理:

RestTemplate

Spring Cloud LoadBalancer源码分析我们先从RestTemplate负载均衡的简单实现来分析入手,除此之外其支持Spring Web Flux响应式编程的实现原理思想也是相同,都是通过客户端添加拦截器,在拦截器中实现负载均衡。从RestTemplate的源码中可以知道其继承自InterceptingHttpAccessor抽象类

而InterceptingHttpAccessor抽象类则提供了一个方法setInterceptors,用于设置拦截器,拦截器需要实现ClientHttpRequestInterceptor接口即可,在实际远程请求服务端接口之前会先调用拦截器的intercept方法。这里的拦截器相当于Servlet技术中的Filter功能

// 代码实现在抽象父类InterceptingHttpAccessor里
// RestTemplate.InterceptingHttpAccessor#setInterceptors
public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) {
 Assert.noNullElements(interceptors, "'interceptors' must not contain null elements");
 // Take getInterceptors() List as-is when passed in here
 if (this.interceptors != interceptors) {
  this.interceptors.clear();
  this.interceptors.addAll(interceptors);
  AnnotationAwareOrderComparator.sort(this.interceptors);
 }
}



LoadBalancerAutoConfiguration

从官网可以知道Spring Cloud LoadBalancer放在spring-cloud-commons,因此也作为其核心的@LoadBalanced注解也就是由spring-cloud-commons来实现,依据SpringBoot自动装配的原理先查看依赖包的实现逻辑,不难发现spring-cloud-commons引入了自动配置类LoadBalancerAutoConfiguration和ReactorLoadBalancerClientAutoConfiguration。

当满足上述的条件时(@Conditional为条件注解),将自动创建LoadBalancerInterceptor并注入到RestTemplate中。

LoadBalancerLnterceptor

LoadBalancerInterceptor实现了ClientHttpRequestInterceptor接口,因此也实现intercept方法,用于实现负载均衡的拦截处理。

LoadBalancerClient

LoadBalancerClient用于进行负载均衡逻辑,继承自ServiceInstanceChooser接口,从服务列表中选择出一个服务地址进行调用。在LoadBalancerClient种存在两个execute()方法,均是用来执行请求的,reconstructURI()是用来重构URL。

对于LoadBalancerClient接口Spring Cloud LoadBalancer的提供默认实现为BlockingLoadBalancerClient


@SuppressWarnings({ "unchecked", "rawtypes" })
public class BlockingLoadBalancerClient implements LoadBalancerClient {
private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;
/**
 * @deprecated in favour of
 * {@link BlockingLoadBalancerClient#BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory)}
 */
@Deprecated
public BlockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory,
  LoadBalancerProperties properties) {
 this.loadBalancerClientFactory = loadBalancerClientFactory;
}
public BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) {
 this.loadBalancerClientFactory = loadBalancerClientFactory;
}
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
 String hint = getHint(serviceId);
 LoadBalancerRequestAdapter<T, DefaultRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(request,
   new DefaultRequestContext(request, hint));
 Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
 supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
 ServiceInstance serviceInstance = choose(serviceId, lbRequest);
    // 选择服务
 if (serviceInstance == null) {
  supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
    new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, new EmptyResponse())));
  throw new IllegalStateException("No instances available for " + serviceId);
 }
 return execute(serviceId, serviceInstance, lbRequest);
}
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request)
  throws IOException {
 DefaultResponse defaultResponse = new DefaultResponse(serviceInstance);
 Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
 Request lbRequest = request instanceof Request ? (Request) request : new DefaultRequest<>();
 supportedLifecycleProcessors
   .forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, new DefaultResponse(serviceInstance)));
 try {
  T response = request.apply(serviceInstance);
  Object clientResponse = getClientResponse(response);
  supportedLifecycleProcessors
    .forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS,
      lbRequest, defaultResponse, clientResponse)));
  return response;
 }
 catch (IOException iOException) {
  supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
    new CompletionContext<>(CompletionContext.Status.FAILED, iOException, lbRequest, defaultResponse)));
  throw iOException;
 }
 catch (Exception exception) {
  supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
    new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, defaultResponse)));
  ReflectionUtils.rethrowRuntimeException(exception);
 }
 return null;
}
private <T> Object getClientResponse(T response) {
 ClientHttpResponse clientHttpResponse = null;
 if (response instanceof ClientHttpResponse) {
  clientHttpResponse = (ClientHttpResponse) response;
 }
 if (clientHttpResponse != null) {
  try {
   return new ResponseData(clientHttpResponse, null);
  }
  catch (IOException ignored) {
  }
 }
 return response;
}
private Set<LoadBalancerLifecycle> getSupportedLifecycleProcessors(String serviceId) {
 return LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(
   loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
   DefaultRequestContext.class, Object.class, ServiceInstance.class);
}
@Override
public URI reconstructURI(ServiceInstance serviceInstance, URI original) {
 return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
}
@Override
public ServiceInstance choose(String serviceId) {
 return choose(serviceId, REQUEST);
}
// 通过不同的负载均衡客户端实现选择不同的服务
@Override
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
 ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
 if (loadBalancer == null) {
  return null;
 }
 Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
 if (loadBalancerResponse == null) {
  return null;
 }
 return loadBalancerResponse.getServer();
}
private String getHint(String serviceId) {
 LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);
 String defaultHint = properties.getHint().getOrDefault("default", "default");
 String hintPropertyValue = properties.getHint().get(serviceId);
 return hintPropertyValue != null ? hintPropertyValue : defaultHint;
}
}


LoadBalancerClientFactory

BlockingLoadBalancerClient中持有LoadBalancerClientFactory通过调用其getInstance方法获取具体的负载均衡客户端。通过工厂类LoadBalancerClientFactory获取具体的负载均衡器实例,后面的loadBalancer.choose(request)调用其接口choose()方法实现根据负载均衡算法选择下一个服务器完成负载均衡,而ReactiveLoadBalancer getInstance(String serviceId) 有默认实现LoadBalancerClientFactory

LoadBalancerClientFactory客户端实现了不同的负载均衡算法,比如轮询、随机等。LoadBalancerClientFactory继承自NamedContextFactory,NamedContextFactory继承ApplicationContextAware,实现Spring ApplicationContext容器操作。

ReactiveLoadBalancer

ReactiveLoadBalancer负载均衡器实现服务选择,Spring Cloud Balancer中实现了轮询RoundRobinLoadBalancer、随机RandomLoadBalancer、NacosLoadBalancer算法。

LoadBalancerClientConfiguration
如果没有显式指定负载均衡算法,默认缺省值为RoundRobinLoadBalancer


@Bean

@ConditionalOnMissingBean

public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,

LoadBalancerClientFactory loadBalancerClientFactory) {

String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);

return new RoundRobinLoadBalancer(

  loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);

}


LoadBalancerRequestFactory

LoadBalancerRequest工厂类调用createRequest方法用于创建LoadBalancerRequest。其内部持有LoadBalancerClient对象也即持有BlockingLoadBalancerClient。

在日常项目中,一般负载均衡都是结合Feign使用,后续我们有时间再来分析Feign整合LoadBalancer的自动配置类FeignLoadBalancerAutoConfiguration的实现



ReactorLoadBalancerClientAutoConfiguration

我们也抛一下基于WebClient的@Loadbalanced的流程的引入,首先声明负载均衡过滤器ReactorLoadBalancerClientAutoConfiguration是一个自动装配器类,在项目中引入了 WebClient 和 ReactiveLoadBalancer 类之后,自动装配流程就开始运行,它会初始化一个实现了 ExchangeFilterFunction 的实例,在后面该实例将作为过滤器被注入到WebClient。后续流程有兴趣再自行研究

自定义负载均衡器
从上面可以知道LoadBalancerClientFactory是创建客户机、负载均衡器和客户机配置实例的工厂。它根据客户端名称创建一个Spring ApplicationContext,并从中提取所需的bean。因此进入到LoadBalancerClientFactory类中,需要去实现它的子接口ReactorServiceInstanceLoadBalancer,因为去获取负载均衡器实例的时候,是通过去容器中查找ReactorServiceInstanceLoadBalancer类型的bean来实现的,可以参照RandomLoadBalancer实现代码

自定义负载均衡器

从上面可以知道LoadBalancerClientFactory是创建客户机、负载均衡器和客户机配置实例的工厂。它根据客户端名称创建一个Spring ApplicationContext,并从中提取所需的bean。因此进入到LoadBalancerClientFactory类中,需要去实现它的子接口ReactorServiceInstanceLoadBalancer,因为去获取负载均衡器实例的时候,是通过去容器中查找ReactorServiceInstanceLoadBalancer类型的bean来实现的,可以参照RandomLoadBalancer实现代码

相关实践学习
小试牛刀,一键部署电商商城
SAE 仅需一键,极速部署一个微服务电商商城,体验 Serverless 带给您的全托管体验,一起来部署吧!
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
1月前
|
NoSQL 安全 Java
深入理解 RedisConnectionFactory:Spring Data Redis 的核心组件
在 Spring Data Redis 中,`RedisConnectionFactory` 是核心组件,负责创建和管理与 Redis 的连接。它支持单机、集群及哨兵等多种模式,为上层组件(如 `RedisTemplate`)提供连接抽象。Spring 提供了 Lettuce 和 Jedis 两种主要实现,其中 Lettuce 因其线程安全和高性能特性被广泛推荐。通过手动配置或 Spring Boot 自动化配置,开发者可轻松集成 Redis,提升应用性能与扩展性。本文深入解析其作用、实现方式及常见问题解决方法,助你高效使用 Redis。
123 4
|
2月前
|
安全 Java Apache
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 身份和权限认证
本文介绍了 Apache Shiro 的身份认证与权限认证机制。在身份认证部分,分析了 Shiro 的认证流程,包括应用程序调用 `Subject.login(token)` 方法、SecurityManager 接管认证以及通过 Realm 进行具体的安全验证。权限认证部分阐述了权限(permission)、角色(role)和用户(user)三者的关系,其中用户可拥有多个角色,角色则对应不同的权限组合,例如普通用户仅能查看或添加信息,而管理员可执行所有操作。
101 0
|
1月前
|
人工智能 Java 数据库
飞算 JavaAI:革新电商订单系统 Spring Boot 微服务开发
在电商订单系统开发中,传统方式耗时约30天,需应对复杂代码、调试与测试。飞算JavaAI作为一款AI代码生成工具,专注于简化Spring Boot微服务开发。它能根据业务需求自动生成RESTful API、数据库交互及事务管理代码,将开发时间缩短至1小时,效率提升80%。通过减少样板代码编写,提供规范且准确的代码,飞算JavaAI显著降低了开发成本,为软件开发带来革新动力。
|
2月前
|
安全 Java 数据安全/隐私保护
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 三大核心组件
本课程介绍如何在Spring Boot中集成Shiro框架,主要讲解Shiro的认证与授权功能。Shiro是一个简单易用的Java安全框架,用于认证、授权、加密和会话管理等。其核心组件包括Subject(认证主体)、SecurityManager(安全管理员)和Realm(域)。Subject负责身份认证,包含Principals(身份)和Credentials(凭证);SecurityManager是架构核心,协调内部组件运作;Realm则是连接Shiro与应用数据的桥梁,用于访问用户账户及权限信息。通过学习,您将掌握Shiro的基本原理及其在项目中的应用。
105 0
|
2月前
|
消息中间件 存储 Java
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——ActiveMQ安装
本教程介绍ActiveMQ的安装与基本使用。首先从官网下载apache-activemq-5.15.3版本,解压后即可完成安装,非常便捷。启动时进入解压目录下的bin文件夹,根据系统选择win32或win64,运行activemq.bat启动服务。通过浏览器访问`http://127.0.0.1:8161/admin/`可进入管理界面,默认用户名密码为admin/admin。ActiveMQ支持两种消息模式:点对点(Queue)和发布/订阅(Topic)。前者确保每条消息仅被一个消费者消费,后者允许多个消费者同时接收相同消息。
71 0
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——ActiveMQ安装
|
2月前
|
消息中间件 Java 微服务
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——发布/订阅消息的生产和消费
本文详细讲解了Spring Boot中ActiveMQ的发布/订阅消息机制,包括消息生产和消费的具体实现方式。生产端通过`sendMessage`方法发送订阅消息,消费端则需配置`application.yml`或自定义工厂以支持topic消息监听。为解决点对点与发布/订阅消息兼容问题,可通过设置`containerFactory`实现两者共存。最后,文章还提供了测试方法及总结,帮助读者掌握ActiveMQ在异步消息处理中的应用。
97 0
|
2月前
|
消息中间件 网络协议 Java
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——ActiveMQ集成
本文介绍了在 Spring Boot 中集成 ActiveMQ 的详细步骤。首先通过引入 `spring-boot-starter-activemq` 依赖并配置 `application.yml` 文件实现基本设置。接着,创建 Queue 和 Topic 消息类型,分别使用 `ActiveMQQueue` 和 `ActiveMQTopic` 类完成配置。随后,利用 `JmsMessagingTemplate` 实现消息发送功能,并通过 Controller 和监听器实现点对点消息的生产和消费。最后,通过浏览器访问测试接口验证消息传递的成功性。
58 0
|
2月前
|
消息中间件 Java API
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ—— JMS 和 ActiveMQ 介绍
本文介绍如何在Spring Boot中集成ActiveMQ,首先阐述了JMS(Java消息服务)的概念及其作为与具体平台无关的API在异步通信中的作用。接着说明了JMS的主要对象模型,如连接工厂、会话、生产者和消费者等,并指出JMS支持点对点和发布/订阅两种消息类型。随后重点讲解了ActiveMQ,作为Apache开源的消息总线,它完全支持JMS规范,适用于异步消息处理。最后,文章探讨了在Spring Boot中使用队列(Queue)和主题(Topic)这两种消息通信形式的方法。
54 0
|
2月前
|
NoSQL Java API
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Spring Boot 集成 Redis
本文介绍了在Spring Boot中集成Redis的方法,包括依赖导入、Redis配置及常用API的使用。通过导入`spring-boot-starter-data-redis`依赖和配置`application.yml`文件,可轻松实现Redis集成。文中详细讲解了StringRedisTemplate的使用,适用于字符串操作,并结合FastJSON将实体类转换为JSON存储。还展示了Redis的string、hash和list类型的操作示例。最后总结了Redis在缓存和高并发场景中的应用价值,并提供课程源代码下载链接。
103 0
|
2月前
|
NoSQL Java Redis
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Redis 安装
本教程介绍在 VMware 虚拟机(CentOS 7)或阿里云服务器中安装 Redis 的过程,包括安装 gcc 编译环境、下载 Redis(官网或 wget)、解压安装、修改配置文件(如 bind、daemonize、requirepass 等设置)、启动 Redis 服务及测试客户端连接。通过 set 和 get 命令验证安装是否成功。适用于初学者快速上手 Redis 部署。
44 0