OpenFeign集成Ribbon负载均衡-过滤和选择服务核心实现

简介: 该文章主要介绍了如何在OpenFeign中集成Ribbon以实现负载均衡,并详细分析了Ribbon中服务选择和服务过滤的核心实现过程。文章还涉及了Ribbon中负载均衡器(ILoadBalancer)和负载均衡策略(IRule)的初始化方式。

前言

在上一篇文章OpenFeign最核心组件LoadBalancerFeignClient详解分析了OpenFeign的负载均衡客户端,OpenFeign使用LoadBalancerFeignClient(下文简称LoadBalancerFC)替换默认的通信客户端feign.Client.Default, LoadBalancerFC是具备负载均衡能力和通信的能力的客户端,而负载均衡能力是通过Ribbon提供的。

本文内容是Ribbon负载均衡的核心部分,Ribbon是一种具备客户端负载均衡能力的技术。

上文已经分析到LoadBalancerFC最终会通过Ribbon中的Rule对象选择一个服务进行发起通信,但是没有详细分析Rule的选择服务流程,本文将继续详细分析。

image.png

先回顾上文讲到的选择服务方法,位于com.netflix.loadbalancer.reactive.LoadBalancerCommand#selectServer 方法

private Observable<Server> selectServer() {
   
        return Observable.create(new OnSubscribe<Server>() {
   
            @Override
            public void call(Subscriber<? super Server> next) {
   
                try {
   
                    //通过loadBalancerContext选择出服务
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
   
                    next.onError(e);
                }
            }
        });
}

loadBalancerContext负责根据loadBalancer对象选择服务,而loadBalancer对象最终是通过Rule对象来获取服务的。

public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
   
        String host = null;
        int port = -1;

        //...
        ILoadBalancer lb = getLoadBalancer();
        //通过负载均衡获取服务
        Server svc = lb.chooseServer(loadBalancerKey);
        logger.debug("{} using LB returned Server: {} for request {}", new Object[]{
   clientName, svc, original});
        return svc;
}

loadBalancerContext负责根据LoadBalancer对象选择服务,而loadBalancer对象最终是通过Rule对象来获取服务的。

那么ILoadBalancer对象和Rule对象是如何初始化的呢?

LoadBalancer和Rule初始化

我们可以看到Ribbon的自动装配类RibbonClientConfiguration,找到LoadBalancer和Rule的默认配置类。

public class RibbonClientConfiguration {
   
@Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
   
        //初始化LoadBalancer对象为ZoneAwareLoadBalancer
        return (ILoadBalancer)(this.propertiesFactory.isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory.get(ILoadBalancer.class, config, this.name) 
        : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater));
    }


    @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {
   
        if (this.propertiesFactory.isSet(IRule.class, this.name)) {
   
            return (IRule)this.propertiesFactory.get(IRule.class, config, this.name);
        } else {
   
            //初始化Rule对象为ZoneAvoidanceRule
            ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
            rule.initWithNiwsConfig(config);
            return rule;
        }
    }
}

image.png

通过ribbon的自动装配类看出:loadbalancer默认配置的是ZoneAwareLoadBalancer,Rule类默认配置的是ZoneAvoidanceRule,他们都使用了条件注解@ConditionalOnMissingBean,也就是我们如果有自定义的负载规则可以覆盖这两个默认配置

loadbalancer和Rule都是以Zone开头,Zone是区域的意思,也就是说他们能够按照区域实现负载均衡,这种Rule是代表按区域负载均衡的机制,一般在大型互联网公司都会有多个机房,这种情况下就存在按区域负载的必要性了。

如下图一样,一个公司的服务会分区部署,不同的区的服务器资源都有可能不一样 image.png

我们可以看到每个服务类信息里都包含一个zone(区域)属性,就是代表这个服务所属的区域。

package com.netflix.loadbalancer;
public class Server {
   

    private String host;
    private int port = 80;
    //服务所属区域
    private String zone ;
    private String scheme;
    private volatile String id;
    private volatile boolean isAliveFlag;

}

因此可以知道负载均衡的总体逻辑是先选可用分区,再从可用分区里选可用服务,

image.png

从源码也可以看出验证上面这个流程,下面就是主要逻辑。

@Override
    public Server chooseServer(Object key) {
   
        //1、按区域负载没有开启
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
   
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        Server server = null;
        try {
   
            //2、获取有所分区快照
            LoadBalancerStats lbStats = getLoadBalancerStats();
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            //3、计算哪些是可用的分区
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            //4、这里是可用分区小于总分区数的时候(说明当前存在有区域不可用的情况),从可用分区选择一个分区,如果所有区域都可用,可以不走区域负载逻辑
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
   
                //5、随机选择一个分区
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
   
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    //6、从可用区选择一个具体的服务
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Exception e) {
   
            logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
        }
        if (server != null) {
   
            //7、通过区域负载获取到了可用服务,直接返回
            return server;
        } else {
   
            //8、不使用按区域负载
            logger.debug("Zone avoidance logic is not invoked.");
            return super.chooseServer(key);
        }
    }

下面我们来看看详细的实现。

可用分区的选择

第一步获取分区的快照信息(各个分区服务可用负载情况)

    public ZoneSnapshot getZoneSnapshot(List<? extends Server> servers) {
   
        if (servers == null || servers.size() == 0) {
   
            return new ZoneSnapshot();
        }
        int instanceCount = servers.size();
        int activeConnectionsCount = 0;
        int activeConnectionsCountOnAvailableServer = 0;
        int circuitBreakerTrippedCount = 0;
        double loadPerServer = 0;
        long currentTime = System.currentTimeMillis();
        for (Server server: servers) {
   
            ServerStats stat = getSingleServerStat(server);   
            if (stat.isCircuitBreakerTripped(currentTime)) {
   
                circuitBreakerTrippedCount++;
            } else {
   
                activeConnectionsCountOnAvailableServer += stat.getActiveRequestsCount(currentTime);
            }
            activeConnectionsCount += stat.getActiveRequestsCount(currentTime);
        }
        if (circuitBreakerTrippedCount == instanceCount) {
   
            //服务全被熔断了
            if (instanceCount > 0) {
   
                // should be NaN, but may not be displayable on Epic
                loadPerServer = -1;
            }
        } else {
   
            //计算每个服务平均负载 活跃的请求线程数/(总服务-熔断服务)
            loadPerServer = ((double) activeConnectionsCountOnAvailableServer) / (instanceCount - circuitBreakerTrippedCount);
        }
        return new ZoneSnapshot(instanceCount, circuitBreakerTrippedCount, activeConnectionsCount, loadPerServer);
    }

第二步就是从所有分区里选出可用的分区了

public static Set<String> getAvailableZones(
            Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
            double triggeringBlackoutPercentage) {
   
        if (snapshot.isEmpty()) {
   
            return null;
        }
        Set<String> availableZones = new HashSet<String>(snapshot.keySet());
        if (availableZones.size() == 1) {
   
            return availableZones;
        }
        Set<String> worstZones = new HashSet<String>();
        double maxLoadPerServer = 0;
        boolean limitedZoneAvailability = false;

//循环分区,找到可用分区
        for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
   
            String zone = zoneEntry.getKey();
            ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
            int instanceCount = zoneSnapshot.getInstanceCount();
            if (instanceCount == 0) {
   
                availableZones.remove(zone);
                limitedZoneAvailability = true;
            } else {
   
                double loadPerServer = zoneSnapshot.getLoadPerServer();

                //熔断的服务比例大于99%
                if (((double) zoneSnapshot.getCircuitTrippedCount())
                        / instanceCount >= triggeringBlackoutPercentage
                        || loadPerServer < 0) {
   
                        //服务的平均负载小于0(服务全被熔断了,从可用分区剔除)
                    availableZones.remove(zone);
                    limitedZoneAvailability = true;
                } else {
   
                // 负载和最大负载相近 1/100000 记录为最差
                    if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
   
                        // they are the same considering double calculation
                        // round error 记录最差的
                        worstZones.add(zone);
                    } else if (loadPerServer > maxLoadPerServer) {
   
                    //负载比最大负载还大,重新记录最差
                        maxLoadPerServer = loadPerServer;
                        worstZones.clear();
                        worstZones.add(zone);
                    }
                }
            }
        }
        // 最大负载都小于0.2,说明服务可用性都还好,并且没有剔除任何区
        if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
   
            // zone override is not needed here
            return availableZones;
        }
        //从最差的区里随机取一个
        String zoneToAvoid = randomChooseZone(snapshot, worstZones);
        //从可用区里移除最差的一个分区
        if (zoneToAvoid != null) {
   
            availableZones.remove(zoneToAvoid);
        }
        return availableZones;

    }

可以看到最后,最终是通过随机函数Random.nextInt()方法从可用分区里获取一个分区。 代码比较冗长,但是逻辑是很清晰的,可以参考下方时序图

image.png

可用服务的选择|逻辑较复杂!!

上面分区选好了,下面就是从可用分区里选择可用服务了。

重要说明:说明选择服务的源码逻辑比上方选择分区逻辑复杂多了,不过多看几遍就好了

image.png

//获取loadbalancer
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
//获取服务
server = zoneLoadBalancer.chooseServer(key);

getLoadBalancer这里运用了享元模式,每个分区都只会创建一次负载均衡器对象,创建好了就缓存起来,下次请求就使用缓存中的负载均衡对象了。

BaseLoadBalancer getLoadBalancer(String zone) {
   
        zone = zone.toLowerCase();
        //首先查缓存
        BaseLoadBalancer loadBalancer = balancers.get(zone);
        if (loadBalancer == null) {
   
            // We need to create rule object for load balancer for each zone
            IRule rule = cloneRule(this.getRule());
            loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
            //1个分区一个loadBalancer
            BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
            if (prev != null) {
   
                loadBalancer = prev;
            }
        } 
        return loadBalancer;        
    }

每个分区创建一个Rule,一个BaseLoadBalancer,服务选择是通过BaseLoadBalancer选择出来的。

继续跟进chooseServer方法可知,使用Rule对象选择服务

public Server chooseServer(Object key) {
   
        if (counter == null) {
   
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
   
            return null;
        } else {
   
            try {
   
                return rule.choose(key);
            } catch (Exception e) {
   
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }

前文讲过自动装配类初始化Rule为ZoneAvoidanceRule类,它继承了PredicateBasedRule,而PredicateBasedRule最终使用AbstractServerPredicate进行服务抉择!

首先拿到所有的服务,这里所有服务是存储在BaseLoadBalancer

public class BaseLoadBalancer  {
   

    //所有服务列表
    @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> allServerList = Collections
            .synchronizedList(new ArrayList<Server>());

}

继续跟进AbstractServerPredicate我们终于看到了负载均衡的庐山真面目

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
   
        //获取所有可用服务列表
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
   
            return Optional.absent();
        }
        //轮训获取服务
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

执行完chooseRoundRobinAfterFiltering方法就拿到了最终选择的服务了,它通过轮训方式获取一个服务。

getEligibleServers方法里通过Predicate服务决策器过滤,过滤出所有可用的服务列表。

ZoneAvoidanceRule构造器里面,我们可以看到它默认使用两个决策器,进行过滤服务,一个是ZoneAvoidancePredicate,一个是AvailabilityPredicate,使用到组合器CompositePredicate将两者结合(组合设计模式)。

public ZoneAvoidanceRule() {
   
        super();
        ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
        AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
        compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
    }

    private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
   
        return CompositePredicate.withPredicates(p1, p2)
                             .addFallbackPredicate(p2)
                             .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                             .build();

    }
}

com.netflix.loadbalancer.ZoneAvoidancePredicate 负责匹配过滤满足区域可用的服务 com.netflix.loadbalancer.AvailabilityPredicate 负责匹配过滤满足可用性的服务

image.png

获取服务实现里面,并没有从最开始获取好的分区里面去找服务,而是再次实时获取全部服务,再通过区域过滤器,服务可用性过滤器,选择可用的服务。

这个原因到底是为什么呢? 可想而知,分区是否可用,服务是否可用这些都是一直在变化的,每次获取最新的服务信息才是更加可靠的!!

总结

好啦,上面就是今天的全部内容,我们从LoadBalancerFeignClient开始,介绍了openFeign集成Ribbon负载均衡能力,而Ribbon使用到LoadBalancer和Rule实现负载均衡,他默认使用按区域负载均衡的策略。 分别使用ZoneAvoidanceRuleZoneAwareLoadBalancer实现负载均衡核心逻辑。

核心流程分两步:

image.png

到此OpenFeign集成负载均衡能力核心流程已经分析完了。

后面将继续分析OpenFeign的其他组件能力。

image.png

相关文章
|
3天前
|
存储 数据可视化 Java
基于MicrometerTracing门面和Zipkin实现集成springcloud2023的服务追踪
Sleuth将会停止维护,Sleuth最新版本也只支持springboot2。作为替代可以使用MicrometerTracing在微服务中作为服务追踪的工具。
25 1
|
1月前
|
Java Maven Windows
使用Java创建集成JACOB的HTTP服务
本文介绍了如何在Java中创建一个集成JACOB的HTTP服务,使Java应用能够调用Windows的COM组件。文章详细讲解了环境配置、动态加载JACOB DLL、创建HTTP服务器、实现IP白名单及处理HTTP请求的具体步骤,帮助读者实现Java应用与Windows系统的交互。作者拥有23年编程经验,文章来源于稀土掘金。著作权归作者所有,商业转载需授权。
使用Java创建集成JACOB的HTTP服务
|
11天前
|
存储 Java 开发工具
【三方服务集成】最新版 | 阿里云OSS对象存储服务使用教程(包含OSS工具类优化、自定义阿里云OSS服务starter)
阿里云OSS(Object Storage Service)是一种安全、可靠且成本低廉的云存储服务,支持海量数据存储。用户可通过网络轻松存储和访问各类文件,如文本、图片、音频和视频等。使用OSS后,项目中的文件上传业务无需在服务器本地磁盘存储文件,而是直接上传至OSS,由其管理和保障数据安全。此外,介绍了OSS服务的开通流程、Bucket创建、AccessKey配置及环境变量设置,并提供了Java SDK示例代码,帮助用户快速上手。最后,展示了如何通过自定义starter简化工具类集成,实现便捷的文件上传功能。
【三方服务集成】最新版 | 阿里云OSS对象存储服务使用教程(包含OSS工具类优化、自定义阿里云OSS服务starter)
|
3天前
|
负载均衡 Kubernetes 区块链
随机密码生成器+阿里k8s负载均衡型服务加证书方法+移动终端设计+ico生成器等
随机密码生成器+阿里k8s负载均衡型服务加证书方法+移动终端设计+ico生成器等
27 1
|
1月前
|
负载均衡 Java Nacos
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
微服务介绍、SpringCloud、服务拆分和远程调用、Eureka注册中心、Ribbon负载均衡、Nacos注册中心
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
|
14天前
|
负载均衡 Java 开发者
Ribbon框架实现客户端负载均衡的方法与技巧
Ribbon框架为微服务架构中的客户端负载均衡提供了强大的支持。通过简单的配置和集成,开发者可以轻松地在应用中实现服务的发现、选择和负载均衡。适当地使用Ribbon,配合其他Spring Cloud组件,可以有效提升微服务架构的可用性和性能。
16 0
|
2月前
|
域名解析 网络协议 API
【API管理 APIM】APIM集成内部VNet时,常遇见的关于自定义DNS服务问题。
【API管理 APIM】APIM集成内部VNet时,常遇见的关于自定义DNS服务问题。
|
2月前
|
负载均衡 Java Spring
Ribbon的超时配置会覆盖OpenFeign的超时配置吗
该文章详细分析了OpenFeign与Ribbon之间的超时配置关系,解释了Ribbon如何覆盖OpenFeign的默认超时配置,并探讨了OpenFeign超时配置的动态修改方案。
|
2月前
|
机器人 C# 人工智能
智能升级:WPF与人工智能的跨界合作——手把手教你集成聊天机器人,打造互动新体验与个性化服务
【8月更文挑战第31天】聊天机器人已成为现代应用的重要组成部分,提供即时响应、个性化服务及全天候支持。随着AI技术的发展,聊天机器人的功能日益强大,不仅能进行简单问答,还能实现复杂对话管理和情感分析。本文通过具体案例分析,展示了如何在WPF应用中集成聊天机器人,并通过示例代码详细说明其实现过程。使用Microsoft的Bot Framework可以轻松创建并配置聊天机器人,增强应用互动性和用户体验。首先,需在Bot Framework门户中创建机器人项目并编写逻辑。然后,在WPF应用中添加聊天界面,实现与机器人的交互。
50 0
|
2月前
|
负载均衡 Kubernetes 开发工具
k8s相关服务与负载均衡
k8s相关服务与负载均衡
34 0