开篇
- 覆盖规则是Dubbo设计的在无需重启应用的情况下,动态调整RPC调用行为的一种能力。
- 在Dubbo2.6及更早版本中,所有的服务治理规则都只针对服务粒度(service),如果要把某条规则作用到应用粒度上,需要为应用下的所有服务(service)配合相同的规则,变更,删除的时候也需要对应的操作。
- 2.7.0版本开始,支持从服务和应用两个粒度来调整动态配置。
- 这篇文章基于Dubbo-2.6.x描述动态配置的生效过程,下发配置的过程后续单独分析。
配置规则
向注册中心写入动态配置覆盖规则 。该功能通常由监控中心或治理中心的页面完成。
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://10.20.153.10:2181"));
registry.register(URL.valueOf("override://0.0.0.0/com.foo.BarService?category=configurators&dynamic=false&application=foo&timeout=1000"));
- 参考官网的文档配置规则
) - override:// 表示数据采用覆盖方式,支持 override 和 absent,可扩展,必填。
0.0.0.0 表示对所有 IP 地址生效,如果只想覆盖某个 IP 的数据,请填入具体 IP,必填。 - com.foo.BarService 表示只对指定服务生效,必填。
- category=configurators 表示该数据为动态配置类型,必填。
- dynamic=false 表示该数据为持久数据,当注册方退出时,数据依然保存在注册中心,必填。
- enabled=true 覆盖规则是否生效,可不填,缺省生效。
- application=foo 表示只对指定应用生效,可不填,表示对所有应用生效。
- timeout=1000 表示将满足以上条件的 timeout 参数的值覆盖为 1000。如果想覆盖其它参数,直接加在 override 的 URL 参数上。
流程分析
public class ZookeeperRegistry extends FailbackRegistry {
// 变量url的值
// consumer://172.17.32.176/org.apache.dubbo.demo.DemoService?
// application=dubbo-demo-api-consumer&category=providers,configurators,routers&dubbo=2.0.2
// &interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello
// &pid=58968&side=consumer&sticky=false×tamp=1571824631224
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (ANY_VALUE.equals(url.getServiceInterface())) {
// 暂时不关注这部分逻辑
} else {
List<URL> urls = new ArrayList<>();
// 处理providers、configurators、routers等路径
// /dubbo/org.apache.dubbo.demo.DemoService/providers
// /dubbo/org.apache.dubbo.demo.DemoService/configurators
// /dubbo/org.apache.dubbo.demo.DemoService/routers
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
// 创建zk节点变化回调监听器
listeners.putIfAbsent(listener,
(parentPath, currentChilds) -> ZookeeperRegistry.this.notify(
url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
// 创建path对应的节点
zkClient.create(path, false);
// 添加path下的children的监听
List<String> children = zkClient.addChildListener(path, zkListener);
// 处理path下的children
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 通知回调notify动作
notify(url, listener, urls);
}
} catch (Throwable e) {
}
}
}
- 1.consumer在refer过程中订阅providers/configurators/routers等节点children变更事件,一旦子节点有变化就会触发consumer侧的监听事件,重新生成服务引用。
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
// 配置变更会触发configuratorUrls的变更
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// 设置动态配置中心对象configurators
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls);
}
// 省略非核心代码
// providers
refreshInvoker(invokerUrls);
}
- 2.获取配置变更的configuratorUrls,生成configuratorUrls对应的configurators对象,该对象会用在动态改变consumer侧的URL链接。
- 3.进入refreshInvoker()方法内部进行invoker的重建过程。
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.methodInvokerMap = null; // Set the method invoker map to null
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
// 转换provider对应的URL为对应的invoker对象
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
-
- refreshInvoker()方法内部的toInvokers()方法执行invokder的重建过程。
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// 省略非核心代码
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// 重新生成invoker对象
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
-
- toInvokers()方法内部的mergeUrl()方法合并consumer、provider、configurator的参数信息,根据合并后的url的重建invoker对象。
// URL参数匹配顺序 override > Consumer > Provider
private URL mergeUrl(URL providerUrl) {
// Merge the consumer side parameters
providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap);
// 合并动态配置导provider的URL当中,合并动态配置
List<Configurator> localConfigurators = this.configurators; // local reference
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
providerUrl = configurator.configure(providerUrl);
}
}
providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker!
// The combination of directoryUrl and override is at the end of notify, which can't be handled here
this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); // Merge the provider side parameters
// 省略非核心代码
return providerUrl;
}
-
- URL参数的合并顺序按照 override > Consumer > Provider进行合并,通过mergeUrl()方法将动态配置的规则生效在URL当中。
-
- configurator.configure(providerUrl)中的configurator对象就是动态配置生成的对象,作用于provider的url之上。
public URL configure(URL url) {
if (configuratorUrl == null || configuratorUrl.getHost() == null
|| url == null || url.getHost() == null) {
return url;
}
// If override url has port, means it is a provider address. We want to control a specific provider with this override url, it may take effect on the specific provider instance or on consumers holding this provider instance.
if (configuratorUrl.getPort() != 0) {
if (url.getPort() == configuratorUrl.getPort()) {
return configureIfMatch(url.getHost(), url);
}
} else {// override url don't have a port, means the ip override url specify is a consumer address or 0.0.0.0
// 1.If it is a consumer ip address, the intention is to control a specific consumer instance, it must takes effect at the consumer side, any provider received this override url should ignore;
// 2.If the ip is 0.0.0.0, this override url can be used on consumer, and also can be used on provider
if (url.getParameter(Constants.SIDE_KEY, Constants.PROVIDER).equals(Constants.CONSUMER)) {
return configureIfMatch(NetUtils.getLocalHost(), url);// NetUtils.getLocalHost is the ip address consumer registered to registry.
} else if (url.getParameter(Constants.SIDE_KEY, Constants.CONSUMER).equals(Constants.PROVIDER)) {
return configureIfMatch(Constants.ANYHOST_VALUE, url);// take effect on all providers, so address must be 0.0.0.0, otherwise it won't flow to this if branch
}
}
return url;
}
private URL configureIfMatch(String host, URL url) {
if (Constants.ANYHOST_VALUE.equals(configuratorUrl.getHost()) || host.equals(configuratorUrl.getHost())) {
String configApplication = configuratorUrl.getParameter(Constants.APPLICATION_KEY,
configuratorUrl.getUsername());
String currentApplication = url.getParameter(Constants.APPLICATION_KEY, url.getUsername());
if (configApplication == null || Constants.ANY_VALUE.equals(configApplication)
|| configApplication.equals(currentApplication)) {
Set<String> conditionKeys = new HashSet<String>();
conditionKeys.add(Constants.CATEGORY_KEY);
conditionKeys.add(Constants.CHECK_KEY);
conditionKeys.add(Constants.DYNAMIC_KEY);
conditionKeys.add(Constants.ENABLED_KEY);
for (Map.Entry<String, String> entry : configuratorUrl.getParameters().entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith("~") || Constants.APPLICATION_KEY.equals(key) || Constants.SIDE_KEY.equals(key)) {
conditionKeys.add(key);
if (value != null && !Constants.ANY_VALUE.equals(value)
&& !value.equals(url.getParameter(key.startsWith("~") ? key.substring(1) : key))) {
return url;
}
}
}
return doConfigure(url, configuratorUrl.removeParameters(conditionKeys));
}
}
return url;
}
public class OverrideConfigurator extends AbstractConfigurator {
public OverrideConfigurator(URL url) {
super(url);
}
@Override
public URL doConfigure(URL currentUrl, URL configUrl) {
return currentUrl.addParameters(configUrl.getParameters());
}
}
public URL addParameters(Map<String, String> parameters) {
if (parameters == null || parameters.size() == 0) {
return this;
}
boolean hasAndEqual = true;
for (Map.Entry<String, String> entry : parameters.entrySet()) {
String value = getParameters().get(entry.getKey());
if (value == null) {
if (entry.getValue() != null) {
hasAndEqual = false;
break;
}
} else {
if (!value.equals(entry.getValue())) {
hasAndEqual = false;
break;
}
}
}
// return immediately if there's no change
if (hasAndEqual) return this;
Map<String, String> map = new HashMap<String, String>(getParameters());
map.putAll(parameters);
return new URL(protocol, username, password, host, port, path, map);
}
-
- OverrideConfigurator的整个configure流程,执行一些前置的配置检查,覆盖override的参数后返回新的URL对象。