01 引言
通过前面的博文,我们对DataX
有了一定的深入的解了:
- 《DataX教程(01)- 入门》
- 《DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)》
- 《DataX教程(03)- 源码解读(超详细版)
- 《DataX教程(04)- 配置完整解读》
- 《DataX教程(05)- DataX Web项目实践》
- 《DataX教程(06)- DataX调优》
- 《DataX教程(07)- 图解DataX任务分配及执行流程》
- 《DataX教程(08)- 监控与汇报》
- 《DataX教程(09)- DataX是如何做到限速的?》
本文主要讲的是DataX
的插件加载原理,在了解DataX之前,我们需要了解关于“双亲委派机制”。
02 双亲委派机制
之前也写过相关的文章,可以参考:《深入理解JVM系列教程(11) - 类加载器》
2.1 类加载器关系
Java
编译后的class
字节码是通过类加载器去加载的,而在JVM
里面,系统自带的类加载器有几种,先看看以下有关类加载器的关系图:
关于上图的几个类加载器,他们之间的关系:
- 启动类加载器(
BootStrap ClassLoader
):由C++实现,没有父类。它负责将<JAVA_HOME>/lib
路径下的核心类库或-Xbootclasspath
参数指定的路径下的jar
包加载到内存中,注意必由于虚拟机是按照文件名识别加载jar
包的,如rt.jar
; - 扩展类加载器(
Extension ClassLoader
):由Java语言实现,没有父类加载器。它负责加载<JAVA_HOME>/lib/ext
目录下或者由系统变量-Djava.ext.dir
指定位路径中的类库; - 系统类加载器(
Application ClassLoader
):由Java
语言实现,父类加载器为Extension ClassLoader
。它负责加载系统类路径java -classpath
或-D java.class.path
指定路径下的类库,也就是我们经常用到的classpath
路径,开发者可以直接使用系统类加载器,一般情况下该类加载是程序中默认的类加载器,通过ClassLoader#getSystemClassLoader()
方法可以获取到该类加载器。 - 自定义类加载器(
Custom ClassLoader
):父类加载器为Application ClassLoader
。
概念知道了,主要讲讲什么是双亲委派机制?
2.2 双亲委派机制流程
流程描述:
- 如果一个类加载器收到了类加载请求,它并不会自己先去加载,而是把这个请求委托给父类的加载器去执行;
- 如果父类加载器还存在其父类加载器,则进一步向上委托,依次递归,请求最终将到达顶层的启动类加载器;
- 如果父类加载器可以完成类加载任务,就成功返回,倘若父类加载器无法完成此加载任务,子加载器才会尝试自己去加载,这就是双亲委派模式;
简单的说就是:每个儿子都很懒,每次有活就丢给父亲去干,直到父亲说这件事我也干不了时,儿子自己想办法去完成。
那么为何要这样做呢?
- 好处是
Java
类随着它的类加载器一起具备了一种带有优先级的层次关系,通过这种层级关可以避免类的重复加载,当父亲已经加载了该类时,就没有必要子ClassLoader
再加载一次。 - 其次是考虑到安全因素,
java
核心api
中定义类型不会被随意替换,假设通过网络传递一个名为java.lang.Integer
的类,通过双亲委托模式传递到启动类加载器,而启动类加载器在核心Java API
发现这个名字的类,发现该类已被加载,并不会重新加载网络传递的过来的java.lang.Integer
,而直接返回已加载过的Integer.class
,这样便可以防止核心API
库被随意篡改。
双亲委派机制的缺点:
- 受到加载范围的限制,父类加载器无法加载到需要的文件,以
Driver
接口为例,由于Driver
接口定义在jdk
当中的,而其实现由各个数据库的服务商来提供,比如mysql
的就写了MYSQL CONNECTOR
,那么问题就来了,DriverManager
(也由jdk
提供)要加载各个实现了Driver
接口的实现类,然后进行管理,但是DriverManager
由启动类加载器加载,只能加载JAVA_HOME
的lib
下文件,而其实现是由服务商提供的,由系统类加载器加载。
这个时候就需要破坏了双亲委派, 启动类加载器来委托子类加载器来加载Driver
实现,这就是著名的SPI(SERVICE PROVIDER INTERFACE)
机制。
2.3 基于SPI机制破坏双亲委派
原理:基于“接口的编程+策略模式+配置文件”组合实现的动态加载机制。
没有SPI
时:
- 你可以现在
classpath
里加一个mysql-connector-java.jar
- 然后这样写
Class clz = Class.forName("com.mysql.jdbc.Driver"); Driver d = (Driver) clz.newInstance();
这就没问题了; - 再用
Application Classloader
加载了mysql-connector-java.jar
的com.mysql.jdbc.Driver
。
问题:硬编码了,一定要加载"
com.mysql.jdbc.Driver
",不是很优雅,不能实现“用接口编程,自动实例化真的实现“的这种编码形式。
使用SPI后:
- 代码大致会这样:
Connection connection = DriverManager.getConnection("jdbc:mysql://xxxxxx/xxx", "xxxx", "xxxxx")
; DriverManager
就根据"jdbc:mysql
"这个提示去找具体实现去了。
ok,说到这里就要回归本文的主题了,关于DataX
是如何实现插件加载的?
很遗憾的说,DataX
并没能有是使用SPI
去破坏双亲委派,而是使用了另外一种方式(插件热拔插原理:加载类 =》通过配置文件获取插件类名和路径 =》实例化该插件UrlClassLoader
=>将线程上下文加载器切换为UrlClassLoader
并保存原来的线程上下文加载器 =》加载插件实现类 =》完成基于实现类的操作 =》恢复原来的线程上下文加载器),接下来讲讲。
03 DataX插件热插拔
在JobContainer
看看reader
插件是怎么加载的,下面来看看加载reader
插件的的代码方法:
private Reader.Job initJobReader( JobPluginCollector jobPluginCollector) { this.readerPluginName = this.configuration.getString( CoreConstant.DATAX_JOB_CONTENT_READER_NAME); classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( PluginType.READER, this.readerPluginName)); Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin( PluginType.READER, this.readerPluginName); // 设置reader的jobConfig jobReader.setPluginJobConf(this.configuration.getConfiguration( CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER)); // 设置reader的readerConfig jobReader.setPeerPluginJobConf(this.configuration.getConfiguration( CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER)); jobReader.setJobPluginCollector(jobPluginCollector); jobReader.init(); classLoaderSwapper.restoreCurrentThreadClassLoader(); return jobReader; }
其实它的流程很简单,
- 读取
job.json
配置文件插件的名字; - 使用
LoadUtil
根据插件类型+插件名获取自定义类加载器JarLoader
(JarLoader
继承自jdk
里的URLClassLoader
); ClassLoaderSwapper
线程类加载器切换类会把上一步生成的JarLoader
类加载器设置进当前的上下文类加载器(注意:保存前会保存原来的线程上下文加载器);- 然后使用
LoadUtil
加载插件,然后插件进行一些初始化的操作; - 最后使用
ClassLoaderSwapper
恢复原来的线程上下文加载器。
可以看到有两个核心的类,分别为:
ClassLoaderSwapper
(线程类加载器管理类)LoadUtil
(插件加载工具类)
继续讲解这两个类。
3.1 ClassLoaderSwapper线程类加载器管理
先看看源码:
public final class ClassLoaderSwapper { private ClassLoader storeClassLoader = null; private ClassLoaderSwapper() { } public static ClassLoaderSwapper newCurrentThreadClassLoaderSwapper() { return new ClassLoaderSwapper(); } /** * 保存当前classLoader,并将当前线程的classLoader设置为所给classLoader * * @param * @return */ public ClassLoader setCurrentThreadClassLoader(ClassLoader classLoader) { this.storeClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(classLoader); return this.storeClassLoader; } /** * 将当前线程的类加载器设置为保存的类加载 * @return */ public ClassLoader restoreCurrentThreadClassLoader() { ClassLoader classLoader = Thread.currentThread() .getContextClassLoader(); Thread.currentThread().setContextClassLoader(this.storeClassLoader); return classLoader; } }
根据源码,可以看到其功能主要是针对系统自带的类加载器和自定义的类加载器的,主要做了 对这两种类加载器在当前线程进行“切换”与“保存” 的操作。
3.2 LoadUtil插件加载工具
LoadUtil
加载的插件,根据插件的类型分为:
- reader
- writer
- transformer(未实现)
LoadUtil
加载的插件,根据运行的类型分为:
- Job
- Task
3.2.1 获取类加载器
先看看获取类加载器的方法:
public static synchronized JarLoader getJarLoader(PluginType pluginType, String pluginName) { Configuration pluginConf = getPluginConf(pluginType, pluginName); JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType, pluginName)); if (null == jarLoader) { String pluginPath = pluginConf.getString("path"); if (StringUtils.isBlank(pluginPath)) { throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, String.format( "%s插件[%s]路径非法!", pluginType, pluginName)); } jarLoader = new JarLoader(new String[]{pluginPath}); jarLoaderCenter.put(generatePluginKey(pluginType, pluginName), jarLoader); } return jarLoader; }
getJarLoader()
方法主要就是根据插件的路径直接new
了一个JarLoader
,再进一步看看JarLoader
的方法视图:
JarLoader
其实就是基于jdk里面的URLClassLoader
进行二次实现。
3.2.2 加载插件
贴上LoadUtil
插件加载的代码:
public static AbstractJobPlugin loadJobPlugin(PluginType pluginType, String pluginName) { Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass( pluginType, pluginName, ContainerType.Job); try { AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz .newInstance(); jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName)); return jobPlugin; } catch (Exception e) { throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, String.format("DataX找到plugin[%s]的Job配置.", pluginName), e); } }
看代码就很清晰了,就是通过clazz.newInstance()
方式生成实例(策略者模式)。
到这里LoadUtil
的代码基本讲解完了。
04 文末
本文是对DataX插件加载的原理分析,如有疑问的童鞋欢迎留言,谢谢大家的阅读,本文完!