DataX教程(10)- DataX插件热插拔原理

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: DataX教程(10)- DataX插件热插拔原理

01 引言

通过前面的博文,我们对DataX有了一定的深入的解了:

本文主要讲的是DataX的插件加载原理,在了解DataX之前,我们需要了解关于“双亲委派机制”。

02 双亲委派机制

之前也写过相关的文章,可以参考:《深入理解JVM系列教程(11) - 类加载器》

2.1 类加载器关系

Java编译后的class字节码是通过类加载器去加载的,而在JVM里面,系统自带的类加载器有几种,先看看以下有关类加载器的关系图:

关于上图的几个类加载器,他们之间的关系:

  • 启动类加载器(BootStrap ClassLoader:由C++实现,没有父类。它负责将<JAVA_HOME>/lib路径下的核心类库或-Xbootclasspath参数指定的路径下的jar包加载到内存中,注意必由于虚拟机是按照文件名识别加载jar包的,如rt.jar
  • 扩展类加载器(Extension ClassLoader):由Java语言实现,没有父类加载器。它负责加载<JAVA_HOME>/lib/ext目录下或者由系统变量-Djava.ext.dir指定位路径中的类库
  • 系统类加载器(Application ClassLoader):由Java语言实现,父类加载器为Extension ClassLoader它负责加载系统类路径java -classpath-D java.class.path指定路径下的类库,也就是我们经常用到的classpath路径,开发者可以直接使用系统类加载器,一般情况下该类加载是程序中默认的类加载器,通过ClassLoader#getSystemClassLoader()方法可以获取到该类加载器
  • 自定义类加载器(Custom ClassLoader:父类加载器为Application ClassLoader

概念知道了,主要讲讲什么是双亲委派机制

2.2 双亲委派机制流程

流程描述:

  • 如果一个类加载器收到了类加载请求,它并不会自己先去加载,而是把这个请求委托给父类的加载器去执行;
  • 如果父类加载器还存在其父类加载器,则进一步向上委托,依次递归,请求最终将到达顶层的启动类加载器;
  • 如果父类加载器可以完成类加载任务,就成功返回,倘若父类加载器无法完成此加载任务,子加载器才会尝试自己去加载,这就是双亲委派模式;

简单的说就是:每个儿子都很懒,每次有活就丢给父亲去干,直到父亲说这件事我也干不了时,儿子自己想办法去完成

那么为何要这样做呢?

  • 好处是Java类随着它的类加载器一起具备了一种带有优先级的层次关系,通过这种层级关可以避免类的重复加载,当父亲已经加载了该类时,就没有必要子ClassLoader再加载一次。
  • 其次是考虑到安全因素,java核心api中定义类型不会被随意替换,假设通过网络传递一个名为java.lang.Integer的类,通过双亲委托模式传递到启动类加载器,而启动类加载器在核心Java API发现这个名字的类,发现该类已被加载,并不会重新加载网络传递的过来的java.lang.Integer,而直接返回已加载过的Integer.class,这样便可以防止核心API库被随意篡改。

双亲委派机制的缺点

  • 受到加载范围的限制,父类加载器无法加载到需要的文件,以Driver接口为例,由于Driver接口定义在jdk当中的,而其实现由各个数据库的服务商来提供,比如mysql的就写了MYSQL CONNECTOR,那么问题就来了,DriverManager(也由jdk提供)要加载各个实现了Driver接口的实现类,然后进行管理,但是DriverManager由启动类加载器加载,只能加载JAVA_HOMElib下文件,而其实现是由服务商提供的,由系统类加载器加载。

这个时候就需要破坏了双亲委派, 启动类加载器来委托子类加载器来加载Driver实现,这就是著名的SPI(SERVICE PROVIDER INTERFACE)机制。

2.3 基于SPI机制破坏双亲委派

原理:基于“接口的编程+策略模式+配置文件”组合实现的动态加载机制

没有SPI时:

  • 你可以现在classpath里加一个mysql-connector-java.jar
  • 然后这样写Class clz = Class.forName("com.mysql.jdbc.Driver"); Driver d = (Driver) clz.newInstance();这就没问题了;
  • 再用Application Classloader加载了mysql-connector-java.jarcom.mysql.jdbc.Driver

问题:硬编码了,一定要加载"com.mysql.jdbc.Driver",不是很优雅,不能实现“用接口编程,自动实例化真的实现“的这种编码形式。

使用SPI后:

  • 代码大致会这样:Connection connection = DriverManager.getConnection("jdbc:mysql://xxxxxx/xxx", "xxxx", "xxxxx");
  • DriverManager就根据"jdbc:mysql"这个提示去找具体实现去了。

ok,说到这里就要回归本文的主题了,关于DataX是如何实现插件加载的?

很遗憾的说,DataX并没能有是使用SPI去破坏双亲委派,而是使用了另外一种方式(插件热拔插原理加载类 =》通过配置文件获取插件类名和路径 =》实例化该插件UrlClassLoader =>将线程上下文加载器切换为UrlClassLoader并保存原来的线程上下文加载器 =》加载插件实现类 =》完成基于实现类的操作 =》恢复原来的线程上下文加载器),接下来讲讲。

03 DataX插件热插拔

JobContainer看看reader插件是怎么加载的,下面来看看加载reader插件的的代码方法:

private Reader.Job initJobReader(
        JobPluginCollector jobPluginCollector) {
    this.readerPluginName = this.configuration.getString(
            CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
    classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
            PluginType.READER, this.readerPluginName));
    Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
            PluginType.READER, this.readerPluginName);
    // 设置reader的jobConfig
    jobReader.setPluginJobConf(this.configuration.getConfiguration(
            CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
    // 设置reader的readerConfig
    jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
            CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));
    jobReader.setJobPluginCollector(jobPluginCollector);
    jobReader.init();
    classLoaderSwapper.restoreCurrentThreadClassLoader();
    return jobReader;
}

其实它的流程很简单,

  1. 读取job.json配置文件插件的名字;
  2. 使用LoadUtil根据插件类型+插件名获取自定义类加载器JarLoaderJarLoader继承自jdk里的URLClassLoader);
  3. ClassLoaderSwapper线程类加载器切换类会把上一步生成的JarLoader类加载器设置进当前的上下文类加载器(注意:保存前会保存原来的线程上下文加载器);
  4. 然后使用LoadUtil加载插件,然后插件进行一些初始化的操作;
  5. 最后使用ClassLoaderSwapper恢复原来的线程上下文加载器。

可以看到有两个核心的类,分别为:

  • ClassLoaderSwapper(线程类加载器管理类)
  • LoadUtil(插件加载工具类)

继续讲解这两个类。

3.1 ClassLoaderSwapper线程类加载器管理

先看看源码:

public final class ClassLoaderSwapper {
   private ClassLoader storeClassLoader = null;
   private ClassLoaderSwapper() {
   }
   public static ClassLoaderSwapper newCurrentThreadClassLoaderSwapper() {
       return new ClassLoaderSwapper();
   }
   /**
    * 保存当前classLoader,并将当前线程的classLoader设置为所给classLoader
    *
    * @param
    * @return
    */
   public ClassLoader setCurrentThreadClassLoader(ClassLoader classLoader) {
       this.storeClassLoader = Thread.currentThread().getContextClassLoader();
       Thread.currentThread().setContextClassLoader(classLoader);
       return this.storeClassLoader;
   }
   /**
    * 将当前线程的类加载器设置为保存的类加载
    * @return
    */
   public ClassLoader restoreCurrentThreadClassLoader() {
       ClassLoader classLoader = Thread.currentThread()
               .getContextClassLoader();
       Thread.currentThread().setContextClassLoader(this.storeClassLoader);
       return classLoader;
   }
}

根据源码,可以看到其功能主要是针对系统自带的类加载器自定义的类加载器的,主要做了 对这两种类加载器在当前线程进行“切换”与“保存” 的操作。

3.2 LoadUtil插件加载工具

LoadUtil加载的插件,根据插件的类型分为:

  • reader
  • writer
  • transformer(未实现)

LoadUtil加载的插件,根据运行的类型分为:

  • Job
  • Task

3.2.1 获取类加载器

先看看获取类加载器的方法:

public static synchronized JarLoader getJarLoader(PluginType pluginType,
                                                  String pluginName) {
    Configuration pluginConf = getPluginConf(pluginType, pluginName);
    JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
            pluginName));
    if (null == jarLoader) {
        String pluginPath = pluginConf.getString("path");
        if (StringUtils.isBlank(pluginPath)) {
            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR,
                    String.format(
                            "%s插件[%s]路径非法!",
                            pluginType, pluginName));
        }
        jarLoader = new JarLoader(new String[]{pluginPath});
        jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),
                jarLoader);
    }
    return jarLoader;
}

getJarLoader()方法主要就是根据插件的路径直接new了一个JarLoader,再进一步看看JarLoader的方法视图:

JarLoader其实就是基于jdk里面的URLClassLoader进行二次实现。

3.2.2 加载插件

贴上LoadUtil插件加载的代码:

public static AbstractJobPlugin loadJobPlugin(PluginType pluginType,
                                              String pluginName) {
    Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
            pluginType, pluginName, ContainerType.Job);
    try {
        AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz
                .newInstance();
        jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
        return jobPlugin;
    } catch (Exception e) {
        throw DataXException.asDataXException(
                FrameworkErrorCode.RUNTIME_ERROR,
                String.format("DataX找到plugin[%s]的Job配置.",
                        pluginName), e);
    }
}

看代码就很清晰了,就是通过clazz.newInstance()方式生成实例(策略者模式)。

到这里LoadUtil的代码基本讲解完了。

04 文末

本文是对DataX插件加载的原理分析,如有疑问的童鞋欢迎留言,谢谢大家的阅读,本文完!

目录
相关文章
|
6月前
|
存储 NoSQL 关系型数据库
阿里DataX极简教程
【5月更文挑战第1天】DataX是一个高效的数据同步工具,用于在各种数据源之间迁移数据,如MySQL到另一个MySQL或MongoDB。它的工作流程包括read、write和setting步骤,通过Framework协调多线程处理。其核心架构包括Job、Task和TaskGroup,支持并发执行。DataX支持多种数据源,如RDBMS、阿里云数仓、NoSQL和无结构化数据存储。例如,从MySQL读取数据并同步到ClickHouse的实践操作包括下载DataX、配置任务文件和执行同步任务。
720 1
阿里DataX极简教程
|
关系型数据库 MySQL 调度
DataX教程(05)- DataX Web项目实践
DataX教程(05)- DataX Web项目实践
1327 0
|
4月前
|
DataWorks NoSQL fastjson
DataWorks操作报错合集之DataX进行MongoDB全量迁移的过程中,DataX的MongoDB Reader插件在初始化阶段找不到Fastjson 2.x版本的类库,该怎么办
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
DataWorks Java 调度
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
69 0
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
|
6月前
|
消息中间件 SQL 分布式计算
DataX插件开发-KafkaWriter
DataX插件开发-KafkaWriter
207 0
|
调度 DataX 容器
DataX教程(07)- 图解DataX任务分配及执行流程
DataX教程(07)- 图解DataX任务分配及执行流程
564 0
DataX教程(07)- 图解DataX任务分配及执行流程
|
数据采集 分布式计算 调度
DataX教程(03)- 源码解读(超详细版)
DataX教程(03)- 源码解读(超详细版)
831 0
|
监控 DataX
DataX教程(09)- DataX是如何做到限速的?
DataX教程(09)- DataX是如何做到限速的?
355 0
|
监控 调度 DataX
DataX教程(08)- 监控与汇报
DataX教程(08)- 监控与汇报
505 0
|
JSON Java DataX
DataX教程(04)- 配置完整解读
DataX教程(04)- 配置完整解读
2365 0

热门文章

最新文章