探索 Java ServiceLoader、Dubbo ExtensionLoader:构建灵活可扩展的应用程序的利器(下)

简介: 探索 Java ServiceLoader、Dubbo ExtensionLoader:构建灵活可扩展的应用程序的利器(下)

LazyIterator#hasNextService 方法

private boolean hasNextService() {
    if (nextName != null) {
        return true;
    }
    if (configs == null) {
        try {
          // META-INF/services/ + 接口全限定类名,即文件服务类的文件
            String fullName = PREFIX + service.getName();
            if (loader == null)
                configs = ClassLoader.getSystemResources(fullName);
            else
              // 将文件路径转成 URL 对象
                configs = loader.getResources(fullName);
        } catch (IOException x) {
            fail(service, "Error locating configuration files", x);
        }
    }
    while ((pending == null) || !pending.hasNext()) {
        if (!configs.hasMoreElements()) {
            return false;
        }
        // 解析 URL 文件对象,读取内容,最后返回
        pending = parse(service, configs.nextElement());
    }
    nextName = pending.next();
    return true;
}

LazyIterator#nextService 方法

通过反射的方式创建实现类的实例对象并返回

private S nextService() {
    if (!hasNextService()) throw new NoSuchElementException();
    String cn = nextName;
    nextName = null;
    Class<?> c = null;
    try {
      // 创建实现类的 Class
        c = Class.forName(cn, false, loader);
    } catch (ClassNotFoundException x) {
        fail(service, "Provider " + cn + " not found");
    }
    if (!service.isAssignableFrom(c)) {
        fail(service, "Provider " + cn  + " not a subtype");
    }
    try {
      // 通过 newInstance 方法实例化
        S p = service.cast(c.newInstance());
        providers.put(cn, p);
        return p;
    } catch (Throwable x) {
        fail(service, "Provider " + cn + " could not be instantiated", x);
    }
    throw new Error();          // This cannot happen
}

Dubbo SPI

Dubbo 是一个基于Java的开源分布式服务框架,它提供了一种扩展机制,称为 Dubbo SPI(Service Provider Interface)。Dubbo SPI 机制是在 Java SPI 机制的基础上进行了扩展和优化

Dubbo SPI 机制的实现主要依赖于以下几个关键组件:

  1. 接口定义:Dubbo 服务接口定义了一组可供扩展的接口方法,例如 Protocol、Registry、Filter 等
  2. 扩展点注解:Dubbo 通过使用 @SPI 注解标记接口,并在注解中指定默认的扩展实现。该注解使得 Dubbo 能够根据配置文件中的扩展实现名称加载具体的实现类
  3. 配置文件:Dubbo 使用配置文件来定义每个扩展点的具体实现类。配置文件通常位于类路径下的 META-INF/dubbo/ 目录中,文件名与扩展接口的全限定名相对应
  4. 扩展加载器:Dubbo 提供了 ExtensionLoader 类作为扩展加载器,负责加载和管理扩展实现类。ExtensionLoader 基于 Java SPI 机制,通过读取配置文件、实例化扩展实现类并进行缓存,实现了对扩展的动态加载
  5. 扩展适配器:对于某些扩展点,Dubbo 还提供了适配器来简化使用。适配器类实现了扩展接口,并在内部封装了实际的扩展实现,提供了一些默认的实现逻辑

通过上述组件的协作,Dubbo SPI 机制实现了对扩展的自动加载和管理。应用程序可以通过配置文件指定要使用的扩展实现,Dubbo 会根据配置加载相应的实现类。如果没有指定配置,则会使用接口上 @SPI 注解中指定的默认实现

总结来说,Dubbo SPI 机制基于 Java SPI 机制进行了扩展和优化,通过配置文件和扩展加载器实现了对扩展的自动加载和管理。这使得 Dubbo 可以非常方便地扩展和替换各种核心组件,以满足不同的业务需求

加载策略

其次,在 Dubbo 提供了三种策略方便我们去自定义加载扩展接口的方式,如下三种:

  1. DubboInternalLoadingStrategy:支持 Dubbo 内部接口实现类加载的方式
  2. DubboLoadingStrategy:提供给客户端自定义一些实现类加载的方式,例如一些过滤器等
  3. ServicesLoadingStrategy:Java 内自带的提供的一些接口实现类,如上面的 Java SPI 篇章所描述的内容
  4. 自定义策略类,用于加载我们指定目录下的文件,提供了这样的入口给到了我们

优先级: DubboInternalLoadingStrategy > DubboLoadingStrategy > ServicesLoadingStrategy

Filter

Dubbo 服务接口定义了一组可供扩展的接口方法,例如 Protocol、Registry、Filter 等,在这里我们以 Filter 为例作为入口开始分析,在 Dubbo 底层是如何通过 ExtensionLoader 扩展加载器一步步实现的

在应用程序调用时, Filter 过滤器肯定是不至一个的,包括 Dubbo 内置的、开发人员所定义的,所以需要组装过滤器链

Dubbo 内置过滤器,主要有:

1、GenericFilter:来拦截并实现泛化调用的功能

2、EchoFilter:在服务提供者接收到请求后,直接将请求内容作为响应返回给服务消费者,用于测试网络连接和服务可用性;一般用于诊断、测试,不适用于实际生产环境中。在实际部署中,建议将其从配置中移除或禁用,以避免不必要的性能开销

3、TokenFilter:服务之间调用的令牌验证,以 token 作为 name 拼接在 URL 后的方式

4、TpsLimitFilter:在服务提供者接收到请求时,通过配置的阈值限制每秒可以处理的请求数量。当请求数量超过设定的阈值时,该过滤器会拒绝处理额外的请求,以保护服务提供者免受过载的影响;要是没有配置这个阈值,该过滤器就不会进行处理了

接着继续来分析 Dubbo 是如何组装拦截器链的,入口:ProtocolFilterWrapper#buildInvokerChain

ExtensionLoader

ExtensionLoader 扩展类加载器, Dubbo SPI 提供扩展方式最重要的类,在它里面帮我们完成了很多公共处理工作

ExtensionLoader#getExtensionLoader

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    if (type == null) {
        throw new IllegalArgumentException("Extension type == null");
    }
    if (!type.isInterface()) {
        throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
    }
    if (!withExtensionAnnotation(type)) {
        throw new IllegalArgumentException("Extension type (" + type +
                ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
    }
    // 获取本地缓存是否已存在【扩展加载器】
    ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    if (loader == null) {
      // 实例化扩展类加载器
        EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
        loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    }
    return loader;
}

首次从本地缓存中取,肯定是没有的,所以每次都会先创建好一个 ExtensionLoader 对象实例,如下:

private ExtensionLoader(Class<?> type) {
    this.type = type;
    // ExtensionFactory 默认实现 AdaptiveExtensionFactory
    objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}

提到了创建 SPI 接口扩展的实现类,那么工厂就必然而然不可少了,在 Dubbo 中 ExtensionFactory 接口的默认实现类是 AdaptiveExtensionFactory > 创建和管理扩展实例

AdaptiveExtensionFactory 它是一个自适应扩展工厂,根据当前的上下文环境和配置动态地选择适合的扩展实例。AdaptiveExtensionFactory 通过 Dubbo 框架编译时生成代码的方式,在运行时实现了扩展实例的动态适配

ExtensionLoader#getAdaptiveExtension

创建好自适应工厂以后,接下来就是创建具体的扩展实例对象了,也就是调用 getAdaptiveExtension 方法,该方法源码如下:

public T getAdaptiveExtension() {
    Object instance = cachedAdaptiveInstance.get();
    if (instance == null) {
        if (createAdaptiveInstanceError != null) {
            throw new IllegalStateException("Failed to create adaptive instance: " +
                    createAdaptiveInstanceError.toString(),
                    createAdaptiveInstanceError);
        }
        // 双重检查锁,先两次从本地缓存中获取是否可以拿到
        synchronized (cachedAdaptiveInstance) {
            instance = cachedAdaptiveInstance.get();
            if (instance == null) {
                try {
                    // 创建扩展类实例
                    instance = createAdaptiveExtension();
                    cachedAdaptiveInstance.set(instance);
                } catch (Throwable t) {
                    createAdaptiveInstanceError = t;
                    throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
                }
            }
        }
    }
    return (T) instance;
}

前期先从缓存读取扩展实例对象,若缓存中不存在,那么就调用 createAdaptiveExtension 方法创建新的实例,源码如下:

private T createAdaptiveExtension() {
    try {
        return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
        throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
    }
}
private Class<?> getAdaptiveExtensionClass() {
    getExtensionClasses();
    if (cachedAdaptiveClass != null) {
        return cachedAdaptiveClass;
    }
    return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

由于 injectExtension 方法是外部的,它依赖于 getAdaptiveExtensionClass 方法返回的结果,所以先分析此方法

ExtensionLoader#loadExtensionClasses

private Map<String, Class<?>> getExtensionClasses() {
    Map<String, Class<?>> classes = cachedClasses.get();
    if (classes == null) {
        synchronized (cachedClasses) {
            classes = cachedClasses.get();
            if (classes == null) {
                classes = loadExtensionClasses();
                cachedClasses.set(classes);
            }
        }
    }
    return classes;
}

又是一个双重检查锁,优先从缓存中读取,获取实例对象,核心方法执行在 loadExtensionClasses

private Map<String, Class<?>> loadExtensionClasses() {
   // 观察此类型是否被 @SPI 注解所修饰,若有的话,只能给 name 属性指定一个值,多于一个会抛出中端异常,不会再往下执行
   cacheDefaultExtensionName();
   Map<String, Class<?>> extensionClasses = new HashMap<>();
   // 通过 Java SPI 获取 LoadingStrategy 接口下所有的实现类
   for (LoadingStrategy strategy : strategies) {
       loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
       loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
   }
   return extensionClasses;
}

通过 Java SPI 获取 LoadingStrategy 接口下所有的实现类,也就是在上面分析到的加载策略,分别为 DubboInternalLoadingStrategy、DubboLoadingStrategy、ServicesLoadingStrategy

DubboInternalLoadingStrategy > 处理的目录:META-INF/dubbo/internal/

DubboLoadingStrategy > 处理的目录:META-INF/dubbo/

ServicesLoadingStrategy > 处理的目录:META-INF/services/

它们对应的扩展实现类统一在 loadDirectory 方法中进行加载,由于该方法源码流程过于长,画图如下:

主要的核心处理方法有这几个 loadDirectory > loadResource > loadClass

loadDirectory:通过不同的扩展类加载策略来拼接文件名称以及资源路径

loadResource:通过资源路径来调用 Class.forName 实例化扩展实现类,同时设置好对应的扩展类名称;若是 Dubbo 内置或自定义的,都直接取用 = 截断字段的左侧全称即可;若不是,那么就取用扩展实现类的类名作为名称

loadClass:通过扩展实现类的不同功能,来对当前所在 ExtensionLoader 对象填充属性,在这里最重要的是这个 cachedActivates 激活缓存集合,只有标注了 @Activate 注解的扩展实现类才会被添加进,在实际开发中也是通过这种方式来引入自定义扩展类的~

Dubbo SPI 实战篇

当我们在实际工作中会用到 Dubbo SPI 机制,去作自定义的一些扩展工作,比如:服务之间相互调用时,我想让上层服务已经认证好的用户信息 > Token,通过无代码侵入的方式传入到下层服务,通过 Header 方式传递,而在 Dubbo 内置的 TokenFilter 它是以 URL 拼接方式传入的,这对系统来说是不安全的

创建好我们自定义的过滤器类,如下:

@Slf4j
@Activate(group = CommonConstants.CONSUMER)
public class ConsumerAuthTokenFilter implements Filter {
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        log.info("methodName: {},  arguments: {}, attachments: {}", invocation.getMethodName(),  invocation.getArguments(), invocation.getAttachments());
        // 从线程本地持有中获取 > ThreadLocal
        String token = ThreadContextHolder.getToken();
        if (StringUtils.isNotEmpty(token)) {
            log.info("dubbo consumer filter");
            // Rpc 上下文传递 AutoToken 参数
            RpcContext.getContext().setAttachment(Constant.AUTH_TOKEN, TokenContextHolder.getToken());
        }
        return invoker.invoke(invocation);
    }
}

自定义扩展类准备好了,接着作以下事情

1、首先要在 resource 目录下创建好 META-INF/dubbo 目录

2、在该目录下新建文件,上述问题,我们通过过滤器来解决,所以文件名称为 org.apache.dubbo.rpc.Filter

3、文件内容:authTokenFilter=org.vnjohn.filter.ConsumerAuthTokenFilter

那么,Dubbo 是通过什么方式去找到这些自定义扩展类的呢?

在 Dubbo SPI > Filter 处,已经组装好过滤器链了,当触发 RPC 服务调用时,会先经过这些过滤器,从而就会调用 ConsumerAuthTokenFilter#invoke 方法,将 AuthToken 进行服务上下文传递!

总结

该篇博文介绍 Java SPI 服务提供接口,是如何基于 ServiceLoader 核心类实现的,以经典的 SQL 驱动扩展类作为案例,揭开底层源码的加载逻辑,从这方面我们可以来做一些自己的扩展工作;同时,还仔细阐述了在 Dubbo 是如何基于原有的 SPI,作一些自己的扩展和优化的,它还支持我们自定义扩展实现其他的加载策略,只要你需要!以过滤器链为序幕,揭开 Dubbo 底层是如何基于 ExtensionLoader 作扩展实现的;最后,以 Dubbo Rpc 服务之间传递 Token 的实战案例,作为这篇博文最后的结尾,其他不足之处尽请见谅,有问题可以留言或私信喔~

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

推荐专栏:Spring、MySQL,订阅一波不再迷路

大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!


目录
相关文章
|
1月前
|
存储 监控 安全
单位网络监控软件:Java 技术驱动的高效网络监管体系构建
在数字化办公时代,构建基于Java技术的单位网络监控软件至关重要。该软件能精准监管单位网络活动,保障信息安全,提升工作效率。通过网络流量监测、访问控制及连接状态监控等模块,实现高效网络监管,确保网络稳定、安全、高效运行。
62 11
|
2月前
|
人工智能 安全 Java
Java和Python在企业中的应用情况
Java和Python在企业中的应用情况
83 7
|
26天前
|
安全 算法 Java
Java CAS原理和应用场景大揭秘:你掌握了吗?
CAS(Compare and Swap)是一种乐观锁机制,通过硬件指令实现原子操作,确保多线程环境下对共享变量的安全访问。它避免了传统互斥锁的性能开销和线程阻塞问题。CAS操作包含三个步骤:获取期望值、比较当前值与期望值是否相等、若相等则更新为新值。CAS广泛应用于高并发场景,如数据库事务、分布式锁、无锁数据结构等,但需注意ABA问题。Java中常用`java.util.concurrent.atomic`包下的类支持CAS操作。
62 2
|
2月前
|
XML Java 测试技术
从零开始学 Maven:简化 Java 项目的构建与管理
Maven 是一个由 Apache 软件基金会开发的项目管理和构建自动化工具。它主要用在 Java 项目中,但也可以用于其他类型的项目。
77 1
从零开始学 Maven:简化 Java 项目的构建与管理
|
2月前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
231 6
|
1月前
|
监控 Java 数据库连接
Java线程管理:守护线程与用户线程的区分与应用
在Java多线程编程中,线程可以分为守护线程(Daemon Thread)和用户线程(User Thread)。这两种线程在行为和用途上有着明显的区别,了解它们的差异对于编写高效、稳定的并发程序至关重要。
45 2
|
2月前
|
关系型数据库 MySQL Java
MySQL索引优化与Java应用实践
【11月更文挑战第25天】在大数据量和高并发的业务场景下,MySQL数据库的索引优化是提升查询性能的关键。本文将深入探讨MySQL索引的多种类型、优化策略及其在Java应用中的实践,通过历史背景、业务场景、底层原理的介绍,并结合Java示例代码,帮助Java架构师更好地理解并应用这些技术。
78 2
|
2月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
2月前
|
Java 测试技术 API
Java 反射机制:深入解析与应用实践
《Java反射机制:深入解析与应用实践》全面解析Java反射API,探讨其内部运作原理、应用场景及最佳实践,帮助开发者掌握利用反射增强程序灵活性与可扩展性的技巧。
153 4
|
13天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
65 17