Spring Cloud Ribbon 全解 (3) - 基本组件实现源码(1)

本文涉及的产品
网络型负载均衡 NLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
传统型负载均衡 CLB,每月750个小时 15LCU
简介: Spring Cloud Ribbon 全解 (3) - 基本组件实现源码(1)

本文基于SpringCloud-Dalston.SR5

上一篇我们了解到Ribbon主要由如下几个组件组成:

  1. 所有Ribbon负载均衡器需要实现的接口IClient
  2. 服务实例列表维护机制实现的接口ServerList
  3. 负载均衡数据记录LoadBalancerStats
  4. 负责选取Server的接口ILoadBalancer
  5. 负载均衡选取规则实现的接口IRule
  6. 检查实例是否存活实现的接口IPing
  7. 服务实例列表更新机制实现的接口ServerListUpdater
  8. 服务实例列表过滤机制ServerListFilter

我们会逐个分析


1. 所有Ribbon负载均衡器需要实现的接口IClient


对于这个IClient,之前我们说到执行器逻辑,例如重试还有异常处理,都在这里处理。我们看他的默认抽象类实现AbstractLoadBalancerAwareClient:


AbstractLoadBalancerAwareClient.java


public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    //获取重试处理器,这个由其他实现类动态实现
    RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
    //构造LoadBalancerCommand,RxJava风格
    LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder()
            .withLoadBalancerContext(this)
            .withRetryHandler(handler)
            .withLoadBalancerURI(request.getUri())
            .build();
    try {
        return command.submit(
            new ServerOperation<T>() {
                @Override
                public Observable<T> call(Server server) {
                    //修改原始url为实际的url
                    URI finalUri = reconstructURIWithServer(server, request.getUri());
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try {
                        //执行请求
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
                    catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } catch (Exception e) {
        Throwable t = e.getCause();
        if (t instanceof ClientException) {
            throw (ClientException) t;
        } else {
            throw new ClientException(e);
        }
    }
}
public abstract RequestSpecificRetryHandler getRequestSpecificRetryHandler(S request, IClientConfig requestConfig);

这个构造的LoadBalancerCommand是一个RxJava风格的,它包含了重试和异常处理机制:


LoadBalancerCommand.java


//返回一个只包含一个Server的Observable,但是每次从负载均衡器中获取一个
private Observable<Server> selectServer() {
    return Observable.create(new OnSubscribe<Server>() {
        @Override
        public void call(Subscriber<? super Server> next) {
            try {
                Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                next.onNext(server);
                next.onCompleted();
            } catch (Exception e) {
                next.onError(e);
            }
        }
    });
}
public Observable<T> submit(final ServerOperation<T> operation) {
    final ExecutionInfoContext context = new ExecutionInfoContext();
    if (listenerInvoker != null) {
        try {
            listenerInvoker.onExecutionStart();
        } catch (AbortExecutionException e) {
            return Observable.error(e);
        }
    }
    //获取在每个服务实例重试的的次数
    final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
    //最多尝试几个服务实例
    final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
    //对于每个服务实例的调用逻辑
    //默认field server是null,通过selectServer()方法获取一个Server
    Observable<T> o = 
            (server == null ? selectServer() : Observable.just(server))
            .concatMap(new Func1<Server, Observable<T>>() {
                @Override
                //对于每个Server,按顺序映射为对于每个Server包含重试逻辑的请求调用
                public Observable<T> call(Server server) {
                    //设置上下文
                    context.setServer(server);
                    final ServerStats stats = loadBalancerContext.getServerStats(server);
                    //每个Server包含重试逻辑的请求调用
                    Observable<T> o = Observable
                            .just(server)
                            .concatMap(new Func1<Server, Observable<T>>() {
                                @Override
                                public Observable<T> call(final Server server) {
                                    context.incAttemptCount();
                                    //增加Server正在处理的请求计数
                                    loadBalancerContext.noteOpenConnection(stats);
                                    //监听器
                                    if (listenerInvoker != null) {
                                        try {
                                            listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                        } catch (AbortExecutionException e) {
                                            return Observable.error(e);
                                        }
                                    }
                                    //计时器
                                    final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                    //operation.call(server)就是刚刚分析的AbstractLoadBalancerAwareClient传过来的ServerOperation,就是直接对这个Server调用请求
                                    //doOnEach的操作就是记录请求前后的一些数据用于负载均衡数据统计
                                    return operation.call(server).doOnEach(new Observer<T>() {
                                        private T entity;
                                        @Override
                                        public void onCompleted() {
                                            //记录请求完成
                                            recordStats(tracer, stats, entity, null);
                                        }
                                        @Override
                                        public void onError(Throwable e) {
                                            //记录请求结束
                                            recordStats(tracer, stats, null, e);
                                            logger.debug("Got error {} when executed on server {}", e, server);
                                            //发生了错误,通知listener
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                            }
                                        }
                                        @Override
                                        public void onNext(T entity) {
                                            //因为只有调用请求成功只有一个结果(只有一个请求), 这里的entity就是结果,只要收到结果就代表请求成功
                                            this.entity = entity;
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                            }
                                        }                            
                                        private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                            tracer.stop();
                                            loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                        }
                                    });
                                }
                            });
                    if (maxRetrysSame > 0)
                        //是否retry
                        o = o.retry(retryPolicy(maxRetrysSame, true));
                    return o;
                }
            });
    if (maxRetrysNext > 0 && server == null)
        //是否retry,如果retry回调用selectServer()返回下一个Server
        o = o.retry(retryPolicy(maxRetrysNext, false));
    //异常处理
    return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
        @Override
        public Observable<T> call(Throwable e) {
            if (context.getAttemptCount() > 0) {
                //如果超过重试次数,则抛异常
                if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                            "Number of retries on next server exceeded max " + maxRetrysNext
                            + " retries, while making a call for: " + context.getServer(), e);
                }
                else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                            "Number of retries exceeded max " + maxRetrysSame
                            + " retries, while making a call for: " + context.getServer(), e);
                }
            }
            if (listenerInvoker != null) {
                listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
            }
            return Observable.error(e);
        }
    });
}


2. 服务实例列表维护机制实现的接口ServerList


image.png


AbstractServerList.java


其实这个抽象类一是在实现ServerList接口的同时,实现了IClientConfigAware这个接口,代表是可配置的。

同时,提供了一个生成默认ServerListFilter(这个Filter的实现类是由NIWSServerListFilterClassName这个配置决定,默认是ZoneAffinityServerListFilter)的方法

public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {   
    public AbstractServerListFilter<T> getFilterImpl(IClientConfig niwsClientConfig) throws ClientException{
        try {
            String niwsServerListFilterClassName = niwsClientConfig
                    .getProperty(
                            CommonClientConfigKey.NIWSServerListFilterClassName,
                            ZoneAffinityServerListFilter.class.getName())
                    .toString();
            AbstractServerListFilter<T> abstractNIWSServerListFilter = 
                    (AbstractServerListFilter<T>) ClientFactory.instantiateInstanceWithClientConfig(niwsServerListFilterClassName, niwsClientConfig);
            return abstractNIWSServerListFilter;
        } catch (Throwable e) {
            throw new ClientException(
                    ClientException.ErrorType.CONFIGURATION,
                    "Unable to get an instance of CommonClientConfigKey.NIWSServerListFilterClassName. Configured class:"
                            + niwsClientConfig
                                    .getProperty(CommonClientConfigKey.NIWSServerListFilterClassName), e);
        }
    }
}


ConfigurationBasedServerList.java


这个是默认的实现,如果没有特殊配置,ServerList的实现类就是ConfigurationBasedServerList;这个实际上就是从配置中读取ServerList,这个配置可以是动态配置,例如是Archaius

public class ConfigurationBasedServerList extends AbstractServerList<Server>  {
    private IClientConfig clientConfig;
    @Override
    public List<Server> getInitialListOfServers() {
        return getUpdatedListOfServers();
    }
    @Override
    public List<Server> getUpdatedListOfServers() {
        String listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);
        return derive(listOfServers);
    }
    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        this.clientConfig = clientConfig;
    }
    //可以看出这个配置就是以逗号分隔的字符串
    private List<Server> derive(String value) {
        List<Server> list = Lists.newArrayList();
        if (!Strings.isNullOrEmpty(value)) {
            for (String s: value.split(",")) {
                list.add(new Server(s.trim()));
            }
        }
        return list;
    }
}


DiscoveryEnabledNIWSServerList.java


这个就是从Eureka上面获取Server列表的类,构造的时候需要传入相关配置以及最重要的EurekaClient的Provider来获取合适的EurekaClient以便于获取Server列表。

实现ServerList接口的方法都是基于obtainServersViaDiscovery这个方法:

@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
    return obtainServersViaDiscovery();
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
    return obtainServersViaDiscovery();
}
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
    List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
    //如果EurekaClient没有被初始化,则日志报警并返回空的列表
    if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
        logger.warn("EurekaClient has not been initialized yet, returning an empty list");
        return new ArrayList<DiscoveryEnabledServer>();
    }
    EurekaClient eurekaClient = eurekaClientProvider.get();
    //这里的vipAddresses其实就是微服务名称的各种形式,但是注意,它们代表的是同一个微服务
    if (vipAddresses!=null){
        for (String vipAddress : vipAddresses.split(",")) {
            // if targetRegion is null, it will be interpreted as the same region of client
            List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
            for (InstanceInfo ii : listOfInstanceInfo) {
                if (ii.getStatus().equals(InstanceStatus.UP)) {
                    //是否覆盖port
                    if(shouldUseOverridePort){
                        if(logger.isDebugEnabled()){
                            logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                        }
                        //这里复制一份是因为不希望其他的地方修改原有的实例信息
                        InstanceInfo copy = new InstanceInfo(ii);
                        if(isSecure){
                            ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                        }else{
                            ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                        }
                    }
                    DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                    des.setZone(DiscoveryClient.getZone(ii));
                    serverList.add(des);
                }
            }
            //如果有一个vipAddress有服务列表,我们就不用获取剩余的了
            if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                break; 
            }
        }
    }
    return serverList;
}

到这里我们可以看出,Ribbon和Eureka的配合其实就是Ribbon从Eureka中利用微服务名称获取Server列表;那么这个列表是如何更新的呢,在Eureka的章节我们提到过,Ribbon定时从EurekaClient获取服务实例列表更新,这就涉及到了下一个我们要讲到的Ribbon元素 - 服务实例列表更新机制实现的接口ServerListUpdater

相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
2月前
|
监控 负载均衡 Java
5 大 SpringCloud 核心组件详解,8 张图彻底弄懂
本文图文详解 Spring Cloud 的五大核心组件,帮助深入理解和掌握微服务架构。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
5 大 SpringCloud 核心组件详解,8 张图彻底弄懂
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
87 2
|
2月前
|
数据采集 监控 前端开发
二级公立医院绩效考核系统源码,B/S架构,前后端分别基于Spring Boot和Avue框架
医院绩效管理系统通过与HIS系统的无缝对接,实现数据网络化采集、评价结果透明化管理及奖金分配自动化生成。系统涵盖科室和个人绩效考核、医疗质量考核、数据采集、绩效工资核算、收支核算、工作量统计、单项奖惩等功能,提升绩效评估的全面性、准确性和公正性。技术栈采用B/S架构,前后端分别基于Spring Boot和Avue框架。
|
1月前
|
存储 缓存 Java
Spring面试必问:手写Spring IoC 循环依赖底层源码剖析
在Spring框架中,IoC(Inversion of Control,控制反转)是一个核心概念,它允许容器管理对象的生命周期和依赖关系。然而,在实际应用中,我们可能会遇到对象间的循环依赖问题。本文将深入探讨Spring如何解决IoC中的循环依赖问题,并通过手写源码的方式,让你对其底层原理有一个全新的认识。
54 2
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
36 6
|
1月前
|
Java 关系型数据库 MySQL
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
58 5
|
1月前
|
缓存 监控 Java
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
41 5
|
2月前
|
前端开发 Java 开发者
Spring生态学习路径与源码深度探讨
【11月更文挑战第13天】Spring框架作为Java企业级开发中的核心框架,其丰富的生态系统和强大的功能吸引了无数开发者的关注。学习Spring生态不仅仅是掌握Spring Framework本身,更需要深入理解其周边组件和工具,以及源码的底层实现逻辑。本文将从Spring生态的学习路径入手,详细探讨如何系统地学习Spring,并深入解析各个重点的底层实现逻辑。
71 9
|
2月前
|
负载均衡 算法 Java
除了 Ribbon,Spring Cloud 中还有哪些负载均衡组件?
这些负载均衡组件各有特点,在不同的场景和需求下,可以根据项目的具体情况选择合适的负载均衡组件来实现高效、稳定的服务调用。
114 5
|
3月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
188 5