前言
Ribbon是由Netflix公司开源的一个客户端负载均衡器,主要功能是实现服务之间的负载均衡调用,内置丰富的负载均衡算法,本章意在探讨Ribbon的核心工作流程,Ribbon基本使用请看《SpringCloud极简入门-客户端负载均衡Ribbon》
Ribbon的工作流程
我们知道,微服务在启动成功之后,默认30s/次会从注册中心拉取服务注册表到本地缓存起来,而我们使用Ribbon时是通过RestTemplate发起请求,URL以:http://user-server/user/...
服务名方式去调用服务,其实Ribbon干的事情就是根据URL中的服务名去本地的服务注册表中查找服务名对应的服务实例(一个或多个),然后通过负载均衡算法选择其中一个服务后,发起Http请求,那接下来我们就来看一下Ribbon的底层到底是怎么工作的
Ribbon配合RestTemplate使用
在《SpringCloud极简入门-客户端负载均衡Ribbon》中我们有讲解到Ribbon的基本使用,首选需要定义RestTemplate
@SpringBootApplication
@EnableEurekaClient
public class OrderServerApplication1030
{
//配置一个RestTemplate ,http客户端,支持Rest风格
//@LoadBalanced :负载均衡注册,让RestTmplate可以实现负载均衡请求
//这个标签标记RestTemplate可以使用LoadBalancerClient进行负载均衡
@Bean
@LoadBalanced
public RestTemplate restTemplate(){
return new RestTemplate();
}
//省略...
调用服务的是否使用服务名调用
@RestController
public class OrderController {
//需要配置成Bean
@Autowired
private RestTemplate restTemplate ;
//浏览器调用该方法
@RequestMapping(value = "/order/{id}",method = RequestMethod.GET)
public User getById(@PathVariable("id")Long id){
//发送http请求调用 user的服务,获取user对象 : RestTemplate
//user的ip,user的端口,user的Controller路径
//String url = "http://localhost:1020/user/"+id;
String url = "http://user-server/user/"+id;
//发送http请求
return restTemplate.getForObject(url, User.class);
}
}
被 @LoadBalanced标记的RestTemplate 可以使用LoadBalancerClient负载均衡客户端实现负载均衡,我们在使用RestTemplate 发起请求的时候需要跟上服务名的方式http://user-server/user/
LoadBalancerClient负载均衡客户端
我们从 @LoadBalanced入手,先看一下LoadBalancerClient它的源码
/**
注解标记RestTemplate 可以使用LoadBalancerClient 负载均衡客户端
* Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient
* @author Spencer Gibb
*/
@Target({
ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
这个注解@LoadBalanced 的作用在注释上说的非常清楚,就是标记RestTemplate课程可以使用使用LoadBalancerClient来实现负载均衡,LoadBalancerClient就是Ribbon实现负载均衡的一个客户端,它在spring-cloud-commons包下,我们可以直接看LoadBalancerClient的源码
/**
客户端负载均衡
* Represents a client side load balancer
* @author Spencer Gibb
*/
public interface LoadBalancerClient extends ServiceInstanceChooser {
//执行请求,会根据serviceId使用负载均衡查找服务
/**
使用负载均衡器执行指定服务
* execute request using a ServiceInstance from the LoadBalancer for the specified service
* 服务Id - 服务ID来查找负载平衡器
* @param serviceId the service id to look up the LoadBalancer
*
* @param request allows implementations to execute pre and post actions such as
* incrementing metrics
* 返回选择的服务
* @return the result of the LoadBalancerRequest callback on the selected
* ServiceInstance
*/
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
//执行请求,这个方法多了一个参数ServiceInstance ,即请求指定的服务
/**
* execute request using a ServiceInstance from the LoadBalancer for the specified
* service
* @param serviceId the service id to look up the LoadBalancer
* @param serviceInstance the service to execute the request to
* @param request allows implementations to execute pre and post actions such as
* incrementing metrics
* @return the result of the LoadBalancerRequest callback on the selected
* ServiceInstance
*/
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
//重构URL,把http://myservice/path/to/service重构成http://ip:端口/path/to/service
/**
* Create a proper URI with a real host and port for systems to utilize.
* Some systems use a URI with the logical serivce name as the host,
* such as http://myservice/path/to/service. This will replace the
* service name with the host:port from the ServiceInstance.
* @param instance
* @param original a URI with the host as a logical service name
* @return a reconstructed URI
*/
URI reconstructURI(ServiceInstance instance, URI original);
}
LoadBalancerClient接口三个方法,excute()为执行请求,reconstructURI()用来重构url,它实现了ServiceInstanceChooser 接口,这个接口的作用是用来选择服务的,看下源码
/**
通过使用负载平衡器,选择一个服务器发送请求。
* Implemented by classes which use a load balancer to choose a server to
* send a request to.
*
* @author Ryan Baxter
*/
public interface ServiceInstanceChooser {
/**
从LoadBalancer中为指定服务选择一个ServiceInstance
* Choose a ServiceInstance from the LoadBalancer for the specified service
*
* //根据服务id去LoadBalancer查找服务
* @param serviceId the service id to look up the LoadBalancer
*
* 返回查找到的服务实例ServiceInstance
* @return a ServiceInstance that matches the serviceId
*/
ServiceInstance choose(String serviceId);
}
提供了一个choose方法,根据服务ID serviceId 查找一个ServiceInstance 服务实例,这里的serviceId其实就是http://user-server/... url中带的服务名。
LoadBalancerClient 还有一个默认实现类RibbonLoadBalancerClient,这个实现是针对Ribbon的客户端负载均衡,继承关系如下:
RibbonLoadBalancerClient是一个非常核心的类,最终的负载均衡的请求处理由它来执行,源码如下:
public class RibbonLoadBalancerClient implements LoadBalancerClient {
private SpringClientFactory clientFactory;
public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
this.clientFactory = clientFactory;
}
//重构URL,找到服务之后,把http://服务名/ 格式 重构成 http://ip:port/ 格式
@Override
public URI reconstructURI(ServiceInstance instance, URI original) {
Assert.notNull(instance, "instance can not be null");
String serviceId = instance.getServiceId();
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
URI uri;
Server server;
if (instance instanceof RibbonServer) {
RibbonServer ribbonServer = (RibbonServer) instance;
server = ribbonServer.getServer();
uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
} else {
server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
uri = updateToSecureConnectionIfNeeded(original, clientConfig,
serverIntrospector, server);
}
return context.reconstructURIWithServer(server, uri);
}
//根据服务名,查找服务实例,选择一个返回ServiceInstance
@Override
public ServiceInstance choose(String serviceId) {
//查找服务
Server server = getServer(serviceId);
if (server == null) {
return null;
}
return new RibbonServer(serviceId, server, isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
}
//执行请求
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
//获取负载均衡器[重要]
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
//选择服务,使用负载均衡器,根据服务的ID,选择一个服务
Server server = getServer(loadBalancer);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
//选择的服务封装成一个RibbonServer:RibbonServer implements ServiceInstance
RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
serviceId), serverIntrospector(serviceId).getMetadata(server));
//执行请求调用服务
return execute(serviceId, ribbonServer, request);
}
//执行请求调用服务
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if(serviceInstance instanceof RibbonServer) {
server = ((RibbonServer)serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
//使用 LoadBalancerRequest 向服务发请求
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
private ServerIntrospector serverIntrospector(String serviceId) {
ServerIntrospector serverIntrospector = this.clientFactory.getInstance(serviceId,
ServerIntrospector.class);
if (serverIntrospector == null) {
serverIntrospector = new DefaultServerIntrospector();
}
return serverIntrospector;
}
//是否是https请求
private boolean isSecure(Server server, String serviceId) {
IClientConfig config = this.clientFactory.getClientConfig(serviceId);
ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
return RibbonUtils.isSecure(config, serverIntrospector, server);
}
//根据服务ID选择服务
protected Server getServer(String serviceId) {
return getServer(getLoadBalancer(serviceId));
}
//负载均衡器选择服务
protected Server getServer(ILoadBalancer loadBalancer) {
if (loadBalancer == null) {
return null;
}
return loadBalancer.chooseServer("default"); // TODO: better handling of key
}
//根据服务id得到负载均衡器
protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.clientFactory.getLoadBalancer(serviceId);
}
...省略...
解释一下:
- 这里的
ServiceInstance choose(String serviceId)
方法的作用是根据ServideId选择一个服务,底层实现是通过LoadBalancer.chooseServer
负载均衡器LoadBalancer来完成的服务的选择的 - 选择到服务之后调用
execute
向选择到的服务发起请求,通过LoadBalancerRequest来完成其请求。
RestTemplate的执行流程
RestTmplate发请求时地址 "http://user-server/user/"+id
; 中 user-server是当前服务需要调用的目标服务的服务名,那么Ribbon到底是如何实现负载均衡调用的呢?我们可以从这里跟踪一下RestTemplate的执行流程
public class RestTemplate extends InterceptingHttpAccessor implements RestOperations {
...省略...
@Nullable
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback, @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
Assert.notNull(url, "URI is required");
Assert.notNull(method, "HttpMethod is required");
ClientHttpResponse response = null;
Object var14;
try {
//创建请求对象,使用SimpleClientHttpRequestFactory创建ClientHttpRequest
ClientHttpRequest request = this.createRequest(url, method);
if (requestCallback != null) {
//设置header和body
requestCallback.doWithRequest(request);
}
response = request.execute();
this.handleResponse(url, method, response);
var14 = responseExtractor != null ? responseExtractor.extractData(response) : null;
} catch (IOException var12) {
String resource = url.toString();
String query = url.getRawQuery();
resource = query != null ? resource.substring(0, resource.indexOf(63)) : resource;
throw new ResourceAccessException("I/O error on " + method.name() + " request for \"" + resource + "\": " + var12.getMessage(), var12);
} finally {
if (response != null) {
response.close();
}
}
return var14;
}
请求来到RestTemplate#doExecute
方法,首选是通过使用SimpleClientHttpRequestFactory
根据url和method创建ClientHttpRequest
请求对象,使用的实现是InterceptingClientHttpRequestFactory
,然后使用response = request.execute();
去执行请求,一路跟踪,请求来到InterceptingClientHttpRequest#executeInternal
class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {
//headers请求头 , bufferedOutput输出内容
protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
//创建拦截器执行器
InterceptingClientHttpRequest.InterceptingRequestExecution requestExecution = new InterceptingClientHttpRequest.InterceptingRequestExecution();
return requestExecution.execute(this, bufferedOutput);
}
这里通过InterceptingClientHttpRequest.InterceptingRequestExecution()
拦截器执行器去执行请求,请求来到InterceptingClientHttpRequest.InterceptingRequestExecution#execute
private class InterceptingRequestExecution implements ClientHttpRequestExecution {
private final Iterator<ClientHttpRequestInterceptor> iterator;
public InterceptingRequestExecution() {
this.iterator = InterceptingClientHttpRequest.this.interceptors.iterator();
}
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
//[重要]这里取到的正是 LoadBalancerInterceptor
ClientHttpRequestInterceptor nextInterceptor = (ClientHttpRequestInterceptor)this.iterator.next();
return nextInterceptor.intercept(request, body, this);
} else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
//如果iterator中没有拦截器了,就创建一个ClientHttpRequest去执行请求
ClientHttpRequest delegate = InterceptingClientHttpRequest.this.requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> {
delegate.getHeaders().addAll(key, value);
});
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage)delegate;
streamingOutputMessage.setBody((outputStream) -> {
StreamUtils.copy(body, outputStream);
});
} else {
StreamUtils.copy(body, delegate.getBody());
}
}
//执行请求
return delegate.execute();
}
}
}
InterceptingRequestExecution
中维护了一个Iterator<ClientHttpRequestInterceptor>
iterator;其中LoadBalancerInterceptor
就在该集合中,所以请求来到LoadBalancerInterceptor #intercept(request, body, this);
方法
//负载均衡拦截器
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
//负载均衡客户端[重要]
private LoadBalancerClient loadBalancer;
//负载均衡请求创建工厂
private LoadBalancerRequestFactory requestFactory;
//初始化
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
//初始化
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
//拦截器核心方法【重要】
//request请求对象
//body 内容
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
//请求的URL,格式如:http://user-server/user/1 ,user-server是服务名
final URI originalUri = request.getURI();
//URL中的服务名
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
//通过requestFactory.createRequest(request, body, execution)创建LoadBalancerRequest
//然后调用负载均衡器执行请求,参数:服务名,LoadBalancerRequest
return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}
}
这里蛮重要的,请求调用了LoadBalancerInterceptor #intercept
负载均衡拦截器的拦截方法,获取到URL,从中获取到主机名即调用的服务名(Ribbon客户端服务名),然后使用LoadBalancerRequestFactory
创建了LoadBalancerRequest
请求对象,调用loadBalancer#execute
负载均衡器执行请求
ILoadBalancer 选择服务(负载均衡)
请求来到RibbonLoadBalancerClient#execute
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
//获取负载均衡器
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
//loadBalancer选择服务
Server server = getServer(loadBalancer);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
//选择的服务封装成RibbonServer
RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
serviceId), serverIntrospector(serviceId).getMetadata(server));
//LoadBalancerRequest对服务执行请求
return execute(serviceId, ribbonServer, request);
}
这里就蛮关键了
- 首选是通过服务名调用getLoadBalancer方法得到负载均衡器
- 然后getServer(loadBalancer)是通过负载均衡器选择一个服务,底层会使用IRule的算法
- 然后将服务封装成RibbonServer 对象,交给LoadBalancerRequest去执行请求
这里的负载均衡器默认会走ZoneAwareLoadBalancer,它是通过SpringClientFactory 从Ribbon上下文对象中获取到的负载均衡器对象,关于这个我们在上一章讨论过
public class RibbonLoadBalancerClient implements LoadBalancerClient {
...省略...
private SpringClientFactory clientFactory;
protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.clientFactory.getLoadBalancer(serviceId);
}
而得到ILoadBalancer之后,调用getServer(loadBalancer)方法选择服务,我们跟踪一下
public class RibbonLoadBalancerClient implements LoadBalancerClient {
...省略...
protected Server getServer(ILoadBalancer loadBalancer) {
if (loadBalancer == null) {
return null;
}
//ZoneAwareLoadBalancer#chooseServer
return loadBalancer.chooseServer("default"); // TODO: better handling of key
}
这里loadBalancer.chooseServer("default");
请求来到ZoneAwareLoadBalancer#chooseServer
,源码如下:
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
...省略...
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
//如果禁用了zone,或者自由一个zone会走这里
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
//下面就是根据zone选择服务了,默认情况下不会走下面
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
//得到zone快照
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
//得到可用的zone
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
//随机选择区域
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
//选择服务
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
这里做了一个判断,如果没有设置zone或者只有一个zone(默认),这里会调用 return super.chooseServer(key);
通过父类的BaseLoadBalancer#chooseServer
方法选择服务,这也是默认的执行流程,代码走到了BaseLoadBalancer#chooseServer
方法中,源码如下
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
public Server chooseServer(Object key) {
if (counter == null) {
//创建一个计数器
counter = createCounter();
}
//计数器增加
counter.increment();
//如果负载均衡规则为空,返回空
if (rule == null) {
return null;
} else {
try {
//[重要]调用了负载均衡器算法类的choose方法
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
在BaseLoadBalancer #chooseServer
方法中调用了IRule#choose
方法进行服务的选择服务,IRule有很多是算法策略实现类,默认会走轮询算法,如果有定义负载均衡算法,这里rule.choose调用的就是定义的算法类
这里我打了个端点,跟踪了一下源码发现默认情况下会从BaseLoadBalancer#chooseServer
方法中调用PredicateBasedRule#choose
,PredicateBasedRule本身是继承ClientConfigEnabledRoundRobinRule
,也就是说PredicateBasedRule
是使用的是轮询算法,同时它扩展了Predicate功能,即:提供了服务器过滤逻辑
/**
一个规则,提供了服务器过滤逻辑,具体使用的是AbstractServerPredicate实现过滤功能。 过滤后,服务器从过滤列表中的循环方式返回。
* A rule which delegates the server filtering logic to an instance of {@link AbstractServerPredicate}.
* After filtering, a server is returned from filtered list in a round robin fashion.
*
*
* @author awang
*
*/
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
/**
抽象函数,返回AbstractServerPredicate,用来对服务做过滤的
* Method that provides an instance of {@link AbstractServerPredicate} to be used by this class.
*
*/
public abstract AbstractServerPredicate getPredicate();
/**
* Get a server by calling {@link AbstractServerPredicate#chooseRandomlyAfterFiltering(java.util.List, Object)}.
* The performance for this method is O(n) where n is number of servers to be filtered.
*/
@Override
public Server choose(Object key) {
//得到负载均衡器
ILoadBalancer lb = getLoadBalancer();
//通过AbstractServerPredicate的chooseRoundRobinAfterFiltering选出具体的服务实例
//AbstractServerPredicate的子类实现的Predicate逻辑来过滤一部分服务实例
//然后在以线性轮询的方式从过滤后的实例中选出一个
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
}
这里使用了AbstractServerPredicate#chooseRoundRobinAfterFiltering
来选择服务从lb.getAllServers()得到所有的服务作为参数,继续跟踪下去
/**
* Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key.
*/
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
//得到合格的服务列表,主要根据zone做一个过滤
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
//没找到合格的服务
return Optional.absent();
}
//以线性轮询的方式合格的服务列表获取一个实例
//incrementAndGetModulo方法会以轮询的方式计算一个下标值
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
...省略...
/**
引用于 RoundRobinRule 算法策略 , 轮询
* Referenced from RoundRobinRule
* Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
*
* @param modulo The modulo to bound the value of the counter.
* @return The next value.
*/
//增量和取模实现轮询
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextIndex.get();
int next = (current + 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo)
return current;
}
}
这里首先会通过zone过滤出可用的服务列表,然后使用轮询算法选择一个服务返回,到这里选择服务的流程调用
LoadBalancerRequest 执行服务
代码继续回到 RibbonLoadBalancerClient#execute,选择完服务之后,服务被封装成RibbonServer
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
//得到负载均衡器
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
//选择服务
Server server = getServer(loadBalancer);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
//把server 封装成RibbonServer
RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
serviceId), serverIntrospector(serviceId).getMetadata(server));
//执行服务调用
return execute(serviceId, ribbonServer, request);
}
找到服务后,调用了execute方法执行后续请求
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if(serviceInstance instanceof RibbonServer) {
server = ((RibbonServer)serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
//加载Ribbon负载均衡器上下文对象
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
//LoadBalancerRequest.apply执行请求
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
这里调用LoadBalancerRequest.apply
执行请求,后面还会调用LoadBalancerRequestFactory#createRequest
方法创建请求,调用ClientHttpRequestExecution#execute
执行,然后又会将请求委派给ClientHttpRequest#execute
去执行,再往后面走就是创建HttpURLConnection
链接对象发送请求了,我们就不继续跟下去了。
总结
纵观Ribbon的工作流程大致如下
- 初始化的时候创建好Ribbon的上下文,以及相关的组件,如ILoadBalancer,IConfig,IRule等等
- 初始化过程中会通过ServerList从EurekaClient加载负载均衡候选的服务列表,并定时更新服务列表,使用ServerListFilter过滤之后,使用IPing检查是否更新服务列表
- 被注解了@LoadBalance标签的RestTemplate可以使用LoadBalancerClient作负载均衡,并添加好拦截器LoadBalancerInterceptor
- 当请求发起RestTemplate会把请求交给LoadBalancerInterceptor 拦截器,LoadBalancerInterceptor 拦截器调用
- LoadBalancerClient接收到请求使用ILoadBalancer负载均衡器选择服务,底层用到IRule算法
- 选择好服务之后,LoadBalancerClient把请求交给LoadBalancerRequest去执行