前言
前面我们研究了RPC的原理,市面上有很多基于RPC思想实现的框架,比如有Dubbo。今天就从Dubbo的SPI机制、服务注册与发现源码及网络通信过程去深入剖析下Dubbo。
Dubbo架构
概述
Dubbo是阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的RPC 实现服务的输出和输入功能,可以和Spring框架无缝集成。
Dubbo是一款高性能、轻量级的开源Java RPC框架,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现。
调用流程:
- 服务容器负责启动,加载,运行服务提供者。
- 服务提供者在启动时,向注册中心注册自己提供的服务。
- 服务消费者在启动时,向注册中心订阅自己所需的服务。
- 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
- 服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
- 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。
架构体系
源码结构
- dubbo-common:公共逻辑模块: 包括Util类和通用模型
- dubbo-remoting 远程通信模块: 相当于dubbo协议的实现,如果RPC使用RMI协议则不需要使用此包
- dubbo-rpc 远程调用模块: 抽象各种协议,以及动态代理,包含一对一的调用,不关心集群的原理。
- dubbo-cluster 集群模块: 将多个服务提供方伪装成一个提供方,包括负载均衡,容错,路由等,集群的地址列表可以是静态配置的,也可以是注册中心下发的.
- dubbo-registry 注册中心模块: 基于注册中心下发的集群方式,以及对各种注册中心的抽象
- dubbo-monitor 监控模块: 统计服务调用次数,调用时间,调用链跟踪的服务.
- dubbo-config 配置模块: 是dubbo对外的api,用户通过config使用dubbo,隐藏dubbo所有细节
- dubbo-container 容器模块: 是一个standlone的容器,以简单的main加载spring启动,因为服务通常不需要Tomcat/Jboss等web容器的特性,没必要用web容器去加载服务.
整体设计
- 图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口。
- 图中从下至上分为十层,各层均为单向依赖,每一层都可以剥离上层被复用,其中,Service 和Config 层为API,其它各层均为SPI。
- 图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类。
- 图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调时链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法。
各层说明
- config 配置层:对外配置接口,以 ServiceConfig , ReferenceConfig 为中心,可以直接初始化配置类,也可以通过spring 解析配置生成配置类
- proxy 服务代理层:服务接口透明代理,生成服务的客户端Stub 和服务器端Skeleton, 以ServiceProxy 为中心,扩展接口为 ProxyFactory
- registry 注册中心层:封装服务地址的注册与发现,以服务URL 为中心,扩展接口为RegistryFactory , Registry , RegistryService
- cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster , Directory , Router , LoadBalance
- monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为MonitorFactory , Monitor , MonitorService
- protocol 远程调用层:封装RPC 调用,以 Invocation , Result 为中心,扩展接口为Protocol , Invoker , Exporter
- exchange 信息交换层:封装请求响应模式,同步转异步,以 Request , Response 为中心,扩展接口为 Exchanger , ExchangeChannel , ExchangeClient , ExchangeServer
- transport 网络传输层:抽象mina 和netty 为统一接口,以 Message 为中心,扩展接口为Channel , Transporter , Client , Server , Codec
- serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization , ObjectInput ,ObjectOutput , ThreadPool
调用流程
对照上面的整体架构图可以大致分为以下步骤:
1、服务提供者启动,开启Netty服务,创建Zookeeper客户端,向注册中心注册服务。
2、服务消费者启动,通过Zookeeper向注册中心获取服务提供者列表,与服务提供者通过Netty建立长连接。
3、服务消费者通过接口开始远程调用服务,ProxyFactory通过初始化Proxy对象,Proxy通过创建动态代理对象。
4、动态代理对象通过invoke方法,层层包装生成一个Invoker对象,该对象包含了代理对象。
5、Invoker通过路由,负载均衡选择了一个最合适的服务提供者,在通过加入各种过滤器,协议层包装生成一个新的DubboInvoker对象。
6、再通过交换成将DubboInvoker对象包装成一个Reuqest对象,该对象通过序列化通过NettyClient传输到服务提供者的NettyServer端。
7、到了服务提供者这边,再通过反序列化、协议解密等操作生成一个DubboExporter对象,再层层传递处理,会生成一个服务提供端的Invoker对象.
8、这个Invoker对象会调用本地服务,获得结果再通过层层回调返回到服务消费者,服务消费者拿到结果后,再解析获得最终结果。
Dubbo中的SPI机制
什么是SPI
概述
在Dubbo 中,SPI 是一个非常重要的模块。基于SPI,我们可以很容易的对Dubbo 进行拓展。如果大家想要学习Dubbo 的源码,SPI 机制务必弄懂。接下来,我们先来了解一下Java SPI 与Dubbo SPI 的用法,然后再来分析Dubbo SPI 的源码。
SPI是Service Provider Interface 服务提供接口缩写,是一种服务发现机制。SPI的本质是将接口的实现类的全限定名定义在配置文件中,并有服务器读取配置文件,并加载实现类。这样就可以在运行的时候,动态为接口替换实现类。
JDK中的SPI
Java SPI 实际上是“基于接口的编程+策略模式+配置文件”组合实现的动态加载机制。
通过一个案例我们来认识下SPI
定义一个接口:
package com.laowang;
/**
* @author 原
* @date 2021/3/27
* @since 1.0
**/
public interface User {
String showName();
}
定义两个实现类
package com.laowang.impl;
import com.laowang.User;
/**
* @author 原
* @date 2021/3/27
* @since 1.0
**/
public class Student implements User {
@Override
public String showName() {
System.out.println("my name is laowang");
return null;
}
}
package com.laowang.impl;
import com.laowang.User;
/**
* @author 原
* @date 2021/3/27
* @since 1.0
**/
public class Teacher implements User {
@Override
public String showName() {
System.out.println("my name is zhangsan");
return null;
}
}
在resources目录下创建文件夹META-INF.services,并在该文件夹下创建一个名称与User的全路径一致的文件com.laowang.User
在文件中写入,两个实现类的全路径名
编写测试类:
package com.laowang;
import java.util.ServiceLoader;
/**
* @author 原
* @date 2021/3/27
* @since 1.0
**/
public class SpiTest {
public static void main(String[] args) {
ServiceLoader<User> serviceLoader = ServiceLoader.load(User.class);
serviceLoader.forEach(User::showName);
}
}
运行结果:
我们发现通过SPI机制,帮我们自动运行了两个实现类。
通过查看ServiceLoader源码:
其实通过读取配置文件中实现类的全路径类名,通过反射创建对象,并放入providers容器中。
总结:
调用过程
应用程序调用ServiceLoader.load方法,创建一个新的ServiceLoader,并实例化该类中的成员变量
应用程序通过迭代器接口获取对象实例,ServiceLoader先判断成员变量providers对象中(LinkedHashMap<String,S>类型)是否有缓存实例对象,如果有缓存,直接返回。如果没有缓存,执行类的装载,
优点
使用Java SPI 机制的优势是实现解耦,使得接口的定义与具体业务实现分离,而不是耦合在一起。应用进程可以根据实际业务情况启用或替换具体组件。
缺点
不能按需加载。虽然ServiceLoader 做了延迟载入,但是基本只能通过遍历全部获取,也就是接口的实现类得全部载入并实例化一遍。如果你并不想用某些实现类,或者某些类实例化很耗时,它也被载入并实例化了,这就造成了浪费。
获取某个实现类的方式不够灵活,只能通过Iterator 形式获取,不能根据某个参数来获取对应的实现类。
多个并发多线程使用ServiceLoader 类的实例是不安全的。
加载不到实现类时抛出并不是真正原因的异常,错误很难定位。
Dubbo中的SPI
Dubbo 并未使用Java SPI,而是重新实现了一套功能更强的SPI 机制。Dubbo SPI 的相关逻辑被封装在了ExtensionLoader 类中,通过ExtensionLoader,我们可以加载指定的实现类。
栗子
与Java SPI 实现类配置不同,Dubbo SPI 是通过键值对的方式进行配置,这样我们可以按需加载指定的实现类。下面来演示Dubbo SPI 的用法:
Dubbo SPI 所需的配置文件需放置在META-INF/dubbo 路径下,与Java SPI 实现类配置不同,DubboSPI 是通过键值对的方式进行配置,配置内容如下。
optimusPrime = org.apache.spi.OptimusPrime
bumblebee = org.apache.spi.Bumblebee
在使用Dubbo SPI 时,需要在接口上标注@SPI 注解。
@SPI
public interface Robot {
void sayHello();
}
通过ExtensionLoader,我们可以加载指定的实现类,下面来演示Dubbo SPI :
public class DubboSPITest {
@Test
public void sayHello() throws Exception {
ExtensionLoader<Robot> extensionLoader =
ExtensionLoader.getExtensionLoader(Robot.class);
Robot optimusPrime = extensionLoader.getExtension("optimusPrime");
optimusPrime.sayHello();
Robot bumblebee = extensionLoader.getExtension("bumblebee");
bumblebee.sayHello();
}
}
Dubbo SPI 除了支持按需加载接口实现类,还增加了IOC 和AOP 等特性,这些特性将会在接下来的源码分析章节中一一进行介绍。
源码分析
ExtensionLoader 的getExtensionLoader 方法获取一个ExtensionLoader 实例,然后再通过ExtensionLoader 的getExtension 方法获取拓展类对象。下面我们从ExtensionLoader 的getExtension 方法作为入口,对拓展类对象的获取过程进行详细的分析。
public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
if ("true".equals(name)) {
// 获取默认的拓展实现类
return getDefaultExtension();
}
// Holder,顾名思义,用于持有目标对象 就是从容器中获取,如果没有直接new一个Holder
Holder<Object> holder = getOrCreateHolder(name);
//获取目标对象实例
Object instance = holder.get();
// 如果目标对象实例为null 就需要通过双重检查创建实例
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
// 创建拓展实例
instance = createExtension(name);
// 设置实例到 holder 中
holder.set(instance);
}
}
}
return (T) instance;
}
上面代码的逻辑比较简单,首先检查缓存,缓存未命中则创建拓展对象。下面我们来看一下创建拓展对象的过程是怎样的。
private T createExtension(String name) {
// 从配置文件中加载所有的拓展类,可得到“配置项名称”到“配置类”的映射关系表
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
//从容器中获取对应的实例对象 如果不存在就通过反射创建
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
// 通过反射创建实例
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 向实例中注入依赖 下面是IOC和AOP的实现
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
// 循环创建 Wrapper 实例
for (Class<?> wrapperClass : wrapperClasses) {
// 将当前 instance 作为参数传给 Wrapper 的构造方法,并通过反射创建Wrapper 实例。
// 然后向 Wrapper 实例中注入依赖,最后将 Wrapper 实例再次赋值给instance 变量
instance = injectExtension(
(T)
wrapperClass.getConstructor(type).newInstance(instance));
}
}
createExtension 方法的逻辑稍复杂一下,包含了如下的步骤:
- 通过getExtensionClasses 获取所有的拓展类
- 通过反射创建拓展对象
- 向拓展对象中注入依赖
将拓展对象包裹在相应的Wrapper 对象中
以上步骤中,第一个步骤是加载拓展类的关键,第三和第四个步骤是Dubbo IOC 与AOP 的具体实现。由于此类设计源码较多,这里简单的总结下ExtensionLoader整个执行逻辑:
getExtension(String name) #根据key获取拓展对象 -->createExtension(String name) #创建拓展实例 -->getExtensionClasses #根据路径获取所有的拓展类 -->loadExtensionClasses #加载拓展类 -->cacheDefaultExtensionName #解析@SPI注解 -->loadDirectory #方法加载指定文件夹配置文件 -->loadResource #加载资源 -->loadClass #加载类,并通过 loadClass 方法对类进行缓存
Dubbo的SPI如何实现IOC和AOP的
Dubbo IOC
Dubbo IOC 是通过setter 方法注入依赖。Dubbo 首先会通过反射获取到实例的所有方法,然后再遍历方法列表,检测方法名是否具有setter 方法特征。若有,则通过ObjectFactory 获取依赖对象,最后通过反射调用setter 方法将依赖设置到目标对象中。整个过程对应的代码如下:
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
//获取实例的所有方法
for (Method method : instance.getClass().getMethods()) {
//isSetter做的事:检测方法是否以 set 开头,且方法仅有一个参数,且方法访问级别为 public
if (isSetter(method)) {
/**
* Check {@link DisableInject} to see if we need auto injection for this property
*/
if (method.getAnnotation(DisableInject.class) != null) {
continue;
}
Class<?> pt = method.getParameterTypes()[0];
if (ReflectUtils.isPrimitives(pt)) {
continue;
}
try {
String property = getSetterProperty(method);
//获取依赖对象
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
//设置属性
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("Failed to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
Dubbo Aop
在说这个之前,我们得先知道装饰者模式
装饰者模式:在不改变原类文件以及不使用继承的情况下,动态地将责任附加到对象上,从而实现动态拓展一个对象的功能。它是通过创建一个包装对象,也就是装饰来包裹真实的对象。
在用Spring的时候,我们经常会用到AOP功能。在目标类的方法前后插入其他逻辑。比如通常使用Spring AOP来实现日志,监控和鉴权等功能。Dubbo的扩展机制,是否也支持类似的功能呢?答案是yes。在Dubbo中,有一种特殊的类,被称为Wrapper类。通过装饰者模式,使用包装类包装原始的扩展点实例。在原始扩展点实现前后插入其他逻辑,实现AOP功能。
一般来说装饰者模式有下面几个参与者:
Component:装饰者和被装饰者共同的父类,是一个接口或者抽象类,用来定义基本行为
ConcreteComponent:定义具体对象,即被装饰者
Decorator:抽象装饰者,继承自Component,从外类来扩展ConcreteComponent。对于ConcreteComponent来说,不需要知道Decorator的存在,Decorator是一个接口或抽象类
ConcreteDecorator:具体装饰者,用于扩展ConcreteComponent
//获取所有需要包装的类
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
我们再看看cachedWrapperClasses是什么?
private Set<Class<?>> cachedWrapperClasses;
是一个set集合,那么集合是什么时候添加元素的呢?
/**
* cache wrapper class
* <p>
* like: ProtocolFilterWrapper, ProtocolListenerWrapper
*/
private void cacheWrapperClass(Class<?> clazz) {
if (cachedWrapperClasses == null) {
cachedWrapperClasses = new ConcurrentHashSet<>();
}
cachedWrapperClasses.add(clazz);
}
通过这个方法添加的,再看看谁调用了这个私有方法:
/**
* test if clazz is a wrapper class
* <p>
* which has Constructor with given class type as its only argument
*/
private boolean isWrapperClass(Class<?> clazz) {
try {
clazz.getConstructor(type);
return true;
} catch (NoSuchMethodException e) {
return false;
}
}
原来是通过isWrapperClass这个方法,判断有没有其他对象中的构造方法中持有本对象,如果有,dubbo就认为这是个装饰类,调用装饰者类的构造方法,并返回实例对象
然后通过实例化这个包装类代替需要加载的这个类。这样执行的方法就是包装类的方法。
Dubbo中的动态编译
我们知道在Dubbo 中,很多拓展都是通过SPI 机制 进行加载的,比如Protocol、Cluster、LoadBalance、ProxyFactory 等。有时,有些拓展并不想在框架启动阶段被加载,而是希望在拓展方法被调用时,根据运行时参数进行加载,即根据参数动态加载实现类。
这种在运行时,根据方法参数才动态决定使用具体的拓展,在dubbo中就叫做扩展点自适应实例。其实是一个扩展点的代理,将扩展的选择从Dubbo启动时,延迟到RPC调用时。Dubbo中每一个扩展点都有一个自适应类,如果没有显式提供,Dubbo会自动为我们创建一个,默认使用Javaassist。
自适应拓展机制的实现逻辑是这样的
- 首先Dubbo 会为拓展接口生成具有代理功能的代码;
- 通过javassist 或jdk 编译这段代码,得到Class 类;
- 通过反射创建代理类;
- 在代理类中,通过URL对象的参数来确定到底调用哪个实现类;
javassist
Javassist是一个开源的分析、编辑和创建Java字节码的类库。是由东京工业大学的数学和计算机科学系的Shigeru Chiba (千叶滋)所创建的。它已加入了开放源代码JBoss 应用服务器项目,通过使用Javassist对字节码操作为JBoss实现动态AOP框架。javassist是jboss的一个子项目,其主要的优点,在于简单,而且快速。直接使用java编码的形式,而不需要了解虚拟机指令,就能动态改变类的结构,或者动态生成类。
/**
* Javassist是一个开源的分析、编辑和创建Java字节码的类库
* 能动态改变类的结构,或者动态生成类
*/
public class CompilerByJavassist {
public static void main(String[] args) throws Exception {
// ClassPool:class对象容器
ClassPool pool = ClassPool.getDefault();
// 通过ClassPool生成一个User类
CtClass ctClass = pool.makeClass("com.itheima.domain.User");
// 添加属性 -- private String username
CtField enameField = new CtField(pool.getCtClass("java.lang.String"),
"username", ctClass);
enameField.setModifiers(Modifier.PRIVATE);
ctClass.addField(enameField);
// 添加属性 -- private int age
CtField enoField = new CtField(pool.getCtClass("int"), "age", ctClass);
enoField.setModifiers(Modifier.PRIVATE);
ctClass.addField(enoField);
//添加方法
ctClass.addMethod(CtNewMethod.getter("getUsername", enameField));
ctClass.addMethod(CtNewMethod.setter("setUsername", enameField));
ctClass.addMethod(CtNewMethod.getter("getAge", enoField));
ctClass.addMethod(CtNewMethod.setter("setAge", enoField));
// 无参构造器
CtConstructor constructor = new CtConstructor(null, ctClass);
constructor.setBody("{}");
ctClass.addConstructor(constructor);
// 添加构造函数
//ctClass.addConstructor(new CtConstructor(new CtClass[] {}, ctClass));
CtConstructor ctConstructor = new CtConstructor(new CtClass[]
{pool.get(String.class.getName()),CtClass.intType}, ctClass);
ctConstructor.setBody("{\n this.username=$1; \n this.age=$2;\n}");
ctClass.addConstructor(ctConstructor);
// 添加自定义方法
CtMethod ctMethod = new CtMethod(CtClass.voidType, "printUser",new
CtClass[] {}, ctClass);
// 为自定义方法设置修饰符
ctMethod.setModifiers(Modifier.PUBLIC);
// 为自定义方法设置函数体
StringBuffer buffer2 = new StringBuffer();
buffer2.append("{\nSystem.out.println(\"用户信息如下\");\n")
.append("System.out.println(\"用户名=\"+username);\n")
.append("System.out.println(\"年龄=\"+age);\n").append("}");
ctMethod.setBody(buffer2.toString());
ctClass.addMethod(ctMethod);
//生成一个class
Class<?> clazz = ctClass.toClass();
Constructor cons2 =
clazz.getDeclaredConstructor(String.class,Integer.TYPE);
Object obj = cons2.newInstance("itheima",20);
//反射 执行方法
obj.getClass().getMethod("printUser", new Class[] {})
.invoke(obj, new Object[] {});
// 把生成的class文件写入文件
byte[] byteArr = ctClass.toBytecode();
FileOutputStream fos = new FileOutputStream(new File("D://User.class"));
fos.write(byteArr);
fos.close();
}
}
源码分析
Adaptive注解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
String[] value() default {};
}
Adaptive 可注解在类或方法上。
标注在类上:Dubbo 不会为该类生成代理类。
标注在方法上:Dubbo 则会为该方法生成代理逻辑,表示当前方法需要根据 参数URL 调用对应的扩展点实现。
dubbo中每一个扩展点都有一个自适应类,如果没有显式提供,Dubbo会自动为我们创建一个,默认使用Javaassist。 先来看下创建自适应扩展类的代码
//1、看下extensionLoader的获取方法
ExtensionLoader<Robot>extensionLoader=ExtensionLoader.getExtensionLoader(Robot.class);
//2、最终调用的是ExtensionLoader的构造方法
private ExtensionLoader(Class<?> type) {
this.type = type;
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
//3、getAdaptiveExtension()看看干了什么事
public T getAdaptiveExtension() {
//获取自适应扩展类,如果没有就开始初始化一个
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
//这里创建了一个自适应扩展类
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
} else {
throw new IllegalStateException("Failed to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}
return (T) instance;
}
//看看createAdaptiveExtension()
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
//再进到getAdaptiveExtensionClass()
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
//继续追进去createAdaptiveExtensionClass()
private Class<?> createAdaptiveExtensionClass() {
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
//看看compiler
@SPI("javassist")
public interface Compiler {
/**
* Compile java source code.
*
* @param code Java source code
* @param classLoader classloader
* @return Compiled class
*/
Class<?> compile(String code, ClassLoader classLoader);
}
//其实到这里就知道了,通过生成一个类的字符串,再通过javassist生成一个对象
createAdaptiveExtensionClassCode()方法中使用一个StringBuilder来构建自适应类的Java源码。方法实现比较长,这里就不贴代码了。这种生成字节码的方式也挺有意思的,先生成Java源代码,然后编译,加载到jvm中。通过这种方式,可以更好的控制生成的Java类。而且这样也不用care各个字节码生成框架的api等。因为xxx.java文件是Java通用的,也是我们最熟悉的。只是代码的可读性不强,需要一点一点构建xx.java的内容。
服务暴露与发现
服务暴露
名词解释
在Dubbo 的核心领域模型中:
- Invoker 是实体域,它是Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。在服务提供方,Invoker用于调用服务提供类。在服务消费方,Invoker用于执行远程调用。
- Protocol 是服务域,它是Invoker 暴露和引用的主功能入口,它负责Invoker 的生命周期管理。
export:暴露远程服务
refer:引用远程服务 - proxyFactory:获取一个接口的代理类
getInvoker:针对server端,将服务对象,如DemoServiceImpl包装成一个Invoker对象
getProxy:针对client端,创建接口的代理对象,例如DemoService的接口。 - Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等
整体流程
在详细探讨服务暴露细节之前 , 我们先看一下整体duubo的服务暴露原理
在整体上看,Dubbo 框架做服务暴露分为两大部分 , 第一步将持有的服务实例通过代理转换成Invoker, 第二步会把Invoker 通过具体的协议 ( 比如Dubbo ) 转换成Exporter, 框架做了这层抽象也大大方便了功能扩展 。
服务提供方暴露服务的蓝色初始化链,时序图如下:
源码分析
服务导出的入口方法是ServiceBean 的onApplicationEvent。onApplicationEvent 是一个事件响应方法,该方法会在收到Spring 上下文刷新事件后执行服务导出操作。方法代码如下:
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
通过export最终找到doExportUrls()方法
private void doExportUrls() {
//加载配置文件中的所有注册中心,并且封装为dubbo内部的URL对象列表
List<URL> registryURLs = loadRegistries(true);
//循环所有协议配置,根据不同的协议,向注册中心中发起注册
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
//服务暴露方法
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
doExportUrlsFor1Protocol()方法代码老多了,我们只关系核心的地方
...
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
//本地暴露,将服务数据记录到本地JVM中
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
//远程暴露,向注册中心发送数据
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (!isOnlyInJvm() && logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 为服务提供类(ref)生成 Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// DelegateProviderMetaDataInvoker 用于持有 Invoker 和ServiceConfig
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 导出服务,并生成 Exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
//不存在注册中心,仅导出服务
....
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
上面代码根据url 中的scope 参数决定服务导出方式,分别如下:
scope = none,不导出服务
scope != remote,导出到本地
scope != local,导出到远程
不管是导出到本地,还是远程。进行服务导出之前,均需要先创建Invoker,这是一个很重要的步骤。因此下面先来分析Invoker 的创建过程。Invoker 是由ProxyFactory 创建而来,Dubbo 默认的ProxyFactory 实现类是JavassistProxyFactory。下面我们到JavassistProxyFactory 代码中,探索Invoker 的创建过程。如下:
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 为目标类创建warpper
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
//创建匿名才invoker对象,并实现doinvoke方法
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
Invoke创建成功之后,接下来我们来看本地导出
/**
* always export injvm
*/
private void exportLocal(URL url) {
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL) // 设置协议头为 injvm
.setHost(LOCALHOST_VALUE)//本地ip:127.0.0.1
.setPort(0)
.build();
// 创建 Invoker,并导出服务,这里的 protocol 会在运行时调用 InjvmProtocol 的export 方法
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}
exportLocal 方法比较简单,首先根据URL 协议头决定是否导出服务。若需导出,则创建一个新的URL并将协议头、主机名以及端口设置成新的值。然后创建Invoker,并调用InjvmProtocol 的export 方法导出服务。下面我们来看一下InjvmProtocol 的export 方法都做了哪些事情。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
如上,InjvmProtocol 的export 方法仅创建了一个InjvmExporter,无其他逻辑。到此导出服务到本地就分析完了。
再看看导出服务到远程
接下来,我们继续分析导出服务到远程的过程。导出服务到远程包含了服务导出与服务注册两个过程。先来分析服务导出逻辑。我们把目光移动到RegistryProtocol 的export 方法上。
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 获取注册中心 URL
URL registryUrl = getRegistryUrl(originInvoker);
URL providerUrl = getProviderUrl(originInvoker);
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//导出服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
//获取已注册的服务提供者 URL,
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
// 向注册中心注册服务
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
// 向注册中心进行订阅 override 数据
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
// 创建并返回 DestroyableExporter
return new DestroyableExporter<>(exporter);
}
上面代码看起来比较复杂,主要做如下一些操作:
- 调用doLocalExport 导出服务
- 向注册中心注册服务
- 向注册中心进行订阅override 数据
- 创建并返回DestroyableExporter
看看doLocalExport 做了什么
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
//protocol和配置的协议相关(dubbo:DubboProtocol)
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
接下来,我们把重点放在Protocol 的export 方法上。假设运行时协议为dubbo,此处的protocol 变量会在运行时加载DubboProtocol,并调用DubboProtocol 的export 方法。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
String key = serviceKey(url);
//创建DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter); //key:接口 (DemoService)
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//启动服务
openServer(url);
//优化序列器
optimizeSerialization(url);
return exporter;
}
如上,我们重点关注DubboExporter 的创建以及openServer 方法,其他逻辑看不懂也没关系,不影响理解服务导出过程。下面分析openServer 方法。
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
//访问缓存
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
//创建服务器实例
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
接下来分析服务器实例的创建过程。如下
private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
// 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
// 创建 ExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 获取 client 参数,可指定 netty,mina
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
// 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
// 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中,
// 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
如上,createServer 包含三个核心的逻辑。
第一是检测是否存在server 参数所代表的Transporter 拓展,不存在则抛出异常。
第二是创建服务器实例。
第三是检测是否支持client 参数所表示的Transporter 拓展,不存在也是抛出异常。两次检测操作所对应的代码较直白了,无需多说。但创建服务器的操作目前还不是很清晰,我们继续往下看。
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 获取 Exchanger,默认为 HeaderExchanger。
// 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
return getExchanger(url).bind(url, handler);
}
上面代码比较简单,就不多说了。下面看一下HeaderExchanger 的bind 方法。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 创建 HeaderExchangeServer 实例,该方法包含了多个逻辑,分别如下:
// 1. new HeaderExchangeHandler(handler)
// 2. new DecodeHandler(new HeaderExchangeHandler(handler))
// 3. Transporters.bind(url, new DecodeHandler(new
HeaderExchangeHandler(handler)))
return new HeaderExchangeServer(Transporters.bind(url, new ChannelHandler[]{new DecodeHandler(new HeaderExchangeHandler(handler))}));
}
HeaderExchanger 的bind 方法包含的逻辑比较多,但目前我们仅需关心Transporters 的bind 方法逻
辑即可。该方法的代码如下:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
} else if (handlers != null && handlers.length != 0) {
Object handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器
handler = new ChannelHandlerDispatcher(handlers);
}
// 获取自适应 Transporter 实例,并调用实例方法
return getTransporter().bind(url, (ChannelHandler)handler);
} else {
throw new IllegalArgumentException("handlers == null");
}
}
如上,getTransporter() 方法获取的Transporter 是在运行时动态创建的,类名为TransporterAdaptive,也就是自适应拓展类。TransporterAdaptive 会在运行时根据传入的URL 参数决定加载什么类型的Transporter,默认为NettyTransporter。调用 NettyTransporter.bind(URL,ChannelHandler) 方法。创建一个 NettyServer 实例。调用 NettyServer.doOPen() 方法,服务器被开启,服务也被暴露出来了。
服务注册
本节内容以Zookeeper 注册中心作为分析目标,其他类型注册中心大家可自行分析。下面从服务注册
的入口方法开始分析,我们把目光再次移到RegistryProtocol 的export 方法上。如下:
进入到register()方法
public void register(URL registryUrl, URL registeredProviderUrl) {
//获得注册中心实例
Registry registry = registryFactory.getRegistry(registryUrl);
//进行注册
registry.register(registeredProviderUrl);
}
看看getRegistry()方法
@Override
public Registry getRegistry(URL url) {
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = url.toServiceStringWithoutResolving();
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
进入createRegistry()方法
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
//// 获取组名,默认为 dubbo
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
// 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
zkClient = zookeeperTransporter.connect(url);
// 添加状态监听器
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
在上面的代码代码中,我们重点关注ZookeeperTransporter 的connect 方法调用,这个方法用于创建
Zookeeper 客户端。创建好Zookeeper 客户端,意味着注册中心的创建过程就结束了。
搞懂了服务注册的本质,那么接下来我们就可以去阅读服务注册的代码了。
public void doRegister(URL url) {
try {
// 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
// /${group}/${serviceInterface}/providers/${url}
// 比如 /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
@Override
public void create(String path, boolean ephemeral) {
if (!ephemeral) {
// 如果要创建的节点类型非临时节点,那么这里要检测节点是否存在
if (checkExists(path)) {
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
// 递归创建上一级路径
create(path.substring(0, i), false);
}
// 根据 ephemeral 的值创建临时或持久节点
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
}
}
好了,到此关于服务注册的过程就分析完了。整个过程可简单总结为:先创建注册中心实例,之后再通过注册中心实例注册服务。
总结
- 在有注册中心,需要注册提供者地址的情况下,ServiceConfig 解析出的URL 格式为:registry:// registry-host/org.apache.dubbo.registry.RegistryService?export=URL.encode("dubbo://service-host/{服务名}/{版本号}")
- 基于Dubbo SPI 的自适应机制,通过URL registry:// 协议头识别,就调用RegistryProtocol#export() 方法
- 将具体的服务类名,比如 DubboServiceRegistryImpl ,通过ProxyFactory 包装成Invoker 实例
- 调用doLocalExport 方法,使用DubboProtocol 将Invoker 转化为Exporter 实例,并打开Netty 服务端监听客户请求
- 创建Registry 实例,连接Zookeeper,并在服务节点下写入提供者的URL 地址,注册服务
- 向注册中心订阅override 数据,并返回一个Exporter 实例
- 根据URL 格式中的 "dubbo://service-host/{服务名}/{版本号}" 中协议头 dubbo:// 识别,调用
DubboProtocol#export()
方法,开发服务端口 - RegistryProtocol#export() 返回的Exporter 实例存放到ServiceConfig 的
List<Exporter>exporters
中