开篇
这篇文章的目的主要是为了分析Consumer侧Cluster的初始化过程,并针对Consumer实际执行invoke()的过程ClusterInvoker的执行流程进行分解。
Cluster初始化过程中会梳理Cluster和ClusterInvoker的关系,了解核心的join()方法。
consumer执行invoke()过程中会涉及ClusterInvoker、LoadBalance、RegistryDirectory的关系梳理,RegistryDirectory负责获取invoker的列表,LoadBalance负责针对invoker的列表负载均衡,ClusterInvoker作为invoke()总入口。
Cluster初始化过程
public class RegistryProtocol implements Protocol {
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
// cluster为Cluster$Adaptive
// directory的url为
// zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService
// ?anyhost=true&application=dubbo-demo-api-consumer
// &check=false&cluster=failover&deprecated=false&dubbo=2.0.2
// &dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&lazy=false
// &methods=sayHello&pid=9034®ister.ip=192.168.1.5
// &release=2.7.2&remote.application=dubbo-demo-api-provider
// &side=consumer&sticky=false×tamp=1572012894506
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
}
- RegistryProtocol在执行cluster.join(directory)方法,其中cluster为Cluster$Adaptive对象,directory为RegistryDirectory对象。
- directory的url参数当中可以设置参数cluster=failover,指明consumer的负载均衡策略。
- Invoker invoker = cluster.join(directory)返回的是MockClusterWrapper对象。
public class Cluster$Adaptive implements Cluster {
public Invoker join(Directory directory) throws RpcException {
if (directory == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument == null");
}
if (directory.getUrl() == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument getUrl() == null");
}
URL uRL = directory.getUrl();
String string = uRL.getParameter("cluster", "failover");
if (string == null) {
throw new IllegalStateException(new StringBuffer()
.append("Failed to get extension (org.apache.dubbo.rpc.cluster.Cluster) name from url (")
.append(uRL.toString()).append(") use keys([cluster])").toString());
}
Cluster cluster = (Cluster)ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(string);
return cluster.join(directory);
}
}
com.alibaba.dubbo.rpc.cluster.Cluster文件
mock=com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
failover=com.alibaba.dubbo.rpc.cluster.support.FailoverCluster
failfast=com.alibaba.dubbo.rpc.cluster.support.FailfastCluster
failsafe=com.alibaba.dubbo.rpc.cluster.support.FailsafeCluster
failback=com.alibaba.dubbo.rpc.cluster.support.FailbackCluster
forking=com.alibaba.dubbo.rpc.cluster.support.ForkingCluster
available=com.alibaba.dubbo.rpc.cluster.support.AvailableCluster
mergeable=com.alibaba.dubbo.rpc.cluster.support.MergeableCluster
broadcast=com.alibaba.dubbo.rpc.cluster.support.BroadcastCluster
- Cluster$Adaptive中获取默认值为failover的cluster扩展,cluster的定义在com.alibaba.dubbo.rpc.cluster.Cluster文件。
- ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("failover")获取MockClusterWrapper对象。
- getExtension("failover")的过程会涉及到Wrapper类的包装,先返回FailoverCluster对象,然后由包装类MockClusterWrapper进行包装。
- cluster.join()执行MockClusterWrapper的join()方法。
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}
public class MockClusterWrapper implements Cluster {
private Cluster cluster;
public MockClusterWrapper(Cluster cluster) {
this.cluster = cluster;
}
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MockClusterInvoker<T>(directory, this.cluster.join(directory));
}
}
- MockClusterWrapper的Cluster对象是FailoverCluster。
- MockClusterWrapper的join()方法会执行FailoverCluster的join()方法。
- MockClusterWrapper的join()方法返回MockClusterInvoker对象。
- MockClusterWrapper的this.cluster.join(directory)相当于执行FailoverCluster的join()方法,返回FailoverClusterInvoker对象。
- 这部分叙述主要是描述MockClusterWrapper包含FailoverCluster,同时MockClusterWrapper.join()返回MockClusterInvoker对象,MockClusterInvoker对象包含MockClusterInvoker。
public class MockClusterInvoker<T> implements Invoker<T> {
private final Directory<T> directory;
private final Invoker<T> invoker;
public MockClusterInvoker(Directory<T> directory, Invoker<T> invoker) {
this.directory = directory;
this.invoker = invoker;
}
}
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
public FailoverClusterInvoker(Directory<T> directory) {
super(directory);
}
}
- MockClusterInvoker内部的invoker是FailoverClusterInvoker对象。
- 对MockClusterInvoker的invoke()会调用FailoverClusterInvoker的invoke()方法。
Cluster和ClusterInvoker关系
- 上图说明了ClusterInvoker和Cluster的类关系图,Cluster按照不同的负载均衡策略分成不同的Cluster,每个Cluster的join方法返回不同的ClusterInvoker对象。
- Cluster的join()方法的作用就是生成对应的ClusterInvoker对象。
Dubbo Cluster invoker流程
下图以FailoverCluster为例说明Cluster在执行invoke动作的流程。
- 上图包含两个过程,分别是consumer侧ReferenceBean初始化,及consumer侧ReferenceBean的invoke()过程。
Reference流程分析
public class RegistryProtocol implements Protocol {
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
// 消费端执行订阅并获取所有相关的providers信息,directory对象内包含provider的invoker
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
// cluster为Cluster@Adaptive对象
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
}
- directory.subscribe()过程执行消费端订阅提供端的provider,订阅后在directory包含所有的provider对应的invoker对象。
- doRefer()过程的cluster.join(directory)返回的是MockClusterInvoker对象。
- MockClusterInvoker对象内部包含FailoverClusterInvoker对象。
- 在每个Reference的Bean对象初始化的过程中都会执行FailoverCluster => FailoverClusterInvoker的流程,由FailoverCluster.join()返回FailoverClusterInvoker。
invoke流程分析
public class MockClusterInvoker<T> implements Invoker<T> {
private final Directory<T> directory;
private final Invoker<T> invoker;
public MockClusterInvoker(Directory<T> directory, Invoker<T> invoker) {
this.directory = directory;
this.invoker = invoker;
}
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
result = doMockInvoke(invocation, null);
} else {
// 省略无关代码
}
return result;
}
}
- MockClusterInvoker作为invoker的入口,执行invoke()方法会调用FailoverClusterInvoker的invoke()方法。
- 继续观察FailoverClusterInvoker的invoke()方法。
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
// directory是RegistryDirectory对象
protected final Directory<T> directory;
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
// 获取符合要求的invokers列表
List<Invoker<T>> invokers = list(invocation);
// 根据invocation上下文生成负载均衡策略
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 根据负载均衡策略选择合适invoker并进行调用
return doInvoke(invocation, invokers, loadbalance);
}
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
return directory.list(invocation);
}
}
- FailoverClusterInvoker作为AbstractClusterInvoker的子类,调用FailoverClusterInvoker对象的invoke()方法实际执行的是AbstractClusterInvoker的invoke()方法。
- invoke()方法执行核心步骤如下:1、获取符合要求的invokers列表;2、根据invocation上下文生成负载均衡策略;3、根据负载均衡策略选择合适invoker并进行调用。
public abstract class AbstractDirectory<T> implements Directory<T> {
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
return doList(invocation);
}
}
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
public List<Invoker<T>> doList(Invocation invocation) {
List<Invoker<T>> invokers = null;
try {
// 根据路由规则去过滤一遍返回符合路由规则的invokers
invokers = routerChain.route(getConsumerUrl(), invocation);
} catch (Throwable t) {
}
return invokers == null ? Collections.emptyList() : invokers;
}
}
- 获取符合要求的invoker列表。
- RegistryDirectory执行List() => doList()的过程获取符合要求的invoker列表。
- 符合要求的invoker列表会根据路由规则过滤一遍,相当于provider侧全量的invoker会由路由规则进行过滤。
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
if (CollectionUtils.isNotEmpty(invokers)) {
// LOADBALANCE_KEY = "loadbalance";
// DEFAULT_LOADBALANCE = "random";
return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
} else {
return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
}
}
}
org.apache.dubbo.rpc.cluster.LoadBalance文件
random=org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
roundrobin=org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
leastactive=org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
consistenthash=org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
- 根据invocation上下文生成负载均衡策略。
- ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(extName)根据扩展名获取对应的负载均衡策略。
- 默认的负载均衡策略是random,对应的是RandomLoadBalance对象。
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
public FailoverClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
// 默认调用一次,如果设置重试则在原来的一次调用次数上增加重试次数
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
// 调用失败则按照重试次数进行重试
for (int i = 0; i < len; i++) {
// 重新检测invokers是否已经变化
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
checkInvokers(copyInvokers, invocation);
}
// 根据loadBalance方法针对copyInvokers进行选择,根据不同负载均衡策略进行选择
Invoker<T> invoker = select(loadbalance, invocation,
copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// 执行invoker动作
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
}
}
- 根据负载均衡策略选择合适invoker并进行调用。
- 调用FailoverClusterInvoker的select()方法会调用AbstractClusterInvoker的select()方法获取invoker对象。
- AbstractClusterInvoker的select()内部根据负载均衡策略选择对应的invoker对象。
- 继续分析AbstractClusterInvoker的select()方法。
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
// 执行doSelect()方法根据负载均衡策略loadbalance选择合适的invoker
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
return invoker;
}
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers.size() == 1) {
return invokers.get(0);
}
// 核心逻辑执行loadbalance负载均衡的select()动作
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
// 省略非核心逻辑代码
return invoker;
}
}
- 执行负载均衡策略的select()方法
- loadbalance.select()执行负载均衡策略的select()方法。
总结
- 1、Consumer针对每个reference标签(对应引用provider的服务引用)会生成MockClusterInvoker对象。
- 2、MockClusterInvoker对象内部包含ClusterInvoker对象,根据集群策略不同的ClusterInvoker对象,具体的Cluster策略在com.alibaba.dubbo.rpc.cluster.Cluster文件定义。
- 3、ClusterInvoker对象(如FailoverClusterInvoker对象)包含directory对象RegistryDirectory,RegistryDirectory包含引用接口的provider侧的invokers列表。
- 4、MockClusterInvoker的invoke()方法会调用FailoverClusterInvoker.invoke()方法;FailoverClusterInvoker.invoke()方法会调用RegistryDirectory()的list()方法返回可用的invokers列表。
- 5、FailoverClusterInvoker.invoke()会返回指定的负载均衡策略并选择合适的invoker进行调用。
-6、执行invoker的invoke()调用。