十五.SpringCloud源码剖析-Ribbon工作流程

本文涉及的产品
应用型负载均衡 ALB,每月750个小时 15LCU
传统型负载均衡 CLB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
简介: Ribbon是由Netflix公司开源的一个客户端负载均衡器,主要功能是实现服务之间的负载均衡调用,内置丰富的负载均衡算法,本章意在探讨Ribbon的核心工作流程,Ribbon基本使用请看《[SpringCloud极简入门-客户端负载均衡Ribbon](https://blog.csdn.net/u014494148/article/details/105002095)》

前言

Ribbon是由Netflix公司开源的一个客户端负载均衡器,主要功能是实现服务之间的负载均衡调用,内置丰富的负载均衡算法,本章意在探讨Ribbon的核心工作流程,Ribbon基本使用请看《SpringCloud极简入门-客户端负载均衡Ribbon

Ribbon的工作流程

我们知道,微服务在启动成功之后,默认30s/次会从注册中心拉取服务注册表到本地缓存起来,而我们使用Ribbon时是通过RestTemplate发起请求,URL以:http://user-server/user/... 服务名方式去调用服务,其实Ribbon干的事情就是根据URL中的服务名去本地的服务注册表中查找服务名对应的服务实例(一个或多个),然后通过负载均衡算法选择其中一个服务后,发起Http请求,那接下来我们就来看一下Ribbon的底层到底是怎么工作的

Ribbon配合RestTemplate使用

在《SpringCloud极简入门-客户端负载均衡Ribbon》中我们有讲解到Ribbon的基本使用,首选需要定义RestTemplate


@SpringBootApplication
@EnableEurekaClient
public class OrderServerApplication1030
{
   
   

    //配置一个RestTemplate ,http客户端,支持Rest风格
    //@LoadBalanced :负载均衡注册,让RestTmplate可以实现负载均衡请求
    //这个标签标记RestTemplate可以使用LoadBalancerClient进行负载均衡
    @Bean
    @LoadBalanced
    public RestTemplate restTemplate(){
   
   
        return new RestTemplate();
    }
    //省略...

调用服务的是否使用服务名调用

@RestController
public class OrderController {
   
   

    //需要配置成Bean
    @Autowired
    private RestTemplate  restTemplate ;

    //浏览器调用该方法
    @RequestMapping(value = "/order/{id}",method = RequestMethod.GET)
    public User getById(@PathVariable("id")Long id){
   
   
        //发送http请求调用 user的服务,获取user对象 : RestTemplate
        //user的ip,user的端口,user的Controller路径
        //String url = "http://localhost:1020/user/"+id;
        String url = "http://user-server/user/"+id;

        //发送http请求
        return restTemplate.getForObject(url, User.class);

    }
}

被 @LoadBalanced标记的RestTemplate 可以使用LoadBalancerClient负载均衡客户端实现负载均衡,我们在使用RestTemplate 发起请求的时候需要跟上服务名的方式http://user-server/user/

LoadBalancerClient负载均衡客户端

我们从 @LoadBalanced入手,先看一下LoadBalancerClient它的源码

/**
注解标记RestTemplate 可以使用LoadBalancerClient 负载均衡客户端
 * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient
 * @author Spencer Gibb
 */
@Target({
   
    ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
   
   
}

这个注解@LoadBalanced 的作用在注释上说的非常清楚,就是标记RestTemplate课程可以使用使用LoadBalancerClient来实现负载均衡,LoadBalancerClient就是Ribbon实现负载均衡的一个客户端,它在spring-cloud-commons包下,我们可以直接看LoadBalancerClient的源码

/**
客户端负载均衡
 * Represents a client side load balancer
 * @author Spencer Gibb
 */
public interface LoadBalancerClient extends ServiceInstanceChooser {
   
   
    //执行请求,会根据serviceId使用负载均衡查找服务
    /**
    使用负载均衡器执行指定服务
     * execute request using a ServiceInstance from the LoadBalancer for the specified service
     * 服务Id - 服务ID来查找负载平衡器
     * @param serviceId the service id to look up the LoadBalancer
     * 
     * @param request allows implementations to execute pre and post actions such as
     * incrementing metrics
     * 返回选择的服务
     * @return the result of the LoadBalancerRequest callback on the selected
     * ServiceInstance
     */
    <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;

    //执行请求,这个方法多了一个参数ServiceInstance ,即请求指定的服务
    /**
     * execute request using a ServiceInstance from the LoadBalancer for the specified
     * service
     * @param serviceId the service id to look up the LoadBalancer
     * @param serviceInstance the service to execute the request to
     * @param request allows implementations to execute pre and post actions such as
     * incrementing metrics
     * @return the result of the LoadBalancerRequest callback on the selected
     * ServiceInstance
     */
    <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;

    //重构URL,把http://myservice/path/to/service重构成http://ip:端口/path/to/service
    /**
     * Create a proper URI with a real host and port for systems to utilize.
     * Some systems use a URI with the logical serivce name as the host,
     * such as http://myservice/path/to/service.  This will replace the
     * service name with the host:port from the ServiceInstance.
     * @param instance
     * @param original a URI with the host as a logical service name
     * @return a reconstructed URI
     */
    URI reconstructURI(ServiceInstance instance, URI original);
}

LoadBalancerClient接口三个方法,excute()为执行请求,reconstructURI()用来重构url,它实现了ServiceInstanceChooser 接口,这个接口的作用是用来选择服务的,看下源码

/**
    通过使用负载平衡器,选择一个服务器发送请求。
 * Implemented by classes which use a load balancer to choose a server to
 * send a request to.
 *
 * @author Ryan Baxter
 */
public interface ServiceInstanceChooser {
   
   

    /**
      从LoadBalancer中为指定服务选择一个ServiceInstance
     * Choose a ServiceInstance from the LoadBalancer for the specified service
     * 
     * //根据服务id去LoadBalancer查找服务
     * @param serviceId the service id to look up the LoadBalancer
     * 
     * 返回查找到的服务实例ServiceInstance 
     * @return a ServiceInstance that matches the serviceId
     */
    ServiceInstance choose(String serviceId);
}

提供了一个choose方法,根据服务ID serviceId 查找一个ServiceInstance 服务实例,这里的serviceId其实就是http://user-server/... url中带的服务名。

LoadBalancerClient 还有一个默认实现类RibbonLoadBalancerClient,这个实现是针对Ribbon的客户端负载均衡,继承关系如下:
在这里插入图片描述

RibbonLoadBalancerClient是一个非常核心的类,最终的负载均衡的请求处理由它来执行,源码如下:

public class RibbonLoadBalancerClient implements LoadBalancerClient {
   
   

    private SpringClientFactory clientFactory;

    public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
   
   
        this.clientFactory = clientFactory;
    }
    //重构URL,找到服务之后,把http://服务名/  格式 重构成 http://ip:port/ 格式
    @Override
    public URI reconstructURI(ServiceInstance instance, URI original) {
   
   
        Assert.notNull(instance, "instance can not be null");
        String serviceId = instance.getServiceId();
        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);

        URI uri;
        Server server;
        if (instance instanceof RibbonServer) {
   
   
            RibbonServer ribbonServer = (RibbonServer) instance;
            server = ribbonServer.getServer();
            uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
        } else {
   
   
            server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
            IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
            ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
            uri = updateToSecureConnectionIfNeeded(original, clientConfig,
                    serverIntrospector, server);
        }
        return context.reconstructURIWithServer(server, uri);
    }
    //根据服务名,查找服务实例,选择一个返回ServiceInstance 
    @Override
    public ServiceInstance choose(String serviceId) {
   
   
        //查找服务
        Server server = getServer(serviceId);
        if (server == null) {
   
   
            return null;
        }
        return new RibbonServer(serviceId, server, isSecure(server, serviceId),
                serverIntrospector(serviceId).getMetadata(server));
    }
    //执行请求
    @Override
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
   
   
        //获取负载均衡器[重要]
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        //选择服务,使用负载均衡器,根据服务的ID,选择一个服务
        Server server = getServer(loadBalancer);
        if (server == null) {
   
   
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        //选择的服务封装成一个RibbonServer:RibbonServer implements ServiceInstance
        RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                serviceId), serverIntrospector(serviceId).getMetadata(server));
        //执行请求调用服务
        return execute(serviceId, ribbonServer, request);
    }
    //执行请求调用服务
    @Override
    public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
   
   
        Server server = null;
        if(serviceInstance instanceof RibbonServer) {
   
   
            server = ((RibbonServer)serviceInstance).getServer();
        }
        if (server == null) {
   
   
            throw new IllegalStateException("No instances available for " + serviceId);
        }

        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);
        RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

        try {
   
   
            //使用 LoadBalancerRequest 向服务发请求
            T returnVal = request.apply(serviceInstance);
            statsRecorder.recordStats(returnVal);
            return returnVal;
        }
        // catch IOException and rethrow so RestTemplate behaves correctly
        catch (IOException ex) {
   
   
            statsRecorder.recordStats(ex);
            throw ex;
        }
        catch (Exception ex) {
   
   
            statsRecorder.recordStats(ex);
            ReflectionUtils.rethrowRuntimeException(ex);
        }
        return null;
    }

    private ServerIntrospector serverIntrospector(String serviceId) {
   
   
        ServerIntrospector serverIntrospector = this.clientFactory.getInstance(serviceId,
                ServerIntrospector.class);
        if (serverIntrospector == null) {
   
   
            serverIntrospector = new DefaultServerIntrospector();
        }
        return serverIntrospector;
    }
    //是否是https请求
    private boolean isSecure(Server server, String serviceId) {
   
   
        IClientConfig config = this.clientFactory.getClientConfig(serviceId);
        ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
        return RibbonUtils.isSecure(config, serverIntrospector, server);
    }
    //根据服务ID选择服务
    protected Server getServer(String serviceId) {
   
   
        return getServer(getLoadBalancer(serviceId));
    }

    //负载均衡器选择服务
    protected Server getServer(ILoadBalancer loadBalancer) {
   
   
        if (loadBalancer == null) {
   
   
            return null;
        }
        return loadBalancer.chooseServer("default"); // TODO: better handling of key
    }
    //根据服务id得到负载均衡器
    protected ILoadBalancer getLoadBalancer(String serviceId) {
   
   
        return this.clientFactory.getLoadBalancer(serviceId);
    }
...省略...

解释一下:

  • 这里的ServiceInstance choose(String serviceId)方法的作用是根据ServideId选择一个服务,底层实现是通过LoadBalancer.chooseServer 负载均衡器LoadBalancer来完成的服务的选择的
  • 选择到服务之后调用execute向选择到的服务发起请求,通过LoadBalancerRequest来完成其请求。

RestTemplate的执行流程

RestTmplate发请求时地址 "http://user-server/user/"+id; 中 user-server是当前服务需要调用的目标服务的服务名,那么Ribbon到底是如何实现负载均衡调用的呢?我们可以从这里跟踪一下RestTemplate的执行流程

public class RestTemplate extends InterceptingHttpAccessor implements RestOperations {
   
   
...省略...
 @Nullable
    protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback, @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
   
   
        Assert.notNull(url, "URI is required");
        Assert.notNull(method, "HttpMethod is required");
        ClientHttpResponse response = null;

        Object var14;
        try {
   
   
            //创建请求对象,使用SimpleClientHttpRequestFactory创建ClientHttpRequest 
            ClientHttpRequest request = this.createRequest(url, method);
            if (requestCallback != null) {
   
   
                //设置header和body
                requestCallback.doWithRequest(request);
            }

            response = request.execute();
            this.handleResponse(url, method, response);
            var14 = responseExtractor != null ? responseExtractor.extractData(response) : null;
        } catch (IOException var12) {
   
   
            String resource = url.toString();
            String query = url.getRawQuery();
            resource = query != null ? resource.substring(0, resource.indexOf(63)) : resource;
            throw new ResourceAccessException("I/O error on " + method.name() + " request for \"" + resource + "\": " + var12.getMessage(), var12);
        } finally {
   
   
            if (response != null) {
   
   
                response.close();
            }

        }

        return var14;
    }

请求来到RestTemplate#doExecute方法,首选是通过使用SimpleClientHttpRequestFactory根据url和method创建ClientHttpRequest 请求对象,使用的实现是InterceptingClientHttpRequestFactory,然后使用response = request.execute();去执行请求,一路跟踪,请求来到InterceptingClientHttpRequest#executeInternal

class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {
   
   
    //headers请求头 , bufferedOutput输出内容
    protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
   
   
        //创建拦截器执行器
        InterceptingClientHttpRequest.InterceptingRequestExecution requestExecution = new InterceptingClientHttpRequest.InterceptingRequestExecution();
        return requestExecution.execute(this, bufferedOutput);
    }

这里通过InterceptingClientHttpRequest.InterceptingRequestExecution() 拦截器执行器去执行请求,请求来到InterceptingClientHttpRequest.InterceptingRequestExecution#execute

 private class InterceptingRequestExecution implements ClientHttpRequestExecution {
   
   
        private final Iterator<ClientHttpRequestInterceptor> iterator;

        public InterceptingRequestExecution() {
   
   
            this.iterator = InterceptingClientHttpRequest.this.interceptors.iterator();
        }

        public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
   
   
            if (this.iterator.hasNext()) {
   
   
                //[重要]这里取到的正是  LoadBalancerInterceptor
                ClientHttpRequestInterceptor nextInterceptor = (ClientHttpRequestInterceptor)this.iterator.next();
                return nextInterceptor.intercept(request, body, this);
            } else {
   
   
                HttpMethod method = request.getMethod();
                Assert.state(method != null, "No standard HTTP method");
                //如果iterator中没有拦截器了,就创建一个ClientHttpRequest去执行请求
                ClientHttpRequest delegate = InterceptingClientHttpRequest.this.requestFactory.createRequest(request.getURI(), method);
                request.getHeaders().forEach((key, value) -> {
   
   
                    delegate.getHeaders().addAll(key, value);
                });
                if (body.length > 0) {
   
   
                    if (delegate instanceof StreamingHttpOutputMessage) {
   
   
                        StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage)delegate;
                        streamingOutputMessage.setBody((outputStream) -> {
   
   
                            StreamUtils.copy(body, outputStream);
                        });
                    } else {
   
   
                        StreamUtils.copy(body, delegate.getBody());
                    }
                }
                //执行请求
                return delegate.execute();
            }
        }
    }

InterceptingRequestExecution 中维护了一个Iterator<ClientHttpRequestInterceptor> iterator;其中LoadBalancerInterceptor 就在该集合中,所以请求来到LoadBalancerInterceptor #intercept(request, body, this); 方法

//负载均衡拦截器
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
   
   
    //负载均衡客户端[重要]
    private LoadBalancerClient loadBalancer;
    //负载均衡请求创建工厂
    private LoadBalancerRequestFactory requestFactory;
    //初始化
    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
   
   
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
    }
    //初始化
    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
   
   
        // for backwards compatibility
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }
    //拦截器核心方法【重要】
    //request请求对象
    //body 内容
    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
   
   
        //请求的URL,格式如:http://user-server/user/1 ,user-server是服务名
        final URI originalUri = request.getURI();
        //URL中的服务名
        String serviceName = originalUri.getHost();

        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        //通过requestFactory.createRequest(request, body, execution)创建LoadBalancerRequest
        //然后调用负载均衡器执行请求,参数:服务名,LoadBalancerRequest
        return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
    }
}

这里蛮重要的,请求调用了LoadBalancerInterceptor #intercept负载均衡拦截器的拦截方法,获取到URL,从中获取到主机名即调用的服务名(Ribbon客户端服务名),然后使用LoadBalancerRequestFactory 创建了LoadBalancerRequest请求对象,调用loadBalancer#execute 负载均衡器执行请求

ILoadBalancer 选择服务(负载均衡)

请求来到RibbonLoadBalancerClient#execute

    @Override
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
   
   
        //获取负载均衡器
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        //loadBalancer选择服务
        Server server = getServer(loadBalancer);
        if (server == null) {
   
   
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        //选择的服务封装成RibbonServer 
        RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                serviceId), serverIntrospector(serviceId).getMetadata(server));
        //LoadBalancerRequest对服务执行请求
        return execute(serviceId, ribbonServer, request);
    }

这里就蛮关键了

  • 首选是通过服务名调用getLoadBalancer方法得到负载均衡器
  • 然后getServer(loadBalancer)是通过负载均衡器选择一个服务,底层会使用IRule的算法
  • 然后将服务封装成RibbonServer 对象,交给LoadBalancerRequest去执行请求

这里的负载均衡器默认会走ZoneAwareLoadBalancer,它是通过SpringClientFactory 从Ribbon上下文对象中获取到的负载均衡器对象,关于这个我们在上一章讨论过

public class RibbonLoadBalancerClient implements LoadBalancerClient {
   
   
...省略...
    private SpringClientFactory clientFactory;
    protected ILoadBalancer getLoadBalancer(String serviceId) {
   
   
        return this.clientFactory.getLoadBalancer(serviceId);
    }

而得到ILoadBalancer之后,调用getServer(loadBalancer)方法选择服务,我们跟踪一下

public class RibbonLoadBalancerClient implements LoadBalancerClient {
   
   
...省略...
    protected Server getServer(ILoadBalancer loadBalancer) {
   
   
        if (loadBalancer == null) {
   
   
            return null;
        }
        //ZoneAwareLoadBalancer#chooseServer
        return loadBalancer.chooseServer("default"); // TODO: better handling of key
    }

这里loadBalancer.chooseServer("default");请求来到ZoneAwareLoadBalancer#chooseServer,源码如下:

public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
   
   
...省略...
@Override
    public Server chooseServer(Object key) {
   
   
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
   
   
            //如果禁用了zone,或者自由一个zone会走这里
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        //下面就是根据zone选择服务了,默认情况下不会走下面
        Server server = null;
        try {
   
   
            LoadBalancerStats lbStats = getLoadBalancerStats();
            //得到zone快照
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            if (triggeringLoad == null) {
   
   
                triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
            }

            if (triggeringBlackoutPercentage == null) {
   
   
                triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
            }
            //得到可用的zone
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
   
   
                //随机选择区域
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
   
   
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    //选择服务
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Exception e) {
   
   
            logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
        }
        if (server != null) {
   
   
            return server;
        } else {
   
   
            logger.debug("Zone avoidance logic is not invoked.");
            return super.chooseServer(key);
        }
    }

这里做了一个判断,如果没有设置zone或者只有一个zone(默认),这里会调用 return super.chooseServer(key);通过父类的BaseLoadBalancer#chooseServer方法选择服务,这也是默认的执行流程,代码走到了BaseLoadBalancer#chooseServer方法中,源码如下

public class BaseLoadBalancer extends AbstractLoadBalancer implements
        PrimeConnections.PrimeConnectionListener, IClientConfigAware {
   
   
 public Server chooseServer(Object key) {
   
   
        if (counter == null) {
   
   
            //创建一个计数器
            counter = createCounter();
        }
        //计数器增加
        counter.increment();
        //如果负载均衡规则为空,返回空
        if (rule == null) {
   
   
            return null;
        } else {
   
   
            try {
   
   
                //[重要]调用了负载均衡器算法类的choose方法
                return rule.choose(key);
            } catch (Exception e) {
   
   
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }

BaseLoadBalancer #chooseServer方法中调用了IRule#choose方法进行服务的选择服务,IRule有很多是算法策略实现类,默认会走轮询算法,如果有定义负载均衡算法,这里rule.choose调用的就是定义的算法类

这里我打了个端点,跟踪了一下源码发现默认情况下会从BaseLoadBalancer#chooseServer方法中调用PredicateBasedRule#choose ,PredicateBasedRule本身是继承ClientConfigEnabledRoundRobinRule,也就是说PredicateBasedRule是使用的是轮询算法,同时它扩展了Predicate功能,即:提供了服务器过滤逻辑

  /**
  一个规则,提供了服务器过滤逻辑,具体使用的是AbstractServerPredicate实现过滤功能。 过滤后,服务器从过滤列表中的循环方式返回。
 * A rule which delegates the server filtering logic to an instance of {@link AbstractServerPredicate}.
 * After filtering, a server is returned from filtered list in a round robin fashion.
 * 
 * 
 * @author awang
 *
 */
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
   
   

    /**
    抽象函数,返回AbstractServerPredicate,用来对服务做过滤的
     * Method that provides an instance of {@link AbstractServerPredicate} to be used by this class.
     * 
     */
    public abstract AbstractServerPredicate getPredicate();

    /**
     * Get a server by calling {@link AbstractServerPredicate#chooseRandomlyAfterFiltering(java.util.List, Object)}.
     * The performance for this method is O(n) where n is number of servers to be filtered.
     */
    @Override
    public Server choose(Object key) {
   
   
        //得到负载均衡器
        ILoadBalancer lb = getLoadBalancer();
         //通过AbstractServerPredicate的chooseRoundRobinAfterFiltering选出具体的服务实例
        //AbstractServerPredicate的子类实现的Predicate逻辑来过滤一部分服务实例
        //然后在以线性轮询的方式从过滤后的实例中选出一个
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
   
   
            return server.get();
        } else {
   
   
            return null;
        }       
    }
}

这里使用了AbstractServerPredicate#chooseRoundRobinAfterFiltering来选择服务从lb.getAllServers()得到所有的服务作为参数,继续跟踪下去

    /**
     * Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key. 
     */
    public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
   
   
        //得到合格的服务列表,主要根据zone做一个过滤
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
   
   
          //没找到合格的服务
            return Optional.absent();
        }
        //以线性轮询的方式合格的服务列表获取一个实例
        //incrementAndGetModulo方法会以轮询的方式计算一个下标值
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }
    ...省略...
 /**
     引用于 RoundRobinRule 算法策略 , 轮询
     * Referenced from RoundRobinRule
     * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
     *
     * @param modulo The modulo to bound the value of the counter.
     * @return The next value.
     */
     //增量和取模实现轮询
    private int incrementAndGetModulo(int modulo) {
   
   
        for (;;) {
   
   
            int current = nextIndex.get();
            int next = (current + 1) % modulo;
            if (nextIndex.compareAndSet(current, next) && current < modulo)
                return current;
        }
    }

这里首先会通过zone过滤出可用的服务列表,然后使用轮询算法选择一个服务返回,到这里选择服务的流程调用

LoadBalancerRequest 执行服务

代码继续回到 RibbonLoadBalancerClient#execute,选择完服务之后,服务被封装成RibbonServer

    @Override
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
   
   
        //得到负载均衡器
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        //选择服务
        Server server = getServer(loadBalancer);
        if (server == null) {
   
   
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        //把server 封装成RibbonServer 
        RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                serviceId), serverIntrospector(serviceId).getMetadata(server));
        //执行服务调用
        return execute(serviceId, ribbonServer, request);
    }

找到服务后,调用了execute方法执行后续请求

@Override
    public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
   
   
        Server server = null;
        if(serviceInstance instanceof RibbonServer) {
   
   
            server = ((RibbonServer)serviceInstance).getServer();
        }
        if (server == null) {
   
   
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        //加载Ribbon负载均衡器上下文对象
        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);
        RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

        try {
   
   
            //LoadBalancerRequest.apply执行请求
            T returnVal = request.apply(serviceInstance);
            statsRecorder.recordStats(returnVal);
            return returnVal;
        }
        // catch IOException and rethrow so RestTemplate behaves correctly
        catch (IOException ex) {
   
   
            statsRecorder.recordStats(ex);
            throw ex;
        }
        catch (Exception ex) {
   
   
            statsRecorder.recordStats(ex);
            ReflectionUtils.rethrowRuntimeException(ex);
        }
        return null;
    }

这里调用LoadBalancerRequest.apply执行请求,后面还会调用LoadBalancerRequestFactory#createRequest方法创建请求,调用ClientHttpRequestExecution#execute执行,然后又会将请求委派给ClientHttpRequest#execute去执行,再往后面走就是创建HttpURLConnection链接对象发送请求了,我们就不继续跟下去了。

总结

纵观Ribbon的工作流程大致如下

  • 初始化的时候创建好Ribbon的上下文,以及相关的组件,如ILoadBalancer,IConfig,IRule等等
  • 初始化过程中会通过ServerList从EurekaClient加载负载均衡候选的服务列表,并定时更新服务列表,使用ServerListFilter过滤之后,使用IPing检查是否更新服务列表
  • 被注解了@LoadBalance标签的RestTemplate可以使用LoadBalancerClient作负载均衡,并添加好拦截器LoadBalancerInterceptor
  • 当请求发起RestTemplate会把请求交给LoadBalancerInterceptor 拦截器,LoadBalancerInterceptor 拦截器调用
  • LoadBalancerClient接收到请求使用ILoadBalancer负载均衡器选择服务,底层用到IRule算法
  • 选择好服务之后,LoadBalancerClient把请求交给LoadBalancerRequest去执行
    在这里插入图片描述
相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
1月前
|
负载均衡 监控 网络协议
SpringCloud之Ribbon使用
通过以上步骤,就可以在Spring Cloud项目中有效地使用Ribbon来实现服务调用的负载均衡,提高系统的可靠性和性能。在实际应用中,根据具体的业务场景和需求选择合适的负载均衡策略,并进行相应的配置和优化,以确保系统的稳定运行。
75 15
|
1月前
|
负载均衡 算法 Java
除了 Ribbon,Spring Cloud 中还有哪些负载均衡组件?
这些负载均衡组件各有特点,在不同的场景和需求下,可以根据项目的具体情况选择合适的负载均衡组件来实现高效、稳定的服务调用。
90 5
|
3月前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
497 37
|
3月前
|
负载均衡 Java Nacos
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
微服务介绍、SpringCloud、服务拆分和远程调用、Eureka注册中心、Ribbon负载均衡、Nacos注册中心
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
|
4月前
|
人工智能 前端开发 Java
【实操】Spring Cloud Alibaba AI,阿里AI这不得玩一下(含前后端源码)
本文介绍了如何使用 **Spring Cloud Alibaba AI** 构建基于 Spring Boot 和 uni-app 的聊天机器人应用。主要内容包括:Spring Cloud Alibaba AI 的概念与功能,使用前的准备工作(如 JDK 17+、Spring Boot 3.0+ 及通义 API-KEY),详细实操步骤(涵盖前后端开发工具、组件选择、功能分析及关键代码示例)。最终展示了如何成功实现具备基本聊天功能的 AI 应用,帮助读者快速搭建智能聊天系统并探索更多高级功能。
1597 2
【实操】Spring Cloud Alibaba AI,阿里AI这不得玩一下(含前后端源码)
|
4月前
|
负载均衡 算法 Java
SpringCloud之Ribbon使用
通过 Ribbon,可以非常便捷的在微服务架构中实现请求负载均衡,提升系统的高可用性和伸缩性。在实际使用中,需要根据实际场景选择合适的负载均衡策略,并对其进行适当配置,以达到更佳的负载均衡效果。
124 13
|
5月前
|
负载均衡 算法 网络协议
Ribbon 负载均衡源码解读
Ribbon 负载均衡源码解读
67 15
Ribbon 负载均衡源码解读
|
5月前
|
负载均衡 Java API
Feign 进行rpc 调用时使用ribbon负载均衡源码解析
Feign 进行rpc 调用时使用ribbon负载均衡源码解析
83 11
|
4月前
|
缓存 Java Maven
SpringCloud基于Eureka的服务治理架构搭建与测试:从服务提供者到消费者的完整流程
Spring Cloud微服务框架中的Eureka是一个用于服务发现和注册的基础组件,它基于RESTful风格,为微服务架构提供了关键的服务注册与发现功能。以下是对Eureka的详细解析和搭建举例。
|
6月前
|
负载均衡 算法 Java
Spring Cloud Netflix 之 Ribbon
Spring Cloud Netflix Ribbon是客户端负载均衡器,用于在微服务架构中分发请求。它与RestTemplate结合,自动在服务发现(如Eureka)注册的服务之间进行调用。配置包括在pom.xml中添加依赖,设置application.yml以连接Eureka服务器,并在配置类中创建@LoadBalanced的RestTemplate。通过这种方式,当调用如`/user/userInfoList`的接口时,Ribbon会自动处理到多个可用服务实例的负载均衡。