dubbo - 负载均衡调用过程

本文涉及的产品
应用型负载均衡 ALB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
简介: 开篇 这篇文章的主要目的是讲清楚ClusterInvoker在多个invoker对象进行负载均衡的调用过程,也就描述从调用到负载均衡选择的调用链路,真正的负载均衡部分的逻辑在后面由单独讲解。 selector 调用时序图 说明: RegistryProtocol的doRefer()方法内部cluster.join()负责创建ClusterInvoker对象,所有的cluster的invoker的选择逻辑都在这个函数实现。

开篇

这篇文章的主要目的是讲清楚ClusterInvoker在多个invoker对象进行负载均衡的调用过程,也就描述从调用到负载均衡选择的调用链路,真正的负载均衡部分的逻辑在后面由单独讲解。


selector 调用时序图

image

说明:

  • RegistryProtocol的doRefer()方法内部cluster.join()负责创建ClusterInvoker对象,所有的cluster的invoker的选择逻辑都在这个函数实现。
  • FailoverClusterInvoker继承自AbstractClusterInvoker,AbstractClusterInvoker抽象出了通用的所有类型ClusterInvoker需要使用的方法,核心的如select、doSelect、invoke等通用方法,将具体不同实现的doInvoke方法抽象给具体实现类实现。
  • AbstractClusterInvoker的invoke()方法作为调用的入口,内部功能主要是根据loadBalance策略选择invoker进行执行。
  • AbstractClusterInvoker的select() -> doSelect()方法内部根据不同loadBalance策略实现invoker的调用。
  • 通过选择具体的invoker对象后调用invoke()方法实现远程调用。


dubbo cluster 调用过程源码

FailoverCluster对象创建入口

说明:

  • 核心在于cluster.join()方法会调用到如FailoverCluster类,建议通过debug打断点方式跟踪调用栈。
public class RegistryProtocol implements Protocol {

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // 构建RegistryDirectory,可以把它理解为注册资源,其中包含了消费者/服务/路由等相关信息,其同时也是回调监听器
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());

        // 构建subscribeUrl信息,主要拼接consumer:xxx的url地址
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);

        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            // 向注册中心注册服务消费者
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }

        // 从注册中心订阅服务提供者(即引用的服务)
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

        // 从invoker当中选择其中一个返回
        Invoker invoker = cluster.join(directory);

        // 注册消费者
        ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory);
        return invoker;
    }
}


FailoverClusterInvoker 负载均衡器举例

  • 按照以下流程进行进行invoker的选择:AbstractClusterInvoker.invoke() -> FailoverClusterInvoker.doInvoke() -> AbstractClusterInvoker. select() -> AbstractClusterInvoker.doselect() -> AbstractLoadBalance.select()。
  • invoke()和select()调用链的关键路径在于invoke由父类触发后在实现类FailoverClusterInvoker执行,然后调用的负载均衡因为是通用的所以在父类AbstractClusterInvoker中实现。
  • directory.list(invocation)负责选择从ReigstryDeirectory获取服务引用的invokers列表进行负载均衡选择
  • invoker选择的过程中,在AbstractClusterInvoker.invoke()阶段会根据规则生成LoadBalance对象进行invoker的选择,在调用过程中生成LoadBalance的对象。
  • LoadBalance的选择是按照URL中配置的信息负载均衡器进行选择,如果没有配置就走默认的负载均衡器。
  • AbstractLoadBalance作为负载均衡器基类,提供了select()的执行流程,具体的doSelect动作由各个实现类去实现。
public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";

    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<T>(directory);
    }
}


public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);

    public FailoverClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyinvokers = invokers;
        checkInvokers(copyinvokers, invocation);
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
                checkWhetherDestroyed();
                copyinvokers = list(invocation);
                // check again
                checkInvokers(copyinvokers, invocation);
            }
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                Result result = invoker.invoke(invocation);
                return result;
            } catch (RpcException e) {

            } catch (Throwable e) {

            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
    }
}


public abstract class AbstractClusterInvoker<T> implements Invoker<T> {

    public Result invoke(final Invocation invocation) throws RpcException {

        checkWhetherDestroyed();

        LoadBalance loadbalance;

        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && invokers.size() > 0) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        return doInvoke(invocation, invokers, loadbalance);
    }


    protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
        List<Invoker<T>> invokers = directory.list(invocation);
        return invokers;
    }


    protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        if (invokers == null || invokers.size() == 0)
            return null;
        String methodName = invocation == null ? "" : invocation.getMethodName();

        boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
        {
            //ignore overloaded method
            if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
                stickyInvoker = null;
            }
            //ignore cucurrent problem
            if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
                if (availablecheck && stickyInvoker.isAvailable()) {
                    return stickyInvoker;
                }
            }
        }
        Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);

        if (sticky) {
            stickyInvoker = invoker;
        }
        return invoker;
    }


    private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        if (invokers == null || invokers.size() == 0)
            return null;
        if (invokers.size() == 1)
            return invokers.get(0);
        // If we only have two invokers, use round-robin instead.
        if (invokers.size() == 2 && selected != null && selected.size() > 0) {
            return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
        }
        Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

        //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
        if ((selected != null && selected.contains(invoker))
                || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
            try {
                Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                if (rinvoker != null) {
                    invoker = rinvoker;
                } else {
                    //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                    int index = invokers.indexOf(invoker);
                    try {
                        //Avoid collision
                        invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker;
                    } catch (Exception e) {
                        logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                    }
                }
            } catch (Throwable t) {
                logger.error("clustor relselect fail reason is :" + t.getMessage() + " if can not slove ,you can set cluster.availablecheck=false in url", t);
            }
        }
        return invoker;
    }
}


AbstractLoadBalance执行过程

  • AbstractLoadBalance的select()方法作为负载均衡选择器的入口位置,提供通用的getWeight()方法获取权重。
  • AbstractLoadBalance的doSelect()由具体的实现类实现。
public abstract class AbstractLoadBalance implements LoadBalance {

    static int calculateWarmupWeight(int uptime, int warmup, int weight) {
        int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
        return ww < 1 ? 1 : (ww > weight ? weight : ww);
    }

    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (invokers == null || invokers.size() == 0)
            return null;
        if (invokers.size() == 1)
            return invokers.get(0);
        return doSelect(invokers, url, invocation);
    }

    protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

    protected int getWeight(Invoker<?> invoker, Invocation invocation) {
        int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
        if (weight > 0) {
            long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
            if (timestamp > 0L) {
                int uptime = (int) (System.currentTimeMillis() - timestamp);
                int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
                if (uptime > 0 && uptime < warmup) {
                    weight = calculateWarmupWeight(uptime, warmup, weight);
                }
            }
        }
        return weight;
    }
}
相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
目录
相关文章
|
6月前
|
负载均衡 Dubbo 应用服务中间件
【Dubbo 解析】Dubbo支持几种负载均衡策略?
【1月更文挑战第11天】【Dubbo 解析】Dubbo支持几种负载均衡策略?
|
负载均衡 Dubbo 应用服务中间件
微服务技术系列教程(31) - Dubbo-原理及负载均衡分析
微服务技术系列教程(31) - Dubbo-原理及负载均衡分析
96 0
|
3月前
|
负载均衡 Dubbo 算法
Dubbo服务负载均衡原理
该文章主要介绍了Dubbo服务负载均衡的原理,包括Dubbo中负载均衡的实现位置、为什么需要负载均衡机制、Dubbo支持的负载均衡算法以及随机负载均衡策略的源码分析。
|
6月前
|
负载均衡 算法
Dubbo-负载均衡原理解析(1),一个本科渣渣是怎么逆袭从咸鱼到Offer收割机的
Dubbo-负载均衡原理解析(1),一个本科渣渣是怎么逆袭从咸鱼到Offer收割机的
|
负载均衡 Dubbo 算法
带你读《Apache Dubbo微服务开发从入门到精通》——三、 负载均衡机制(2)
带你读《Apache Dubbo微服务开发从入门到精通》——三、 负载均衡机制(2)
105 3
带你读《Apache Dubbo微服务开发从入门到精通》——三、 负载均衡机制(2)
|
消息中间件 运维 负载均衡
Dubbo负载均衡和集群容错和动态代理策略
Dubbo负载均衡和集群容错和动态代理策略
124 0
|
负载均衡 Dubbo 算法
带你读《Apache Dubbo微服务开发从入门到精通》——六、 负载均衡
带你读《Apache Dubbo微服务开发从入门到精通》——六、 负载均衡
118 5
|
存储 负载均衡 Kubernetes
带你读《Apache Dubbo微服务开发从入门到精通》——二、 应用级服务发现机制详解(上)
带你读《Apache Dubbo微服务开发从入门到精通》——二、 应用级服务发现机制详解(上)
123 7
|
自然语言处理 负载均衡 Dubbo
带你读《Apache Dubbo微服务开发从入门到精通》——三、 负载均衡机制(1)
带你读《Apache Dubbo微服务开发从入门到精通》——三、 负载均衡机制(1)
120 8
|
Dubbo Java 应用服务中间件
带你读《Apache Dubbo微服务开发从入门到精通》—— 一、 Dubbo服务发现设计
带你读《Apache Dubbo微服务开发从入门到精通》—— 一、 Dubbo服务发现设计
128 6
下一篇
无影云桌面