Java SPI 机制在 Flink 中的应用(源码分析)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 我们在使用 Flink SQL 的时候是否有过这样的疑问? Flink 提供了各种各样的 connector 我们只需要在 DML 里面定义即可运行,那它是怎么找到要执行的代码呢? 它是怎么知道代码对应关系的呢? 其实 Flink 是通过 Java 的 SPI(并不是Flink发明创造的) 机制来实现的,下面就来深入源码分析一下其实现过程.什么是 SPI ?

我们在使用 Flink SQL 的时候是否有过这样的疑问? Flink 提供了各种各样的 connector 我们只需要在 DML 里面定义即可运行,那它是怎么找到要执行的代码呢? 它是怎么知道代码对应关系的呢? 其实 Flink 是通过 Java 的 SPI(并不是Flink发明创造的) 机制来实现的,下面就来深入源码分析一下其实现过程.


什么是 SPI ?


SPI 全称(Service Provide Interface),在 JAVA 中是一个比较重要的概念,在框架设计中被广泛使用。在框架设计中,要遵循的原则是对扩展开放,对修改关闭,保证框架实现对于使用者来说是黑盒。因为框架不可能做好所有的事情,只能把共性的部分抽离出来进行流程化,然后留下一些扩展点让使用者去实现,这样不同的扩展就不用修改源代码或者对框架进行定制。也就是我们经常说的面向接口编程。


在 JDK6 里面引进的一个新的特性 ServiceLoader,从官方的文档来说,它主要是用来装载一系列的 service provider。而且ServiceLoader 可以通过 service provider 的配置文件来装载指定的 service provider。当服务的提供者,提供了服务接口的一种实现之后,我们只需要在 jar 包的 META-INF/services/ 目录里同时创建一个以服务接口命名的文件。该文件里就是实现该服务接口的具体实现类。而当外部程序装配这个模块的时候,就能通过该 jar 包 META-INF/services/ 里的配置文件找到具体的实现类名,并装载实例化,完成模块的注入。综上所述,SPI 机制实际上就是 "基于接口的编程+策略模式+配置文件" 组合实现的一种动态加载机制,在 JDK 中提供了工具类:java.util.ServiceLoader 来实现服务查找。


实现 SPI  机制,要遵循下面的一些规范:


服务提供者提供了接口的具体实现后,需要在资源文件夹中创建 META-INF/services 文件夹,并且新建一个以全类名为名字的文本文件,文件内容为实现类的全名(如下面图中的红框);


接口实现类必须在工程的 classpath 下,也就是 maven 中需要加入依赖或者 jar 包引用到工程里.


其实 Java 里使用 SPI 还是比较多的,比如我们常用的 JDBC 连接 Mysql 就是用的 SPI 机制来实现的连接逻辑,下面来看一个简单的 Demo.


接口服务提供


public interface Person {
    void eat();
}


实现类

public class Flink implements Person {
    @Override
    public void eat() {
        System.out.println(this.getClass().getSimpleName() + " 执行 eat 方法");
    }
}
public class JasonLee implements Person {
    @Override
    public void eat() {
        System.out.println(this.getClass().getSimpleName() + " 执行 eat 方法");
    }
}
public class Spark implements Person {
    @Override
    public void eat() {
        System.out.println(this.getClass().getSimpleName() + " 执行 eat 方法");
    }
}


META-INF/services 配置文件



image-20210506170559361

在 source 下面的  META-INF/services 文件夹下面创建 spi.Person 文件,文件的内容是上面实现类的全名(包名.类名)


测试类


public static void main(String[] args) {
    ServiceLoader<Person> load = ServiceLoader.load(Person.class);
    Iterator<Person> iterator = load.iterator();
    while (iterator.hasNext()) {
        Person next = iterator.next();
        next.eat();
    }
}


执行打印的结果是:


Flink 执行 eat 方法

JasonLee 执行 eat 方法

Spark 执行 eat 方法

可以看到 3 个实现类都被执行了.


然后来分析一下 ServiceLoader#load 方法的源码:


public static <S> ServiceLoader<S> load(Class<S> service) {
    ClassLoader cl = Thread.currentThread().getContextClassLoader();
    return ServiceLoader.load(service, cl);
}


首先会获取当前线程的类加载器,然后调用另一个重载的 load 方法.


public static <S> ServiceLoader<S> load(Class<S> service,
                                        ClassLoader loader)
{
    return new ServiceLoader<>(service, loader);
}


这个 load 方法会调用 ServiceLoader 的构造方法进行变量初始化.


private ServiceLoader(Class<S> svc, ClassLoader cl) {
    service = Objects.requireNonNull(svc, "Service interface cannot be null");
    loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
    acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
    reload();
}


构造方法里面主要是完成了对 service,loader,acc 变量的赋值工作,然后调用 reload 方法


public void reload() {
    providers.clear();
    lookupIterator = new LazyIterator(service, loader);
}


providers 其实是一个 LinkedHashMap 用来做缓存用,存储的是用读取到的 services 文件夹下面实现类的实例,所以上来先清空缓存中的数据.然后创建了 LazyIterator 实例,核心的逻辑在 hasNextService 和 nextService 这两个方法中.


private boolean hasNextService() {
    if (nextName != null) {
        return true;
    }
    if (configs == null) {
        try {
            // 获取实现类的全名称
            String fullName = PREFIX + service.getName();
            if (loader == null)
                configs = ClassLoader.getSystemResources(fullName);
            else
                configs = loader.getResources(fullName);
        } catch (IOException x) {
            fail(service, "Error locating configuration files", x);
        }
    }
    while ((pending == null) || !pending.hasNext()) {
        if (!configs.hasMoreElements()) {
            return false;
        }
        // 解析实现类
        pending = parse(service, configs.nextElement());
    }
    nextName = pending.next();
    return true;
}
private S nextService() {
            if (!hasNextService())
                throw new NoSuchElementException();
            String cn = nextName;
            nextName = null;
            Class<?> c = null;
            try {
                // 通过反射去创建对象
                c = Class.forName(cn, false, loader);
            } catch (ClassNotFoundException x) {
                fail(service,
                     "Provider " + cn + " not found");
            }
            if (!service.isAssignableFrom(c)) {
                fail(service,
                     "Provider " + cn  + " not a subtype");
            }
            try {
                // 对象的实例化
                S p = service.cast(c.newInstance());
                providers.put(cn, p);
                return p;
            } catch (Throwable x) {
                fail(service,
                     "Provider " + cn + " could not be instantiated",
                     x);
            }
            throw new Error();          // This cannot happen
        }


我们必须要在 source 下面创建 META-INF/services 文件夹吗 不放在这个位置难道就加载不到吗? 答案是肯定的,如果不创建确实加载不到,因为源码里面的 PREFIX = "META-INF/services/" 这个变量是写死的,所以我们必须创建这个文件夹.这里会遍历文件里面所有的实现类然后通过反射机制去创建对象.


你可能还会发现一个问题 load 方法一开始就获取了 Thread.currentThread().getContextClassLoader() 上下文的类加载器,然后一直往后面传递,最后在 forName 里面用到了,那如果不把 loader 传进来行不行? 答案是确实不行,因为 ServiceLoader 是一个基础类,它是在 java.util 这个包下面的,所以它是由 BootstrapClassLoader 来加载的.而我们自定义的实现类是由 AppClassLoader 去加载的,BootstrapClassLoader 这个类加载器是加载不到我们定义的类的,所以这里 getContextClassLoader 其实是打破了双亲委派模型的.


Flink 中 SPI 实现


在 Flink 源码中大量使用了 Java 的 SPI 机制,比如在 Flink-connector ,Flink-formats ,Flink-metrics 等模块都可以看到 SPI 的身影.比如 Json Format



image-20210506182553881

那么 Flink 是如何保证正确的 TableFactory 实现类被加载的呢?直接来看 TableFactoryService#findSingleInternal 方法的源码


private static <T extends TableFactory> T findSingleInternal(
        Class<T> factoryClass,
        Map<String, String> properties,
        Optional<ClassLoader> classLoader) {
    // 加载所有的实现类
    List<TableFactory> tableFactories = discoverFactories(classLoader);
    // 过滤出满足条件的
    List<T> filtered = filter(tableFactories, factoryClass, properties);
    if (filtered.size() > 1) {
        throw new AmbiguousTableFactoryException(
                filtered, factoryClass, tableFactories, properties);
    } else {
        return filtered.get(0);
    }
}


其中 discoverFactories 方法用来发现并加载 Table 的服务提供类,filter 方法则是用来筛选出满足条件的 TableFactory 的实现类。前者最终调用了 ServiceLoader 的相关方法,如下:


private static List<TableFactory> discoverFactories(Optional<ClassLoader> classLoader) {
    try {
        List<TableFactory> result = new LinkedList<>();
        ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader());
        ServiceLoader.load(TableFactory.class, cl).iterator().forEachRemaining(result::add);
        return result;
    } catch (ServiceConfigurationError e) {
        LOG.error("Could not load service provider for table factories.", e);
        throw new TableException("Could not load service provider for table factories.", e);
    }
}


可以看到 Java 的 SPI 机制就是在这里用的,查找并加载 TableFactory 所有的实现类,然后保存在 List 里面.然后在过滤中满足条件的一个.


总结:


SPI 机制的优缺点都非常明显,优点是实现解耦,使得接口的定义和具体业务实现分离,易于动态扩展,帮忙我们灵活的插件化开发.


缺点也很明显,不能按需加载,虽然 ServiceLoader 做了延迟加载,但是会把接口的实现类全部加载并实例化一遍,可能会造成浪费,获取某个实现类的方式比较单一,只能通过 iterator 形式获取,不能根据参数的形式获取.

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
人工智能 安全 Java
Java和Python在企业中的应用情况
Java和Python在企业中的应用情况
52 7
|
21小时前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
108 2
探索Flink动态CEP:杭州银行的实战案例
|
2天前
|
安全 算法 Java
Java CAS原理和应用场景大揭秘:你掌握了吗?
CAS(Compare and Swap)是一种乐观锁机制,通过硬件指令实现原子操作,确保多线程环境下对共享变量的安全访问。它避免了传统互斥锁的性能开销和线程阻塞问题。CAS操作包含三个步骤:获取期望值、比较当前值与期望值是否相等、若相等则更新为新值。CAS广泛应用于高并发场景,如数据库事务、分布式锁、无锁数据结构等,但需注意ABA问题。Java中常用`java.util.concurrent.atomic`包下的类支持CAS操作。
18 2
|
25天前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
100 6
|
23天前
|
监控 Java 数据库连接
Java线程管理:守护线程与用户线程的区分与应用
在Java多线程编程中,线程可以分为守护线程(Daemon Thread)和用户线程(User Thread)。这两种线程在行为和用途上有着明显的区别,了解它们的差异对于编写高效、稳定的并发程序至关重要。
29 2
|
29天前
|
关系型数据库 MySQL Java
MySQL索引优化与Java应用实践
【11月更文挑战第25天】在大数据量和高并发的业务场景下,MySQL数据库的索引优化是提升查询性能的关键。本文将深入探讨MySQL索引的多种类型、优化策略及其在Java应用中的实践,通过历史背景、业务场景、底层原理的介绍,并结合Java示例代码,帮助Java架构师更好地理解并应用这些技术。
31 2
|
SQL 存储 Java
Java 应用与数据库的关系| 学习笔记
快速学习 Java 应用与数据库的关系。
207 0
Java 应用与数据库的关系| 学习笔记
|
SQL 存储 Java
Java 应用与数据库的关系| 学习笔记
快速学习 Java 应用与数据库的关系。
198 0
Java 应用与数据库的关系| 学习笔记
|
SQL 存储 关系型数据库
Java应用与数据库的关系|学习笔记
快速学习Java应用与数据库的关系
Java应用与数据库的关系|学习笔记
|
1天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。