Java SPI 机制在 Flink 中的应用(源码分析)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 我们在使用 Flink SQL 的时候是否有过这样的疑问? Flink 提供了各种各样的 connector 我们只需要在 DML 里面定义即可运行,那它是怎么找到要执行的代码呢? 它是怎么知道代码对应关系的呢? 其实 Flink 是通过 Java 的 SPI(并不是Flink发明创造的) 机制来实现的,下面就来深入源码分析一下其实现过程.什么是 SPI ?

我们在使用 Flink SQL 的时候是否有过这样的疑问? Flink 提供了各种各样的 connector 我们只需要在 DML 里面定义即可运行,那它是怎么找到要执行的代码呢? 它是怎么知道代码对应关系的呢? 其实 Flink 是通过 Java 的 SPI(并不是Flink发明创造的) 机制来实现的,下面就来深入源码分析一下其实现过程.


什么是 SPI ?


SPI 全称(Service Provide Interface),在 JAVA 中是一个比较重要的概念,在框架设计中被广泛使用。在框架设计中,要遵循的原则是对扩展开放,对修改关闭,保证框架实现对于使用者来说是黑盒。因为框架不可能做好所有的事情,只能把共性的部分抽离出来进行流程化,然后留下一些扩展点让使用者去实现,这样不同的扩展就不用修改源代码或者对框架进行定制。也就是我们经常说的面向接口编程。


在 JDK6 里面引进的一个新的特性 ServiceLoader,从官方的文档来说,它主要是用来装载一系列的 service provider。而且ServiceLoader 可以通过 service provider 的配置文件来装载指定的 service provider。当服务的提供者,提供了服务接口的一种实现之后,我们只需要在 jar 包的 META-INF/services/ 目录里同时创建一个以服务接口命名的文件。该文件里就是实现该服务接口的具体实现类。而当外部程序装配这个模块的时候,就能通过该 jar 包 META-INF/services/ 里的配置文件找到具体的实现类名,并装载实例化,完成模块的注入。综上所述,SPI 机制实际上就是 "基于接口的编程+策略模式+配置文件" 组合实现的一种动态加载机制,在 JDK 中提供了工具类:java.util.ServiceLoader 来实现服务查找。


实现 SPI  机制,要遵循下面的一些规范:


服务提供者提供了接口的具体实现后,需要在资源文件夹中创建 META-INF/services 文件夹,并且新建一个以全类名为名字的文本文件,文件内容为实现类的全名(如下面图中的红框);


接口实现类必须在工程的 classpath 下,也就是 maven 中需要加入依赖或者 jar 包引用到工程里.


其实 Java 里使用 SPI 还是比较多的,比如我们常用的 JDBC 连接 Mysql 就是用的 SPI 机制来实现的连接逻辑,下面来看一个简单的 Demo.


接口服务提供


public interface Person {
    void eat();
}


实现类

public class Flink implements Person {
    @Override
    public void eat() {
        System.out.println(this.getClass().getSimpleName() + " 执行 eat 方法");
    }
}
public class JasonLee implements Person {
    @Override
    public void eat() {
        System.out.println(this.getClass().getSimpleName() + " 执行 eat 方法");
    }
}
public class Spark implements Person {
    @Override
    public void eat() {
        System.out.println(this.getClass().getSimpleName() + " 执行 eat 方法");
    }
}


META-INF/services 配置文件



image-20210506170559361

在 source 下面的  META-INF/services 文件夹下面创建 spi.Person 文件,文件的内容是上面实现类的全名(包名.类名)


测试类


public static void main(String[] args) {
    ServiceLoader<Person> load = ServiceLoader.load(Person.class);
    Iterator<Person> iterator = load.iterator();
    while (iterator.hasNext()) {
        Person next = iterator.next();
        next.eat();
    }
}


执行打印的结果是:


Flink 执行 eat 方法

JasonLee 执行 eat 方法

Spark 执行 eat 方法

可以看到 3 个实现类都被执行了.


然后来分析一下 ServiceLoader#load 方法的源码:


public static <S> ServiceLoader<S> load(Class<S> service) {
    ClassLoader cl = Thread.currentThread().getContextClassLoader();
    return ServiceLoader.load(service, cl);
}


首先会获取当前线程的类加载器,然后调用另一个重载的 load 方法.


public static <S> ServiceLoader<S> load(Class<S> service,
                                        ClassLoader loader)
{
    return new ServiceLoader<>(service, loader);
}


这个 load 方法会调用 ServiceLoader 的构造方法进行变量初始化.


private ServiceLoader(Class<S> svc, ClassLoader cl) {
    service = Objects.requireNonNull(svc, "Service interface cannot be null");
    loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
    acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
    reload();
}


构造方法里面主要是完成了对 service,loader,acc 变量的赋值工作,然后调用 reload 方法


public void reload() {
    providers.clear();
    lookupIterator = new LazyIterator(service, loader);
}


providers 其实是一个 LinkedHashMap 用来做缓存用,存储的是用读取到的 services 文件夹下面实现类的实例,所以上来先清空缓存中的数据.然后创建了 LazyIterator 实例,核心的逻辑在 hasNextService 和 nextService 这两个方法中.


private boolean hasNextService() {
    if (nextName != null) {
        return true;
    }
    if (configs == null) {
        try {
            // 获取实现类的全名称
            String fullName = PREFIX + service.getName();
            if (loader == null)
                configs = ClassLoader.getSystemResources(fullName);
            else
                configs = loader.getResources(fullName);
        } catch (IOException x) {
            fail(service, "Error locating configuration files", x);
        }
    }
    while ((pending == null) || !pending.hasNext()) {
        if (!configs.hasMoreElements()) {
            return false;
        }
        // 解析实现类
        pending = parse(service, configs.nextElement());
    }
    nextName = pending.next();
    return true;
}
private S nextService() {
            if (!hasNextService())
                throw new NoSuchElementException();
            String cn = nextName;
            nextName = null;
            Class<?> c = null;
            try {
                // 通过反射去创建对象
                c = Class.forName(cn, false, loader);
            } catch (ClassNotFoundException x) {
                fail(service,
                     "Provider " + cn + " not found");
            }
            if (!service.isAssignableFrom(c)) {
                fail(service,
                     "Provider " + cn  + " not a subtype");
            }
            try {
                // 对象的实例化
                S p = service.cast(c.newInstance());
                providers.put(cn, p);
                return p;
            } catch (Throwable x) {
                fail(service,
                     "Provider " + cn + " could not be instantiated",
                     x);
            }
            throw new Error();          // This cannot happen
        }


我们必须要在 source 下面创建 META-INF/services 文件夹吗 不放在这个位置难道就加载不到吗? 答案是肯定的,如果不创建确实加载不到,因为源码里面的 PREFIX = "META-INF/services/" 这个变量是写死的,所以我们必须创建这个文件夹.这里会遍历文件里面所有的实现类然后通过反射机制去创建对象.


你可能还会发现一个问题 load 方法一开始就获取了 Thread.currentThread().getContextClassLoader() 上下文的类加载器,然后一直往后面传递,最后在 forName 里面用到了,那如果不把 loader 传进来行不行? 答案是确实不行,因为 ServiceLoader 是一个基础类,它是在 java.util 这个包下面的,所以它是由 BootstrapClassLoader 来加载的.而我们自定义的实现类是由 AppClassLoader 去加载的,BootstrapClassLoader 这个类加载器是加载不到我们定义的类的,所以这里 getContextClassLoader 其实是打破了双亲委派模型的.


Flink 中 SPI 实现


在 Flink 源码中大量使用了 Java 的 SPI 机制,比如在 Flink-connector ,Flink-formats ,Flink-metrics 等模块都可以看到 SPI 的身影.比如 Json Format



image-20210506182553881

那么 Flink 是如何保证正确的 TableFactory 实现类被加载的呢?直接来看 TableFactoryService#findSingleInternal 方法的源码


private static <T extends TableFactory> T findSingleInternal(
        Class<T> factoryClass,
        Map<String, String> properties,
        Optional<ClassLoader> classLoader) {
    // 加载所有的实现类
    List<TableFactory> tableFactories = discoverFactories(classLoader);
    // 过滤出满足条件的
    List<T> filtered = filter(tableFactories, factoryClass, properties);
    if (filtered.size() > 1) {
        throw new AmbiguousTableFactoryException(
                filtered, factoryClass, tableFactories, properties);
    } else {
        return filtered.get(0);
    }
}


其中 discoverFactories 方法用来发现并加载 Table 的服务提供类,filter 方法则是用来筛选出满足条件的 TableFactory 的实现类。前者最终调用了 ServiceLoader 的相关方法,如下:


private static List<TableFactory> discoverFactories(Optional<ClassLoader> classLoader) {
    try {
        List<TableFactory> result = new LinkedList<>();
        ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader());
        ServiceLoader.load(TableFactory.class, cl).iterator().forEachRemaining(result::add);
        return result;
    } catch (ServiceConfigurationError e) {
        LOG.error("Could not load service provider for table factories.", e);
        throw new TableException("Could not load service provider for table factories.", e);
    }
}


可以看到 Java 的 SPI 机制就是在这里用的,查找并加载 TableFactory 所有的实现类,然后保存在 List 里面.然后在过滤中满足条件的一个.


总结:


SPI 机制的优缺点都非常明显,优点是实现解耦,使得接口的定义和具体业务实现分离,易于动态扩展,帮忙我们灵活的插件化开发.


缺点也很明显,不能按需加载,虽然 ServiceLoader 做了延迟加载,但是会把接口的实现类全部加载并实例化一遍,可能会造成浪费,获取某个实现类的方式比较单一,只能通过 iterator 形式获取,不能根据参数的形式获取.

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2天前
|
Java 数据安全/隐私保护
java中异常处理机制
java中异常处理机制
8 1
|
2天前
|
算法 安全 Java
深入探索Java中的并发编程:CAS机制的原理与应用
总之,CAS机制是一种用于并发编程的原子操作,它通过比较内存中的值和预期值来实现多线程下的数据同步和互斥,从而提供了高效的并发控制。它在Java中被广泛应用于实现线程安全的数据结构和算法。
17 0
|
3天前
|
传感器 机器人 Java
使用Java构建机器人应用
使用Java构建机器人应用
6 0
|
3天前
|
分布式计算 负载均衡 Java
构建高可用性Java应用:介绍分布式系统设计与开发
构建高可用性Java应用:介绍分布式系统设计与开发
8 0
|
3天前
|
设计模式 算法 Java
设计模式在Java开发中的应用
设计模式在Java开发中的应用
14 0
|
3天前
|
Java API 开发者
解密Java反射机制与动态代理
解密Java反射机制与动态代理
8 0
|
4天前
|
分布式计算 Java 大数据
Java语言主要应用领域
【5月更文挑战第7天】Java在嵌入式系统中以低至130KB的占用展现可靠性,实现“一次编写,到处运行”。在大数据领域,Java通过Hadoop、Hbase、Accumulo和ElasticSearch等工具发挥关键作用。Java也是Eclipse、IntelliJ IDEA和NetBeans等开发工具的基础。广泛应用于电商网站和金融服务器系统,即便在J2ME式微后,仍能在部分低端手机中找到其踪影。
15 4
|
4天前
|
算法 Java 机器人
Java在嵌入式领域的应用
【5月更文挑战第7天】Java广泛应用于消费产品(如智能电视、机顶盒、数码相机)、工业控制(PLC、DCS、FCS)、通信(交换机、路由器、基站)、智能仪器、机器人、计算机外部设备、军事电子及太空科学,涵盖从家用到高科技领域的嵌入式系统开发。
15 4
|
1天前
|
安全 Java
【JAVA进阶篇教学】第十篇:Java中线程安全、锁讲解
【JAVA进阶篇教学】第十篇:Java中线程安全、锁讲解
|
1天前
|
安全 Java
【JAVA进阶篇教学】第六篇:Java线程中状态
【JAVA进阶篇教学】第六篇:Java线程中状态