全国首个政企采购云平台:政采云基于 Dubbo 的混合云跨网方案实践
作者:王晓彬:政采云资深开发工程师,负责基础服务相关工作 徐锡平:政采云资深开发工程师,负责基础服务相关工作对云岛业务结构的公司来说,云平台属于公司内部、完全可控的局域网,而岛端则是有自己安全网络策略的独立内部网络。需要云岛通信时,会基于需求,按客户要求走流程开通一些端口,这个过程需要一定的成本且不完全可控。业务上,如果这种跨网需求增多,则会逐渐变成痛点。如果可以搭建一个透明的跨网传输网络,配合良好的顶层设计,就可以在业务支撑、安全管控和运维成本中寻求较好的平衡。本文将介绍政采云基于 Dubbo 的跨网方案落地过程中面临的技术挑战、社区合作以及更深层次抽象的一些思考。在政采云这种政企业务场景中的数据跨网,与业界公有云、自建私有云的公司相比,既有共性又有自己的特点,希望能为大家提供新的思路或者启发。前言稳定、高效、可靠的基础设施是互联网企业应对业务高峰流量的底层基石。作为政采云的基础技术平台,基础平台部一直致力于通过业内前沿技术的落地,保障公司内部所有业务在线生产系统所依赖的基础技术平台能稳定、安全、低成本、可持续地运行与发展。由于公司对 Dubbo 框架的重度使用,跨网数据传输系统一般基于 Dubbo 特性开发,在政采云内部就有多个版本的实现。早在几年前,政采云就上线了基于 Dubbo Filter 转发的方案,它解决了岛到云的单向数据传输,安全认证等问题。另外,业务部门也有按照自己的需求,推出网状点对点的方案,实现了一定程度的透明传输。结合前两年的探索实践以及业界相关领域技术的成熟度,2022 年下半年,我们对各跨岛方案,进行了整合升级,也就是现在的高速公路方案,保障跨岛标准化同时,解决了之前方案实践过程中面临的很多业务痛点,包括:单向传输:因为架构原因,如需双向需要对等重新部署一套,成本较大。白名单开通成本高:点对点的网状架构,需要两两开通白名单,因为政企网络特殊性,开通流程复杂且慢。平台维护成本高:业务各自一套数据传输平台,重复建设且运维成本高。公共功能的缺失:核心功能,业务可以按需开发,但是数据审计、链路追踪、可观测性等公共特性,往往没有足够投入。跨网数据传输系统演进1.1 历史架构图一自左向右、自下而上进行模块介绍:业务 Web:业务 Web 作为数据发送方,调本地集群 Provider 时,携带跨岛信息过去(Dubbo 上下文)。岛业务 Center:本地虚拟 Provider,通过 Filter 拦截跨岛请求,通过 http 传送到云平台 Dubbo 网关,返回数据后反序列化返回岛业务 web。Dubbo 网关:接收 Http 请求,通过泛化调用云端 Provider,处理数据后返回业务 Center。云业务 Center:普通 Dubbo Provider。1.2 高速公路架构图二1.2.1 隧道机制隧道技术是一种通过使用互联网络的基础设施在网络之间传递数据的方式。使用隧道传递的数据 (或负载) 可以是不同协议的数据帧或包。高速公路架构中,使用了隧道这个概念。两端(业务层)是 Dubbo 私有协议,跨网传输过程中,则使用了 http 协议,http 协议可以更好的被中间设备、网关识别转发。这个机制的最大便利在于对业务的低侵入性。对于业务集群的应用完全不需要修改。图三除了路由标记,出口 / 入口 Dubbo 协议字节流没有任何业务外信息,所以可以路由任何 Dubbo 请求。图四1.2.2 主要节点客户端 Sdk:不改变用户使用 Dubbo 的方式,多种形式提供 Dubbo 的路由。Dubbo 出口网关 :代理 Dubbo 流量出口。Dubbo 入口网关 :代理 Dubbo 流量入口。统一网关 :基于 Apisix,代理跨网间所有流量,可以扩展鉴权、审计、限流等特性。挑战与应对之策如前言中所述,已有的几个方案设计上存在了一些问题,落地后也限制了使用了场景。在架构上,我们提出了高速公路方案,选择了全双工的对等网络传输框架。角色上,云平台定位一个特殊的岛端应用,遵循 P2P 实施原则。而对用户而言,高速公路是一个通往岛端的隧道,遵循对用户透明原则。我们可以先来看下在搭建平台的过程中面临的一些挑战以及解法。2.1 技术挑战结合当下跨网数据传输系统面临的处境,并对业界 Dubbo 跨网方案做过一番调研后,在平台搭建上确定了如下三期目标:一期目标:网络能力建设,简单来说是搭建基于 Dubbo 的传输通道,上层功能先维持不变。二期目标:业务上,找业务先行试点,基于反馈,小步快跑,快速迭代;技术上,寻求 Dubbo 社区协作,增强对 Dubbo 相关技术风险的把控,同时抽离通用特性,反馈社区。三期目标:抽象出更通用的网络框架,从而使语言层,传输协议层、及中间件层独立扩展,一键切换。在上述三期目标基本落地后,高速公路系统不仅可以跑起来,同时拥有非常强大的扩展性,更好的承接业务需求及共建。在这过程中,我们需要解决不少技术问题。2.1.1 客户端路由如前面历史方案所述,其场景被限制为岛到云的单向数据传输,特点如下:客户端无路由能力:Consumer 端只能指定是否路由到云平台,而不能指定其他岛端。基于 filter 的扩展:Dubbo 的 Filter 并不是为路由设计的,在此基础上较难扩展。需要本地 Provider 角色:Consumer 端发出的请求,必须由一个注册在 Zookeeper 下的 Provider 兜住,然后 Filter 根据上下文决定是否转发,这就限制了业务方必须部署一个本地 Provider 应用(哪怕是空应用),才能做到跨网访问。我们要解决的问题之一,就是打破单向传输瓶颈,客户端可以更自由的路由到目标云 / 岛。我们设计了以下几种路由方式:注解方式:使用 @DubboReference 提供的通用 parameters 参数,设置路由目标,可以达到方法粒度的路由。
@DubboReference(check = false, parameters = {"ENV_SHANGHAI", "ALL"}) //all表示所有方法,可以单独指定
private DemoService demoService;配置中心指定:把以上 parameters = {"ENV_SHANGHAI", "ALL"} 信息,在配置中心配置,达到同样的效果,这种方式对代码完全无侵入。线程指定:这种方式最灵活。
AddressZoneSpecify.setAddress(Enviroment.SHANGHAI);
demoService.play();无论哪种路由方式,基于“用户透明“的原则,都不改变用户使用 dubbo 的方式。2.1.2 Dubbo 请求地址切换客户端路由最小限度地侵入业务代码,达到了透明调用远程服务的目标。但是,用户仍旧需要部署一套虚拟 Provider 应用,接收请求后按规则进行路由。为了避免部署多余的应用,我们需要有一定的机制,直接把 dubbo 流量切换到远程。图五解决了切换问题后,本地的 APP2 不再需要,甚至 zk 也可以移除。当然,如果业务同时有本地和远程的调用需要,也可以继续存在。图四原先,我们准备通过 Dubbo 的 Route 自定义扩展,去实现动态切换地址的能力。查阅资料后,发现 Dubbo 已经提供了类似能力。https://cn.dubbo.apache.org/zh-cn/docs3-v2/java-sdk/advanced-features-and-usage/service/specify-ip/该特性放在 Dubbo 的子工程 dubbo-spi-extensions 中,同样以 Route 扩展形式实现。但在实际使用过程中,我们遇到如下问题:不支持 Dubbo2:使用 Dubbo2 时,直接以异常的形式提醒暂不支持。NPE 异常:某些场景下调用出现了 NPE 异常。丢失部分信息:Router 下构建新 Invocation 时,丢失了 version、group 等信息。重试异常:远程 Provider 如果发生了异常,客户端在重试的时候,选择了本地集群 Provider 调用,造成错误。作为一个尝鲜新特性,我们理解功能存在不稳定的情况。但这个功能作为我们跨网方案的技术要点,又必须解决。所以,我们通过 PR 的形式,把相应补丁提交到 Dubbo 社区。这个过程中,我们联系到了 Dubbo PMC 远云大佬,一起讨论和完善 PR,直到解决所有已知问题。2.1.3 出口网关的实现在图 4 中,通过切换地址,我们似乎可以直接访问远程应用,并且架构非常简单。但是遗憾的是,存在几个难以解决的问题:网关组件的限制:在云岛 / 岛岛间,存在一系列网关组件,来提供转发、负载均衡的功能,比如 SLB、NGINX、WAF。这些组件并不能识别私有的 Dubbo 流量并转发。ip 白名单开通成本高:类似 P2P 方案,需要点对点开通 IP 白名单,成本巨大。升级维护复杂:客户端通过集成 SDK 的形式转发,后续如需要劫持流量进行扩展,需要同时对每个接入应用进行升级。图六针对以上问题,我们的设计中,需要加入 Dubbo 网关的角色,来实现以下目标。① 两端 ip 收敛显著减少网关长连接数量弱化服务注册发现流程(每个环境只有一个 Dubbo 网关,直接配置即可互相发现)简化鉴权、认证流程。一条链路可以使用白名单,一群则只能配置较复杂的鉴权② 两端功能收敛客户端的 SDK 专注路由功能,基本不用升级扩展功能放在 Dubbo-Proxy,统一升级,业务端无感知Dubbo-Proxy 作为业务网关,可以减轻对业务端的侵入,起到类似分布式运行时(Dapr)作用。但是,在引入之前,需要解决一些现实的技术问题。其中,最重要的问题是如何接收陌生的 Dubbo 流量,然后进行转发。做了一些相关调研后,有两个方案可用:通用 Provider直接在 Dubbo-Proxy 注册一个普通的通用 Service,客户端的 SDK 利用 Filter,劫持流量,直接调用通用 Service 后处理数据返回。注册虚拟节点该方案来源于远云。客户端在本地 zk 订阅远程节点时,通知 Proxy,Proxy 获取订阅的信息后(预先订阅所有 zk 变更),主动注册相应虚拟 Service(对 zk 来说,注册一个节点的参数只是字符串)到 zk 上。这样,可以把客户端的远程流量“骗”到 Proxy ,Proxy 再使用服务端泛化,接收请求并转发。以上两种方案,都可以实现出口网关。但是,在设计上,角色间需要多次交互,才能达到目的。那么,是否有更简洁的方式,直接支持这种接收和转发呢?首先,我们对 Dubbo 源码进行了调研,看 Provider 接收到陌生流量(无相应 Service)后会如何处理,是否有扩展点可以拦截。发现在 Byte 流解析阶段,Dubbo 即对 Service 进行了检查,不存在直接抛异常返回。图七在 Provider 处理的生命周期中,Decode 出于非常早期的阶段,几乎没有什么扩展点可以拦截处理。因为快速失败的理念,早期的检测确实可以避免后面无谓的代码执行消耗。但是,对比 Spring ,Dubbo 在扩展性上是有不足的,即对于一个通用的异常,却没有相应的扩展机制。我们决定在 decode 的基础上,加上对这个异常的扩展。主要思路是,在 decode 被调用处,catch 住这块异常,通过 SPI 的形式,获取扩展实现,可以定制异常信息,也可以控制 decode 流程重试。这块修改难度并不大,私有版本上顺利通过测试,同时提交 PR 到社区。这个过程中,远云大佬帮忙发现了一个并发安全的 bug,并给了不少减少风险的建议。
//解码结束后,无论是否异常,都将进入这个方法
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
if (req.error != null) {
// Give ExceptionProcessors a chance to retry request handle or custom exception information.
String exPs = System.getProperty(EXCEPTION_PROCESSOR_KEY);
if (StringUtils.isNotBlank(exPs)) {
ExtensionLoader<ExceptionProcessor> extensionLoader = channel.getUrl().getOrDefaultFrameworkModel().getExtensionLoader(ExceptionProcessor.class);
ExceptionProcessor expProcessor = extensionLoader.getOrDefaultExtension(exPs);
boolean handleError = expProcessor.shouldHandleError(error);
if (handleError) {
//获取异常扩展,执行wrapAndHandleException操作,需要重试的场景可以抛出retry异常
msg = Optional.ofNullable(expProcessor.wrapAndHandleException(channel, req)).orElse(msg);
}
}
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
}
}
//handleRequest过程中的retry控制
public void received(Channel channel, Object message) throws RemotingException {
//解码
decode(message);
try {
handler.handleRequest(channel, message);
} catch (RetryHandleException e) {
if (message instanceof Request) {
ErrorData errorData = (ErrorData) ((Request) message).getData();
//有定制,进行重试
retry(errorData.getData());
} else {
// Retry only once, and only Request will throw an RetryHandleException
throw new RemotingException(channel, "Unknown error encountered when retry handle: " + e.getMessage());
}
handler.received(channel, message);
}
}
关于 ExceptionProcessor 扩展,我们在官方扩展包 Dubbo-Spi-Extensions 中,提供了一个默认实现,允许控制重试解码,并自定义异常处理。2.1.4 中心网关图 6 的架构,已经非常接近最终实现了,但是缺了一个中心网关角色。引入这个网关 (基于 Apisix ) 的原因:白名单问题:虽然 Dubbo 网关收敛了终端 IP,但是要实现岛岛互通,还是得两两互开白名单。引入中心网关(云平台)后,每个岛单独和云平台互开即可。白名单开通复杂度从 O(n*n) 变为 O(n)。统一网关的好处:作为公司级网关,可以统一对所有应用进行限流、鉴权、审计、可观测性等功能拓展。更多思考无论公司内外,能选择的跨网方案非常多,我们会去选择一个能解决痛点的,而不是完美的方案。落地方案一般比较保守,但是对于架构的思考,一定是需要更超前的。http 协议导致的性能损失前面说到,在 Dubbo 网关和中心网关间,我们使用了 Http 协议。对比 Dubbo 等精简协议,Http 协议显然更臃肿。但是,也许这是现阶段最合适的方案。除了避免私有协议在网络设备中的“艰难前行”,Http 协议开发成本更低,相应落地风险也更小。一些新技术,也许是我们后续发展的方向。比如 Higress,支持 Triple 协议(基于 Http2)交换信息,在获得更高性能的同时,也解决了设备识别问题。但是选择 Higress,需要面对学习认知成本、新开源 BUG 多等问题,同时它可能更适合内部网络(即使跨公网也能搭建 VPN),而不是我们各私有岛端(客户自定义安全策略)的网络互通。扩展性不足高速公路是一个基于 Dubbo 的跨网方案,在协议与框架层,与 Dubbo 的绑定比较深,但是它应该能做的更多。也许很快,会接入 Http、Mq 等应用协议的流量,或者 Python、Go 等语言的客户端,甚至是 Mysql 的数据互通。这个时候,要么对架构大改,要么各种兼容,这都不是我们想看到的。参考网络分层协议,我们也粗略地做了一个分层抽象规划。图八物理层打通:主要解决网络异构问题,即约定不同安全策略的子域如何通信。通讯协议层加速:前面讲到的应用层协议,需要做到允许独立扩展及切换。语言层编译加速:业务网关可能更适合使用 Golang,然后 Java 节点是否可以用 Native 优化性能?框架层功能升级:比如当前对 Dubbo 的定制开发,使用的 Apisix 中心网关是否可以扩展 dubbo 转 dubbo?任务编排:业务的跨网调度,不一定是 A->B->C->D,会不会是 A、B 同时完成后才能 ->C->D?更上层的控制面 / 治理面 / 运维面未来规划随着高速公路方案在政采云的逐渐落地,我们未来会从稳定性、功能增强、新技术探索三个方面去做深、做广:1. 稳定性:基础服务的稳定性是一切的基石,而这往往是不少研发同学容易忽视的一点,研发同学需“在晴天时修屋顶”。系统自身的健壮性:资源池化隔离、QoS 保障能力建设。节点实例的稳定性:加固发现能力,持续完善异常检测工具(除了常规的健康检测,会从观测指标的不同纬度综合决策),自动进行异常实例的替换;加强数据运营,提升反馈能力。2. 功能增强协议增强:当前只能对 Dubbo 流量转发,计划增加对 Http/Grpc 等协议等支持,从而支持更多的场景(已有业务提此类需求)。安全性增强:在中心网关 Apisix 开发鉴权、审计等插件,更好的控制跨网的调用与被调。易用性增强:开发自动工单系统,对需要配置的事项,由业务测提工单,相应人员审核后自动配置,解放劳动力同时减少出错概率。3. 新技术探索网关场景,通常有个两个比较明显的特点:并发量高:多个应用复用同一个网关行为轻量:一般只有转发、权限校验等轻量操作基于这两个特点,语言层性能开销在总性能开销中的占比,往往会业务应用更大,这个时候, Golang 等语言会比 Java 更有优势。当前也在对 Dubbo-Go 调研,未来替换基于 Java 版 Dubbo 的网关应用。另外,Higress 方案看起来不错,必定会有许多值得我们学习的东西。欢迎感兴趣的同学扫描下方二维码加入钉钉交流群,一起参与探讨交流。MSE 3 月采购季优惠【0元试用】服务治理试用版,开通后 30 天免费使用。【新老同享】服务治理资源包:专业版/(7200Agent*小时)67.5 元。注册配置中心资源包:开发版/规格 1c2g 76.7 元【首购专享】注册配置中心专业版,引擎类型 Nacos,包年包月 6 折。注册配置中心专业版,引擎类型 ZooKeeper,包年包月 5 折。云原生网关,包年包月 6 折。服务治理资源包(按量抵扣),包年包月 6 折。
Dubbo3 源码解读-宋小生-14:Dubbo配置加载全解析
完整电子书下载地址: https://developer.aliyun.com/ebook/7894Dubbo3 已经全面取代 HSF2 成为阿里的下一代服务框架,2022 双十一基于 Dubbo3 首次实现了关键业务不停推、不降级的全面用户体验提升,从技术上,大幅提高研发与运维效率的同时地址推送等关键资源利用率提升超 40%,基于三位一体的开源中间件体系打造了阿里在云上的单元化最佳实践和统一标准,同时将规模化实践经验与技术创新贡献开源社区,极大的推动了开源技术与标准的发展。本文是 Dubbo 社区贡献者宋小生基于 Dubbo3 3.0.8 版本撰写的源码解析博客,在 Dubbo3 开源&内部技术栈统一的情况下,期望能对集团内的开发者了解 Dubbo3 背后的实现原理有所帮助。可点此查看 博客原文 。本篇是宋小生系列 14/30 篇。同时,由 Dubbo3 团队领导的源码解读系列也正在进行中,感兴趣的同学可加入钉钉群了解详情: 2816500319414.1 回到启动器的初始化过程在应用程序启动的时候会调用发布器的启动方法 ,然后调用初始化方法,在发布器DefaultApplicationDeployer中的初始化方法initialize() 如下:@Override
public void initialize() {
if (initialized) {
return;
}
// Ensure that the initialization is completed when concurrent calls
synchronized (startLock) {
if (initialized) {
return;
}
// register shutdown hook
registerShutdownHook();
startConfigCenter();
loadApplicationConfigs();
initModuleDeployers();
// @since 2.7.8
startMetadataCenter();
initialized = true;
if (logger.isInfoEnabled()) {
logger.info(getIdentifier() + " has been initialized!");
}
}
}初始化过程中会先启动配置中心配置信息处理,然后 调用加载初始化应用程序配置方法loadApplicationConfigs();进行配置加载关于配置的官方文档链接为 配置概述Dubbo框架的配置项比较繁多,为了更好地管理各种配置,将其按照用途划分为不同的组件,最终所有配置项都会汇聚到URL中,传递给后续处理模块。常用配置组件如下:application: Dubbo应用配置registry: 注册中心protocol: 服务提供者RPC协议config-center: 配置中心metadata-report: 元数据中心service: 服务提供者配置reference: 远程服务引用配置provider: service的默认配置或分组配置consumer: reference的默认配置或分组配置module: 模块配置monitor: 监控配置metrics: 指标配置ssl: SSL/TLS配置配置还有几个比较重要的点:配置来源从Dubbo支持的配置来源说起,默认有6种配置来源:JVM System Properties,JVM -D 参数System environment,JVM进程的环境变量Externalized Configuration,外部化配置,从配置中心读取Application Configuration,应用的属性配置,从Spring应用的Environment中提取"dubbo"打头的属性集API / XML /注解等编程接口采集的配置可以被理解成配置来源的一种,是直接面向用户编程的配置采集方式从classpath读取配置文件 dubbo.properties覆盖关系下图展示了配置覆盖关系的优先级,从上到下优先级依次降低:配置方式Java API配置XML配置Annotation配置属性配置配置虽然非常多,但是我们掌握一下配置加载的原理,再了解下官网的文档说明路径应该基础的配置搞定是没问题的,更深入的配置很多参数还是需要了解下源码的.14.2 配置信息的初始化回顾前面我们在讲ModuleModel对象的创建的时候ModuleModel模型中包含了一个成员变量为ModuleEnvironment 代表当前的模块环境和ModuleConfigManager配置管理器而ModuleModel模型对象的父模型对象ApplicationModel中包含了一个成员变量Environment环境和ConfigManager配置管理器.在回顾调用过程之前我们先看下模型,配置管理器和环境与配置之间的关系如下图:在ModuleConfigManager对象初始化方法initialize()中创建了模块配置管理器:ModuleConfigManager如下代码所示: @Override
protected void initialize() {
super.initialize();
this.serviceRepository = new ModuleServiceRepository(this);
this.moduleConfigManager = new ModuleConfigManager(this);
this.moduleConfigManager.initialize();
ModuleEnvironment环境信息对象也会在配置管理器创建的时候被调用到:如下代码所示: @Override
public ModuleEnvironment getModelEnvironment() {
if (moduleEnvironment == null) {
moduleEnvironment = (ModuleEnvironment) this.getExtensionLoader(ModuleExt.class)
.getExtension(ModuleEnvironment.NAME);
}
return moduleEnvironment;
}在扩展对象ExtensionLoader进行对象ModuleEnvironment创建之后会对对象进行初始化调用 initExtension(instance)方法 初始化的时候调用如下代码:ExtensionLoader中的初始化方法如下: private void initExtension(T instance) {
if (instance instanceof Lifecycle) {
Lifecycle lifecycle = (Lifecycle) instance;
lifecycle.initialize();
}
}## 14.3 属性加载### 14.3.1 Environment中属性的初始化方法这个初始化方法对应ModuleEnvironment的父类型Environment中的初始化方法如下:initialize() @Override
public void initialize() throws IllegalStateException {
if (initialized.compareAndSet(false, true)) {
//加载在JVM或者环境变量指定的dubbo.properties配置文件 配置的key为dubbo.properties.file ,如果未指定则查找类路径下的dubbo.properties
this.propertiesConfiguration = new PropertiesConfiguration(scopeModel);
//系统JVM参数的配置无需我们来加载到内存,系统已经加载好了放到了System中,我们只需System.getProperty(key)来调用
this.systemConfiguration = new SystemConfiguration();
//系统环境变量的配置,无需我们来加载到内存,系统已经加载好了放到了System中,我们只需System.getenv(key)来获取就可以
this.environmentConfiguration = new EnvironmentConfiguration();
//从远程配置中心的全局配置获取对应配置
this.externalConfiguration = new InmemoryConfiguration("ExternalConfig");
//从远程配置中心的应用配置获取对应配置
this.appExternalConfiguration = new InmemoryConfiguration("AppExternalConfig");
//应用内的配置比如: Spring Environment/PropertySources/application.properties
this.appConfiguration = new InmemoryConfiguration("AppConfig");
//加载迁移配置,用户在JVM参数或者环境变量中指定的dubbo.migration.file,如果用户未指定测尝试加载类路径下的dubbo-migration.yaml
loadMigrationRule();
}
}### 14.4.2 属性变量说明 前面我们已经基本上介绍了各个属性的含义下面用一个表格列举一下方便查看:属性变量名属性类型说明propertiesConfigurationPropertiesConfigurationdubbo.properties文件中的属性systemConfigurationSystemConfigurationJVM参数 启动进程时指定的 (-D)配置environmentConfigurationEnvironmentConfiguration环境变量中的配置externalConfigurationInmemoryConfiguration外部配置全局配置 例如配置中心中 config-center global/default configappExternalConfigurationInmemoryConfiguration外部的应用配置 例如配置中心中执行的当前应用的配置 config-center app configappConfigurationInmemoryConfiguration来自应用中的配置例如:Spring Environment/PropertySources/application.properties globalConfigurationCompositeConfiguration前面6个配置属性放到一起就是这个globalConfigurationMapsList<Map<String, String>>最前面的6个属性转换为map放到一起就是这个可以理解为将全局配置globalConfiguration转换成了列表 这个列表顺序在这里是:SystemConfiguration -> EnvironmentConfiguration -> AppExternalConfiguration -> ExternalConfiguration -> AppConfiguration -> AbstractConfig -> PropertiesConfigurationdefaultDynamicGlobalConfigurationCompositeConfiguration这个也是一个组合配置将defaultDynamicConfiguration动态配置(来自配置中心的配置)和全局配置添加到了自己的配置列表中 列表顺序为defaultDynamicConfiguration -> globalConfigurationlocalMigrationRuleString,用户在JVM参数或者环境变量中指定的dubbo.migration.file,如果用户未指定测尝试加载类路径下的dubbo-migration.yaml关于每个配置信息这里还是来了解下细节,方便大家了解原理.14.3.3 dubbo.properties配置文件加载解析原理如前面所示://加载在JVM或者环境变量指定的dubbo.properties配置文件 配置的key为dubbo.properties.file ,如果未指定则查找类路径下的dubbo.properties
this.propertiesConfiguration = new PropertiesConfiguration(scopeModel);下面就直接提构造器的PropertiesConfiguration代码了:public PropertiesConfiguration(ScopeModel scopeModel) {
this.scopeModel = scopeModel;
refresh();
}
public void refresh() {
//配置获取的过程是借助工具类ConfigUtils来获取的
properties = ConfigUtils.getProperties(scopeModel.getClassLoaders());
}继续看ConfigUtils的getProperties方法:public static Properties getProperties(Set<ClassLoader> classLoaders) {
//这个配置的KEY是dubbo.properties.file System.getProperty是从JVM参数中获取配置的 一般情况下我们在启动Java进程的时候会指定Dubbo配置文件 如配置:
//-Ddubbo.properties.file=/dubbo.properties
String path = System.getProperty(CommonConstants.DUBBO_PROPERTIES_KEY);
if (StringUtils.isEmpty(path)) {
//优先级最高的JVM参数拿不到数据则从 环境变量中获取,这个配置key也是dubbo.properties.file System.getenv是从环境变量中获取数据
//例如我们在环境变量中配置 dubbo.properties.file=/dubbo.properties
path = System.getenv(CommonConstants.DUBBO_PROPERTIES_KEY);
if (StringUtils.isEmpty(path)) {
//如果在JVM参数和环境变量都拿不到这个配置文件的路径我们就用默认的吧
//默认的路径是类路径下的资源文件 这个路径是: dubbo.properties
path = CommonConstants.DEFAULT_DUBBO_PROPERTIES;
}
}
return ConfigUtils.loadProperties(classLoaders, path, false, true);
}路径获取之后加载详细的配置内容:ConfigUtils的loadProperties代码如下:ConfigUtils.loadProperties(classLoaders, path, false, true);代码如下:public static Properties loadProperties(Set<ClassLoader> classLoaders, String fileName, boolean allowMultiFile, boolean optional) {
Properties properties = new Properties();
// add scene judgement in windows environment Fix 2557
//检查文件是否存在 直接加载配置文件如果加载到了配置文件则直接返回
if (checkFileNameExist(fileName)) {
try {
FileInputStream input = new FileInputStream(fileName);
try {
properties.load(input);
} finally {
input.close();
}
} catch (Throwable e) {
logger.warn("Failed to load " + fileName + " file from " + fileName + "(ignore this file): " + e.getMessage(), e);
}
return properties;
}
//为什么会有下面的逻辑呢,如果仅仅使用上面的加载方式只能加载到本系统下的配置文件,无法加载封装在jar中的根路径的配置
Set<java.net.URL> set = null;
try {
List<ClassLoader> classLoadersToLoad = new LinkedList<>();
classLoadersToLoad.add(ClassUtils.getClassLoader());
classLoadersToLoad.addAll(classLoaders);
//这个方法loadResources在扩展加载的时候说过
set = ClassLoaderResourceLoader.loadResources(fileName, classLoadersToLoad).values().stream().reduce(new LinkedHashSet<>(), (a, i) -> {
a.addAll(i);
return a;
});
} catch (Throwable t) {
logger.warn("Fail to load " + fileName + " file: " + t.getMessage(), t);
}
if (CollectionUtils.isEmpty(set)) {
if (!optional) {
logger.warn("No " + fileName + " found on the class path.");
}
return properties;
}
if (!allowMultiFile) {
if (set.size() > 1) {
String errMsg = String.format("only 1 %s file is expected, but %d dubbo.properties files found on class path: %s",
fileName, set.size(), set);
logger.warn(errMsg);
}
// fall back to use method getResourceAsStream
try {
properties.load(ClassUtils.getClassLoader().getResourceAsStream(fileName));
} catch (Throwable e) {
logger.warn("Failed to load " + fileName + " file from " + fileName + "(ignore this file): " + e.getMessage(), e);
}
return properties;
}
logger.info("load " + fileName + " properties file from " + set);
for (java.net.URL url : set) {
try {
Properties p = new Properties();
InputStream input = url.openStream();
if (input != null) {
try {
p.load(input);
properties.putAll(p);
} finally {
try {
input.close();
} catch (Throwable t) {
}
}
}
} catch (Throwable e) {
logger.warn("Fail to load " + fileName + " file from " + url + "(ignore this file): " + e.getMessage(), e);
}
}
return properties;
}完整的配置加载流程这里用简单的话描述下:项目内配置查询路径查询从JVM参数中获取配置的 dubbo.properties.file配置文件路径如果前面未获取到路径则从环境变量参数中获取配置的dubbo.properties.file配置文件路径如果前面未获取到路径则使用默认路径dubbo.propertie配置加载将路径转为FileInputStream 然后使用Properties加载依赖中的配置扫描查询使用类加载器扫描所有资源URLurl转InputStream 如 url.openStream() 然后使用Properties加载14.3.4 加载JVM参数的配置这里我们继续看SystemConfiguration配置的加载这个直接看下代码就可以了:这个类型仅仅是使用System.getProperty来获取JVM配置即可 public class SystemConfiguration implements Configuration {
@Override
public Object getInternalProperty(String key) {
return System.getProperty(key);
}
public Map<String, String> getProperties() {
return (Map) System.getProperties();
}
}14.3.5 加载环境变量参数的配置这里我们来看EnvironmentConfiguration,这里我们直接来看代码:public class EnvironmentConfiguration implements Configuration {
@Override
public Object getInternalProperty(String key) {
String value = System.getenv(key);
if (StringUtils.isEmpty(value)) {
value = System.getenv(StringUtils.toOSStyleKey(key));
}
return value;
}
public Map<String, String> getProperties() {
return System.getenv();
}
}14.3.6 内存配置的封装:InmemoryConfiguration这里我们看下InmemoryConfiguration的设计,这个直接看代码吧内部使用了一个LinkedHashMap来存储配置public class InmemoryConfiguration implements Configuration {
private String name;
// stores the configuration key-value pairs
private Map<String, String> store = new LinkedHashMap<>();
public InmemoryConfiguration() {
}
public InmemoryConfiguration(String name) {
this.name = name;
}
public InmemoryConfiguration(Map<String, String> properties) {
this.setProperties(properties);
}
@Override
public Object getInternalProperty(String key) {
return store.get(key);
}
/**
* Add one property into the store, the previous value will be replaced if the key exists
*/
public void addProperty(String key, String value) {
store.put(key, value);
}
/**
* Add a set of properties into the store
*/
public void addProperties(Map<String, String> properties) {
if (properties != null) {
this.store.putAll(properties);
}
}
/**
* set store
*/
public void setProperties(Map<String, String> properties) {
if (properties != null) {
this.store = properties;
}
}
public Map<String, String> getProperties() {
return store;
}
}14.3.7 Dubbo迁移新版本的配置文件加载dubbo-migration.yaml关于配置迁移文件的用法可以看下这个Dubbo官方的地址迁移规则说明这个配置文件的文件名字为:dubbo-migration.yaml这个和14.3.4加载JVM参数配置的过程是相似的细节可以看14.3.4节 private void loadMigrationRule() {
//JVM参数的dubbo.migration.file配置
String path = System.getProperty(CommonConstants.DUBBO_MIGRATION_KEY);
if (StringUtils.isEmpty(path)) {
//环境变量的dubbo.migration.file配置
path = System.getenv(CommonConstants.DUBBO_MIGRATION_KEY);
if (StringUtils.isEmpty(path)) {
//默认的迁移配置文件 dubbo-migration.yaml
path = CommonConstants.DEFAULT_DUBBO_MIGRATION_FILE;
}
}
this.localMigrationRule = ConfigUtils.loadMigrationRule(scopeModel.getClassLoaders(), path);
}14.4 初始化加载应用配置加载配置涉及到了配置优先级的处理,下面来看加载配置代码 loadApplicationConfigs()方法private void loadApplicationConfigs() {
//发布器还是不处理配置加载的逻辑还是交给配置管理器
configManager.loadConfigs();
}配置管理器加载配置: @Override
public void loadConfigs() {
// application config has load before starting config center
// load dubbo.applications.xxx
//加载应用配置
loadConfigsOfTypeFromProps(ApplicationConfig.class);
// load dubbo.monitors.xxx
//加载监控配置
loadConfigsOfTypeFromProps(MonitorConfig.class);
// load dubbo.metrics.xxx
//加载指标监控配置
loadConfigsOfTypeFromProps(MetricsConfig.class);
// load multiple config types:
// load dubbo.protocols.xxx
//加载协议配置
loadConfigsOfTypeFromProps(ProtocolConfig.class);
// load dubbo.registries.xxx
loadConfigsOfTypeFromProps(RegistryConfig.class);
// load dubbo.metadata-report.xxx
//加载元数据配置
loadConfigsOfTypeFromProps(MetadataReportConfig.class);
// config centers has bean loaded before starting config center
//loadConfigsOfTypeFromProps(ConfigCenterConfig.class);
//刷新配置
refreshAll();
//检查配置
checkConfigs();
// set model name
if (StringUtils.isBlank(applicationModel.getModelName())) {
applicationModel.setModelName(applicationModel.getApplicationName());
}
}
Dubbo3 源码解读-宋小生-14:Dubbo配置加载全解析
完整电子书下载地址: https://developer.aliyun.com/ebook/7894Dubbo3 已经全面取代 HSF2 成为阿里的下一代服务框架,2022 双十一基于 Dubbo3 首次实现了关键业务不停推、不降级的全面用户体验提升,从技术上,大幅提高研发与运维效率的同时地址推送等关键资源利用率提升超 40%,基于三位一体的开源中间件体系打造了阿里在云上的单元化最佳实践和统一标准,同时将规模化实践经验与技术创新贡献开源社区,极大的推动了开源技术与标准的发展。本文是 Dubbo 社区贡献者宋小生基于 Dubbo3 3.0.8 版本撰写的源码解析博客,在 Dubbo3 开源&内部技术栈统一的情况下,期望能对集团内的开发者了解 Dubbo3 背后的实现原理有所帮助。可点此查看 博客原文 。本篇是宋小生系列 14/30 篇。同时,由 Dubbo3 团队领导的源码解读系列也正在进行中,感兴趣的同学可加入钉钉群了解详情: 2816500319414.1 回到启动器的初始化过程在应用程序启动的时候会调用发布器的启动方法 ,然后调用初始化方法,在发布器DefaultApplicationDeployer中的初始化方法initialize() 如下:@Override
public void initialize() {
if (initialized) {
return;
}
// Ensure that the initialization is completed when concurrent calls
synchronized (startLock) {
if (initialized) {
return;
}
// register shutdown hook
registerShutdownHook();
startConfigCenter();
loadApplicationConfigs();
initModuleDeployers();
// @since 2.7.8
startMetadataCenter();
initialized = true;
if (logger.isInfoEnabled()) {
logger.info(getIdentifier() + " has been initialized!");
}
}
}初始化过程中会先启动配置中心配置信息处理,然后 调用加载初始化应用程序配置方法loadApplicationConfigs();进行配置加载关于配置的官方文档链接为 配置概述Dubbo框架的配置项比较繁多,为了更好地管理各种配置,将其按照用途划分为不同的组件,最终所有配置项都会汇聚到URL中,传递给后续处理模块。常用配置组件如下:application: Dubbo应用配置registry: 注册中心protocol: 服务提供者RPC协议config-center: 配置中心metadata-report: 元数据中心service: 服务提供者配置reference: 远程服务引用配置provider: service的默认配置或分组配置consumer: reference的默认配置或分组配置module: 模块配置monitor: 监控配置metrics: 指标配置ssl: SSL/TLS配置配置还有几个比较重要的点:配置来源从Dubbo支持的配置来源说起,默认有6种配置来源:JVM System Properties,JVM -D 参数System environment,JVM进程的环境变量Externalized Configuration,外部化配置,从配置中心读取Application Configuration,应用的属性配置,从Spring应用的Environment中提取"dubbo"打头的属性集API / XML /注解等编程接口采集的配置可以被理解成配置来源的一种,是直接面向用户编程的配置采集方式从classpath读取配置文件 dubbo.properties覆盖关系下图展示了配置覆盖关系的优先级,从上到下优先级依次降低:配置方式Java API配置XML配置Annotation配置属性配置配置虽然非常多,但是我们掌握一下配置加载的原理,再了解下官网的文档说明路径应该基础的配置搞定是没问题的,更深入的配置很多参数还是需要了解下源码的.14.2 配置信息的初始化回顾前面我们在讲ModuleModel对象的创建的时候ModuleModel模型中包含了一个成员变量为ModuleEnvironment 代表当前的模块环境和ModuleConfigManager配置管理器而ModuleModel模型对象的父模型对象ApplicationModel中包含了一个成员变量Environment环境和ConfigManager配置管理器.在回顾调用过程之前我们先看下模型,配置管理器和环境与配置之间的关系如下图:在ModuleConfigManager对象初始化方法initialize()中创建了模块配置管理器:ModuleConfigManager如下代码所示: @Override
protected void initialize() {
super.initialize();
this.serviceRepository = new ModuleServiceRepository(this);
this.moduleConfigManager = new ModuleConfigManager(this);
this.moduleConfigManager.initialize();
ModuleEnvironment环境信息对象也会在配置管理器创建的时候被调用到:如下代码所示: @Override
public ModuleEnvironment getModelEnvironment() {
if (moduleEnvironment == null) {
moduleEnvironment = (ModuleEnvironment) this.getExtensionLoader(ModuleExt.class)
.getExtension(ModuleEnvironment.NAME);
}
return moduleEnvironment;
}在扩展对象ExtensionLoader进行对象ModuleEnvironment创建之后会对对象进行初始化调用 initExtension(instance)方法 初始化的时候调用如下代码:ExtensionLoader中的初始化方法如下: private void initExtension(T instance) {
if (instance instanceof Lifecycle) {
Lifecycle lifecycle = (Lifecycle) instance;
lifecycle.initialize();
}
}## 14.3 属性加载### 14.3.1 Environment中属性的初始化方法这个初始化方法对应ModuleEnvironment的父类型Environment中的初始化方法如下:initialize() @Override
public void initialize() throws IllegalStateException {
if (initialized.compareAndSet(false, true)) {
//加载在JVM或者环境变量指定的dubbo.properties配置文件 配置的key为dubbo.properties.file ,如果未指定则查找类路径下的dubbo.properties
this.propertiesConfiguration = new PropertiesConfiguration(scopeModel);
//系统JVM参数的配置无需我们来加载到内存,系统已经加载好了放到了System中,我们只需System.getProperty(key)来调用
this.systemConfiguration = new SystemConfiguration();
//系统环境变量的配置,无需我们来加载到内存,系统已经加载好了放到了System中,我们只需System.getenv(key)来获取就可以
this.environmentConfiguration = new EnvironmentConfiguration();
//从远程配置中心的全局配置获取对应配置
this.externalConfiguration = new InmemoryConfiguration("ExternalConfig");
//从远程配置中心的应用配置获取对应配置
this.appExternalConfiguration = new InmemoryConfiguration("AppExternalConfig");
//应用内的配置比如: Spring Environment/PropertySources/application.properties
this.appConfiguration = new InmemoryConfiguration("AppConfig");
//加载迁移配置,用户在JVM参数或者环境变量中指定的dubbo.migration.file,如果用户未指定测尝试加载类路径下的dubbo-migration.yaml
loadMigrationRule();
}
}### 14.4.2 属性变量说明 前面我们已经基本上介绍了各个属性的含义下面用一个表格列举一下方便查看:属性变量名属性类型说明propertiesConfigurationPropertiesConfigurationdubbo.properties文件中的属性systemConfigurationSystemConfigurationJVM参数 启动进程时指定的 (-D)配置environmentConfigurationEnvironmentConfiguration环境变量中的配置externalConfigurationInmemoryConfiguration外部配置全局配置 例如配置中心中 config-center global/default configappExternalConfigurationInmemoryConfiguration外部的应用配置 例如配置中心中执行的当前应用的配置 config-center app configappConfigurationInmemoryConfiguration来自应用中的配置例如:Spring Environment/PropertySources/application.properties globalConfigurationCompositeConfiguration前面6个配置属性放到一起就是这个globalConfigurationMapsList<Map<String, String>>最前面的6个属性转换为map放到一起就是这个可以理解为将全局配置globalConfiguration转换成了列表 这个列表顺序在这里是:SystemConfiguration -> EnvironmentConfiguration -> AppExternalConfiguration -> ExternalConfiguration -> AppConfiguration -> AbstractConfig -> PropertiesConfigurationdefaultDynamicGlobalConfigurationCompositeConfiguration这个也是一个组合配置将defaultDynamicConfiguration动态配置(来自配置中心的配置)和全局配置添加到了自己的配置列表中 列表顺序为defaultDynamicConfiguration -> globalConfigurationlocalMigrationRuleString,用户在JVM参数或者环境变量中指定的dubbo.migration.file,如果用户未指定测尝试加载类路径下的dubbo-migration.yaml关于每个配置信息这里还是来了解下细节,方便大家了解原理.14.3.3 dubbo.properties配置文件加载解析原理如前面所示://加载在JVM或者环境变量指定的dubbo.properties配置文件 配置的key为dubbo.properties.file ,如果未指定则查找类路径下的dubbo.properties
this.propertiesConfiguration = new PropertiesConfiguration(scopeModel);下面就直接提构造器的PropertiesConfiguration代码了:public PropertiesConfiguration(ScopeModel scopeModel) {
this.scopeModel = scopeModel;
refresh();
}
public void refresh() {
//配置获取的过程是借助工具类ConfigUtils来获取的
properties = ConfigUtils.getProperties(scopeModel.getClassLoaders());
}继续看ConfigUtils的getProperties方法:public static Properties getProperties(Set<ClassLoader> classLoaders) {
//这个配置的KEY是dubbo.properties.file System.getProperty是从JVM参数中获取配置的 一般情况下我们在启动Java进程的时候会指定Dubbo配置文件 如配置:
//-Ddubbo.properties.file=/dubbo.properties
String path = System.getProperty(CommonConstants.DUBBO_PROPERTIES_KEY);
if (StringUtils.isEmpty(path)) {
//优先级最高的JVM参数拿不到数据则从 环境变量中获取,这个配置key也是dubbo.properties.file System.getenv是从环境变量中获取数据
//例如我们在环境变量中配置 dubbo.properties.file=/dubbo.properties
path = System.getenv(CommonConstants.DUBBO_PROPERTIES_KEY);
if (StringUtils.isEmpty(path)) {
//如果在JVM参数和环境变量都拿不到这个配置文件的路径我们就用默认的吧
//默认的路径是类路径下的资源文件 这个路径是: dubbo.properties
path = CommonConstants.DEFAULT_DUBBO_PROPERTIES;
}
}
return ConfigUtils.loadProperties(classLoaders, path, false, true);
}路径获取之后加载详细的配置内容:ConfigUtils的loadProperties代码如下:ConfigUtils.loadProperties(classLoaders, path, false, true);代码如下:public static Properties loadProperties(Set<ClassLoader> classLoaders, String fileName, boolean allowMultiFile, boolean optional) {
Properties properties = new Properties();
// add scene judgement in windows environment Fix 2557
//检查文件是否存在 直接加载配置文件如果加载到了配置文件则直接返回
if (checkFileNameExist(fileName)) {
try {
FileInputStream input = new FileInputStream(fileName);
try {
properties.load(input);
} finally {
input.close();
}
} catch (Throwable e) {
logger.warn("Failed to load " + fileName + " file from " + fileName + "(ignore this file): " + e.getMessage(), e);
}
return properties;
}
//为什么会有下面的逻辑呢,如果仅仅使用上面的加载方式只能加载到本系统下的配置文件,无法加载封装在jar中的根路径的配置
Set<java.net.URL> set = null;
try {
List<ClassLoader> classLoadersToLoad = new LinkedList<>();
classLoadersToLoad.add(ClassUtils.getClassLoader());
classLoadersToLoad.addAll(classLoaders);
//这个方法loadResources在扩展加载的时候说过
set = ClassLoaderResourceLoader.loadResources(fileName, classLoadersToLoad).values().stream().reduce(new LinkedHashSet<>(), (a, i) -> {
a.addAll(i);
return a;
});
} catch (Throwable t) {
logger.warn("Fail to load " + fileName + " file: " + t.getMessage(), t);
}
if (CollectionUtils.isEmpty(set)) {
if (!optional) {
logger.warn("No " + fileName + " found on the class path.");
}
return properties;
}
if (!allowMultiFile) {
if (set.size() > 1) {
String errMsg = String.format("only 1 %s file is expected, but %d dubbo.properties files found on class path: %s",
fileName, set.size(), set);
logger.warn(errMsg);
}
// fall back to use method getResourceAsStream
try {
properties.load(ClassUtils.getClassLoader().getResourceAsStream(fileName));
} catch (Throwable e) {
logger.warn("Failed to load " + fileName + " file from " + fileName + "(ignore this file): " + e.getMessage(), e);
}
return properties;
}
logger.info("load " + fileName + " properties file from " + set);
for (java.net.URL url : set) {
try {
Properties p = new Properties();
InputStream input = url.openStream();
if (input != null) {
try {
p.load(input);
properties.putAll(p);
} finally {
try {
input.close();
} catch (Throwable t) {
}
}
}
} catch (Throwable e) {
logger.warn("Fail to load " + fileName + " file from " + url + "(ignore this file): " + e.getMessage(), e);
}
}
return properties;
}完整的配置加载流程这里用简单的话描述下:项目内配置查询路径查询从JVM参数中获取配置的 dubbo.properties.file配置文件路径如果前面未获取到路径则从环境变量参数中获取配置的dubbo.properties.file配置文件路径如果前面未获取到路径则使用默认路径dubbo.propertie配置加载将路径转为FileInputStream 然后使用Properties加载依赖中的配置扫描查询使用类加载器扫描所有资源URLurl转InputStream 如 url.openStream() 然后使用Properties加载14.3.4 加载JVM参数的配置这里我们继续看SystemConfiguration配置的加载这个直接看下代码就可以了:这个类型仅仅是使用System.getProperty来获取JVM配置即可 public class SystemConfiguration implements Configuration {
@Override
public Object getInternalProperty(String key) {
return System.getProperty(key);
}
public Map<String, String> getProperties() {
return (Map) System.getProperties();
}
}14.3.5 加载环境变量参数的配置这里我们来看EnvironmentConfiguration,这里我们直接来看代码:public class EnvironmentConfiguration implements Configuration {
@Override
public Object getInternalProperty(String key) {
String value = System.getenv(key);
if (StringUtils.isEmpty(value)) {
value = System.getenv(StringUtils.toOSStyleKey(key));
}
return value;
}
public Map<String, String> getProperties() {
return System.getenv();
}
}14.3.6 内存配置的封装:InmemoryConfiguration这里我们看下InmemoryConfiguration的设计,这个直接看代码吧内部使用了一个LinkedHashMap来存储配置public class InmemoryConfiguration implements Configuration {
private String name;
// stores the configuration key-value pairs
private Map<String, String> store = new LinkedHashMap<>();
public InmemoryConfiguration() {
}
public InmemoryConfiguration(String name) {
this.name = name;
}
public InmemoryConfiguration(Map<String, String> properties) {
this.setProperties(properties);
}
@Override
public Object getInternalProperty(String key) {
return store.get(key);
}
/**
* Add one property into the store, the previous value will be replaced if the key exists
*/
public void addProperty(String key, String value) {
store.put(key, value);
}
/**
* Add a set of properties into the store
*/
public void addProperties(Map<String, String> properties) {
if (properties != null) {
this.store.putAll(properties);
}
}
/**
* set store
*/
public void setProperties(Map<String, String> properties) {
if (properties != null) {
this.store = properties;
}
}
public Map<String, String> getProperties() {
return store;
}
}14.3.7 Dubbo迁移新版本的配置文件加载dubbo-migration.yaml关于配置迁移文件的用法可以看下这个Dubbo官方的地址迁移规则说明这个配置文件的文件名字为:dubbo-migration.yaml这个和14.3.4加载JVM参数配置的过程是相似的细节可以看14.3.4节 private void loadMigrationRule() {
//JVM参数的dubbo.migration.file配置
String path = System.getProperty(CommonConstants.DUBBO_MIGRATION_KEY);
if (StringUtils.isEmpty(path)) {
//环境变量的dubbo.migration.file配置
path = System.getenv(CommonConstants.DUBBO_MIGRATION_KEY);
if (StringUtils.isEmpty(path)) {
//默认的迁移配置文件 dubbo-migration.yaml
path = CommonConstants.DEFAULT_DUBBO_MIGRATION_FILE;
}
}
this.localMigrationRule = ConfigUtils.loadMigrationRule(scopeModel.getClassLoaders(), path);
}14.4 初始化加载应用配置加载配置涉及到了配置优先级的处理,下面来看加载配置代码 loadApplicationConfigs()方法private void loadApplicationConfigs() {
//发布器还是不处理配置加载的逻辑还是交给配置管理器
configManager.loadConfigs();
}配置管理器加载配置: @Override
public void loadConfigs() {
// application config has load before starting config center
// load dubbo.applications.xxx
//加载应用配置
loadConfigsOfTypeFromProps(ApplicationConfig.class);
// load dubbo.monitors.xxx
//加载监控配置
loadConfigsOfTypeFromProps(MonitorConfig.class);
// load dubbo.metrics.xxx
//加载指标监控配置
loadConfigsOfTypeFromProps(MetricsConfig.class);
// load multiple config types:
// load dubbo.protocols.xxx
//加载协议配置
loadConfigsOfTypeFromProps(ProtocolConfig.class);
// load dubbo.registries.xxx
loadConfigsOfTypeFromProps(RegistryConfig.class);
// load dubbo.metadata-report.xxx
//加载元数据配置
loadConfigsOfTypeFromProps(MetadataReportConfig.class);
// config centers has bean loaded before starting config center
//loadConfigsOfTypeFromProps(ConfigCenterConfig.class);
//刷新配置
refreshAll();
//检查配置
checkConfigs();
// set model name
if (StringUtils.isBlank(applicationModel.getModelName())) {
applicationModel.setModelName(applicationModel.getApplicationName());
}
}
Dubbo3 源码解读-宋小生-15:Dubbo的三大中心之元数据中心源码解析
完整电子书下载地址: https://developer.aliyun.com/ebook/7894Dubbo3 已经全面取代 HSF2 成为阿里的下一代服务框架,2022 双十一基于 Dubbo3 首次实现了关键业务不停推、不降级的全面用户体验提升,从技术上,大幅提高研发与运维效率的同时地址推送等关键资源利用率提升超 40%,基于三位一体的开源中间件体系打造了阿里在云上的单元化最佳实践和统一标准,同时将规模化实践经验与技术创新贡献开源社区,极大的推动了开源技术与标准的发展。本文是 Dubbo 社区贡献者宋小生基于 Dubbo3 3.0.8 版本撰写的源码解析博客,在 Dubbo3 开源&内部技术栈统一的情况下,期望能对集团内的开发者了解 Dubbo3 背后的实现原理有所帮助。可点此查看 博客原文 。本篇是宋小生系列 15/30 篇。同时,由 Dubbo3 团队领导的源码解读系列也正在进行中,感兴趣的同学可加入钉钉群了解详情: 2816500319415.1 简介关于元数据中心的概念对于大部分用户来说是比较陌生的,配置中心的话我们还好理解,对于元数据中心是什么,我们来看下我从官网拷贝过来的一段文字:元数据中心在2.7.x版本开始支持,随着应用级别的服务注册和服务发现在Dubbo中落地,元数据中心也变的越来越重要。在以下几种情况下会需要部署元数据中心:对于一个原先采用老版本Dubbo搭建的应用服务,在迁移到Dubbo 3时,Dubbo 3 会需要一个元数据中心来维护RPC服务与应用的映射关系(即接口与应用的映射关系),因为如果采用了应用级别的服务发现和服务注册,在注册中心中将采用“应用 —— 实例列表”结构的数据组织形式,不再是以往的“接口 —— 实例列表”结构的数据组织形式,而以往用接口级别的服务注册和服务发现的应用服务在迁移到应用级别时,得不到接口与应用之间的对应关系,从而无法从注册中心得到实例列表信息,所以Dubbo为了兼容这种场景,在Provider端启动时,会往元数据中心存储接口与应用的映射关系。为了让注册中心更加聚焦与地址的发现和推送能力,减轻注册中心的负担,元数据中心承载了所有的服务元数据、大量接口/方法级别配置信息等,无论是接口粒度还是应用粒度的服务发现和注册,元数据中心都起到了重要的作用。如果有以上两种需求,都可以选择部署元数据中心,并通过Dubbo的配置来集成该元数据中心。元数据中心并不依赖于注册中心和配置中心,用户可以自由选择是否集成和部署元数据中心,如下图所示:该图中不配备配置中心,意味着可以不需要全局管理配置的能力。该图中不配备注册中心,意味着可能采用了Dubbo mesh的方案,也可能不需要进行服务注册,仅仅接收直连模式的服务调用。官网参考文章地址:部署架构(注册中心 配置中心 元数据中心元数据参考手册综上所述可以用几句话概括下:元数据中心来维护RPC服务与应用的映射关系(即接口与应用的映射关系)来兼容接口与应用之间的对应关系让注册中心更加聚焦与地址的发现和推送能力注册中心的启动是在DefaultApplicationDeployer中的初始化方法 initialize() 中:如下所示这里只看下 startMetadataCenter();方法即可 @Override
public void initialize() {
if (initialized) {
return;
}
// Ensure that the initialization is completed when concurrent calls
synchronized (startLock) {
if (initialized) {
return;
}
// register shutdown hook
registerShutdownHook();
startConfigCenter();
loadApplicationConfigs();
initModuleDeployers();
// @since 2.7.8
startMetadataCenter();
initialized = true;
if (logger.isInfoEnabled()) {
logger.info(getIdentifier() + " has been initialized!");
}
}
}
15.2 深入探究元数据中心的启动过程### 15.2.1 启动元数据中心的代码全貌关于元数据中心我们看下 startMetadataCenter()方法来大致了解下整个流程private void startMetadataCenter() {
//如果未配置元数据中心的地址等配置则使用注册中心的地址等配置做为元数据中心的配置
useRegistryAsMetadataCenterIfNecessary();
//获取应用的配置信息
ApplicationConfig applicationConfig = getApplication();
//元数据配置类型 元数据类型, local 或 remote,,如果选择远程,则需要进一步指定元数据中心
String metadataType = applicationConfig.getMetadataType();
// FIXME, multiple metadata config support.
//查询元数据中心的地址等配置
Collection<MetadataReportConfig> metadataReportConfigs = configManager.getMetadataConfigs();
if (CollectionUtils.isEmpty(metadataReportConfigs)) {
//这个就是判断 如果选择远程,则需要进一步指定元数据中心 否则就抛出来异常
if (REMOTE_METADATA_STORAGE_TYPE.equals(metadataType)) {
throw new IllegalStateException("No MetadataConfig found, Metadata Center address is required when 'metadata=remote' is enabled.");
}
return;
}
//MetadataReport实例的存储库对象获取
MetadataReportInstance metadataReportInstance = applicationModel.getBeanFactory().getBean(MetadataReportInstance.class);
List<MetadataReportConfig> validMetadataReportConfigs = new ArrayList<>(metadataReportConfigs.size());
for (MetadataReportConfig metadataReportConfig : metadataReportConfigs) {
ConfigValidationUtils.validateMetadataConfig(metadataReportConfig);
validMetadataReportConfigs.add(metadataReportConfig);
}
//初始化元数据
metadataReportInstance.init(validMetadataReportConfigs);
//MetadataReport实例的存储库对象初始化失败则抛出异常
if (!metadataReportInstance.inited()) {
throw new IllegalStateException(String.format("%s MetadataConfigs found, but none of them is valid.", metadataReportConfigs.size()));
}
}15.2.2 元数据中心未配置则使用注册中心配置前面在说配置中心的时候有说过配置中心如果未配置会使用注册中心的地址等信息作为默认配置,这里元数据做了类似的操作:如代码:DefaultApplicationDeployer类型的 useRegistryAsMetadataCenterIfNecessary()方法private void useRegistryAsMetadataCenterIfNecessary() {
//配置缓存中查询元数据配置
Collection<MetadataReportConfig> metadataConfigs = configManager.getMetadataConfigs();
//配置存在则直接返回
if (CollectionUtils.isNotEmpty(metadataConfigs)) {
return;
}
////查询是否有注册中心设置了默认配置isDefault 设置为true的注册中心则为默认注册中心列表,如果没有注册中心设置为默认注册中心,则获取所有未设置默认配置的注册中心列表
List<RegistryConfig> defaultRegistries = configManager.getDefaultRegistries();
if (defaultRegistries.size() > 0) {
//多注册中心遍历
defaultRegistries
.stream()
//筛选符合条件的注册中心 (筛选逻辑就是查看是否有对应协议的扩展支持)
.filter(this::isUsedRegistryAsMetadataCenter)
//注册中心配置映射为元数据中心 映射就是获取需要的配置
.map(this::registryAsMetadataCenter)
//将元数据中心配置存储在配置缓存中方便后续使用
.forEach(metadataReportConfig -> {
if (metadataReportConfig.getId() == null) {
Collection<MetadataReportConfig> metadataReportConfigs = configManager.getMetadataConfigs();
if (CollectionUtils.isNotEmpty(metadataReportConfigs)) {
for (MetadataReportConfig existedConfig : metadataReportConfigs) {
if (existedConfig.getId() == null && existedConfig.getAddress().equals(metadataReportConfig.getAddress())) {
return;
}
}
}
configManager.addMetadataReport(metadataReportConfig);
} else {
Optional<MetadataReportConfig> configOptional = configManager.getConfig(MetadataReportConfig.class, metadataReportConfig.getId());
if (configOptional.isPresent()) {
return;
}
configManager.addMetadataReport(metadataReportConfig);
}
logger.info("use registry as metadata-center: " + metadataReportConfig);
});
}
}这个代码有些细节就不细说了 我们概括下顺序梳理下思路:配置缓存中查询元数据配置,配置存在则直接返回查询所有可用的默认注册中心列表多注册中心遍历选符合条件的注册中心 (筛选逻辑就是查看是否有对应协议的扩展支持)注册中心配置RegistryConfig映射转换为元数据中心配置类型MetadataReportConfig 映射就是获取需要的配置将元数据中心配置存储在配置缓存中方便后续使用元数据的配置可以参考官网:元数据参考手册这里主要看下可配置项有哪些 对应类型为MetadataReportConfig 在官网暂时未找到合适的文档,这里整理下属性列表方便后续配置说明查看:配置变量类型说明idString配置idprotocolString元数据协议addressString元数据中心地址portInteger元数据中心端口usernameString元数据中心认证用户名passwordString元数据中心认证密码timeoutInteger元数据中心的请求超时(毫秒)groupString该组将元数据保存在中。它与注册表相同parametersMap<String, String>自定义参数retryTimesInteger重试次数retryPeriodInteger重试间隔cycleReportBoolean默认情况下, 是否每天重复存储完整的元数据syncReportBooleanSync or Async report.clusterBoolean需要群集支持,默认为falseregistryString注册表配置idfileString元数据报告文件存储位置checkBoolean连接到元数据中心时要应用的失败策略15.2.3 元数据中心的初始化逻辑15.2.3.1 元数据中心的初始化调用逻辑主要看这一行比较重要的逻辑: //初始化元数据
metadataReportInstance.init(validMetadataReportConfigs);在了解这一行逻辑之前我们先来看下元数据相关联的类型:MetadataReportInstance中的初始化方法init public void init(List<MetadataReportConfig> metadataReportConfigs) {
//CAS判断是否有初始化过
if (!init.compareAndSet(false, true)) {
return;
}
//元数据类型配置如果未配置则默认为local
this.metadataType = applicationModel.getApplicationConfigManager().getApplicationOrElseThrow().getMetadataType();
if (metadataType == null) {
this.metadataType = DEFAULT_METADATA_STORAGE_TYPE;
}
//获取MetadataReportFactory 工厂类型
MetadataReportFactory metadataReportFactory = applicationModel.getExtensionLoader(MetadataReportFactory.class).getAdaptiveExtension();
//多元数据中心初始化
for (MetadataReportConfig metadataReportConfig : metadataReportConfigs) {
init(metadataReportConfig, metadataReportFactory);
}
}
private void init(MetadataReportConfig config, MetadataReportFactory metadataReportFactory) {
//配置转url
URL url = config.toUrl();
if (METADATA_REPORT_KEY.equals(url.getProtocol())) {
String protocol = url.getParameter(METADATA_REPORT_KEY, DEFAULT_DIRECTORY);
url = URLBuilder.from(url)
.setProtocol(protocol)
.setScopeModel(config.getScopeModel())
.removeParameter(METADATA_REPORT_KEY)
.build();
}
url = url.addParameterIfAbsent(APPLICATION_KEY, applicationModel.getCurrentConfig().getName());
String relatedRegistryId = isEmpty(config.getRegistry()) ? (isEmpty(config.getId()) ? DEFAULT_KEY : config.getId()) : config.getRegistry();
//从元数据工厂中获取元数据
MetadataReport metadataReport = metadataReportFactory.getMetadataReport(url);
//缓存元数据到内存
if (metadataReport != null) {
metadataReports.put(relatedRegistryId, metadataReport);
}
}关于元数据的初始化我们主要看两个位置:一个是元数据工厂对象的创建与初始化MetadataReportFactory一个是元数据对象的创建与初始化MetadataReport15.2.3.2 元数据工厂对象MetadataReportFactory关于元数据工厂类型MetadataReportFactory,元数据工厂 用于创建与管理元数据对象, 相关类型如下:我们这里主要以为Zookeeper扩展的元数据工厂ZookeeperMetadataReportFactory类型为例子:实现类型逻辑不复杂,这里就直接贴代码看看:public class ZookeeperMetadataReportFactory extends AbstractMetadataReportFactory {
//与Zookeeper交互的传输器
private ZookeeperTransporter zookeeperTransporter;
//应用程序模型
private ApplicationModel applicationModel;
public ZookeeperMetadataReportFactory(ApplicationModel applicationModel) {
this.applicationModel = applicationModel;
this.zookeeperTransporter = ZookeeperTransporter.getExtension(applicationModel);
}
@DisableInject
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public MetadataReport createMetadataReport(URL url) {
return new ZookeeperMetadataReport(url, zookeeperTransporter);
}
}元数据工厂的实现比较简单继承抽象的元数据工厂AbstractMetadataReportFactory实现工厂方法createMetadataReport来创建一个元数据操作类型如果我们想要实现一个元数据工厂扩展可以参考Zookeeper的这个方式15.2.3.3 元数据操作对象MetadataReport的创建与初始化前面的从元数据工厂中获取元数据操作对象的逻辑处理代码如下://从元数据工厂中获取元数据 ,url对象可以理解为配置
MetadataReport metadataReport = metadataReportFactory.getMetadataReport(url);关于元数据对象,用于元数据信息的增删改查等逻辑的操作与元数据信息的缓存我们这里还是以Zookeeper的实现ZookeeperMetadataReportFactory类型做为参考:我们先来看这个逻辑//从元数据工厂中获取元数据 ,url对象可以理解为配置
MetadataReport metadataReport = metadataReportFactory.getMetadataReport(url);ZookeeperMetadataReportFactory的父类型AbstractMetadataReportFactory中的getMetadataReport方法如下: @Override
public MetadataReport getMetadataReport(URL url) {
//url值参考例子zookeeper://127.0.0.1:2181?application=dubbo-demo-api-provider&client=&port=2181&protocol=zookeeper
//如果存在export则移除
url = url.setPath(MetadataReport.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY);
//生成元数据缓存key 元数据维度 地址+名字
//如: zookeeper://127.0.0.1:2181/org.apache.dubbo.metadata.report.MetadataReport
String key = url.toServiceString();
//缓存中查询 查到则直接返回
MetadataReport metadataReport = serviceStoreMap.get(key);
if (metadataReport != null) {
return metadataReport;
}
// Lock the metadata access process to ensure a single instance of the metadata instance
//存在写操作 加个锁
lock.lock();
try {
//双重校验锁在查一下
metadataReport = serviceStoreMap.get(key);
if (metadataReport != null) {
return metadataReport;
}
//check参数 查元数据报错是否抛出异常
boolean check = url.getParameter(CHECK_KEY, true) && url.getPort() != 0;
try {
//关键模版方法 调用扩展实现的具体业务(创建元数据操作对象)
metadataReport = createMetadataReport(url);
} catch (Exception e) {
if (!check) {
logger.warn("The metadata reporter failed to initialize", e);
} else {
throw e;
}
}
//check逻辑检查
if (check && metadataReport == null) {
throw new IllegalStateException("Can not create metadata Report " + url);
}
//缓存对象
if (metadataReport != null) {
serviceStoreMap.put(key, metadataReport);
}
//返回
return metadataReport;
} finally {
// Release the lock
lock.unlock();
}
}
上面这个抽象类AbstractMetadataReportFactory中的获取元数据操作对象的模版方法getMetadataReport(URL url), 用了双重校验锁的逻辑来创建对象缓存对象,又用了模版方法设计模式,来让抽象类做通用的逻辑,让实现类型去做扩展, 虽然代码写的太长了些整体还是用了不少的设计思想.我们直接看这个代码:metadataReport = createMetadataReport(url);这个创建元数据操作对象的代码实际上走的是实现类型的逻辑:来自工厂Bean ZookeeperMetadataReportFactory的工厂方法如下所示:@Override
public MetadataReport createMetadataReport(URL url) {
return new ZookeeperMetadataReport(url, zookeeperTransporter);
}创建了元数据操作对象,这里我们继续看下元数据操作对象ZookeeperMetadataReport创建做了哪些逻辑:来自ZookeeperMetadataReport的构造器:public ZookeeperMetadataReport(URL url, ZookeeperTransporter zookeeperTransporter) {
//url即配置 配置传递给抽象类 做一些公共的逻辑
//url参考:zookeeper://127.0.0.1:2181/org.apache.dubbo.metadata.report.MetadataReport?application=dubbo-demo-api-provider&client=&port=2181&protocol=zookeeper
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getGroup(DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
//连接Zookeeper
zkClient = zookeeperTransporter.connect(url);
}核心的公共的操作逻辑封装在父类AbstractMetadataReport里面我们来看前面super调用的构造器逻辑:如下所示: public AbstractMetadataReport(URL reportServerURL) {
//设置url 如:zookeeper://127.0.0.1:2181/org.apache.dubbo.metadata.report.MetadataReport?application=dubbo-demo-api-provider&client=&port=2181&protocol=zookeeper
setUrl(reportServerURL);
// Start file save timer
//缓存的文件名字
//格式为: 用户目录+/.dubbo/dubbo-metadata- + 应用程序名字application + url地址(IP+端口) + 后缀.cache 如下所示
///Users/song/.dubbo/dubbo-metadata-dubbo-demo-api-provider-127.0.0.1-2181.cache
String defaultFilename = System.getProperty(USER_HOME) + DUBBO_METADATA +
reportServerURL.getApplication() + "-" +
replace(reportServerURL.getAddress(), ":", "-") + CACHE;
//如果用户配置了缓存文件名字则以用户配置为准file
String filename = reportServerURL.getParameter(FILE_KEY, defaultFilename);
File file = null;
//文件名字不为空
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
//文件和父目录不存在则创建文件目录
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid service store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
// if this file exists, firstly delete it.
//还未初始化则已存在的历史文件删除掉
if (!initialized.getAndSet(true) && file.exists()) {
file.delete();
}
}
//赋值给成员变量后续继续可以用
this.file = file;
//文件存在则直接加载文件中的内容
loadProperties();
//sync-report配置的值为同步配置还异步配置,true是同步配置,默认为false为异步配置
syncReport = reportServerURL.getParameter(SYNC_REPORT_KEY, false);
//重试属性与逻辑也封装了一个类型 创建对象
//retry-times重试次数配置 默认为100次
//retry-period 重试间隔配置 默认为3000
metadataReportRetry = new MetadataReportRetry(reportServerURL.getParameter(RETRY_TIMES_KEY, DEFAULT_METADATA_REPORT_RETRY_TIMES),
reportServerURL.getParameter(RETRY_PERIOD_KEY, DEFAULT_METADATA_REPORT_RETRY_PERIOD));
// cycle report the data switch
//是否定期从元数据中心同步配置
//cycle-report配置默认为true
if (reportServerURL.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT)) {
//开启重试定时器 24个小时间隔从元数据中心同步一次
reportTimerScheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboMetadataReportTimer", true));
reportTimerScheduler.scheduleAtFixedRate(this::publishAll, calculateStartTime(), ONE_DAY_IN_MILLISECONDS, TimeUnit.MILLISECONDS);
}
this.reportMetadata = reportServerURL.getParameter(REPORT_METADATA_KEY, false);
this.reportDefinition = reportServerURL.getParameter(REPORT_DEFINITION_KEY, true);
}
15.2.3.4 内存中元数据自动同步到Zookeeper和本地文件这里来总结下元数据操作的初始化逻辑:首次初始化清理历史元数据文件如:Users/song/.dubbo/dubbo-metadata-dubbo-demo-api-provider-127.0.0.1-2181.cache如果非首次进来则直接加载缓存在本地的缓存文件,赋值给properties成员变量初始化同步配置是否异步(默认为false), sync-report配置的值为同步配置还异步配置,true是同步配置,默认为false为异步配置初始化重试属性是否定期从元数据中心同步配置初始化 默认为true 24小时自动同步一次关于元数据同步可以看AbstractMetadataReport类型的publishAll方法: reportTimerScheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboMetadataReportTimer", true));
reportTimerScheduler.scheduleAtFixedRate(this::publishAll, calculateStartTime(), ONE_DAY_IN_MILLISECONDS, TimeUnit.MILLISECONDS);这里有个方法叫做calculateStartTime 这个代码是随机时间的between 2:00 am to 6:00 am, the time is random. 2点到6点之间启动, 低峰期启动自动同步 返回值:AbstractMetadataReport类型的 void publishAll() {
logger.info("start to publish all metadata.");
this.doHandleMetadataCollection(allMetadataReports);
}AbstractMetadataReport类型的doHandleMetadataCollectionprivate boolean doHandleMetadataCollection(Map<MetadataIdentifier, Object> metadataMap) {
if (metadataMap.isEmpty()) {
return true;
}
Iterator<Map.Entry<MetadataIdentifier, Object>> iterable = metadataMap.entrySet().iterator();
while (iterable.hasNext()) {
Map.Entry<MetadataIdentifier, Object> item = iterable.next();
if (PROVIDER_SIDE.equals(item.getKey().getSide())) {
//提供端的元数据则存储提供端元数据
this.storeProviderMetadata(item.getKey(), (FullServiceDefinition) item.getValue());
} else if (CONSUMER_SIDE.equals(item.getKey().getSide())) {
//消费端的元数据则存储提供端元数据
this.storeConsumerMetadata(item.getKey(), (Map) item.getValue());
}
}
return false;
}提供端元数据的存储:AbstractMetadataReport类型的storeProviderMetadata @Override
public void storeProviderMetadata(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
if (syncReport) {
storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition);
} else {
reportCacheExecutor.execute(() -> storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition));
}
}AbstractMetadataReport类型的storeProviderMetadataTask具体同步代码:storeProviderMetadataTaskprivate void storeProviderMetadataTask(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
try {
if (logger.isInfoEnabled()) {
logger.info("store provider metadata. Identifier : " + providerMetadataIdentifier + "; definition: " + serviceDefinition);
}
allMetadataReports.put(providerMetadataIdentifier, serviceDefinition);
failedReports.remove(providerMetadataIdentifier);
Gson gson = new Gson();
String data = gson.toJson(serviceDefinition);
//内存中的元数据同步到元数据中心
doStoreProviderMetadata(providerMetadataIdentifier, data);
//内存中的元数据同步到本地文件
saveProperties(providerMetadataIdentifier, data, true, !syncReport);
} catch (Exception e) {
// retry again. If failed again, throw exception.
failedReports.put(providerMetadataIdentifier, serviceDefinition);
metadataReportRetry.startRetryTask();
logger.error("Failed to put provider metadata " + providerMetadataIdentifier + " in " + serviceDefinition + ", cause: " + e.getMessage(), e);
}
}
上面代码我们主要看本地内存中的元数据同步到元数据中心和存本地的两个点://内存中的元数据同步到元数据中心
doStoreProviderMetadata(providerMetadataIdentifier, data);
//内存中的元数据同步到本地文件
saveProperties(providerMetadataIdentifier, data, true, //内存中的元数据同步到元数据中心这个方法会调用当前子类重写的具体存储逻辑:这里我们以ZookeeperMetadataReport的doStoreProviderMetadata举例: private void storeMetadata(MetadataIdentifier metadataIdentifier, String v) {
//使用zkClient创建一个节点数据为参数V v是前面说的服务定义数据
zkClient.create(getNodePath(metadataIdentifier), v, false);
}这里参数我们举个例子: 提供者的元数据内容如下:节点路径为:/dubbo/metadata/link.elastic.dubbo.entity.DemoService/provider/dubbo-demo-api-provider格式:/dubbo/metadata前缀服务提供者接口提供者类型provider应用名具体的元数据内容如下:比较详细的记录了应用信息,服务接口信息和服务接口对应的方法信息{
"parameters": {
"side": "provider",
"interface": "link.elastic.dubbo.entity.DemoService",
"pid": "38680",
"application": "dubbo-demo-api-provider",
"dubbo": "2.0.2",
"release": "3.0.8",
"anyhost": "true",
"bind.ip": "192.168.1.9",
"methods": "sayHello,sayHelloAsync",
"background": "false",
"deprecated": "false",
"dynamic": "true",
"service-name-mapping": "true",
"generic": "false",
"bind.port": "20880",
"timestamp": "1653097653865"
},
"canonicalName": "link.elastic.dubbo.entity.DemoService",
"codeSource": "file:/Users/song/Desktop/Computer/A/code/gitee/weaving-a-net/weaving-test/dubbo-test/target/classes/",
"methods": [
{
"name": "sayHello",
"parameterTypes": [
"java.lang.String"
],
"returnType": "java.lang.String",
"annotations": [
]
},
{
"name": "sayHelloAsync",
"parameterTypes": [
"java.lang.String"
],
"returnType": "java.util.concurrent.CompletableFuture",
"annotations": [
]
}
],
"types": [
{
"type": "java.util.concurrent.CompletableFuture",
"properties": {
"result": "java.lang.Object",
"stack": "java.util.concurrent.CompletableFuture.Completion"
}
},
{
"type": "java.lang.Object"
},
{
"type": "java.lang.String"
},
{
"type": "java.util.concurrent.CompletableFuture.Completion",
"properties": {
"next": "java.util.concurrent.CompletableFuture.Completion",
"status": "int"
}
},
{
"type": "int"
}
],
"annotations": [
]
}本地缓存文件的写入 可以看下如下代码AbstractMetadataReport类型的saveProperties方法 private void saveProperties(MetadataIdentifier metadataIdentifier, String value, boolean add, boolean sync) {
if (file == null) {
return;
}
try {
if (add) {
properties.setProperty(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), value);
} else {
properties.remove(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
}
long version = lastCacheChanged.incrementAndGet();
if (sync) {
//获取最新修改版本持久化到磁盘
new SaveProperties(version).run();
} else {
reportCacheExecutor.execute(new SaveProperties(version));
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}主要看如下代码: new SaveProperties(version).run();SaveProperties类型代码如下: private class SaveProperties implements Runnable {
private long version;
private SaveProperties(long version) {
this.version = version;
}
@Override
public void run() {
doSaveProperties(version);
}
}继续看doSaveProperties方法:private void doSaveProperties(long version) {
//不是最新的就不要持久化了
if (version < lastCacheChanged.get()) {
return;
}
if (file == null) {
return;
}
// Save
try {
//创建本地文件锁:
//路径为:
///Users/song/.dubbo/dubbo-metadata-dubbo-demo-api-provider-127.0.0.1-2181.cache.lock
File lockfile = new File(file.getAbsolutePath() + ".lock");
//锁文件不存在则创建锁文件
if (!lockfile.exists()) {
lockfile.createNewFile();
}
//随机访问文件工具类对象创建 读写权限
try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
//文件文件Channel
//返回与此文件关联的唯一FileChannel对象。
FileChannel channel = raf.getChannel()) {
//FileChannel中的lock()与tryLock()方法都是尝试去获取在某一文件上的独有锁(以下简称独有锁),可以实现进程间操作的互斥。区别在于lock()会阻塞(blocking)方法的执行,tryLock()则不会。
FileLock lock = channel.tryLock();
//如果多个线程同时进来未获取锁的则抛出异常
if (lock == null) {
throw new IOException("Can not lock the metadataReport cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.metadata.file=xxx.properties");
}
// Save
try {
//文件不存在则创建本地元数据缓存文件
///Users/song/.dubbo/dubbo-metadata-dubbo-demo-api-provider-127.0.0.1-2181.cache
if (!file.exists()) {
file.createNewFile();
}
Properties tmpProperties;
if (!syncReport) {
// When syncReport = false, properties.setProperty and properties.store are called from the same
// thread(reportCacheExecutor), so deep copy is not required
tmpProperties = properties;
} else {
// Using store method and setProperty method of the this.properties will cause lock contention
// under multi-threading, so deep copy a new container
//异步存储会导致锁争用 使用此的store方法和setProperty方法。属性将导致多线程下的锁争用,因此深度复制新容器
tmpProperties = new Properties();
Set<Map.Entry<Object, Object>> entries = properties.entrySet();
for (Map.Entry<Object, Object> entry : entries) {
tmpProperties.setProperty((String) entry.getKey(), (String) entry.getValue());
}
}
try (FileOutputStream outputFile = new FileOutputStream(file)) {
//Properties类型自带的方法:
//将此属性表中的属性列表(键和元素对)以适合使用load(Reader)方法的格式写入输出字符流。
tmpProperties.store(outputFile, "Dubbo metadataReport Cache");
}
} finally {
lock.release();
}
}
} catch (Throwable e) {
if (version < lastCacheChanged.get()) {
return;
} else {
reportCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
}
//这个代码太诡异了如果是lock失败也会打印异常给人非常疑惑的感觉 后续会修复
logger.warn("Failed to save service store file, cause: " + e.getMessage(), e);
}
}写入文件的内容大致如下link.elastic.dubbo.entity.DemoService:::provider:dubbo-demo-api-provider -> {
"parameters": {
"side": "provider",
"interface": "link.elastic.dubbo.entity.DemoService",
"pid": "41457",
"application": "dubbo-demo-api-provider",
"dubbo": "2.0.2",
"release": "3.0.8",
"anyhost": "true",
"bind.ip": "192.168.1.9",
"methods": "sayHello,sayHelloAsync",
"background": "false",
"deprecated": "false",
"dynamic": "true",
"service-name-mapping": "true",
"generic": "false",
"bind.port": "20880",
"timestamp": "1653100253548"
},
"canonicalName": "link.elastic.dubbo.entity.DemoService",
"codeSource": "file:/Users/song/Desktop/Computer/A/code/gitee/weaving-a-net/weaving-test/dubbo-test/target/classes/",
"methods": [
{
"name": "sayHelloAsync",
"parameterTypes": [
"java.lang.String"
],
"returnType": "java.util.concurrent.CompletableFuture",
"annotations": [
]
},
{
"name": "sayHello",
"parameterTypes": [
"java.lang.String"
],
"returnType": "java.lang.String",
"annotations": [
]
}
],
"types": [
{
"type": "java.util.concurrent.CompletableFuture",
"properties": {
"result": "java.lang.Object",
"stack": "java.util.concurrent.CompletableFuture.Completion"
}
},
{
"type": "java.lang.Object"
},
{
"type": "java.lang.String"
},
{
"type": "java.util.concurrent.CompletableFuture.Completion",
"properties": {
"next": "java.util.concurrent.CompletableFuture.Completion",
"status": "int"
}
},
{
"type": "int"
}
],
"annotations": [
]
}
Dubbo3 源码解读-宋小生-16:模块发布器发布服务全过程
完整电子书下载地址: https://developer.aliyun.com/ebook/7894Dubbo3 已经全面取代 HSF2 成为阿里的下一代服务框架,2022 双十一基于 Dubbo3 首次实现了关键业务不停推、不降级的全面用户体验提升,从技术上,大幅提高研发与运维效率的同时地址推送等关键资源利用率提升超 40%,基于三位一体的开源中间件体系打造了阿里在云上的单元化最佳实践和统一标准,同时将规模化实践经验与技术创新贡献开源社区,极大的推动了开源技术与标准的发展。本文是 Dubbo 社区贡献者宋小生基于 Dubbo3 3.0.8 版本撰写的源码解析博客,在 Dubbo3 开源&内部技术栈统一的情况下,期望能对集团内的开发者了解 Dubbo3 背后的实现原理有所帮助。可点此查看 博客原文 。本篇是宋小生系列 16/30 篇。同时,由 Dubbo3 团队领导的源码解读系列也正在进行中,感兴趣的同学可加入钉钉群了解详情: 2816500319416.1 简介Dubbo做为服务治理框架,比较核心的就是服务相关的概念,这里我先贴个找到的关于Dubbo工作原理的架构图:如果按完整服务启动与订阅的顺序我们可以归结为以下6点:导出服务(提供者)服务提供方通过指定端口对外暴露服务注册服务(提供者)提供方向注册中心注册自己的信息(服务发现)-订阅服务(消费者)服务调用方通过注册中心订阅自己感兴趣的服务(服务发现)-服务推送(消费者)注册中心向调用方推送地址列表调用服务(消费者调用提供者)调用方选择一个地址发起RPC调用监控服务服务提供方和调用方的统计数据由监控模块收集展示上面的完整的服务启动订阅与调用流程不仅仅适用于Dubbo 同样也适用于其他服务治理与发现的模型, 一般服务发现与服务调用的思路就是这样的,我们将以上内容扩展,暴漏服务可以使用http,tcp,udp等各种协议,注册服务可以注册到Redis,Dns,Etcd,Zookeeper等注册中心中,订阅服务可以主动去注册中心查询服务列表,服务发现可以让注册中心将服务数据动态推送给消费者.Dubbo其实就是基于这种简单的服务模型来扩展出各种功能的支持,来满足服务治理的各种场景,了解了这里可能各位同学就想着自行开发一个简单的微服务框架了。回到主题,从以上的服务完整发布调用流程可以看到,所有的功能都是由导出服务(提供者)开始的,只有提供者先提供了服务才可以有真正的服务让消费者调用。之前的博客内容 链接:<<12-全局视野来看Dubbo3.0.8的服务启动生命周期>> 我们了解了 DefaultModuleDeployer模块器启动的流程,其中在start代码的模版方法中开始了导出服务的功能,这里我们来详细看下服务发布的全过程:入口代码: DefaultModuleDeployer的发布服务方法 private void exportServices() {
//从配置管缓存中查询缓存的所有的服务配置然后逐个服务发布
for (ServiceConfigBase sc : configManager.getServices()) {
exportServiceInternal(sc);
}
}16.2 导出服务的入口入口代码: DefaultModuleDeployer的发布服务方法 private void exportServices() {
//从配置管缓存中查询缓存的所有的服务配置然后逐个服务发布
for (ServiceConfigBase sc : configManager.getServices()) {
exportServiceInternal(sc);
}
}主要流程为遍历初始化的服务配置列表然后逐个服务开始到处内部导出服务代码:exportServiceInternal方法: private void exportServiceInternal(ServiceConfigBase sc) {
ServiceConfig<?> serviceConfig = (ServiceConfig<?>) sc;
//服务配置刷新 配置优先级覆盖
if (!serviceConfig.isRefreshed()) {
serviceConfig.refresh();
}
//服务已经导出过了就直接返回
if (sc.isExported()) {
return;
}
//是否异步方式导出 全局配置或者服务级其中一个配置了异步则异步处理
if (exportAsync || sc.shouldExportAsync()) {
//异步其实就是使用线程来导出服务serviceExportExecutor
ExecutorService executor = executorRepository.getServiceExportExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
if (!sc.isExported()) {
sc.export();
exportedServices.add(sc);
}
} catch (Throwable t) {
logger.error(getIdentifier() + " export async catch error : " + t.getMessage(), t);
}
}, executor);
asyncExportingFutures.add(future);
} else {
//同步导出服务
if (!sc.isExported()) {
sc.export();
exportedServices.add(sc);
}
}
}这个逻辑里面做了一些基本的操作,可以直接看注释然后调用ServiceConfig的export的来导出服务,继续往后看服务配置的导出服务方法。16.3 服务配置导出服务模板方法核心的服务导出代码是在服务配置中来做的ServiceConfig的 export() 方法ServiceConfig的 export() 方法代码如下:
@Override
public void export() {
//已经导出过服务直接放那会
if (this.exported) {
return;
}
// ensure start module, compatible with old api usage
//确保模块启动了(基本的初始化操作执行了)
getScopeModel().getDeployer().start();
//悲观锁
synchronized (this) {
//双重校验
if (this.exported) {
return;
}
//配置是否刷新 前面初始化时候已经刷新过配置
if (!this.isRefreshed()) {
this.refresh();
}
//服务导出配置配置为false则不导出
if (this.shouldExport()) {
//服务发布前初始化一下元数据对象
this.init();
if (shouldDelay()) {
//配置了服务的延迟发布配置则走延迟发布逻辑
doDelayExport();
} else {
//导出服务
doExport();
}
}
}
}16.3.1 服务配置导出服务前的初始化方法ServiceConfig 导出服务之前的初始化方法initpublic void init() {
if (this.initialized.compareAndSet(false, true)) {
//加载服务监听器 这里暂时没有服务监听器扩展
// load ServiceListeners from extension
ExtensionLoader<ServiceListener> extensionLoader = this.getExtensionLoader(ServiceListener.class);
this.serviceListeners.addAll(extensionLoader.getSupportedExtensionInstances());
}
//服务提供者配置传递给元数据配置对象 一个服务提供者配置会有一个元数据配置,服务配置
initServiceMetadata(provider);
//元数据
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setTarget(getRef());
//元数据的key格式为 group/服务接口:版本号
serviceMetadata.generateServiceKey();
}16.4 服务配置导出服务模板方法2ServiceConfig 导出服务核心逻辑protected synchronized void doExport() {
//取消发布
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
//已经发布
if (exported) {
return;
}
//服务路径 为空则设置为接口名,本例子中为link.elastic.dubbo.entity.DemoService
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
//导出URL
doExportUrls();
//
exported();
}16.4.1 导出服务的URL配置逻辑ServiceConfig 导出URL核心逻辑 private void doExportUrls() {
//模块服务存储库
ModuleServiceRepository repository = getScopeModel().getServiceRepository();
ServiceDescriptor serviceDescriptor;
//ref为服务实现类型 这里对应我们例子的DemoServiceImpl
final boolean serverService = ref instanceof ServerService;
if(serverService){
serviceDescriptor=((ServerService) ref).getServiceDescriptor();
repository.registerService(serviceDescriptor);
}else{
//我们代码走这个逻辑 注册服务 这个注册不是向注册中心注册 这个是解析服务接口将服务方法等描述信息存放在了服务存储ModuleServiceRepository类型对象的成员变量services中
serviceDescriptor = repository.registerService(getInterfaceClass());
}
//提供者领域模型, 提供者领域模型 封装了一些提供者需要的就基本属性同时内部解析封装方法信息 ProviderMethodModel 列表 , 服务标识符 格式group/服务接:版本号
providerModel = new ProviderModel(getUniqueServiceName(),
//服务实现类DemoServiceImpl
ref,
//服务描述符 描述符里面包含了服务接口的方法信息,不过服务接口通过反射也可以拿到方法信息
serviceDescriptor,
//服务配置
this,
//当前所处模型
getScopeModel(),
//当前服务接口的元数据对象
serviceMetadata);
//模块服务存储库存储提供者模型对象ModuleServiceRepository
repository.registerProvider(providerModel);
//获取配置的注册中心列表 ,同时将注册中心配置转URL (在Dubbo中URL就是配置信息的一种形式)
//这里会获取到两个 由dubbo.application.register-mode 双注册配置决定
//注册中心 registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=9008&registry=zookeeper&release=3.0.8&timestamp=1653703292768
//service-discovery-registry://8.131.79.126:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=10275&registry=zookeeper&release=3.0.8&timestamp=1653704425920
//参数dubbo是dubbo协议的版本不是Dubbo版本 Dubbo RPC protocol version, for compatibility, it must not be between 2.0.10 ~ 2.6.2
//这里后面详细说下 服务双注册 dubbo.application.register-mode
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// stub service will use generated service name
if(!serverService) {
// In case user specified path, register service one more time to map it to path.
//模块服务存储库ModuleServiceRepository存储服务接口信息
repository.registerService(pathKey, interfaceClass);
}
//导出根据协议导出配置到注册中心
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
16.4.2 应用级和接口级服务注册地址获取这里主要看下注册中心的获取,这里涉及到服务的双注册配置List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);关于loadRegistries方法的详情我们就不看了主要看loadRegistries方法中调用的genCompatibleRegistries添加服务发现注册中心 /**
* @param scopeModel 域模型
* @param registryList 配置的注册中心列表 例如:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=9008&registry=zookeeper&release=3.0.8&timestamp=1653703292768
*
* @param provider 是否为服务提供者 这里Demo为true
*/
private static List<URL> genCompatibleRegistries(ScopeModel scopeModel, List<URL> registryList, boolean provider) {
List<URL> result = new ArrayList<>(registryList.size());
//遍历所有的注册中心 为每个注册中心增加兼容的服务发现注册中心地址配置
registryList.forEach(registryURL -> {
//是否为提供者
if (provider) {
// for registries enabled service discovery, automatically register interface compatible addresses.
String registerMode;
//注册协议配置了service-discovery-registry 走这个逻辑
//前面这个逻辑是直接接给result结果中添加应用级注册,如果是all配置则增加接口级注册信息
if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
registerMode = registryURL.getParameter(REGISTER_MODE_KEY, ConfigurationUtils.getCachedDynamicProperty(scopeModel, DUBBO_REGISTER_MODE_DEFAULT_KEY, DEFAULT_REGISTER_MODE_INSTANCE));
if (!isValidRegisterMode(registerMode)) {
registerMode = DEFAULT_REGISTER_MODE_INSTANCE;
}
//这里配置的就是应用级配置 则先添加应用级地址,再根据条件判断是否添加接口级注册中心地址
result.add(registryURL);
if (DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(registerMode)
&& registryNotExists(registryURL, registryList, REGISTRY_PROTOCOL)) {
URL interfaceCompatibleRegistryURL = URLBuilder.from(registryURL)
.setProtocol(REGISTRY_PROTOCOL)
.removeParameter(REGISTRY_TYPE_KEY)
.build();
result.add(interfaceCompatibleRegistryURL);
}
} else {
//正常情况下我们的配置会走这个逻辑
// 获取服务注册的注册模式 配置为dubbo.application.register-mode 默认值为all 既注册接口数据又注册应用级信息
registerMode = registryURL.getParameter(REGISTER_MODE_KEY, ConfigurationUtils.getCachedDynamicProperty(scopeModel, DUBBO_REGISTER_MODE_DEFAULT_KEY, DEFAULT_REGISTER_MODE_ALL));
if (!isValidRegisterMode(registerMode)) {
registerMode = DEFAULT_REGISTER_MODE_INTERFACE;
}
//根据逻辑条件判断是否添加应用级注册中心地址
if ((DEFAULT_REGISTER_MODE_INSTANCE.equalsIgnoreCase(registerMode) || DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(registerMode))
&& registryNotExists(registryURL, registryList, SERVICE_REGISTRY_PROTOCOL)) {
URL serviceDiscoveryRegistryURL = URLBuilder.from(registryURL)
.setProtocol(SERVICE_REGISTRY_PROTOCOL)
.removeParameter(REGISTRY_TYPE_KEY)
.build();
result.add(serviceDiscoveryRegistryURL);
}
//根据逻辑条件判断是否添加接口级注册中心地址
if (DEFAULT_REGISTER_MODE_INTERFACE.equalsIgnoreCase(registerMode) || DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(registerMode)) {
result.add(registryURL);
}
}
FrameworkStatusReportService reportService = ScopeModelUtil.getApplicationModel(scopeModel).getBeanFactory().getBean(FrameworkStatusReportService.class);
reportService.reportRegistrationStatus(reportService.createRegistrationReport(registerMode));
} else {
result.add(registryURL);
}
});
return result;
}这个方法是根据服务注册模式来判断使用接口级注册地址还是应用级注册地址分别如下所示:配置信息:dubbo.application.register-mode配置值:interface接口级注册instance应用级注册all接口级别和应用级都注册接口级注册地址:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=9008&registry=zookeeper&release=3.0.8&timestamp=1653703292768应用级注册地址:service-discovery-registry://8.131.79.126:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=10275&registry=zookeeper&release=3.0.8&timestamp=1653704425920## 16.5 导出服务配置到本地和注册中心 doExportUrlsFor1Protocol(protocolConfig, registryURLs);protocolConfig为:dubbo协议的配置<dubbo:protocol port="-1" name="dubbo" />registryURLs目前有两个 应用级注册地址和接口级注册地址:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=9008&registry=zookeeper&release=3.0.8&timestamp=1653703292768service-discovery-registry://8.131.79.126:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=10275&registry=zookeeper&release=3.0.8&timestamp=165370442592016.5.1 导出服务配置的doExportUrlsFor1Protocol方法 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//生成协议配置具体可见下图中的元数据配置中的attachments
Map<String, String> map = buildAttributes(protocolConfig);
// remove null key and null value
//移除空值 简化配置
map.keySet().removeIf(key -> key == null || map.get(key) == null);
// init serviceMetadata attachments
//协议配置放到元数据对象中
serviceMetadata.getAttachments().putAll(map);
//协议配置 + 默认协议配置转URL类型的配置存储
URL url = buildUrl(protocolConfig, map);
//导出url
exportUrl(url, registryURLs);
}
16.5.2 导出服务配置模板方法继续看导出服务的模板方法,分为本地导出和注册中心导出//参数url为协议配置url可以参考:dubbo://192.168.1.9:20880/link.elastic.dubbo.entity.DemoService?anyhost=true&application=dubbo-demo-api-provider&background=false&bind.ip=192.168.1.9&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=link.elastic.dubbo.entity.DemoService&methods=sayHello,sayHelloAsync&pid=10953&release=3.0.8&side=provider&timestamp=1653705630518private void exportUrl(URL url, List<URL> registryURLs) {
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
//未明确指定远程导出 则开启本地导出
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
//未明确指定本地导出 则开启远程导出
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
url = exportRemote(url, registryURLs);
if (!isGeneric(generic) && !getScopeModel().isInternal()) {
MetadataUtils.publishServiceDefinition(url, providerModel.getServiceModel(), getApplicationModel());
}
}
}
this.urls.add(url);
}16.6 导出服务到本地本地调用使用了 injvm 协议,是一个伪协议,它不开启端口,不发起远程调用,只在 JVM 内直接关联,但执行 Dubbo 的 Filter 链。直接通过代码来看吧 private void exportLocal(URL url) {
//协议转为injvm 代表本地导出 host为127.0.0.1
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
local = local.setScopeModel(getScopeModel())
.setServiceModel(providerModel);
doExportUrl(local, false);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}16.6.1 doExportUrl方法private void doExportUrl(URL url, boolean withMetaData) {
//这里是由adaptor扩展类型处理过的 我们直接看默认的类型javassist 对应JavassistProxyFactory代理工厂 获取调用对象 (
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
if (withMetaData) {
invoker = new DelegateProviderMetaDataInvoker(invoker, this);
}
Exporter<?> exporter = protocolSPI.export(invoker);
exporters.add(exporter);
}16.6.2 JavassistProxyFactory类型的getInvoker方法@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
try {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
// 创建实际服务提供者的代理类型,代理类型后缀为DubboWrap在这里类型为 link.elastic.dubbo.entity.DemoServiceImplDubboWrap0
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
//创建一个匿名内部类对象 继承自AbstractProxyInvoker的Invoker对象
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
} catch (Throwable fromJavassist) {
// try fall back to JDK proxy factory
...
}
}
}16.6.3 使用协议导出调用对象 export Exporter<?> exporter = protocolSPI.export(invoker);这个使用了Adaptor扩展和Wrapper机制Debug起来不太方便这里贴一下调用堆栈16.6.3.1 协议序列化机制ProtocolSerializationWrapper @Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//这里主要逻辑是将服务提供者url添加到服务存储仓库中
getFrameworkModel(invoker.getUrl().getScopeModel()).getServiceRepository().registerProviderUrl(invoker.getUrl());
return protocol.export(invoker);
}16.6.3.2 协议过滤器Wrapper ProtocolFilterWrapper @Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//注册中心的协议导出直接执行
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
//过滤器调用链FilterChainBuilder的扩展对象查询
FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());
//这里分为2步 生成过滤器调用链 然后使用链表中的节点调用 这里值查询provider类型的过滤器
return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}过滤器调用链的生成 对用DefaultFilterChainBuilder类型的buildInvokerChain方法@Override
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
//originalInvoker代表真正的服务调用器
Invoker<T> last = originalInvoker;
URL url = originalInvoker.getUrl();
List<ModuleModel> moduleModels = getModuleModelsFromUrl(url);
List<Filter> filters;
if (moduleModels != null && moduleModels.size() == 1) {
//类型Filter key为service.filter 分组为provider 所有提供者过滤器拉取
filters = ScopeModelUtil.getExtensionLoader(Filter.class, moduleModels.get(0)).getActivateExtension(url, key, group);
} else if (moduleModels != null && moduleModels.size() > 1) {
filters = new ArrayList<>();
List<ExtensionDirector> directors = new ArrayList<>();
for (ModuleModel moduleModel : moduleModels) {
List<Filter> tempFilters = ScopeModelUtil.getExtensionLoader(Filter.class, moduleModel).getActivateExtension(url, key, group);
filters.addAll(tempFilters);
directors.add(moduleModel.getExtensionDirector());
}
filters = sortingAndDeduplication(filters, directors);
} else {
filters = ScopeModelUtil.getExtensionLoader(Filter.class, null).getActivateExtension(url, key, group);
}
//倒序拼接,将过滤器的调用对象添加到链表中 最后倒序遍历之后 last节点指向了调用链路链表头节点的对象
if (!CollectionUtils.isEmpty(filters)) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
//每个invoker对象中都有originalInvoker对象
last = new CopyOfFilterChainNode<>(originalInvoker, next, filter);
}
return new CallbackRegistrationInvoker<>(last, filters);
}
return last;
}16.6.3.3 协议监听器Wrapper ProtocolListenerWrapper @Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//注册中心地址则直接导出
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
// 先导出对象 再创建过滤器包装对象 执行监听器逻辑
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ScopeModelUtil.getExtensionLoader(ExporterListener.class, invoker.getUrl().getScopeModel())
.getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
}16.6.3.4 QOS的协议Wrapper QosProtocolWrapper@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//注册中心导出的时候开启QOS 默认端口22222
if (UrlUtils.isRegistry(invoker.getUrl())) {
startQosServer(invoker.getUrl());
return protocol.export(invoker);
}
return protocol.export(invoker);
}16.6.3.5 InjvmProtocol 的导出方法 @Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
16.7 导出服务到注册中心16.5.2 导出服务配置模板方法 中我们看到了服务导出会导出到本地和远程,接下来就看下导出到远程的方法exportRemote 参数url:dubbo://192.168.1.9:20880/link.elastic.dubbo.entity.DemoService?anyhost=true&application=dubbo-demo-api-provider&background=false&bind.ip=192.168.1.9&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=link.elastic.dubbo.entity.DemoService&methods=sayHello,sayHelloAsync&pid=12865&release=3.0.8&side=provider&timestamp=1653708351378参数registryURLs目前有两个 应用级注册地址和接口级注册地址:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=9008&registry=zookeeper&release=3.0.8&timestamp=1653703292768service-discovery-registry://8.131.79.126:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=10275&registry=zookeeper&release=3.0.8&timestamp=1653704425920private URL exportRemote(URL url, List<URL> registryURLs) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
//遍历所有注册地址与注册模式 逐个注册
for (URL registryURL : registryURLs) {
//为协议URL 添加应用级注册service-discovery-registry参数service-name-mapping为true
if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true");
}
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
//为协议url 添加动态配置dynamic
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
//监控配置暂时为null
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.putAttribute(MONITOR_KEY, monitorUrl);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
//开始注册服务了 打印个认知 提示下
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL.getAddress());
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true);
}
} else {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
doExportUrl(url, true);
}
return url;
}16.7.1 doExportUrl方法与 16.6.1 doExportUrl方法 导出本地协议是一样的逻辑 ,我们来看看点不同地方private void doExportUrl(URL url, boolean withMetaData) {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
if (withMetaData) {
//远程服务导出逐个值为true 元数据invoker包装一下
invoker = new DelegateProviderMetaDataInvoker(invoker, this);
}
Exporter<?> exporter = protocolSPI.export(invoker);
exporters.add(exporter);
}与本地导出ProtocolFilterWrapper的不同之处 服务发现service-discovery-registry的导出UrlUtils.isRegistry(invoker.getUrl() 判断结果为true会走这个逻辑 @Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//注册中心的协议导出直接执行
// 服务发现service-discovery-registry的导出会走这个逻辑
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
//过滤器调用链FilterChainBuilder的扩展对象查询
FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());
//这里分为2步 生成过滤器调用链 然后使用链表中的节点调用 这里值查询provider类型的过滤器
return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}与 协议监听器Wrapper ProtocolListenerWrapper 的不同之处服务发现service-discovery-registry的导出UrlUtils.isRegistry(invoker.getUrl() 判断结果为true会走这个逻辑 @Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//注册中心地址则直接导出
// 服务发现service-discovery-registry的导出会走这个逻辑
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
// 先导出对象 再创建过滤器包装对象 执行监听器逻辑
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ScopeModelUtil.getExtensionLoader(ExporterListener.class, invoker.getUrl().getScopeModel())
.getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
}与 16.6.3.4 QOS的协议Wrapper QosProtocolWrapper 不同之处服务发现service-discovery-registry的导出UrlUtils.isRegistry(invoker.getUrl() 判断结果为true会走这个逻辑@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//注册中心导出的时候开启QOS 默认端口22222
if (UrlUtils.isRegistry(invoker.getUrl())) {
startQosServer(invoker.getUrl());
return protocol.export(invoker);
}
return protocol.export(invoker);
}启动QOS服务startQosServerprivate void startQosServer(URL url) {
try {
if (!hasStarted.compareAndSet(false, true)) {
return;
}
boolean qosEnable = url.getParameter(QOS_ENABLE, true);
if (!qosEnable) {
logger.info("qos won't be started because it is disabled. " +
"Please check dubbo.application.qos.enable is configured either in system property, " +
"dubbo.properties or XML/spring-boot configuration.");
return;
}
String host = url.getParameter(QOS_HOST);
int port = url.getParameter(QOS_PORT, QosConstants.DEFAULT_PORT);
boolean acceptForeignIp = Boolean.parseBoolean(url.getParameter(ACCEPT_FOREIGN_IP, "false"));
Server server = frameworkModel.getBeanFactory().getBean(Server.class);
server.setHost(host);
server.setPort(port);
server.setAcceptForeignIp(acceptForeignIp);
server.start();
} catch (Throwable throwable) {
logger.warn("Fail to start qos server: ", throwable);
}QOS的Server的启动方法startpublic void start() throws Throwable {
if (!started.compareAndSet(false, true)) {
return;
}
//1个主线程
boss = new NioEventLoopGroup(1, new DefaultThreadFactory("qos-boss", true));
//0个从线程
worker = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-worker", true));
//服务端启动器,和参数设置
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.option(ChannelOption.SO_REUSEADDR, true);
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new QosProcessHandler(frameworkModel, welcome, acceptForeignIp));
}
});
try {
if (StringUtils.isBlank(host)) {
serverBootstrap.bind(port).sync();
} else {
serverBootstrap.bind(host, port).sync();
}
logger.info("qos-server bind localhost:" + port);
} catch (Throwable throwable) {
logger.error("qos-server can not bind localhost:" + port, throwable);
throw throwable;
}
}QOS处理器为QosProcessHandler关于QosProcessHandler的细节这里先不说最后一个不同的地方调用链路走的这个 RegistryProtocol16.7.2 通过注册协议导出服务与注册服务的流程RegistryProtocol的导出方法:这个方法非常重要也是服务注册的核心代码,先概括下包含了哪些步骤覆盖配置导出协议端口开启TCP服务注册到注册中心通知服务启动了 @Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//service-discovery-registry://8.131.79.126:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=14256&registry=zookeeper&release=3.0.8&timestamp=1653710477057
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
//dubbo://192.168.1.9:20880/link.elastic.dubbo.entity.DemoService?anyhost=true&application=dubbo-demo-api-provider&background=false&bind.ip=192.168.1.9&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=link.elastic.dubbo.entity.DemoService&methods=sayHello,sayHelloAsync&pid=14256&release=3.0.8&service-name-mapping=true&side=provider&timestamp=1653710479073
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
//provider://192.168.1.9:20880/link.elastic.dubbo.entity.DemoService?anyhost=true&application=dubbo-demo-api-provider&background=false&bind.ip=192.168.1.9&bind.port=20880&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=link.elastic.dubbo.entity.DemoService&methods=sayHello,sayHelloAsync&pid=14256&release=3.0.8&service-name-mapping=true&side=provider&timestamp=1653710479073
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
//override配置
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
Map<URL, NotifyListener> overrideListeners = getProviderConfigurationListener(providerUrl).getOverrideListeners();
overrideListeners.put(registryUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
//通过URL获取 注册中心Registry操作对象
final Registry registry = getRegistry(registryUrl);
//需要向注册中心注册地址转换
//dubbo://192.168.1.9:20880/link.elastic.dubbo.entity.DemoService?anyhost=true&application=dubbo-demo-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=link.elastic.dubbo.entity.DemoService&methods=sayHello,sayHelloAsync&pid=14656&release=3.0.8&service-name-mapping=true&side=provider&timestamp=1653711086189
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish (provider itself and registry should both need to register)
boolean register = providerUrl.getParameter(REGISTER_KEY, true) && registryUrl.getParameter(REGISTER_KEY, true);
//是否向注册中心注册
if (register) {
register(registry, registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
if (!registry.isServiceDiscovery()) {
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
}
//内置监听器通知 这个不是通知消费者的
notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}16.8 doLocalExport本地导出协议开启端口前面已经看过了本地协议JVM协议的服务导出和注册中心配置的导出,这里可以直接看一些关键代码: private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
//代码中用的这个protoco对象是dubbo自动生成的适配器对象protocol$Adaptive 适配器对象会根据当前协议的参数来查询具体的协议扩展对象
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}上面这个protocol$Adaptive 协议的export导出方法与之前的一样也会经历下面几个过程,具体细节可以参考JVM协议的导出:ProtocolSerializationWrapperProtocolFilterWrapperProtocolListenerWrapperQosProtocolWrapper唯一不同的是我们这里对应的协议扩展类型为DubboProtocol、接下来来看下DubboProtocol的导出服务export方法实现: @Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
checkDestroyed();
//服务提供者的url参考例子dubbo://192.168.1.9:20880/link.elastic.dubbo.entity.DemoService?anyhost=true&application=dubbo-demo-api-provider&background=false&bind.ip=192.168.1.9&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=link.elastic.dubbo.entity.DemoService&methods=sayHello,sayHelloAsync&pid=6043&release=3.0.8&service-name-mapping=true&side=provider&timestamp=1654224285437
URL url = invoker.getUrl();
// export service.
//生成服务的key参考:link.elastic.dubbo.entity.DemoService:20880
String key = serviceKey(url);
//创建导出服务用的导出器DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
//export a stub service for dispatching event
//stub配置校验
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
}
}
//创建服务开启服务端口
openServer(url);
//
optimizeSerialization(url);
return exporter;
}开启服务端口这里就到了RPC协议的TCP通信模块了,对应DubboProtocol 的 openServer(url);方法 private void openServer(URL url) {
checkDestroyed();
// find server. 地址作为key这里是192.168.1.9:20880
String key = url.getAddress();
// client can export a service which only for server to invoke
//默认提供者开启服务,消费者是不能开启服务的
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
//协议服务器 下面一个双重校验锁检查,如果为空则创建服务
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}else {
server.reset(url);
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}为当前地址创建协议服务对应方法如下:DubboProtocol的createServer方法private ProtocolServer createServer(URL url) {
//下面将url增加了心跳参数最终如下dubbo://192.168.1.9:20880/link.elastic.dubbo.entity.DemoService?anyhost=true&application=dubbo-demo-api-provider&background=false&bind.ip=192.168.1.9&bind.port=20880&channel.readonly.sent=true&codec=dubbo&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&heartbeat=60000&interface=link.elastic.dubbo.entity.DemoService&methods=sayHello,sayHelloAsync&pid=6700&release=3.0.8&service-name-mapping=true&side=provider&timestamp=1654225251112
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
//这里服务端使用的网络库这里是默认值netty
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (StringUtils.isNotEmpty(str) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
//dubbo交换器层对象创建
ExchangeServer server;
try {
//这个方法会绑定端口,关于交换器与传输网络层到后面统一说
//这里通过绑定url和请求处理器来创建交换器对象
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(CLIENT_KEY);
if (StringUtils.isNotEmpty(str)) {
Set<String> supportedTypes = url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
DubboProtocolServer protocolServer = new DubboProtocolServer(server);
//关闭等待时长默认为10秒
loadServerProperties(protocolServer);
return protocolServer;
}16.9 向注册中心注册服务register这个细节在下个博客中说涉及到Dubbo3的双注册
Dubbo3 源码解读-宋小生-17:Dubbo服务提供者的双注册原理
完整电子书下载地址: https://developer.aliyun.com/ebook/7894Dubbo3 已经全面取代 HSF2 成为阿里的下一代服务框架,2022 双十一基于 Dubbo3 首次实现了关键业务不停推、不降级的全面用户体验提升,从技术上,大幅提高研发与运维效率的同时地址推送等关键资源利用率提升超 40%,基于三位一体的开源中间件体系打造了阿里在云上的单元化最佳实践和统一标准,同时将规模化实践经验与技术创新贡献开源社区,极大的推动了开源技术与标准的发展。本文是 Dubbo 社区贡献者宋小生基于 Dubbo3 3.0.8 版本撰写的源码解析博客,在 Dubbo3 开源&内部技术栈统一的情况下,期望能对集团内的开发者了解 Dubbo3 背后的实现原理有所帮助。可点此查看 博客原文 。本篇是宋小生系列 17/30 篇。同时,由 Dubbo3 团队领导的源码解读系列也正在进行中,感兴趣的同学可加入钉钉群了解详情: 2816500319417.1 简介上个博客《15-Dubbo的三大中心之元数据中心源码解析》导出服务端的时候多次提到了元数据中心,注册信息的注册。Dubbo3出来时间不太长,对于现在的用户来说大部分使用的仍旧是Dubbo2.x,Dubbo3 比较有特色也是会直接使用到的功能就是应用级服务发现:应用级服务发现从服务/接口粒度到应用粒度的升级,使得 Dubbo 在集群可伸缩性、连接异构微服务体系上更具优势。应用粒度能以更低的资源消耗支持超百万实例规模集群程; 实现与 Spring Cloud、Kubernetes Service 等异构微服务体系的互联互通。对于直接使用Dubbo3的用户还好,可以仅仅开启应用级注册,但是对于Dubbo2.x的用户升级到Dubbo3的用户来说前期都是要开启双注册来慢慢迁移的,既注册传统的接口信息到注册中心,又注册应用信息到注册中心,同时注册应用与接口关系的元数据信息。关于双注册与服务迁移的过程的使用可以参考官网:应用级地址发现迁移指南关于官网提供者双注册的图我这里贴一下,方便了解:17.2 双注册配置的读取17.2.1 注册中心地址作为元数据中心这个配置的解析过程在前面的博客介绍元数据中心的时候很详细的说了相关链接:15-Dubbo的三大中心之元数据中心源码解析对应代码位于:DefaultApplicationDeployer类型的startMetadataCenter()方法private void startMetadataCenter() {
//如果未配置元数据中心的地址等配置则使用注册中心的地址等配置做为元数据中心的配置
useRegistryAsMetadataCenterIfNecessary();
//...省略掉其他代码防止受到干扰
}
具体逻辑是这个方法:useRegistryAsMetadataCenterIfNecessaryprivate void useRegistryAsMetadataCenterIfNecessary() {
//配置缓存中查询元数据配置
Collection<MetadataReportConfig> metadataConfigs = configManager.getMetadataConfigs();
//...省略掉空判断
//查询是否有注册中心设置了默认配置isDefault 设置为true的注册中心则为默认注册中心列表,如果没有注册中心设置为默认注册中心,则获取所有未设置默认配置的注册中心列表
List<RegistryConfig> defaultRegistries = configManager.getDefaultRegistries();
if (defaultRegistries.size() > 0) {
//多注册中心遍历
defaultRegistries
.stream()
//筛选符合条件的注册中心 (筛选逻辑就是查看是否有对应协议的扩展支持)
.filter(this::isUsedRegistryAsMetadataCenter)
//注册中心配置映射为元数据中心 映射就是获取需要的配置
.map(this::registryAsMetadataCenter)
//将元数据中心配置存储在配置缓存中方便后续使用
.forEach(metadataReportConfig -> {
//...省略掉具体的逻辑
});
}
}关于元数据中心地址的获取,主要经过如下逻辑:查询: 所有可用的默认注册中心列表遍历: 多注册中心遍历筛选: 选符合条件的注册中心 (筛选逻辑就是查看是否有对应协议的扩展支持)转化: 注册中心配置RegistryConfig映射转换为元数据中心配置类型MetadataReportConfigMetadataReportConfig 映射就是获取需要的配置。最后会把查询到的元数据中心配置存储在配置缓存中方便后续使用。17.2.2 双注册模式配置双注册配置类型是这个dubbo.application.register-mode=all默认值为all代表应用级注册和接口级注册,当前在完全迁移到应用级注册之后可以将服务直接迁移到应用级配置上去。配置值解释:all 双注册instance 应用级注册interface 接口级注册后面的代码如果想要看更详细的代码可以看博客《16-模块发布器发布服务全过程》关于这个配置的使用我们详细来看下,在Dubbo服务注册时候会先通过此配置查询需要注册服务地址,具体代码位于ServiceConfig的doExportUrls()方法中:private void doExportUrls() {
//省略掉前面的代码...
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
//省略掉后面的代码...
}然后就是具体注册中心地址的获取过程我们看下:ConfigValidationUtils的加载注册中心地址方法loadRegistries public static List<URL> loadRegistries(AbstractInterfaceConfig interfaceConfig, boolean provider) {
// check && override if necessary
//省略掉前面的代码...
//这里会获取到一个接口配置注册地址例如:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=9008&registry=zookeeper&release=3.0.8&timestamp=1653703292768
List<RegistryConfig> registries = interfaceConfig.getRegistries();
//省略掉中间的的代码...
return genCompatibleRegistries(interfaceConfig.getScopeModel(), registryList, provider);
}ConfigValidationUtils的双注册地址的获取genCompatibleRegistries方法.前面代码获取到了一个注册中心地址列表例如:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=9008&registry=zookeeper&release=3.0.8&timestamp=1653703292768下面可以看下如果根据配置来转换为应用级注册地址+接口级注册地址private static List<URL> genCompatibleRegistries(ScopeModel scopeModel, List<URL> registryList, boolean provider) {
List<URL> result = new ArrayList<>(registryList.size());
registryList.forEach(registryURL -> {
if (provider) {
// for registries enabled service discovery, automatically register interface compatible addresses.
String registerMode;
if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
//为了更好理解这里简化掉服务发现注册地址配置的逻辑判断过程仅仅看当前例子提供的值走的逻辑
} else {
//双注册模式配置查询 对应参数为dubbo.application.register-mode 默认值为all
registerMode = registryURL.getParameter(REGISTER_MODE_KEY, ConfigurationUtils.getCachedDynamicProperty(scopeModel, DUBBO_REGISTER_MODE_DEFAULT_KEY, DEFAULT_REGISTER_MODE_ALL));
//如果用户配置了一个错误的注册模式配置则只走接口级配置 这里默认值为interface
if (!isValidRegisterMode(registerMode)) {
registerMode = DEFAULT_REGISTER_MODE_INTERFACE;
}
//这个逻辑是满足应用级注册就添加一个应用级注册的地址
if ((DEFAULT_REGISTER_MODE_INSTANCE.equalsIgnoreCase(registerMode) || DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(registerMode))
&& registryNotExists(registryURL, registryList, SERVICE_REGISTRY_PROTOCOL)) {
URL serviceDiscoveryRegistryURL = URLBuilder.from(registryURL)
.setProtocol(SERVICE_REGISTRY_PROTOCOL)
.removeParameter(REGISTRY_TYPE_KEY)
.build();
result.add(serviceDiscoveryRegistryURL);
}
//这个逻辑是满足接口级注册配置就添加一个接口级注册地址
if (DEFAULT_REGISTER_MODE_INTERFACE.equalsIgnoreCase(registerMode) || DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(registerMode)) {
result.add(registryURL);
}
}
//省略掉若干代码和括号
return result;
}可以看到这里简化的配置比较容易理解了双注册模式配置查询 对应参数为dubbo.application.register-mode ,默认值为all如果用户配置了一个错误的注册模式配置则只走接口级配置 这里默认值为interface满足应用级注册就添加一个应用级注册的地址满足接口级注册配置就添加一个接口级注册地址这个方法是根据服务注册模式来判断使用接口级注册地址还是应用级注册地址分别如下所示:配置信息:dubbo.application.register-mode配置值:interface接口级注册instance应用级注册all接口级别和应用级都注册最终的注册地址配置如下:接口级注册地址:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=9008&registry=zookeeper&release=3.0.8&timestamp=1653703292768应用级注册地址:service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=10275&registry=zookeeper&release=3.0.8&timestamp=165370442592017.3 双注册服务数据的注册17.3.1 双注册代码逻辑调用简介前面说了这个注册服务的配置地址会由Dubbo内部进行判断如果判断是all的话会自动将一个配置的注册地址转变为两个一个是传统的接口级注册,一个是应用级注册使用的配置地址然后我们先看注册中心,注册服务数据的源码如果想要查看源码细节可以在RegistryProtocol类型的export(final Invoker originInvoker) 方法的如下代码位置打断点:RegistryProtocol的export方法的注册中心注册数据代码如下: // url to registry 注册服务对外的接口
//如果url为service-discovery-registry发现则这个实现类型为ServiceDiscoveryRegistry
final Registry registry = getRegistry(registryUrl);
//服务发现的提供者url: dubbo://192.168.1.9:20880/link.elastic.dubbo.entity.DemoService?anyhost=true&application=dubbo-demo-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=link.elastic.dubbo.entity.DemoService&methods=sayHello,sayHelloAsync&pid=19559&release=3.0.8&service-name-mapping=true&side=provider&timestamp=1654938441023
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish (provider itself and registry should both need to register)
//register参数是否 注册数据到注册中心
boolean register = providerUrl.getParameter(REGISTER_KEY, true) && registryUrl.getParameter(REGISTER_KEY, true);
if (register) {
//这里有两种情况 接口级注册会将接口级服务提供者数据直接注册到Zookeper上面,服务发现(应用级注册)这里仅仅会将注册数据转换为服务元数据等后面来发布元数据
register(registry, registeredProviderUrl);
}
// register stated url on provider model
//向提供者模型注册提供者配置ProviderModel
registerStatedUrl(registryUrl, registeredProviderUrl, register);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
if (!registry.isServiceDiscovery()) {
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
}在上个博客中我们整体说了下服务注册时候的一个流程,关于数据向注册中心的注册细节这里可以详细看下17.3.2 注册中心领域对象的初始化前面的代码使用url来获取注册中心操作对象如下调用代码:// url to registry 注册服务对外的接口
final Registry registry = getRegistry(registryUrl);对应代码如下: protected Registry getRegistry(final URL registryUrl) {
//这里分为两步先获取注册中心工厂对象
RegistryFactory registryFactory = ScopeModelUtil.getExtensionLoader(RegistryFactory.class, registryUrl.getScopeModel()).getAdaptiveExtension();
//使用注册中心工厂对象获取注册中心操作对象
return registryFactory.getRegistry(registryUrl);
}关于参数URL有两个在前面已经说过,url信息如下:接口级注册地址:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=9008&registry=zookeeper&release=3.0.8&timestamp=1653703292768应用级注册地址:service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=10275&registry=zookeeper&release=3.0.8&timestamp=1653704425920注册中心工厂对象与注册中心操作对象的获取与执行我们通过Debug来看比较麻烦,这里涉及到很多扩展机制动态生成的代码我们无法看到,这里我直接来贴一下比较关键的一些类型,以Zookeeper注册中心来举例子: 先来看下注册工厂相关的类型:RegistryFactory 注册中心对象获取AbstractRegistryFactory 模板类型封装注册中心对象获取的基本逻辑,比如缓存和基本的逻辑判断ServiceDiscoveryRegistryFactory 用于创建服务发现注册中心工厂对象 用于创建ServiceDiscoveryRegistry对象ZookeeperRegistryFactory 用于创建ZookeeperRegistry类型对象NacosRegistryFactory Nacos注册中心工厂对象 用于创建NacosRegistry接下来看封装了注册中心操作逻辑的注册中心领域对象:Node 节点信息开放接口 比如节点 url的获取 ,销毁RegistryService 注册服务接口,比如注册,订阅,查询等操作Registry 注册中心接口,是否服务发现查询,注册,取消注册方法AbstractRegistry 注册中心逻辑抽象模板类型,封装了注册,订阅,通知的基本逻辑,和本地缓存注册中心信息的基本逻辑FailbackRegistry 封装了失败重试的逻辑NacosRegistry 封装了以nacos作为注册中心的基本逻辑ServiceDiscoveryRegistry 应用级服务发现注册中心逻辑,现在不需要这种网桥实现,协议可以直接与服务发现交互。ServiceDiscoveryRegistry是一种非常特殊的注册表实现,用于以兼容的方式将旧的接口级服务发现模型与3.0中引入的新服务发现模型连接起来。它完全符合注册表SPI的扩展规范,但与zookeeper和Nacos的具体实现不同,因为它不与任何真正的第三方注册表交互,而只与过程中ServiceDiscovery的相关组件交互。简而言之,它架起了旧接口模型和新服务发现模型之间的桥梁:register()方法主要通过与MetadataService交互,将接口级数据聚合到MetadataInfo中subscribe() 触发应用程序级服务发现模型的整个订阅过程。-根据ServiceNameMapping将接口映射到应用程序。-启动新的服务发现侦听器(InstanceListener),并使NotifierListener成为InstanceListener的一部分。CacheableFailbackRegistry 提供了一些本地内存缓存的逻辑 对注册中心有用,注册中心的sdk将原始字符串作为提供程序实例返回,例如zookeeper和etcdZookeeperRegistry Zookeeper作为注册中心的基本操作逻辑封装了解了这几个领域对象这里我们回到代码逻辑,这里直接看将会执行的一些核心逻辑: protected Registry getRegistry(final URL registryUrl) {
//这里分为两步先获取注册中心工厂对象
RegistryFactory registryFactory = ScopeModelUtil.getExtensionLoader(RegistryFactory.class, registryUrl.getScopeModel()).getAdaptiveExtension();
//使用注册中心工厂对象获取注册中心操作对象
return registryFactory.getRegistry(registryUrl);
}前面注册中心工厂不论那种协议的地址信息获取到的都是一个RegistryFactory$Adaptive类型(由扩展机制的字节码工具自动生成的代码)如果getRegistry参数为应用级注册地址。如下所示将获取到的类型为ServiceDiscoveryRegistryFactory逻辑来获取注册中心:(这个逻辑是@Adaptive注解产生的了逻辑具体原理可以看扩展机制中@Adaptive的实现)service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=10275&registry=zookeeper&release=3.0.8&timestamp=1653704425920getRegistry方法优先走的逻辑是这里:AbstractRegistryFactory模板类型中的getRegistry方法@Override
public Registry getRegistry(URL url) {
if (registryManager == null) {
throw new IllegalStateException("Unable to fetch RegistryManager from ApplicationModel BeanFactory. " +
"Please check if `setApplicationModel` has been override.");
}
//销毁状态直接返回
Registry defaultNopRegistry = registryManager.getDefaultNopRegistryIfDestroyed();
if (null != defaultNopRegistry) {
return defaultNopRegistry;
}
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameter(TIMESTAMP_KEY)
.removeAttribute(EXPORT_KEY)
.removeAttribute(REFER_KEY)
.build();
//这个key为 service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService
String key = createRegistryCacheKey(url);
Registry registry = null;
//check配置 是否检查注册中心连通 默认为true
boolean check = url.getParameter(CHECK_KEY, true) && url.getPort() != 0;
// Lock the registry access process to ensure a single instance of the registry
//给写操作加锁方式并发写问题
registryManager.getRegistryLock().lock();
try {
//锁内检查是否销毁的逻辑
// double check
// fix https://github.com/apache/dubbo/issues/7265.
defaultNopRegistry = registryManager.getDefaultNopRegistryIfDestroyed();
if (null != defaultNopRegistry) {
return defaultNopRegistry;
}
//锁内检查是否缓存中存在存在则直接返回
registry = registryManager.getRegistry(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
//使用url创建注册中心操作的逻辑
registry = createRegistry(url);
} catch (Exception e) {
//check配置检查
if (check) {
throw new RuntimeException("Can not create registry " + url, e);
} else {
LOGGER.warn("Failed to obtain or create registry ", e);
}
} finally {
// Release the lock
registryManager.getRegistryLock().unlock();
}
if (check && registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
//缓存逻辑
if (registry != null) {
registryManager.putRegistry(key, registry);
}
return registry;
}逻辑其实吧比较简单,概括下上面的逻辑:销毁逻辑判断缓存获取,存在则直接返回根据注册中心url配置,创建注册中心操作对象注册中心连接失败的check配置逻辑处理将注册中心操作对象存入缓存上面比较重要的逻辑是createRegistry这个整个调用过程我给大家看下Debug的详情,这里很多逻辑由扩展机制产生的这里直接看下逻辑调用栈,有几个需要关注的地方我圈了起来:我们继续看服务发现的注册中心工厂对象的获取,代码如下:ServiceDiscoveryRegistryFactory类型的createRegistry方法 @Override
protected Registry createRegistry(URL url) {
//判断url是否是这个前缀:service-discovery-registry
if (UrlUtils.hasServiceDiscoveryRegistryProtocol(url)) {
//切换下协议:将服务发现协议切换为配置的注册中心协议这里是Zookeeper如下:
//zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=39884&release=3.0.8
String protocol = url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);
url = url.setProtocol(protocol).removeParameter(REGISTRY_KEY);
}
//创建服务发现注册中心对象对象
return new ServiceDiscoveryRegistry(url, applicationModel);
}通过以上代码可以看到其实最终创建的是一个ServiceDiscoveryRegistry注册中心对象,这个url协议被转换为了对应注册中心的协议,也就是说双注册会有两个协议一个是原先的接口级注册注册中心对象(这个还未说到)和这里对应注册中心协议的服务发现注册中心对象ServiceDiscoveryRegistry17.3.3 ServiceDiscoveryRegistryServiceDiscoveryRegistry服务发现注册中心对象的初始化过程:17.3.3.1 ServiceDiscoveryRegistry的构造器: public ServiceDiscoveryRegistry(URL registryURL, ApplicationModel applicationModel) {
super(registryURL);
//根据url创建一个服务发现对象类型为ServiceDiscovery
this.serviceDiscovery = createServiceDiscovery(registryURL);
//这个类型为是serviceNameMapping类型是MetadataServiceNameMapping类型
this.serviceNameMapping = (AbstractServiceNameMapping) ServiceNameMapping.getDefaultExtension(registryURL.getScopeModel());
super.applicationModel = applicationModel;
}ServiceDiscoveryRegistry中创建服务发现对象createServiceDiscovery方法protected ServiceDiscovery createServiceDiscovery(URL registryURL) {
return getServiceDiscovery(registryURL.addParameter(INTERFACE_KEY, ServiceDiscovery.class.getName())
.removeParameter(REGISTRY_TYPE_KEY));
}ServiceDiscoveryRegistry中创建服务发现对象getServiceDiscovery方法private ServiceDiscovery getServiceDiscovery(URL registryURL) {
//服务发现工厂对象的获取这里是ServiceDiscoveryFactory类型,
ServiceDiscoveryFactory factory = getExtension(registryURL);
//服务发现工厂对象获取服务发现对象
return factory.getServiceDiscovery(registryURL);
}ServiceDiscoveryFactory和ServiceDiscovery类型可以往后看17.3.3.2 父类型FailbackRegistry的构造器 public FailbackRegistry(URL url) {
super(url);
//重试间隔配置retry.period ,默认为5秒
this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);
// since the retry task will not be very much. 128 ticks is enough.
//因为重试任务不会太多。128个刻度就足够了。Dubbo封装的时间轮用于高效率的重试,这个在Kafka也自定义实现了后续可以单独来看看
retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
}17.3.3.3 AbstractRegistry的构造器参数url如下所示:zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=39884&release=3.0.8
public AbstractRegistry(URL url) {
setUrl(url);
registryManager = url.getOrDefaultApplicationModel().getBeanFactory().getBean(RegistryManager.class);
//是否本地缓存默认为true
localCacheEnabled = url.getParameter(REGISTRY_LOCAL_FILE_CACHE_ENABLED, true);
registryCacheExecutor = url.getOrDefaultFrameworkModel().getBeanFactory()
.getBean(FrameworkExecutorRepository.class).getSharedExecutor();
if (localCacheEnabled) {
// Start file save timer 是否同步缓存默认为false
syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false);
//默认缓存的文件路径与文件名字为:/Users/song/.dubbo/dubbo-registry-dubbo-demo-api-provider-127.0.0.1-2181.cache
String defaultFilename = System.getProperty(USER_HOME) + DUBBO_REGISTRY +
url.getApplication() + "-" + url.getAddress().replaceAll(":", "-") + CACHE;
//未指定缓存的文件名字则用默认的文件名字
String filename = url.getParameter(FILE_KEY, defaultFilename);
File file = null;
//父目录创建,保证目录存在
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
this.file = file;
// When starting the subscription center,
// we need to read the local cache file for future Registry fault tolerance processing.
//加载本地磁盘文件
loadProperties();
//变更推送
notify(url.getBackupUrls());
}
}17.3.4 将服务提供者数据转换到本地内存的元数据信息中在前面我们看到了RegistryProtocol中调用register来注册服务提供者的数据到注册的中心,接下来详细看下实现原理:下面参数为ServiceDiscoveryRegistry为情况下举例子:ServiceDiscoveryRegistry类型的register方法与ZookeeperRegister注册不一样传统的接口级注册在这个方法里面就将服务数据注册到注册中心了,服务发现的数据注册分为了两步,这里仅仅将数据封装到内存中如下:url例子为:dubbo://192.168.1.9:20880/link.elastic.dubbo.entity.DemoService?anyhost=true&application=dubbo-demo-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=link.elastic.dubbo.entity.DemoService&methods=sayHello,sayHelloAsync&pid=19559&release=3.0.8&service-name-mapping=true&side=provider&timestamp=1654938441023RegistryProtocol中的register方法: private void register(Registry registry, URL registeredProviderUrl) {
registry.register(registeredProviderUrl);
}上面这个代码会优先走ListenerRegistryWrapper的一些逻辑来执行register方法来触发一些监听器的逻辑,我们直接跳到ServiceDiscoveryRegistry中的register方法来看ServiceDiscoveryRegistry的register方法@Override
public final void register(URL url) {
//逻辑判断比如只有side为提供者时候才能注册
if (!shouldRegister(url)) { // Should Not Register
return;
}
doRegister(url);
}ServiceDiscoveryRegistry的doRegister方法: @Override
public void doRegister(URL url) {
// fixme, add registry-cluster is not necessary anymore
url = addRegistryClusterKey(url);
serviceDiscovery.register(url);
}
AbstractServiceDiscovery的register方法:@Override
public void register(URL url) {
//metadaInfo类型为MetadataInfo类型,用来操作元数据的
metadataInfo.addService(url);
}
MetadataInfo 类型的addService方法public synchronized void addService(URL url) {
// fixme, pass in application mode context during initialization of MetadataInfo.
//元数据参数过滤器扩展获取:MetadataParamsFilter
if (this.loader == null) {
this.loader = url.getOrDefaultApplicationModel().getExtensionLoader(MetadataParamsFilter.class);
}
//元数据参数过滤器获取
List<MetadataParamsFilter> filters = loader.getActivateExtension(url, "params-filter");
// generate service level metadata
//生成服务级别的元数据
ServiceInfo serviceInfo = new ServiceInfo(url, filters);
this.services.put(serviceInfo.getMatchKey(), serviceInfo);
// extract common instance level params
extractInstanceParams(url, filters);
if (exportedServiceURLs == null) {
exportedServiceURLs = new ConcurrentSkipListMap<>();
}
addURL(exportedServiceURLs, url);
updated = true;
}17.3.5 接口级服务提供者配置的注册前面我们通过服务发现的的url进行了举例子,其实在RegistryProtocol协议的export方法中还会注册接口级信息:例如如下关键代码:当registryUrl参数不是服务发现协议service-discovery-registry配置而是zookeeper如下时候获取到的扩展类型将是与Zookeeper相关的扩展对象zookeeper://8.131.79.126:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=29386&release=3.0.8&timestamp=1655023329438RegistryProtocol协议的export方法中接口级数据注册的核心代码如下:如下代码的操作类型可以看注释// url to registry 这里registry对象的类型为ZookeeperRegistry
final Registry registry = getRegistry(registryUrl);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish (provider itself and registry should both need to register)
boolean register = providerUrl.getParameter(REGISTER_KEY, true) && registryUrl.getParameter(REGISTER_KEY, true);
//这一个方法里面会将提供者的url配置写入Zookeeper的provider节点下面
if (register) {
register(registry, registeredProviderUrl);
}如上代码是获取Zookeeper操作对象和向Zookeeper中写入服务提供者信息的代码,关于与Zookeeper连接和注册数据本地缓存的代码可以看ZookeeperRegistry类型和它的几个父类型比如:CacheableFailbackRegistry类型,关于接口级数据的注册可以看register方法,这个就不详细说了,下面我贴一下接口级数据注册的Zookeeper信息可以了解下就行:接口信息如下,上面我们需要注意的是这个 url配置为临时节点,当与Zookeeper断开连接或者Session超时的时候这个信息会被移除:/dubbo/link.elastic.dubbo.entity.DemoService/providers/dubbo%3A%2F%2F192.168.1.9%3A20880%2Flink.elastic.dubbo.entity.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26background%3Dfalse%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dlink.elastic.dubbo.entity.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D29386%26release%3D3.0.8%26service-name-mapping%3Dtrue%26side%3Dprovider%26timestamp%3D165502332951417.4 应用级服务发现功能的实现ServiceDiscovery在说这个实现之前我们先看看相关类型,这个服务发现相关的类型与注册中心相关的类型有点类似:服务发现工厂类型:服务发现类型:刚刚在 ServiceDiscoveryRegistry中创建服务发现对象getServiceDiscovery方法看到了两个类型一个是服务发现工厂类型ServiceDiscoveryFactory,一个是服务发现类型ServiceDiscoveryprivate ServiceDiscovery getServiceDiscovery(URL registryURL) {
//服务发现工厂对象的获取这里是ServiceDiscoveryFactory类型,这里对应ZookeeperServiceDiscoveryFactory
ServiceDiscoveryFactory factory = getExtension(registryURL);
//服务发现工厂对象获取服务发现对象
return factory.getServiceDiscovery(registryURL);
}AbstractServiceDiscoveryFactory类型的getServiceDiscovery方法 @Override
public ServiceDiscovery getServiceDiscovery(URL registryURL) {
//这个key是 zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.client.ServiceDiscovery
//一个地址需要创建一个服务发现对象
String key = registryURL.toServiceStringWithoutResolving();
return discoveries.computeIfAbsent(key, k -> createDiscovery(registryURL));
}createDiscovery方法对应ZookeeperServiceDiscoveryFactory类型中的createDiscovery方法如下代码所示:@Override
protected ServiceDiscovery createDiscovery(URL registryURL) {
return new ZookeeperServiceDiscovery(applicationModel, registryURL);
}
17.4.1 ZookeeperServiceDiscoveryZookeeperServiceDiscovery的构造器 public ZookeeperServiceDiscovery(ApplicationModel applicationModel, URL registryURL) {
//先调用父类AbstractServiceDiscovery 模板类构造器
super(applicationModel, registryURL);
try {
//创建 创建CuratorFramework 类型对象用于操作Zookeeper
this.curatorFramework = buildCuratorFramework(registryURL);
//获取应用级服务发现的根路径 值为/services 这个可以在Zookeeper上面看到
this.rootPath = ROOT_PATH.getParameterValue(registryURL);
//创建服务发现对象 实现类型为ServiceDiscoveryImpl 这个实现来源于Curator框架中的discovery模块
this.serviceDiscovery = buildServiceDiscovery(curatorFramework, rootPath);
//启动服务发现
this.serviceDiscovery.start();
} catch (Exception e) {
throw new IllegalStateException("Create zookeeper service discovery failed.", e);
}
}这个方法比较重要是应用级服务发现的实现,这里主要关注下serviceDiscovery类型的创建与启动,这个应用级服务发现的实现其实是Dubbo使用了Curator来做的,Dubbo只是在这里封装了一些方法来进行调用Curator的实现:关于Curator的官方文档可以看curator官网关于Zookeeper上面注册服务应用级服务注册信息可以看如下图所示(后面会具体讲到数据注册时的调用):我这个服务提供者注册的应用数据如下:{
"name" : "dubbo-demo-api-provider",
"id" : "192.168.1.9:20880",
"address" : "192.168.1.9",
"port" : 20880,
"sslPort" : null,
"payload" : {
"@class" : "org.apache.dubbo.registry.zookeeper.ZookeeperInstance",
"id" : "192.168.1.9:20880",
"name" : "dubbo-demo-api-provider",
"metadata" : {
"dubbo.endpoints" : "[{\"port\":20880,\"protocol\":\"dubbo\"}]",
"dubbo.metadata-service.url-params" : "{\"connections\":\"1\",\"version\":\"1.0.0\",\"dubbo\":\"2.0.2\",\"release\":\"3.0.8\",\"side\":\"provider\",\"port\":\"20880\",\"protocol\":\"dubbo\"}",
"dubbo.metadata.revision" : "a662fd2213a8a49dc6ff43a4c2ae7b9e",
"dubbo.metadata.storage-type" : "local",
"timestamp" : "1654916298616"
}
},
"registrationTimeUTC" : 1654917265499,
"serviceType" : "DYNAMIC",
"uriSpec" : null
}如果感兴趣的话可以看更详细的curator服务发现文档curator-x-discovery17.4.2 AbstractServiceDiscovery的构造器 public AbstractServiceDiscovery(ApplicationModel applicationModel, URL registryURL) {
//调用重载的构造器
this(applicationModel.getApplicationName(), registryURL);
this.applicationModel = applicationModel;
MetadataReportInstance metadataReportInstance = applicationModel.getBeanFactory().getBean(MetadataReportInstance.class);
metadataType = metadataReportInstance.getMetadataType();
this.metadataReport = metadataReportInstance.getMetadataReport(registryURL.getParameter(REGISTRY_CLUSTER_KEY));
// if (REMOTE_METADATA_STORAGE_TYPE.equals(metadataReportInstance.getMetadataType())) {
// this.metadataReport = metadataReportInstance.getMetadataReport(registryURL.getParameter(REGISTRY_CLUSTER_KEY));
// } else {
// this.metadataReport = metadataReportInstance.getNopMetadataReport();
// }
}重载的构造器 public AbstractServiceDiscovery(String serviceName, URL registryURL) {
this.applicationModel = ApplicationModel.defaultModel();
//这个url参考:zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.client.ServiceDiscovery&pid=4570&release=3.0.8
this.registryURL = registryURL;
//这个serviceName参考dubbo-demo-api-provider
this.serviceName = serviceName;
//MetadataInfo 用来封装元数据信息
this.metadataInfo = new MetadataInfo(serviceName);
//这个是元数据缓存信息管理的类型 缓存文件使用LRU策略 感兴趣的可以详细看看
//对应缓存路径为:/Users/song/.dubbo/.metadata.zookeeper127.0.0.1%003a2181.dubbo.cache
this.metaCacheManager = new MetaCacheManager(getCacheNameSuffix(),
applicationModel.getFrameworkModel().getBeanFactory()
.getBean(FrameworkExecutorRepository.class).getCacheRefreshingScheduledExecutor());
}17.5 服务映射类型AbstractServiceNameMapping服务映射主要是通过服务名字来反查应用信息的应用名字如下图所示这里我们来看下服务映射相关的类型主要通过如下代码来获取扩展对象:this.serviceNameMapping = (AbstractServiceNameMapping) ServiceNameMapping.getDefaultExtension(registryURL.getScopeModel());对应类型如下:最终获取的扩展实现类型为:MetadataServiceNameMapping构造器如下: public MetadataServiceNameMapping(ApplicationModel applicationModel) {
super(applicationModel);
metadataReportInstance = applicationModel.getBeanFactory().getBean(MetadataReportInstance.class);
}服务映射元数据父类型AbstractServiceNameMapping如下: public AbstractServiceNameMapping(ApplicationModel applicationModel) {
this.applicationModel = applicationModel;
//LRU缓存保存服务映射数据
this.mappingCacheManager = new MappingCacheManager("",
applicationModel.getFrameworkModel().getBeanFactory()
.getBean(FrameworkExecutorRepository.class).getCacheRefreshingScheduledExecutor());
}
17.4 双注册元数据信息发布到注册中心17.4.1 回顾简介前面注册数据的时候并没有把服务配置的元数据直接注册在注册中心而是需要在导出服务之后在ServiceConfig中来发布元数据,这个就需要我们回到ServiceConfig的exportUrl方法来看了如下所示:private void exportUrl(URL url, List<URL> registryURLs) {
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
...省略到若干代码
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
url = exportRemote(url, registryURLs);
if (!isGeneric(generic) && !getScopeModel().isInternal()) {
MetadataUtils.publishServiceDefinition(url, providerModel.getServiceModel(), getApplicationModel());
}
}
}
this.urls.add(url);
}17.4.2 元数据服务定义数据的发布在exportRemote之后单独调用发布元数据的方法来发布,通过调用元数据工具类来发布元数据信息接下来我们详细看下:MetadataUtils类型的publishServiceDefinition方法:public static void publishServiceDefinition(URL url, ServiceDescriptor serviceDescriptor, ApplicationModel applicationModel) {
//查询是否存在元数据存储对象 对应接口MetadataReport 这里对应实现类 ZookeeperMetadataReport
if (getMetadataReports(applicationModel).size() == 0) {
String msg = "Remote Metadata Report Server is not provided or unavailable, will stop registering service definition to remote center!";
logger.warn(msg);
}
try {
String side = url.getSide();
//服务提供者走这个逻辑
if (PROVIDER_SIDE.equalsIgnoreCase(side)) {
String serviceKey = url.getServiceKey();
//获取当前服务元数据信息
FullServiceDefinition serviceDefinition = serviceDescriptor.getFullServiceDefinition(serviceKey);
if (StringUtils.isNotEmpty(serviceKey) && serviceDefinition != null) {
serviceDefinition.setParameters(url.getParameters());
for (Map.Entry<String, MetadataReport> entry : getMetadataReports(applicationModel).entrySet()) {
MetadataReport metadataReport = entry.getValue();
if (!metadataReport.shouldReportDefinition()) {
logger.info("Report of service definition is disabled for " + entry.getKey());
continue;
}
//存储服务提供者的元数据 metadataReport类型为ZookeeperMetadataReport 方法来源于父类模板方法: AbstractMetadataReport类型的storeProviderMetadata模板方法
metadataReport.storeProviderMetadata(
new MetadataIdentifier(
url.getServiceInterface(),
url.getVersion() == null ? "" : url.getVersion(),
url.getGroup() == null ? "" : url.getGroup(),
PROVIDER_SIDE,
applicationModel.getApplicationName())
, serviceDefinition);
}
}
} else {
//服务消费者走这个逻辑
for (Map.Entry<String, MetadataReport> entry : getMetadataReports(applicationModel).entrySet()) {
MetadataReport metadataReport = entry.getValue();
if (!metadataReport.shouldReportDefinition()) {
logger.info("Report of service definition is disabled for " + entry.getKey());
continue;
}
metadataReport.storeConsumerMetadata(
new MetadataIdentifier(
url.getServiceInterface(),
url.getVersion() == null ? "" : url.getVersion(),
url.getGroup() == null ? "" : url.getGroup(),
CONSUMER_SIDE,
applicationModel.getApplicationName()),
url.getParameters());
}
}
} catch (Exception e) {
//ignore error
logger.error("publish service definition metadata error.", e);
}
}AbstractMetadataReport的storeProviderMetadata方法如下所示: @Override
public void storeProviderMetadata(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
//是否同步配置对应sync-report 默认为异步
if (syncReport) {
storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition);
} else {
reportCacheExecutor.execute(() -> storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition));
}
}AbstractMetadataReport的存储元数据方法storeProviderMetadataTaskprivate void storeProviderMetadataTask(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
try {
if (logger.isInfoEnabled()) {
logger.info("store provider metadata. Identifier : " + providerMetadataIdentifier + "; definition: " + serviceDefinition);
}
allMetadataReports.put(providerMetadataIdentifier, serviceDefinition);
failedReports.remove(providerMetadataIdentifier);
Gson gson = new Gson();
String data = gson.toJson(serviceDefinition);
doStoreProviderMetadata(providerMetadataIdentifier, data);
saveProperties(providerMetadataIdentifier, data, true, !syncReport);
} catch (Exception e) {
// retry again. If failed again, throw exception.
failedReports.put(providerMetadataIdentifier, serviceDefinition);
metadataReportRetry.startRetryTask();
logger.error("Failed to put provider metadata " + providerMetadataIdentifier + " in " + serviceDefinition + ", cause: " + e.getMessage(), e);
}
}元数据信息如下:可以分为两类 应用元数据,服务元数据{
"parameters": {
"side": "provider",
"interface": "link.elastic.dubbo.entity.DemoService",
"pid": "22099",
"application": "dubbo-demo-api-provider",
"dubbo": "2.0.2",
"release": "3.0.8",
"anyhost": "true",
"bind.ip": "192.168.1.9",
"methods": "sayHello,sayHelloAsync",
"background": "false",
"deprecated": "false",
"dynamic": "true",
"service-name-mapping": "true",
"generic": "false",
"bind.port": "20880",
"timestamp": "1654942353902"
},
"canonicalName": "link.elastic.dubbo.entity.DemoService",
"codeSource": "file:/Users/song/Desktop/dubbo-test/target/classes/",
"methods": [{
"name": "sayHelloAsync",
"parameterTypes": ["java.lang.String"],
"returnType": "java.util.concurrent.CompletableFuture",
"annotations": []
}, {
"name": "sayHello",
"parameterTypes": ["java.lang.String"],
"returnType": "java.lang.String",
"annotations": []
}],
"types": [{
"type": "java.util.concurrent.CompletableFuture",
"properties": {
"result": "java.lang.Object",
"stack": "java.util.concurrent.CompletableFuture.Completion"
}
}, {
"type": "java.lang.Object"
}, {
"type": "java.lang.String"
}, {
"type": "java.util.concurrent.CompletableFuture.Completion",
"properties": {
"next": "java.util.concurrent.CompletableFuture.Completion",
"status": "int"
}
}, {
"type": "int"
}],
"annotations": []
}Zookeeper扩展类型ZookeeperMetadataReport实现的存储方法如下所示doStoreProviderMetadata:如果我们自己实现一套元数据就可以重写这个方法来进行元数据的额存储ZookeeperMetadataReport的doStoreProviderMetadata @Override
protected void doStoreProviderMetadata(MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions) {
storeMetadata(providerMetadataIdentifier, serviceDefinitions);
}ZookeeperMetadataReport的storeMetadata private void storeMetadata(MetadataIdentifier metadataIdentifier, String v) {
//参数false为非临时节点,这个元数据为持久节点,这个细节就暂时不看了就是将刚刚的json元数据存储到对应路径上面:路径为:/dubbo/metadata/link.elastic.dubbo.entity.DemoService/provider/dubbo-demo-api-provider
zkClient.create(getNodePath(metadataIdentifier), v, false);
}
Dubbo3 源码解读-宋小生-18:Dubbo3元数据服务MetadataService的导出
完整电子书下载地址: https://developer.aliyun.com/ebook/7894Dubbo3 已经全面取代 HSF2 成为阿里的下一代服务框架,2022 双十一基于 Dubbo3 首次实现了关键业务不停推、不降级的全面用户体验提升,从技术上,大幅提高研发与运维效率的同时地址推送等关键资源利用率提升超 40%,基于三位一体的开源中间件体系打造了阿里在云上的单元化最佳实践和统一标准,同时将规模化实践经验与技术创新贡献开源社区,极大的推动了开源技术与标准的发展。本文是 Dubbo 社区贡献者宋小生基于 Dubbo3 3.0.8 版本撰写的源码解析博客,在 Dubbo3 开源&内部技术栈统一的情况下,期望能对集团内的开发者了解 Dubbo3 背后的实现原理有所帮助。可点此查看 博客原文 。本篇是宋小生系列 18/30 篇。同时,由 Dubbo3 团队领导的源码解读系列也正在进行中,感兴趣的同学可加入钉钉群了解详情: 281650031918.1 简介MetadataService此服务用于公开Dubbo进程内的元数据信息。典型用途包括:使用者查询提供者的元数据信息,以列出接口和每个接口的配置控制台(dubbo admin)查询特定进程的元数据,或聚合所有进程的数据。在Dubbo2.x的时候,所有的服务数据都是以接口的形式注册在注册中心.Dubbo3将部分数据抽象为元数据的形式来将数据存放在元数据中心,然后元数据由服务提供者提供给消费者而不是再由注册中心进行推送,如下图所示:引入 MetadataService 元数据服务服务的好处• 由中心化推送转向点对点拉取(Consumer - Proroder)• 易于扩展更多的参数• 更多的数据量• 对外暴露更多的治理数据18.2 MetadataService的导出过程了解元数据的到处过程,这个就要继续前面博客往后的代码了前面博客说了一个服务发布之后的服务信息的双注册数据,这里继续看下导出服务之后的代码:先来简单回顾下模块发布的启动生命周期方法:DefaultModuleDeployer类型的start方法: @Override
public synchronized Future start() throws IllegalStateException {
...
try {
...
onModuleStarting();
// initialize
applicationDeployer.initialize();
initialize();
// export services
exportServices();
// prepare application instance
// exclude internal module to avoid wait itself
if (moduleModel != moduleModel.getApplicationModel().getInternalModule()) {
applicationDeployer.prepareInternalModule();
}
// refer services
referServices();
// if no async export/refer services, just set started
if (asyncExportingFutures.isEmpty() && asyncReferringFutures.isEmpty()) {
onModuleStarted();
} else {
....
return startFuture;
}前面的博客我们已经说了服务提供者导出服务的方法如下: // export services
exportServices();在导出服务之后如果代码中配置了引用服务的代码将会执行引用服务的功能,调用代码如下:referServices();不过我们样例代码并没有介绍引用服务的功能,这里先不说,等服务提供者完全启动成功之后我们再来看消费者的逻辑。接下来我们要看的是模块启动成功之后的方法 onModuleStarted();,在这个方法中会去发布服务元数据信息。18.3 模块启动成功时候的逻辑 onModuleStarted();这里我们直接先看代码再来分析下逻辑:DefaultModuleDeployer类型的onModuleStarted方法如下所示: private void onModuleStarted() {
try {
//状态判断是否为启动中如果是则将状态设置为STARTED
if (isStarting()) {
//先修改状态
setStarted();
logger.info(getIdentifier() + " has started.");
//状态修改成功之后开始通知应用程序发布器模块发布器启动成功了
applicationDeployer.notifyModuleChanged(moduleModel, DeployState.STARTED);
}
} finally {
// complete module start future after application state changed
completeStartFuture(true);
}
}应用程序发布器处理启动成功的逻辑:DefaultApplicationDeployer类型的notifyModuleChanged方法: @Override
public void notifyModuleChanged(ModuleModel moduleModel, DeployState state) {
//根据所有模块的状态来判断应用发布器的状态
checkState(moduleModel, state);
// notify module state changed or module changed
//通知所有模块状态更新
synchronized (stateLock) {
stateLock.notifyAll();
}
}应用发布器模型DefaultApplicationDeployer检查状态方法checkState代码如下: @Override
public void checkState(ModuleModel moduleModel, DeployState moduleState) {
//存在写操作 先加个锁
synchronized (stateLock) {
//非内部模块,并且模块的状态是发布成功了
if (!moduleModel.isInternal() && moduleState == DeployState.STARTED) {
prepareApplicationInstance();
}
//应用下所有模块状态进行汇总计算
DeployState newState = calculateState();
switch (newState) {
case STARTED:
onStarted();
break;
case STARTING:
onStarting();
break;
case STOPPING:
onStopping();
break;
case STOPPED:
onStopped();
break;
case FAILED:
Throwable error = null;
ModuleModel errorModule = null;
for (ModuleModel module : applicationModel.getModuleModels()) {
ModuleDeployer deployer = module.getDeployer();
if (deployer.isFailed() && deployer.getError() != null) {
error = deployer.getError();
errorModule = module;
break;
}
}
onFailed(getIdentifier() + " found failed module: " + errorModule.getDesc(), error);
break;
case PENDING:
// cannot change to pending from other state
// setPending();
break;
}
}
}18.4 准备发布元数据信息和应用实例信息前面有个代码调用比较重要:prepareApplicationInstance()DefaultApplicationDeployer类型的prepareApplicationInstance方法如下所示 @Override
public void prepareApplicationInstance() {
//已经注册过应用实例数据了 直接返回 (下面CAS逻辑判断了)
if (hasPreparedApplicationInstance.get()) {
return;
}
//注册开关控制默认为true
//通过将registerConsumer默认设置为“false”来关闭纯使用者进程实例的注册。
if (isRegisterConsumerInstance()) {
exportMetadataService();
if (hasPreparedApplicationInstance.compareAndSet(false, true)) {
// register the local ServiceInstance if required
registerServiceInstance();
}
}
}18.4.1 导出元数据服务方法exportMetadataService这里我们就先直接来贴一下代码:DefaultApplicationDeployer类型的exportMetadataService方法如下所示: private void exportMetadataService() {
if (!isStarting()) {
return;
}
//这里监听器我们主要关注的类型是ExporterDeployListener类型
for (DeployListener<ApplicationModel> listener : listeners) {
try {
if (listener instanceof ApplicationDeployListener) {
// 回调监听器的模块启动成功方法
((ApplicationDeployListener) listener).onModuleStarted(applicationModel);
}
} catch (Throwable e) {
logger.error(getIdentifier() + " an exception occurred when handle starting event", e);
}
}
}前面我们主要关注ExporterDeployListener类型的监听器的回调方法,这里我贴一下代码:ExporterDeployListener类型的onModuleStarted方法如下: @Override
public synchronized void onModuleStarted(ApplicationModel applicationModel) {
// start metadata service exporter
//MetadataServiceDelegation类型为实现提供远程RPC服务以方便元数据信息的查询功能的类型。
MetadataServiceDelegation metadataService = applicationModel.getBeanFactory().getOrRegisterBean(MetadataServiceDelegation.class);
if (metadataServiceExporter == null) {
metadataServiceExporter = new ConfigurableMetadataServiceExporter(applicationModel, metadataService);
// fixme, let's disable local metadata service export at this moment
//默认我们是没有配置这个元数据类型的这里元数据类型默认为local 条件是不是remote则开始导出,在前面的博客<<Dubbo启动器DubboBootstrap添加应用程序的配置信息ApplicationConfig>> 中有提到这个配置下面我再说下
if (!REMOTE_METADATA_STORAGE_TYPE.equals(getMetadataType(applicationModel))) {
metadataServiceExporter.export();
}
}
}在前面的博客<<Dubbo启动器DubboBootstrap添加应用程序的配置信息ApplicationConfig>> 中有提到这个配置下面我再说下metadata-typemetadata 传递方式,是以 Provider 视角而言的,Consumer 侧配置无效,可选值有:remote - Provider 把 metadata 放到远端注册中心,Consumer 从注册中心获取。local - Provider 把 metadata 放在本地,Consumer 从 Provider 处直接获取 。可以看到默认的local配置元数据信息的获取是由消费者从提供者拉的,那提供者怎么拉取对应服务的元数据信息那就要要用到这个博客说到的MetadataService服务,传递方式为remote的方式其实就要依赖注册中心了相对来说增加了注册中心的压力。18.4.2 可配置元数据服务的导出ConfigurableMetadataServiceExporter的export前面了解了导出服务的调用链路,这里详细看下ConfigurableMetadataServiceExporter的export过程源码如下所示:public synchronized ConfigurableMetadataServiceExporter export() {
//元数据服务配置已经存在或者已经导出或者不可导出情况下是无需导出的
if (serviceConfig == null || !isExported()) {
//创建服务配置
this.serviceConfig = buildServiceConfig();
// export
//导出服务 ,导出服务的具体过程这里就不再说了可以看上一个博客,这个导出服务的过程会绑定端口
serviceConfig.export();
metadataService.setMetadataURL(serviceConfig.getExportedUrls().get(0));
if (logger.isInfoEnabled()) {
logger.info("The MetadataService exports urls : " + serviceConfig.getExportedUrls());
}
} else {
if (logger.isWarnEnabled()) {
logger.warn("The MetadataService has been exported : " + serviceConfig.getExportedUrls());
}
}
return this;
}18.4.3 元数据服务配置对象的创建前面我们看到了构建元数据服务对象的代码调用ServiceConfig,接下来我们详细看下构建源码如下所示:ConfigurableMetadataServiceExporter类型的buildServiceConfig构建元数据服务配置对象方法如下: private ServiceConfig<MetadataService> buildServiceConfig() {
//1 获取当前的应用配置 然后初始化应用配置
ApplicationConfig applicationConfig = getApplicationConfig();
//创建服务配置对象
ServiceConfig<MetadataService> serviceConfig = new ServiceConfig<>();
//设置域模型
serviceConfig.setScopeModel(applicationModel.getInternalModule());
serviceConfig.setApplication(applicationConfig);
//2 创建注册中心配置对象 然后并初始化
RegistryConfig registryConfig = new RegistryConfig("N/A");
registryConfig.setId("internal-metadata-registry");
//3 创建服务配置对象,并初始化
serviceConfig.setRegistry(registryConfig);
serviceConfig.setRegister(false);
//4 生成协议配置 ,这里会配置一下元数据使用的服务端口号默认使用其他服务的端口20880
serviceConfig.setProtocol(generateMetadataProtocol());
serviceConfig.setInterface(MetadataService.class);
serviceConfig.setDelay(0);
//这里也是需要注意的地方服务引用的类型为MetadataServiceDelegation
serviceConfig.setRef(metadataService);
serviceConfig.setGroup(applicationConfig.getName());
serviceConfig.setVersion(MetadataService.VERSION);
//5 生成方法配置 这里目前提供的服务方法为getAndListenInstanceMetadata方法 后续可以看下这个方法的视线
serviceConfig.setMethods(generateMethodConfig());
serviceConfig.setConnections(1); // separate connection
serviceConfig.setExecutes(100); // max tasks running at the same time
return serviceConfig;
}
这个服务配置对象的创建非常像我们第一个博客提到的服务配置过程,不过这个元数据服务对象有几个比较特殊的配置注册中心的配置register设置为了false 则为不向注册中心注册具体的服务配置信息对每个提供者的最大连接数connections为1服务提供者每服务每方法最大可并行执行请求数executes为100在使用过程中可以知道上面这几个配置值18.5 应用级数据注册 registerServiceInstance()在前面导出元数据服务之后也会调用一行代码来注册应用级数据来保证应用上线主要涉及到的代码为DefaultApplicationDeployer类型中的registerServiceInstance方法如下所示 private void registerServiceInstance() {
try {
//标记变量设置为true
registered = true;
ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);
} catch (Exception e) {
logger.error("Register instance error", e);
}
if (registered) {
// scheduled task for updating Metadata and ServiceInstance
asyncMetadataFuture = frameworkExecutorRepository.getSharedScheduledExecutor().scheduleWithFixedDelay(() -> {
// ignore refresh metadata on stopping
if (applicationModel.isDestroyed()) {
return;
}
try {
if (!applicationModel.isDestroyed() && registered) {
ServiceInstanceMetadataUtils.refreshMetadataAndInstance(applicationModel);
}
} catch (Exception e) {
if (!applicationModel.isDestroyed()) {
logger.error("Refresh instance and metadata error", e);
}
}
}, 0, ConfigurationUtils.get(applicationModel, METADATA_PUBLISH_DELAY_KEY, DEFAULT_METADATA_PUBLISH_DELAY), TimeUnit.MILLISECONDS);
}
}
这个方法先将应用元数据注册到注册中心,然后开始开启定时器每隔30秒同步一次元数据向注册中心。18.5.1 服务实例元数据工具类注册服务发现的元数据信息前面通过调用类型ServiceInstanceMetadataUtils工具类的registerMetadataAndInstance方法来进行服务实例数据和元数据的注册这里我们详细看下代码如下所示: public static void registerMetadataAndInstance(ApplicationModel applicationModel) {
LOGGER.info("Start registering instance address to registry.");
RegistryManager registryManager = applicationModel.getBeanFactory().getBean(RegistryManager.class);
// register service instance
//注意这里服务发现的类型只有ServiceDiscoveryRegistry类型的注册协议才满足 registryManager.getServiceDiscoveries().forEach(ServiceDiscovery::register);
}18.5.2 AbstractServiceDiscovery中的服务发现数据注册的模版方法AbstractServiceDiscovery类型的注册方法register()方法这个是一个模版方法,真正执行的注册逻辑封装在了doRegister方法中由扩展的服务发现子类来完成 @Override
public synchronized void register() throws RuntimeException {
//第一步创建应用的实例信息等待下面注册到注册中心
this.serviceInstance = createServiceInstance(this.metadataInfo);
if (!isValidInstance(this.serviceInstance)) {
logger.warn("No valid instance found, stop registering instance address to registry.");
return;
}
//是否需要更新
boolean revisionUpdated = calOrUpdateInstanceRevision(this.serviceInstance);
if (revisionUpdated) {
reportMetadata(this.metadataInfo);
//应用的实例信息注册到注册中心之上 ,这个
doRegister(this.serviceInstance);
}
}18.5.3 应用级实例对象创建可以看到在AbstractServiceDiscovery服务发现的第一步创建应用的实例信息等待下面注册到注册中心this.serviceInstance = createServiceInstance(this.metadataInfo);最终创建的serviceInstance类型为ServiceInstance 这个是Dubbo封装的一个接口,具体实现类型为DefaultServiceInstance,我们可以看下应用级的元数据有哪些 protected ServiceInstance createServiceInstance(MetadataInfo metadataInfo) {
//这里的服务名字为:dubbo-demo-api-provider
DefaultServiceInstance instance = new DefaultServiceInstance(serviceName, applicationModel);
//应用服务的元数据 ,可以看下面debug的数据信息
instance.setServiceMetadata(metadataInfo);
//metadataType的值为local 这个方法是将元数据类型存储到英勇的元数据对象中 对应内容为dubbo.metadata.storage-type:local
setMetadataStorageType(instance, metadataType);
// 这个是自定义元数据数据 我们也可以通过实现扩展ServiceInstanceCustomizer来自定义一些元数据
ServiceInstanceMetadataUtils.customizeInstance(instance, applicationModel);
return instance;
}
这个方法的主要目的就是将应用的元数据信息都封装到ServiceInstance类型中,不过额外提供了一个扩展性比较好的方法可以自定义元数据信息前面的metadataInfo对象的信息如下图所示:自定义元数据类型Dubbo官方提供了一个默认的实现类型为:ServiceInstanceMetadataCustomizer最终封装好的元数据信息如下所示:DefaultServiceInstance{
serviceName='dubbo-demo-api-provider',
host='192.168.1.169',
port=20880,
enabled=true,
healthy=true,
metadata={
dubbo.metadata-service.url-params={"connections":"1",
"version":"1.0.0",
"dubbo":"2.0.2",
"release":"3.0.9",
"side":"provider",
"port":"20880",
"protocol":"dubbo"
},
dubbo.endpoints=[
{"port":20880,"protocol":"dubbo"}],
dubbo.metadata.storage-type=local,
timestamp=1656227493387}}18.5.4 应用级实例数据配置变更的的版本号获取前面创建元应用的实例信息后开始创建版本号来判断是否需要更新,对应AbstractServiceDiscovery类型的calOrUpdateInstanceRevision protected boolean calOrUpdateInstanceRevision(ServiceInstance instance) {
//获取元数据版本号对应字段dubbo.metadata.revision
String existingInstanceRevision = getExportedServicesRevision(instance);
//获取实例的服务元数据信息:metadata{app='dubbo-demo-api-provider',revision='null',size=1,services=[link.elastic.dubbo.entity.DemoService:dubbo]}
MetadataInfo metadataInfo = instance.getServiceMetadata();
//必须在不同线程之间同步计算此实例的状态,如同一实例的修订和修改。此方法的使用仅限于某些点,例如在注册期间。始终尝试使用此选项。改为getRevision()。
String newRevision = metadataInfo.calAndGetRevision();
//版本号发生了变更(元数据发生了变更)版本号是md5元数据信息计算出来HASH验证
if (!newRevision.equals(existingInstanceRevision)) {
//版本号添加到dubbo.metadata.revision字段中
instance.getMetadata().put(EXPORTED_SERVICES_REVISION_PROPERTY_NAME, metadataInfo.getRevision());
return true;
}
return false;
}18.5.4.1 元数据版本号的计算与HASH校验 calAndGetRevision这个方法其实比较重要,决定了什么时候会更新元数据,Dubbo使用了一种Hash验证的方式将元数据转MD5值与之前的存在的版本号(也是元数据转MD5得到的) 如果数据发生了变更则MD5值会发生变化 以此来更新元数据,不过发生了MD5冲突的话就会导致配置不更新这个冲突的概率非常小。好了直接来看代码吧:MetadataInfo类型的calAndGetRevision方法:public synchronized String calAndGetRevision() {
if (revision != null && !updated) {
return revision;
}
updated = false;
//应用下没有服务则使用一个空的版本号
if (CollectionUtils.isEmptyMap(services)) {
this.revision = EMPTY_REVISION;
} else {
StringBuilder sb = new StringBuilder();
//app是应用名
sb.append(app);
for (Map.Entry<String, ServiceInfo> entry : new TreeMap<>(services).entrySet()) {
sb.append(entry.getValue().toDescString());
}
String tempRevision = RevisionResolver.calRevision(sb.toString());
if (!StringUtils.isEquals(this.revision, tempRevision)) {
//元数据重新注册的话我们可以看看这个日志metadata revision change
if (logger.isInfoEnabled()) {
logger.info(String.format("metadata revision changed: %s -> %s, app: %s, services: %d", this.revision, tempRevision, this.app, this.services.size()));
}
this.revision = tempRevision;
this.rawMetadataInfo = JsonUtils.getJson().toJson(this);
}
}
return revision;
}RevisionResolver类型的Md5运算计算版本号md5Utils.getMd5(metadata);18.5.5 reportMetadata回到18.5.2 AbstractServiceDiscovery中的模版方法register,这里我们来看下reportMetadata方法,不过这个方法目前并不会走到,因为我们默认的配置元数据是local不会直接把应用的元数据注册在元数据中心 protected void reportMetadata(MetadataInfo metadataInfo) {
if (metadataReport != null) {
//订阅元数据的标识符
SubscriberMetadataIdentifier identifier = new SubscriberMetadataIdentifier(serviceName, metadataInfo.getRevision());
//是否远程发布元数据,这里我们是本地注册这个就不会在元数据中心发布这个元数据信息
if ((DEFAULT_METADATA_STORAGE_TYPE.equals(metadataType) && metadataReport.shouldReportMetadata()) || REMOTE_METADATA_STORAGE_TYPE.equals(metadataType)) {
metadataReport.publishAppMetadata(identifier, metadataInfo);
}
}
}18.5.6 扩展的注册中心来注册应用级服务发现数据doRegister方法前面我们说了AbstractServiceDiscovery中的模版方法register,在register会调用一个doRegister方法来注册应用级数据,这个方法是需要扩展注册中心的服务发现来自行实现的,我们这里以官方实现的Zookeeper服务发现模型为例:ZookeeperServiceDiscovery中的doRegister方法 @Override
public void doRegister(ServiceInstance serviceInstance) {
try {
//Dubbo实现的ServiceInstance类型对象转 Curator的ServiceInstance
serviceDiscovery.registerService(build(serviceInstance));
} catch (Exception e) {
throw new RpcException(REGISTRY_EXCEPTION, "Failed register instance " + serviceInstance.toString(), e);
}
}前面我们介绍了ZookeeperServiceDiscovery发现的构造器连接注册中心,这里来看下服务注册,应用级实例数据注册一共分为两步第一步是:Dubbo实现的ServiceInstance类型对象转 Curator的ServiceInstance第二步是:执行registerService方法将数据注册到注册中心先来看第一步:Dubbo实现的ServiceInstance类型对象转 Curator的ServiceInstance关于Curator的服务发现原理可以参考官网的文章博客curator-x-discovery什么是发现服务?在 SOA/分布式系统中,服务需要找到彼此。即,Web 服务可能需要找到缓存服务等。DNS 可以用于此,但对于不断变化的服务来说,它远不够灵活。服务发现系统提供了一种机制:注册其可用性的服务定位特定服务的单个实例在服务实例更改时通知服务实例由类表示:ServiceInstance。ServiceInstances 具有名称、id、地址、端口和/或 ssl 端口,以及可选的有效负载(用户定义)。ServiceInstances 通过以下方式序列化并存储在 ZooKeeper 中:base path
|_______ service A name
|__________ instance 1 id --> (serialized ServiceInstance)
|__________ instance 2 id --> (serialized ServiceInstance)
|__________ ...
|_______ service B name
|__________ instance 1 id --> (serialized ServiceInstance)
|__________ instance 2 id --> (serialized ServiceInstance)
|__________ ...
|_______ ...这个应用最终注册应用级服务数据如下:这里需要注意的是这个 应用的IP+端口的服务元数据信息是临时节点build方法内容对应着上图的JSON数据 可以看菜build方法封装的过程:public static org.apache.curator.x.discovery.ServiceInstance<ZookeeperInstance> build(ServiceInstance serviceInstance) {
ServiceInstanceBuilder builder;
String serviceName = serviceInstance.getServiceName();
String host = serviceInstance.getHost();
int port = serviceInstance.getPort();
Map<String, String> metadata = serviceInstance.getSortedMetadata();
String id = generateId(host, port);
//ZookeeperInstance是Dubbo封装的用于存放payload数据 包含服务id,服务名字和元数据
ZookeeperInstance zookeeperInstance = new ZookeeperInstance(id, serviceName, metadata);
try {
builder = builder()
.id(id)
.name(serviceName)
.address(host)
.port(port)
.payload(zookeeperInstance);
} catch (Exception e) {
throw new RuntimeException(e);
}
return builder.build();
}
在《18.5 应用级数据注册 registerServiceInstance() 》 小节中介绍了应用元数据信息的注册调用代码,其实后面还有个update的逻辑定期30秒同步元数据到元数据中心,这里就不详细介绍了。
Dubbo3 源码解读-宋小生-19:重新来过从一个服务消费者的Demo说起
完整电子书下载地址: https://developer.aliyun.com/ebook/7894Dubbo3 已经全面取代 HSF2 成为阿里的下一代服务框架,2022 双十一基于 Dubbo3 首次实现了关键业务不停推、不降级的全面用户体验提升,从技术上,大幅提高研发与运维效率的同时地址推送等关键资源利用率提升超 40%,基于三位一体的开源中间件体系打造了阿里在云上的单元化最佳实践和统一标准,同时将规模化实践经验与技术创新贡献开源社区,极大的推动了开源技术与标准的发展。本文是 Dubbo 社区贡献者宋小生基于 Dubbo3 3.0.8 版本撰写的源码解析博客,在 Dubbo3 开源&内部技术栈统一的情况下,期望能对集团内的开发者了解 Dubbo3 背后的实现原理有所帮助。可点此查看 博客原文 。本篇是宋小生系列 19/30 篇。同时,由 Dubbo3 团队领导的源码解读系列也正在进行中,感兴趣的同学可加入钉钉群了解详情: 2816500319为了更方便了解原理,我们先来编写一个Demo,从例子中来看源码实现:,前面说了提供者现在已经有服务注册上去了,那接下来我们编写一个消费者的例子来进行服务发现与服务RPC调用。19.1 启动Zookeeper为了Demo可以正常启动,需要我们先在本地启动一个Zookeeper如下图所示:19.2 服务消费者接下来给大家贴一下示例源码,这个源码来源于Dubbo源码目录的 dubbo-demo/dubbo-demo-api 目录下面的dubbo-demo-api-consumer子项目,这里我做了删减,方便看核心代码:首先我们定义一个服务接口如下所示:import java.util.concurrent.CompletableFuture;
public interface DemoService {
/**
* 同步处理的服务方法
* @param name
* @return
*/
String sayHello(String name);
/**
* 用于异步处理的服务方法
* @param name
* @return
*/
default CompletableFuture<String> sayHelloAsync(String name) {
return CompletableFuture.completedFuture(sayHello(name));
}
}
服务实现类如下:
import org.apache.dubbo.rpc.RpcContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
public class DemoServiceImpl implements DemoService {
private static final Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);
@Override
public String sayHello(String name) {
logger.info("Hello " + name + ", request from consumer: " + RpcContext.getServiceContext().getRemoteAddress());
return "Hello " + name + ", response from provider: " + RpcContext.getServiceContext().getLocalAddress();
}
@Override
public CompletableFuture<String> sayHelloAsync(String name) {
return null;
}
}19.3 启用服务消费者有了服务接口之后我们来启用服务,启用服务的源码如下:这里如果要启动消费者,主要要修改QOS端口这里我已经配置可以直接复用
package link.elastic.dubbo.consumer;
import link.elastic.dubbo.entity.DemoService;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.MetadataReportConfig;
import org.apache.dubbo.config.ProtocolConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.apache.dubbo.rpc.service.GenericService;
public class ConsumerApplication {
public static void main(String[] args) {
runWithBootstrap();
}
private static void runWithBootstrap() {
ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setInterface(DemoService.class);
reference.setGeneric("true");
reference.setProtocol("");
DubboBootstrap bootstrap = DubboBootstrap.getInstance();
ApplicationConfig applicationConfig = new ApplicationConfig("dubbo-demo-api-consumer");
applicationConfig.setQosEnable(false);
applicationConfig.setQosPort(-1);
bootstrap.application(applicationConfig)
.registry(new RegistryConfig("zookeeper://8.131.79.126:2181"))
.protocol(new ProtocolConfig(CommonConstants.DUBBO, -1))
.reference(reference)
.start();
DemoService demoService = bootstrap.getCache().get(reference);
String message = demoService.sayHello("dubbo");
System.out.println(message);
// generic invoke
GenericService genericService = (GenericService) demoService;
Object genericInvokeResult = genericService.$invoke("sayHello", new String[]{String.class.getName()},
new Object[]{"dubbo generic invoke"});
System.out.println(genericInvokeResult);
}
}
1.4 启用服务后写入Zookeeper的节点数据启动服务,这个时候我们打开Zookeeper图形化客户端来看看这个服务在Zookeeper上面写入来哪些数据,如下图:在这里插入图片描述写入Zookeper上的节点用于服务在分布式场景下的协调,这些节点是比较重要的。如果了解过Dubbo的同学,应该会知道Dubbo在低版本的时候会向注册中心中写入服务接口,具体路径在上面的 dubbo目录下 ,然后在 /dubbo/服务接口/ 路径下写入如下信息:服务提供者配置信息URL形式服务消费者的配置信息URL形式服务路由信息配置信息上面这个图就是Dubbo3的注册信息了,后面我们也会围绕细节来说明下,这里可以看下新增了:/dubbo/metadata 元数据信息/dubbo/mapping 服务和应用的映射信息/dubbo/config 注册中心配置/services目录应用信息在这里可以大致了解下,在后面会有更详细的源码解析这个示例代码.通过透析代码来看透Dubbo3服务注册原理,服务提供原理。
54-微服务技术栈(高级):微服务网关Soul(Soul数据库模型设计)
插件采用数据库设计,来存储插件,选择器,规则配置数据,以及对应关系。数据库表UML类图:设计详解:一个插件对应多个选择器,一个选择器对应多个规则。一个选择器对应多个匹配条件,一个规则对应多个匹配条件。每个规则在对应插件下,不同的处理表现为handle字段,这个一个不同处理的json字符串。具体的可以在admin使用过程中进行查看。说明:meta_data:对dubbo泛化调用使用,每条记录对应一个dubbo接口的方法,http协议不会保存,而springcloud协议,只会存储一条数据, path为 :/contextPath/**plugin:存储当前支持插件,我们对应配置的插件相关参数,就会更新这样表rule:插件管理中,我们配置的具体规则。实际在这里我们也可以看出Soul的三大核心:plugin,rule,selectorrule_condition:rule表中配置的,对应的具体匹配规则selector:规则表selector_condition:规则条件表实际上,上面这张图,就对应上述四个表中的selector-选择器列表,选择器里面的配置对应-selector_conditionrule-选择器规则列表,规则列表中的配置对应-rule_condition
53-微服务技术栈(高级):微服务网关Soul(Soul网关接入与验证)
此章节将基于上一章节基础之上,引入Soul网关,至于Soul网关是干什么的,怎么做的,我们会在后续章节讲解,1-3章节侧重于搭建应用。本章节的Soul网关接入,如果你1,2章节都是和我保持一致,那么只需要直接启动Soul网关即可,但是对应的provider,consumer应用是需要额外的代码接入的。开发环境和第二章保持一致。1 提供者接入Soul1.1 pom<dependency> <groupId>org.dromara</groupId> <artifactId>soul-spring-boot-starter-client-alibaba-dubbo</artifactId> <version>2.2.0</version></dependency><dependency> <groupId>org.dromara</groupId> <artifactId>soul-spring-boot-starter-client-springmvc</artifactId> <version>2.2.0</version></dependency><dependency> <groupId>org.dromara</groupId> <artifactId>soul-client-springmvc</artifactId> <version>2.2.0</version></dependency>1.2 Controller与之前不同的是,这里我们会在Controller增加一个注解,@SoulSpringMvcClient,标注其注册成为一个SoulSpringMvcClient对象。这里有两种方式,一种是全局,一种是下面示例文件的局部,我会在 3.1.3 配置文件具体讲解二者实现上的差异性。其中@SoulSpringMvcClient(path = "/consumer/** "), ** 标识允许访问:consumer路径下全部,如果在当前Controller中,你只想部分暴露,那么更正为:删除Controller上的:SoulSpringMvcClient(path = "/consumer/** ")在对应需要暴露的接口上,加上全路径,如: @SoulSpringMvcClien(path = "/consumer/getUserById")package com.youzha.dubbo.Controller;import com.youzha.dubbo.dto.ResultDTO;import com.youzha.dubbo.entity.User;import com.youzha.dubbo.service.RemoteUserService;import lombok.extern.slf4j.Slf4j;import org.dromara.soul.client.springmvc.annotation.SoulSpringMvcClient;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.http.MediaType;import org.springframework.web.bind.annotation.*;/** * @author youzhaxiaobo * @version 1.0 * @date 2020/7/1 19:30 * @Desc */@Slf4j@RestController@RequestMapping("/consumer")@SoulSpringMvcClient(path = "/consumer/**")public class ConsumerController { @Autowired private RemoteUserService userService; @GetMapping(value = "/getUserById", produces = MediaType.APPLICATION_JSON_VALUE) public ResultDTO getUserById(@RequestParam("id") int id) { log.info("id=" + id); User user = userService.findById(id); log.info("消费者获取用户,信息为:{}", user); if(null == user) { return new ResultDTO(-1, "获取失败"); } return new ResultDTO(200, "获取成功", user); } @GetMapping(value = "/helloWorld", produces = MediaType.APPLICATION_JSON_VALUE) public ResultDTO helloWorld() { log.info("id=" + 1); User user = userService.findById(1); log.info("消费者获取用户,信息为:{}", user); if(null == user) { return new ResultDTO(-1, "获取失败"); } return new ResultDTO(200, "获取成功", user); }}1.3 配置文件soul: # 网关http配置 http: adminUrl: http://127.0.0.1:9093 contextPath: /consumer appName: consumer full: false port: 9092说明:http:标注这里协议是Http,同样还有(dubbo等)adminUrl:对应soul-admin启动的应用地址,端口contextPath:对应注册进Soul的路由前缀(即我们后续通过网关访问的限制名,多个应用应不同)appName:对应的应用名称,不配置的话,会默认取 dubbo配置中application 中的名称full:true则表示代理全部,全局允许访问,权限很大;false表示非全局,只访问有注解的地方(推荐)port:当前应用的启动端口,并非soul-admin或网关,需保持一直另外,如果你在 application.yaml(properties) 中配置了context-path,请删除,这个配置对于Soul而言是无感知的,即你的配置文件中不应出现下面的servlet配置:server: port: 9092 servlet: context-path: **如果你在这里配置的:full是false,那么对应3.1.2章节就无需调整,如果是true,直接3.1.2关于SoulPringMvcClient相关注解即可。2 消费者接入Soul消费者相关配置,和提供者是一致的,这里不再赘述,直接贴出相关代码。2.1 pom<dependency> <groupId>org.dromara</groupId> <artifactId>soul-spring-boot-starter-client-alibaba-dubbo</artifactId> <version>2.2.0</version></dependency><dependency> <groupId>org.dromara</groupId> <artifactId>soul-spring-boot-starter-client-springmvc</artifactId> <version>2.2.0</version></dependency><dependency> <groupId>org.dromara</groupId> <artifactId>soul-client-springmvc</artifactId> <version>2.2.0</version></dependency>2.2 Controllerpackage com.youzha.dubbo.controller;import com.youzha.dubbo.dto.ResultDTO;import lombok.extern.slf4j.Slf4j;import org.dromara.soul.client.springmvc.annotation.SoulSpringMvcClient;import org.springframework.http.MediaType;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;/** * @author youzhaxiaobo * @version 1.0 * @date 2020/7/4 0004 13:52 * @Desc */@Slf4j@RestController@RequestMapping("/provider")@SoulSpringMvcClient(path = "/provider/**")public class ProviderController { @GetMapping(value = "/hello", produces = MediaType.APPLICATION_JSON_VALUE) public ResultDTO getUserById(@RequestParam("id") int id) { log.info("id=" + id); if (id > 0) { return new ResultDTO(-1, "获取失败"); } return new ResultDTO(200, "获取成功"); }}2.3 配置文件soul: # 网关dubbo配置 dubbo: adminUrl: http://127.0.0.1:9093 contextPath: /provider-youzha appName: provider-youzha # 网关http配置 http: adminUrl: http://127.0.0.1:9093 contextPath: /provider appName: provider full: false port: 9091说明:这里同时暴露出去了dubbo,http两套协议,dubbo对应的就是我们提供在Service层的代码,其变更很小,如下,只是增加了一个注解:@SoulDubboClientdubbo,http提供的contextPath的路由前缀,需要保持不同,同时全局唯一(即不能和其他应用一致)3 验证前提:启动了zookeeper,启动了MySQL。3.1 启动soul-admin直接启动相关Application即可,登陆控制台查看:localhost:9093,用户名/密码:admin/123456。成功之后,查看插件,注意这里确保二者开启,且配置项和我吻合:其中:dividezk:{"register":"zookeeper://127.0.0.1:2181"}3.2 启动soul-boostrap直接启动相关Application即可。3.3 启动soul-provider直接启动相关Application即可,出现下面的日志则表示注册成功:3.4 启动soul-consumer直接启动相关Application即可,出现下面的日志则表示注册成功:注意:3.3.3,3.3.4启动完成之后,查看soul-admin的网页管理端,应该出现下图:3.5 验证此时直接通过自身IP访问消费者,地址:http://localhost:9092/consumer/getUserById?id=1,查看日志:消费者:提供者:通过网关访问消费者,地址:http://localhost:9094/consumer/consumer/getUserById?id=1,查看日志:消费者:提供者:直接通过自身IP访问提供者,地址:http://localhost:9091/provider/hello?id=-1通过网关访问提供者,地址:http://localhost:9094/provider/provider/hello?id=-1Chrome POST验证provider的dubbo接口fetch(new Request('http://localhost:9094/provider/provider-youzha/findById',{ method:'POST', headers: {'Content-Type': 'application/x-www-form-urlencoded'}, body:"id=1"})).then((resp)=>{console.log(resp)})注意:通过网关访问,其地址为boostrap对应的:ip+端口访问地址需要加上路由前缀,即配置文件中 soul 模块中的contextPath如果你的项目和soul不部署在同一服务器,请在你的配置文件中追加上该配置(以你实际部署机器IP为准),保证Soul能管理你的服务soul.http.host=168.10.54.1154 完整代码📎Dubbo_Soul.rar📎zookeeper.rar📎redis.rar📎Soul.rar