为什么加上@LoadBalanced,RestTemplate就有负载均衡的能力呢?源码分析:
RestTemplate拦截器
首先看RestTemplate类,继承了InterceptingHttpAccessor,代码中有个类型为ClientHttpRequestInterceptor拦截器集合,Ribbon的功能就是在这扩展的。
public class RestTemplate extends InterceptingHttpAccessor implements RestOperations {
......
}
public abstract class InterceptingHttpAccessor extends HttpAccessor {
private final List<ClientHttpRequestInterceptor> interceptors = new ArrayList();
......
}
拦截器注入
Interceptor在哪里注入给RestTemplate的?看LoadBalancerAutoConfiguration这个类,该类重点关注这个静态内部类LoadBalancerInterceptorConfig,其往spring注入了LoadBalancerInterceptor这个Bean,实现了上面所讲的ClientHttpRequestInterceptor接口,并赋值给RestTemplate。
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnClass({
RestTemplate.class})
@ConditionalOnBean({
LoadBalancerClient.class})
@EnableConfigurationProperties({
LoadBalancerRetryProperties.class})
public class LoadBalancerAutoConfiguration {
......
@ConditionalOnMissingClass({
"org.springframework.retry.support.RetryTemplate"})
static class LoadBalancerInterceptorConfig {
LoadBalancerInterceptorConfig() {
}
@Bean
public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return (restTemplate) -> {
List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
}
RestTemplate执行过程
接着看RestTemplate请求执行的过程,在执行doExecute()时获取一个ClientHttpRequest->InterceptingClientHttpRequest,调用request.execute()时会做两件事
- 遍历intercept,对请求的URL进行处理,这里的intercept也就时步骤2的LoadBalancerInterceptor。
- 对处理后的URL进行真正的HTTP请求。
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback, @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
Assert.notNull(url, "URI is required");
Assert.notNull(method, "HttpMethod is required");
ClientHttpResponse response = null;
Object var14;
try {
//InterceptingClientHttpRequest
ClientHttpRequest request = this.createRequest(url, method);
......
response = request.execute();
......
} catch (IOException var12) {
......
} finally {
......
}
return var14;
}
class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {
......
protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
InterceptingClientHttpRequest.InterceptingRequestExecution requestExecution = new InterceptingClientHttpRequest.InterceptingRequestExecution();
return requestExecution.execute(this, bufferedOutput);
}
private class InterceptingRequestExecution implements ClientHttpRequestExecution {
......
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
//这里执行LoadBalancerInterceptor
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = (ClientHttpRequestInterceptor)this.iterator.next();
return nextInterceptor.intercept(request, body, this);
} else {
//拦截器处理完后执行真正的HTTP请求
......
ClientHttpRequest delegate = InterceptingClientHttpRequest.this.requestFactory.createRequest(request.getURI(), method);
......
return delegate.execute();
}
}
}
}
Ribbon拦截器处理
4.intercept是如何处理URL的?调用intercept时有个loadBalancer,其实现为RibbonLoadBalancerClient,在执行loadBalancer的execute时将URL映射成对应的ip:port,期间包括负载策略、重试策略、服务列表的获取等功能。
//拦截器的处理
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
......
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
......
return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
}
public class RibbonLoadBalancerClient implements LoadBalancerClient {
......
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
//获取“service-provider”的服务列表
ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
//通过负载策略选择具体的服务
Server server = this.getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
} else {
RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
return this.execute(serviceId, (ServiceInstance)ribbonServer, (LoadBalancerRequest)request);
}
}
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
//这里是BaseLoadBalancer
return loadBalancer == null ? null : loadBalancer.chooseServer(hint != null ? hint : "default");
}
......
}
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
//默认的负载策略
private final static IRule DEFAULT_RULE = new RoundRobinRule();
protected IRule rule = DEFAULT_RULE;
public BaseLoadBalancer() {
this.name = DEFAULT_NAME;
this.ping = null;
setRule(DEFAULT_RULE);
setupPingTask();
lbStats = new LoadBalancerStats(DEFAULT_NAME);
}
//添加服务
public void addServer(Server newServer) {
...
}
//设置服务列表
public void setServersList(List lsrv) {
......
}
//获取服务列表
public List<Server> getServerList(boolean availableOnly) {
return (availableOnly ? getReachableServers() : getAllServers());
}
//根据负载策略选择具体的服务
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
}
简单提一嘴,Ribbon在2019年停止维护,2020年cloud版本删除了Ribbon的依赖由Loadbalancer顶替负载均衡,个人感觉跟Ribbon一样。