OpenFeign集成Ribbon负载均衡-过滤和选择服务核心实现

本文涉及的产品
应用型负载均衡 ALB,每月750个小时 15LCU
传统型负载均衡 CLB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
简介: 该文章主要介绍了如何在OpenFeign中集成Ribbon以实现负载均衡,并详细分析了Ribbon中服务选择和服务过滤的核心实现过程。文章还涉及了Ribbon中负载均衡器(ILoadBalancer)和负载均衡策略(IRule)的初始化方式。

前言

在上一篇文章OpenFeign最核心组件LoadBalancerFeignClient详解分析了OpenFeign的负载均衡客户端,OpenFeign使用LoadBalancerFeignClient(下文简称LoadBalancerFC)替换默认的通信客户端feign.Client.Default, LoadBalancerFC是具备负载均衡能力和通信的能力的客户端,而负载均衡能力是通过Ribbon提供的。

本文内容是Ribbon负载均衡的核心部分,Ribbon是一种具备客户端负载均衡能力的技术。

上文已经分析到LoadBalancerFC最终会通过Ribbon中的Rule对象选择一个服务进行发起通信,但是没有详细分析Rule的选择服务流程,本文将继续详细分析。

image.png

先回顾上文讲到的选择服务方法,位于com.netflix.loadbalancer.reactive.LoadBalancerCommand#selectServer 方法

private Observable<Server> selectServer() {
   
        return Observable.create(new OnSubscribe<Server>() {
   
            @Override
            public void call(Subscriber<? super Server> next) {
   
                try {
   
                    //通过loadBalancerContext选择出服务
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
   
                    next.onError(e);
                }
            }
        });
}

loadBalancerContext负责根据loadBalancer对象选择服务,而loadBalancer对象最终是通过Rule对象来获取服务的。

public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
   
        String host = null;
        int port = -1;

        //...
        ILoadBalancer lb = getLoadBalancer();
        //通过负载均衡获取服务
        Server svc = lb.chooseServer(loadBalancerKey);
        logger.debug("{} using LB returned Server: {} for request {}", new Object[]{
   clientName, svc, original});
        return svc;
}

loadBalancerContext负责根据LoadBalancer对象选择服务,而loadBalancer对象最终是通过Rule对象来获取服务的。

那么ILoadBalancer对象和Rule对象是如何初始化的呢?

LoadBalancer和Rule初始化

我们可以看到Ribbon的自动装配类RibbonClientConfiguration,找到LoadBalancer和Rule的默认配置类。

public class RibbonClientConfiguration {
   
@Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
   
        //初始化LoadBalancer对象为ZoneAwareLoadBalancer
        return (ILoadBalancer)(this.propertiesFactory.isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory.get(ILoadBalancer.class, config, this.name) 
        : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater));
    }


    @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {
   
        if (this.propertiesFactory.isSet(IRule.class, this.name)) {
   
            return (IRule)this.propertiesFactory.get(IRule.class, config, this.name);
        } else {
   
            //初始化Rule对象为ZoneAvoidanceRule
            ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
            rule.initWithNiwsConfig(config);
            return rule;
        }
    }
}

image.png

通过ribbon的自动装配类看出:loadbalancer默认配置的是ZoneAwareLoadBalancer,Rule类默认配置的是ZoneAvoidanceRule,他们都使用了条件注解@ConditionalOnMissingBean,也就是我们如果有自定义的负载规则可以覆盖这两个默认配置

loadbalancer和Rule都是以Zone开头,Zone是区域的意思,也就是说他们能够按照区域实现负载均衡,这种Rule是代表按区域负载均衡的机制,一般在大型互联网公司都会有多个机房,这种情况下就存在按区域负载的必要性了。

如下图一样,一个公司的服务会分区部署,不同的区的服务器资源都有可能不一样 image.png

我们可以看到每个服务类信息里都包含一个zone(区域)属性,就是代表这个服务所属的区域。

package com.netflix.loadbalancer;
public class Server {
   

    private String host;
    private int port = 80;
    //服务所属区域
    private String zone ;
    private String scheme;
    private volatile String id;
    private volatile boolean isAliveFlag;

}

因此可以知道负载均衡的总体逻辑是先选可用分区,再从可用分区里选可用服务,

image.png

从源码也可以看出验证上面这个流程,下面就是主要逻辑。

@Override
    public Server chooseServer(Object key) {
   
        //1、按区域负载没有开启
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
   
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        Server server = null;
        try {
   
            //2、获取有所分区快照
            LoadBalancerStats lbStats = getLoadBalancerStats();
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            //3、计算哪些是可用的分区
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            //4、这里是可用分区小于总分区数的时候(说明当前存在有区域不可用的情况),从可用分区选择一个分区,如果所有区域都可用,可以不走区域负载逻辑
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
   
                //5、随机选择一个分区
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
   
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    //6、从可用区选择一个具体的服务
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Exception e) {
   
            logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
        }
        if (server != null) {
   
            //7、通过区域负载获取到了可用服务,直接返回
            return server;
        } else {
   
            //8、不使用按区域负载
            logger.debug("Zone avoidance logic is not invoked.");
            return super.chooseServer(key);
        }
    }

下面我们来看看详细的实现。

可用分区的选择

第一步获取分区的快照信息(各个分区服务可用负载情况)

    public ZoneSnapshot getZoneSnapshot(List<? extends Server> servers) {
   
        if (servers == null || servers.size() == 0) {
   
            return new ZoneSnapshot();
        }
        int instanceCount = servers.size();
        int activeConnectionsCount = 0;
        int activeConnectionsCountOnAvailableServer = 0;
        int circuitBreakerTrippedCount = 0;
        double loadPerServer = 0;
        long currentTime = System.currentTimeMillis();
        for (Server server: servers) {
   
            ServerStats stat = getSingleServerStat(server);   
            if (stat.isCircuitBreakerTripped(currentTime)) {
   
                circuitBreakerTrippedCount++;
            } else {
   
                activeConnectionsCountOnAvailableServer += stat.getActiveRequestsCount(currentTime);
            }
            activeConnectionsCount += stat.getActiveRequestsCount(currentTime);
        }
        if (circuitBreakerTrippedCount == instanceCount) {
   
            //服务全被熔断了
            if (instanceCount > 0) {
   
                // should be NaN, but may not be displayable on Epic
                loadPerServer = -1;
            }
        } else {
   
            //计算每个服务平均负载 活跃的请求线程数/(总服务-熔断服务)
            loadPerServer = ((double) activeConnectionsCountOnAvailableServer) / (instanceCount - circuitBreakerTrippedCount);
        }
        return new ZoneSnapshot(instanceCount, circuitBreakerTrippedCount, activeConnectionsCount, loadPerServer);
    }

第二步就是从所有分区里选出可用的分区了

public static Set<String> getAvailableZones(
            Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
            double triggeringBlackoutPercentage) {
   
        if (snapshot.isEmpty()) {
   
            return null;
        }
        Set<String> availableZones = new HashSet<String>(snapshot.keySet());
        if (availableZones.size() == 1) {
   
            return availableZones;
        }
        Set<String> worstZones = new HashSet<String>();
        double maxLoadPerServer = 0;
        boolean limitedZoneAvailability = false;

//循环分区,找到可用分区
        for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
   
            String zone = zoneEntry.getKey();
            ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
            int instanceCount = zoneSnapshot.getInstanceCount();
            if (instanceCount == 0) {
   
                availableZones.remove(zone);
                limitedZoneAvailability = true;
            } else {
   
                double loadPerServer = zoneSnapshot.getLoadPerServer();

                //熔断的服务比例大于99%
                if (((double) zoneSnapshot.getCircuitTrippedCount())
                        / instanceCount >= triggeringBlackoutPercentage
                        || loadPerServer < 0) {
   
                        //服务的平均负载小于0(服务全被熔断了,从可用分区剔除)
                    availableZones.remove(zone);
                    limitedZoneAvailability = true;
                } else {
   
                // 负载和最大负载相近 1/100000 记录为最差
                    if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
   
                        // they are the same considering double calculation
                        // round error 记录最差的
                        worstZones.add(zone);
                    } else if (loadPerServer > maxLoadPerServer) {
   
                    //负载比最大负载还大,重新记录最差
                        maxLoadPerServer = loadPerServer;
                        worstZones.clear();
                        worstZones.add(zone);
                    }
                }
            }
        }
        // 最大负载都小于0.2,说明服务可用性都还好,并且没有剔除任何区
        if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
   
            // zone override is not needed here
            return availableZones;
        }
        //从最差的区里随机取一个
        String zoneToAvoid = randomChooseZone(snapshot, worstZones);
        //从可用区里移除最差的一个分区
        if (zoneToAvoid != null) {
   
            availableZones.remove(zoneToAvoid);
        }
        return availableZones;

    }

可以看到最后,最终是通过随机函数Random.nextInt()方法从可用分区里获取一个分区。 代码比较冗长,但是逻辑是很清晰的,可以参考下方时序图

image.png

可用服务的选择|逻辑较复杂!!

上面分区选好了,下面就是从可用分区里选择可用服务了。

重要说明:说明选择服务的源码逻辑比上方选择分区逻辑复杂多了,不过多看几遍就好了

image.png

//获取loadbalancer
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
//获取服务
server = zoneLoadBalancer.chooseServer(key);

getLoadBalancer这里运用了享元模式,每个分区都只会创建一次负载均衡器对象,创建好了就缓存起来,下次请求就使用缓存中的负载均衡对象了。

BaseLoadBalancer getLoadBalancer(String zone) {
   
        zone = zone.toLowerCase();
        //首先查缓存
        BaseLoadBalancer loadBalancer = balancers.get(zone);
        if (loadBalancer == null) {
   
            // We need to create rule object for load balancer for each zone
            IRule rule = cloneRule(this.getRule());
            loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
            //1个分区一个loadBalancer
            BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
            if (prev != null) {
   
                loadBalancer = prev;
            }
        } 
        return loadBalancer;        
    }

每个分区创建一个Rule,一个BaseLoadBalancer,服务选择是通过BaseLoadBalancer选择出来的。

继续跟进chooseServer方法可知,使用Rule对象选择服务

public Server chooseServer(Object key) {
   
        if (counter == null) {
   
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
   
            return null;
        } else {
   
            try {
   
                return rule.choose(key);
            } catch (Exception e) {
   
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }

前文讲过自动装配类初始化Rule为ZoneAvoidanceRule类,它继承了PredicateBasedRule,而PredicateBasedRule最终使用AbstractServerPredicate进行服务抉择!

首先拿到所有的服务,这里所有服务是存储在BaseLoadBalancer

public class BaseLoadBalancer  {
   

    //所有服务列表
    @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> allServerList = Collections
            .synchronizedList(new ArrayList<Server>());

}

继续跟进AbstractServerPredicate我们终于看到了负载均衡的庐山真面目

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
   
        //获取所有可用服务列表
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
   
            return Optional.absent();
        }
        //轮训获取服务
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

执行完chooseRoundRobinAfterFiltering方法就拿到了最终选择的服务了,它通过轮训方式获取一个服务。

getEligibleServers方法里通过Predicate服务决策器过滤,过滤出所有可用的服务列表。

ZoneAvoidanceRule构造器里面,我们可以看到它默认使用两个决策器,进行过滤服务,一个是ZoneAvoidancePredicate,一个是AvailabilityPredicate,使用到组合器CompositePredicate将两者结合(组合设计模式)。

public ZoneAvoidanceRule() {
   
        super();
        ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
        AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
        compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
    }

    private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
   
        return CompositePredicate.withPredicates(p1, p2)
                             .addFallbackPredicate(p2)
                             .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                             .build();

    }
}

com.netflix.loadbalancer.ZoneAvoidancePredicate 负责匹配过滤满足区域可用的服务 com.netflix.loadbalancer.AvailabilityPredicate 负责匹配过滤满足可用性的服务

image.png

获取服务实现里面,并没有从最开始获取好的分区里面去找服务,而是再次实时获取全部服务,再通过区域过滤器,服务可用性过滤器,选择可用的服务。

这个原因到底是为什么呢? 可想而知,分区是否可用,服务是否可用这些都是一直在变化的,每次获取最新的服务信息才是更加可靠的!!

总结

好啦,上面就是今天的全部内容,我们从LoadBalancerFeignClient开始,介绍了openFeign集成Ribbon负载均衡能力,而Ribbon使用到LoadBalancer和Rule实现负载均衡,他默认使用按区域负载均衡的策略。 分别使用ZoneAvoidanceRuleZoneAwareLoadBalancer实现负载均衡核心逻辑。

核心流程分两步:

image.png

到此OpenFeign集成负载均衡能力核心流程已经分析完了。

后面将继续分析OpenFeign的其他组件能力。

image.png

相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
1月前
|
弹性计算 监控 负载均衡
|
1月前
|
负载均衡 Java Nacos
Ribbon负载均衡
Ribbon负载均衡
32 1
Ribbon负载均衡
|
1月前
|
运维 负载均衡 算法
|
2月前
|
存储 数据可视化 Java
基于MicrometerTracing门面和Zipkin实现集成springcloud2023的服务追踪
Sleuth将会停止维护,Sleuth最新版本也只支持springboot2。作为替代可以使用MicrometerTracing在微服务中作为服务追踪的工具。
143 1
|
3月前
|
Java Maven Windows
使用Java创建集成JACOB的HTTP服务
本文介绍了如何在Java中创建一个集成JACOB的HTTP服务,使Java应用能够调用Windows的COM组件。文章详细讲解了环境配置、动态加载JACOB DLL、创建HTTP服务器、实现IP白名单及处理HTTP请求的具体步骤,帮助读者实现Java应用与Windows系统的交互。作者拥有23年编程经验,文章来源于稀土掘金。著作权归作者所有,商业转载需授权。
使用Java创建集成JACOB的HTTP服务
|
21天前
|
机器学习/深度学习 人工智能 自然语言处理
Voice-Pro:开源AI音频处理工具,集成转录、翻译、TTS等一站式服务
Voice-Pro是一款开源的多功能音频处理工具,集成了语音转文字、文本转语音、实时翻译、YouTube视频下载和人声分离等多种功能。它支持超过100种语言,适用于教育、娱乐和商业等多个领域,为用户提供一站式的音频处理解决方案,极大地提高工作效率和音频处理的便捷性。
91 10
Voice-Pro:开源AI音频处理工具,集成转录、翻译、TTS等一站式服务
|
1月前
|
负载均衡 算法 Java
除了 Ribbon,Spring Cloud 中还有哪些负载均衡组件?
这些负载均衡组件各有特点,在不同的场景和需求下,可以根据项目的具体情况选择合适的负载均衡组件来实现高效、稳定的服务调用。
83 5
|
19天前
|
负载均衡 Java Nacos
常见的Ribbon/Spring LoadBalancer的负载均衡策略
自SpringCloud 2020版起,Ribbon被弃用,转而使用Spring Cloud LoadBalancer。Ribbon支持轮询、随机、加权响应时间和重试等负载均衡策略;而Spring Cloud LoadBalancer则提供轮询、随机及Nacos负载均衡策略,基于Reactor实现,更高效灵活。
43 0
|
1月前
|
安全 测试技术 数据安全/隐私保护
原生鸿蒙应用市场开发者服务的技术解析:从集成到应用发布的完整体验
原生鸿蒙应用市场开发者服务的技术解析:从集成到应用发布的完整体验
|
2月前
|
存储 Java 开发工具
【三方服务集成】最新版 | 阿里云OSS对象存储服务使用教程(包含OSS工具类优化、自定义阿里云OSS服务starter)
阿里云OSS(Object Storage Service)是一种安全、可靠且成本低廉的云存储服务,支持海量数据存储。用户可通过网络轻松存储和访问各类文件,如文本、图片、音频和视频等。使用OSS后,项目中的文件上传业务无需在服务器本地磁盘存储文件,而是直接上传至OSS,由其管理和保障数据安全。此外,介绍了OSS服务的开通流程、Bucket创建、AccessKey配置及环境变量设置,并提供了Java SDK示例代码,帮助用户快速上手。最后,展示了如何通过自定义starter简化工具类集成,实现便捷的文件上传功能。
【三方服务集成】最新版 | 阿里云OSS对象存储服务使用教程(包含OSS工具类优化、自定义阿里云OSS服务starter)