Dubbo 的扩展点
在Dubbo框架中,SPI(Service Provider Interface)是一种扩展机制,使得框架的某些模块可以通过配置的方式加载实现类,而不需要在代码中显式地进行实例化。
Dubbo的SPI扩展点是指一组接口和实现类,这些接口和实现类可以被动态地扩展和替换。Dubbo框架内置了很多SPI扩展点,例如负载均衡算法、集群容错策略、协议实现等。
在使用Dubbo时,开发者可以根据自己的需求通过配置文件来指定相应的SPI实现类。Dubbo框架会在运行时根据配置信息动态加载对应的实现类,并将它们注入到应用程序中。这样,就可以实现对框架功能的扩展和定制。
指定名称的扩展点
ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("name");
- 找到Protocol的全路径名称, 在/META-INF/dubbo/intenal
- 在指定文件中找到“name”对应的实现类.
自定义扩展点
- 在resource/META-INF/dubbo/ org.apache.dubbo.rpc.cluster.LoadBalance(文件名为 包的全路径 + 接口名)
- 里面的内容填写 gploadbalance = com.gupaodu.springboot.dubbo.springbootdubbosampleprovider.GpDefineLoadBalance (是扩展的全路径)
// 扩展了dubbo中的负载均衡 public class GpDefineLoadBalance extends AbstractLoadBalance{ @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { return null; } }
@DubboReference(loadbalance = "gploadbalance",...)
测试
@Test public void testSPI(){ LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("gploadbalance"); System.out.println(loadBalance); }
运行测试发现可以拿到扩展点。
结合前面的知识,其实consumer也会有一个url,而loadBalance 最终也会体现在 url上。
本质上其实逻辑挺简单的,解析这个loadBalance ,如果没有就采用默认的就好了。
//解析URL String loadbalance="random"; //URL loadbalance="gploadbalance" //loadlance=gploadbalance LoadBalance loadBalance=ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(loadbalance); System.out.println(loadBalance);
这样做就相当于请求的过程中,可以根据客户端的配置或者服务端的配置所配置的这个负载均衡的信息来决定当前采用什么样的负载策略。
扩展点的特征
在类级别标准@SPI(RandomLoadBalance.NAME)
.
其中,括号内的数据,表示当前扩展点的默认扩展点。
当然扩展点远不止如此。
比如在公司内部,可以针对Filter进行扩展,加入验证等逻辑。
像容错、负载均衡等等都可以扩展。
猜想
- 查找路径: /META-INF/dubbo ; /META-INF/dubbo/internal
random=org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance roundrobin=org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance leastactive=org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance consistenthash=org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBa lance shortestresponse=org.apache.dubbo.rpc.cluster.loadbalance.ShortestResponseLo adBalance
- 解析文件, Properties文件的解析
- 把解析出来的内容存储到内存中 key(name), value=Class ; 然后再通过反射newInstance();
源码分析
RandomLoadBalance rd= (RandomLoadBalance)ExtensionLoader.getExtensionLoader(Loadbalance.class).getExtensio n("random"); // 根据一个type去得到一个扩展点的实例 public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) { if (type == null) { throw new IllegalArgumentException("Extension type == null"); } if (!type.isInterface()) { throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!"); } if (!withExtensionAnnotation(type)) { throw new IllegalArgumentException("Extension type (" + type + ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!"); } // 这里面算是一个优化,如果 hashmap结构 EXTENSION_LOADERS 有,则直接用,没有则创建,然后再用 // 相当于缓存优化 ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); if (loader == null) { EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type)); loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); } // 先看最终返回的loader是谁来赋值的 return loader; } public T getExtension(String name) { return getExtension(name, true); } public T getExtension(String name, boolean wrap) { if (StringUtils.isEmpty(name)) { throw new IllegalArgumentException("Extension name == null"); } if ("true".equals(name)) { return getDefaultExtension(); } final Holder<Object> holder = getOrCreateHolder(name); ------------------------------------------------------------------------- // 还是使用了缓存的思想 private Holder<Object> getOrCreateHolder(String name) { Holder<Object> holder = cachedInstances.get(name); if (holder == null) { cachedInstances.putIfAbsent(name, new Holder<>()); holder = cachedInstances.get(name); } return holder; } ------------------------------------------------------------------------- Object instance = holder.get(); // 如果没有的话,使用双重检查锁的机制去 创建一个单例出来 if (instance == null) { synchronized (holder) { instance = holder.get(); if (instance == null) { instance = createExtension(name, wrap); holder.set(instance); } } } // 返回了一个实例,说明最终已经实例化了 return (T) instance; } // 创建一个扩展,根据名字来创建 private T createExtension(String name, boolean wrap) { // getExtensionClasses 返回的是集合 key = name(扩展点的名字), clazz=name对应的扩展点类 // 假设当前加载的扩展名字是:random, 那么此时clazz=包路径.RandomLoadBalance Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { // 获取实例 和前面介绍的优化思路一样 T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } injectExtension(instance); if (wrap) { List<Class<?>> wrapperClassesList = new ArrayList<>(); if (cachedWrapperClasses != null) { wrapperClassesList.addAll(cachedWrapperClasses); wrapperClassesList.sort(WrapperComparator.COMPARATOR); Collections.reverse(wrapperClassesList); } if (CollectionUtils.isNotEmpty(wrapperClassesList)) { for (Class<?> wrapperClass : wrapperClassesList) { Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class); if (wrapper == null || (ArrayUtils.contains(wrapper.matches(), name) && !ArrayUtils.contains(wrapper.mismatches(), name))) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } } } initExtension(instance); return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance (name: " + name + ", class: " + type + ") couldn't be instantiated: " + t.getMessage(), t); } }
META-INF/dubbo/internal
random=org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance roundrobin=org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance leastactive=org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance consistenthash=org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalanc e shortestresponse=org.apache.dubbo.rpc.cluster.loadbalance.ShortestResponseLoadBa lance
把上面这个文件中的内容,解析出来以Map的形式存储。
private Map<String, Class<?>> getExtensionClasses() { Map<String, Class<?>> classes = cachedClasses.get(); if (classes == null) { synchronized (cachedClasses) { classes = cachedClasses.get(); if (classes == null) { classes = loadExtensionClasses(); cachedClasses.set(classes); } } } return classes; }
- 根据默认配置的查找路径进行查找并解析
- strategies 对应的是不同扫描路径下的策略
private Map<String, Class<?>> loadExtensionClasses() { cacheDefaultExtensionName(); Map<String, Class<?>> extensionClasses = new HashMap<>(); for (LoadingStrategy strategy : strategies) { loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages()); loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages()); } return extensionClasses; }
返回的结果如下
// 整段代码本质上就是将配置文件解析成 map private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type, boolean extensionLoaderClassLoaderFirst, boolean overridden, String... excludedPackages) { String fileName = dir + type; try { Enumeration<java.net.URL> urls = null; ClassLoader classLoader = findClassLoader(); // try to load from ExtensionLoader's ClassLoader first if (extensionLoaderClassLoaderFirst) { ClassLoader extensionLoaderClassLoader = ExtensionLoader.class.getClassLoader(); if (ClassLoader.getSystemClassLoader() != extensionLoaderClassLoader) { urls = extensionLoaderClassLoader.getResources(fileName); } } if (urls == null || !urls.hasMoreElements()) { if (classLoader != null) { urls = classLoader.getResources(fileName); } else { urls = ClassLoader.getSystemResources(fileName); } } if (urls != null) { while (urls.hasMoreElements()) { java.net.URL resourceURL = urls.nextElement(); loadResource(extensionClasses, classLoader, resourceURL, overridden, excludedPackages); } } } catch (Throwable t) { logger.error("Exception occurred when loading extension class (interface: " + type + ", description file: " + fileName + ").", t); } }
再次回到 createExtension
// 创建一个扩展,根据名字来创建 private T createExtension(String name, boolean wrap) { // getExtensionClasses 返回的是集合 key = name(扩展点的名字), clazz=name对应的扩展点类 // 假设当前加载的扩展名字是:random, 那么此时clazz=包路径.RandomLoadBalance Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { // 获取实例 和前面介绍的优化思路一样 T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } // 如果装载的这个类依赖了另外的扩展点的时候 // 需要对其进行依赖注入,其会帮我自己去完成,不需要我们自己注入 injectExtension(instance); --------------------------------------------------------------------------- private T injectExtension(T instance) { if (objectFactory == null) { return instance; } try { for (Method method : instance.getClass().getMethods()) { if (!isSetter(method)) { continue; } /** * Check {@link DisableInject} to see if we need auto injection for this property */ if (method.getAnnotation(DisableInject.class) != null) { continue; } Class<?> pt = method.getParameterTypes()[0]; if (ReflectUtils.isPrimitives(pt)) { continue; } try { String property = getSetterProperty(method); // 通过这个方式去得到一个扩展点,根据这个属性的名字 和 类 Object object = objectFactory.getExtension(pt, property); if (object != null) { method.invoke(instance, object); } } catch (Exception e) { logger.error("Failed to inject via method " + method.getName() + " of interface " + type.getName() + ": " + e.getMessage(), e); } } } catch (Exception e) { logger.error(e.getMessage(), e); } return instance; } --------------------------------------------------------------------------- // 接下来,根据是否需要进行包装操作,进行不同的处理。如果需要包装,则按照事先定义的包装类的顺序,对实例进行包装操作。具体的包装处理是通过使用定义的 Wrapper 来完成的。 if (wrap) { List<Class<?>> wrapperClassesList = new ArrayList<>(); if (cachedWrapperClasses != null) { wrapperClassesList.addAll(cachedWrapperClasses); wrapperClassesList.sort(WrapperComparator.COMPARATOR); Collections.reverse(wrapperClassesList); } if (CollectionUtils.isNotEmpty(wrapperClassesList)) { for (Class<?> wrapperClass : wrapperClassesList) { Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class); if (wrapper == null || (ArrayUtils.contains(wrapper.matches(), name) && !ArrayUtils.contains(wrapper.mismatches(), name))) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } } } // 最终类似于这样的效果 ProtocolFilterWrapper(ProtocolListenerWrapper(QosProtocolWrapper(DubboProtocol))) initExtension(instance); return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance (name: " + name + ", class: " + type + ") couldn't be instantiated: " + t.getMessage(), t); } }
总结
- 加载指定路径下的文件内容,保存到集合中
- 会对存在依赖注入的扩展点进行依赖注入
- 会对存在Wrapper类的扩展点,实现扩展点的包装
自适应扩展点
在运行期间,根据上下文来决定当前返回哪个扩展点。
相当于在扩展点的前置加了一个动态扩展的功能。
ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
自适应扩展点的标识
@SPI("dubbo") public interface Protocol { int getDefaultPort(); @Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException; @Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException; void destroy(); default List<ProtocolServer> getServers() { return Collections.emptyList(); } } @Adaptive public class AdaptiveCompiler implements Compiler { ...... }
@Adaptive
- 该注解可以声明在类级别上
- 也可以声明在方法级别
在 Dubbo 中,自适应扩展点的实现主要是通过 Adaptive 注解和自动生成的代理类来实现的。当接口方法被标记为 @Adaptive 时,Dubbo 在运行时会根据不同的条件动态地生成该接口的适配实现,并在调用时使用这个适配实现。
具体实现步骤如下:
- 根据接口类型获取对应的 ExtensionLoader。
- 通过 ExtensionLoader 获取指定名称的扩展实现类。
- 如果获取的扩展实现类为空,Dubbo 会根据 @Adaptive 注解中的 value 属性值(通常是 URL 中的某个参数值)来选择默认的扩展实现。
- 如果需要自适应的方法有参数,Dubbo 会根据参数的不同值动态生成一个代理类,并在代理类中根据运行时条件选择并调用对应的扩展实现。
例如,假设有一个接口被标记为 @Adaptive,而接口方法有一个字符串类型的参数。在运行时,Dubbo 会动态生成一个代理类,这个代理类会根据传入的参数值选择对应的扩展实现。如果传入不同的参数值,Dubbo 会生成不同的代理类,从而实现了自适应扩展的功能。
总的来说,Dubbo 中的自适应扩展点实现是通过动态生成代理类,在运行时根据条件选择不同的扩展实现,从而实现了灵活的自适应扩展功能。
- 如果修饰在类级别,那么直接返回修饰的类
- 如果修饰在方法界别,动态创建一个代理类(javassist)Java AOP 中 CGLIB库的动态代理基于 javassist
源码
ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
首先其修饰是在方法层面,为了做适配,会动态的生成一个类,具体的export要选择那个子类去发布,具体要看url中配置的是什么?
我们将url传递到这个export中,就会根据我们当前的选择去做动态的适配
@Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
其会动态生成一个 Protocol$Adaptive 类,其会实现 Protocol接口,并且会重写 Protocol 中对应的抽象方法
在代码中可以看到 String extName = (url.getProtocol() == null ? “dubbo” : url.getProtocol());
当得到名字以后,其实就是
ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(“extName”); dubbo:// -> ProtocolA d a p t i v e − > D u b b o P r o t o c o l r e g i s t r y : / / − > P r o t o c o l Adaptive -> DubboProtocol registry:// -> ProtocolAdaptive−>DubboProtocolregistry://−>ProtocolAdaptive -> RegistryProtocol
返回一个什么对象?
public T getAdaptiveExtension() { Object instance = cachedAdaptiveInstance.get(); if (instance == null) { if (createAdaptiveInstanceError != null) { throw new IllegalStateException("Failed to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError); } synchronized (cachedAdaptiveInstance) { instance = cachedAdaptiveInstance.get(); if (instance == null) { try { instance = createAdaptiveExtension(); cachedAdaptiveInstance.set(instance); } catch (Throwable t) { createAdaptiveInstanceError = t; throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t); } } } } return (T) instance; } private T createAdaptiveExtension() { try { return injectExtension((T) getAdaptiveExtensionClass().newInstance()); } catch (Exception e) { throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e); } } private Class<?> getAdaptiveExtensionClass() { // 这个方法调用之后 会把指定目录下的配置信息装载到 Map 中 getExtensionClasses(); // 如果是类级别的,直接返回 if (cachedAdaptiveClass != null) { return cachedAdaptiveClass; } // 如果是方法级别 return cachedAdaptiveClass = createAdaptiveExtensionClass(); } // 组装了一段代码,通过compiler进行编译 得到一个自适应的扩展点 private Class<?> createAdaptiveExtensionClass() { String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate(); ClassLoader classLoader = findClassLoader(); org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); return compiler.compile(code, classLoader); } public class AdaptiveCompiler implements Compiler { private static volatile String DEFAULT_COMPILER; public static void setDefaultCompiler(String compiler) { DEFAULT_COMPILER = compiler; } /* 这段代码是 Dubbo 框架中用于编译代码的一个方法。下面是对其功能的解释: 首先,通过 ExtensionLoader 类获取 Compiler 接口的实现类。ExtensionLoader 是 Dubbo 框架中用于加载扩展类的工具类。 获取默认的编译器名称 DEFAULT_COMPILER,如果该名称不为空,则使用该名称获取对应的编译器实现类。否则,使用 ExtensionLoader 的默认实现类作为编译器。 最后,调用编译器的 compile 方法,将要编译的代码和类加载器作为参数传入。该方法会将代码编译为 Java 字节码,并返回编译后的 Class 对象。 总体来说,这段代码的作用是根据配置选择合适的编译器实现类,并使用该编译器将给定的代码编译为可执行的 Class 对象。这一过程中,Dubbo 使用了 SPI 机制和动态加载的方式,以便支持不同的编译器实现。 */ @Override public Class<?> compile(String code, ClassLoader classLoader) { Compiler compiler; ExtensionLoader<Compiler> loader = ExtensionLoader.getExtensionLoader(Compiler.class); String name = DEFAULT_COMPILER; // copy reference if (name != null && name.length() > 0) { compiler = loader.getExtension(name); } else { compiler = loader.getDefaultExtension(); } return compiler.compile(code, classLoader); } }
激活扩展点(条件扩展)
ExtensionLoader.getExtensionLoader(Protocol.class).getActiveExtension();
相当于Spring中的conditional。
其本质在于通过了某种条件激活了某个扩展
使用
ExtensionLoader extensionLoader=ExtensionLoader.getExtensionLoader(Filter.class); URL url=new URL("","",0); url=url.addParameter("cache","cache"); List<Filter> filters=extensionLoader.getActivateExtension(url,"cache"); // 如果url中存在这个key,就会去激活 CacheFilter System.out.println(filters.size()); // 此时filters的个数就会加1,就会额外的激活一个扩展点 // 激活扩展点也主要用在Filter里面
具体使用在
实现
只要url参数中包含CACHE_KEY,那么 CacheFilter就会被激活
@Activate(group = {CONSUMER, PROVIDER}, value = CACHE_KEY) public class CacheFilter implements Filter {
类似于 @ConditionalOnBean(TTT.class)
源码
public List<T> getActivateExtension(URL url, String key) { return getActivateExtension(url, key, null); } public List<T> getActivateExtension(URL url, String key, String group) { String value = url.getParameter(key); return getActivateExtension(url, StringUtils.isEmpty(value) ? null : COMMA_SPLIT_PATTERN.split(value), group); } /* 这段代码是 Dubbo 框架中用于获取扩展的激活实现的方法。下面是对其功能的解释: 首先,该方法会根据给定的 URL、值数组和分组信息,获取符合条件的扩展类。 如果参数 values 中不包含“-default”关键字,则从缓存中获取激活类集合。 对于每个激活类,获取其对应的分组和值信息,并判断其是否满足当前分组和值条件。如果满足条件,则将其加入到激活扩展类集合中。 对激活扩展类集合进行排序。 接下来,遍历值数组中的每个元素,并根据其是否以“-”开头,来决定是否将其加入到已加载扩展类集合或待激活扩展类集合中。 最后,将已加载扩展类集合添加到待激活扩展类集合中,并返回待激活扩展类集合。 总体来说,这段代码的作用是根据给定的条件获取符合条件的扩展类,并将其按照一定的规则排序和分类。 */ public List<T> getActivateExtension(URL url, String[] values, String group) { List<T> activateExtensions = new ArrayList<>(); List<String> names = values == null ? new ArrayList<>(0) : asList(values); if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) { getExtensionClasses(); for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) { String name = entry.getKey(); Object activate = entry.getValue(); String[] activateGroup, activateValue; if (activate instanceof Activate) { activateGroup = ((Activate) activate).group(); activateValue = ((Activate) activate).value(); } else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) { activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group(); activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value(); } else { continue; } if (isMatchGroup(group, activateGroup) && !names.contains(name) && !names.contains(REMOVE_VALUE_PREFIX + name) && isActive(activateValue, url)) { activateExtensions.add(getExtension(name)); } } activateExtensions.sort(ActivateComparator.COMPARATOR); } List<T> loadedExtensions = new ArrayList<>(); for (int i = 0; i < names.size(); i++) { String name = names.get(i); if (!name.startsWith(REMOVE_VALUE_PREFIX) && !names.contains(REMOVE_VALUE_PREFIX + name)) { if (DEFAULT_KEY.equals(name)) { if (!loadedExtensions.isEmpty()) { activateExtensions.addAll(0, loadedExtensions); loadedExtensions.clear(); } } else { loadedExtensions.add(getExtension(name)); } } } if (!loadedExtensions.isEmpty()) { activateExtensions.addAll(loadedExtensions); } return activateExtensions; }
扩展
按照其原有格式进行添加扩展,然后配置即可。