01 引言
博主在上一篇文章《数据集成框架FlinkX(纯钧)入门》,大致讲解了FlinkX
的一些概念,以及举了相关FlinkX
的使用案例。
本文我们继续探索一下FlinkX
的源码以及整个执行流程。阅读前,复制上一篇文章有关FlinkX
的项目目录:
- bin # 存放执行脚本的目录 ├── chunjun-docker.sh # Docker 启动脚本 ├── chunjun-kubernetes-application.sh # Kubernetes 应用模式启动脚本 ├── chunjun-kubernetes-session.sh # Kubernetes 会话模式启动脚本 ├── chunjun-local.sh # 本地启动脚本 ├── chunjun-standalone.sh # 单机模式启动脚本 ├── chunjun-yarn-perjob.sh # YARN 每作业模式启动脚本 ├── chunjun-yarn-session.sh # YARN 会话模式启动脚本 ├── start-chunjun # 通用启动脚本 └── submit.sh # 提交任务脚本 - build # 构建脚本目录 └── build.sh # 构建脚本 - chunjun-assembly # 汇总装配模块目录 - chunjun-clients # 客户端模块目录 - chunjun-connectors # 连接器模块目录 ├── (多个子目录) # 不同的数据连接器子模块 - chunjun-core # 核心模块目录 - chunjun-ddl # 数据定义语言模块目录 ├── chunjun-ddl-base # DDL 基础模块 ├── chunjun-ddl-mysql # MySQL DDL 模块 ├── chunjun-ddl-oracle # Oracle DDL 模块 - chunjun-dev # 开发工具模块目录 ├── (多个子目录) # 包含开发用的各种工具和资源 - chunjun-dirty # 脏数据处理模块目录 ├── (多个子目录) # 不同的脏数据处理子模块 - chunjun-docker # Docker 相关模块目录 ├── (多个子目录) # Docker 相关资源和配置 - chunjun-e2e # 端到端测试模块目录 - chunjun-examples # 示例模块目录 ├── json # JSON 示例 └── sql # SQL 示例 - chunjun-local-test # 本地测试模块目录 - chunjun-metrics # 指标监控模块目录 ├── (多个子目录) # 包含不同的监控模块 - chunjun-restore # 数据恢复模块目录 ├── chunjun-restore-common # 通用数据恢复模块 └── chunjun-restore-mysql # MySQL 数据恢复模块
02 源码剖析
通过阅读官网,可以知道如果要跑一个FlinkX
的任务,一般的都会使用如下命令:
## local模式 sh bin/chunjun-local.sh -job chunjun-examples/json/stream/stream.json ## standalone模式 sh bin/chunjun-standalone.sh -job chunjun-examples/json/stream/stream.json ## yarn session模式 sh ./bin/chunjun-yarn-session.sh -job chunjun-examples/json/stream/stream.json -confProp {\"yarn.application.id\":\"SESSION_APPLICATION_ID\"} ## 其它模式。。。。。
命令主要有以下核心的参数:
参数 | 描述 |
mode | 任务提交的类型,非必填项,类型有:local(默认值),standalone,yarn-session, yarn-per-job,kubernetes-session,kubernetes-application,对应源码中枚举类 ClusterMode; |
jobType | 纯钧任务类型,必填项,同步任务为:sync,SQL计算任务为:sql |
job | 纯钧任务脚本地址,必填项 |
chunjunDistDir | 纯钧插件包地址 |
confProp | 纯钧任务配置参数,Flink相关配置也是在这里配置 |
flinkConfDir | flink-conf.yaml 地址,在非local模式时,需要配置 |
那么,源码的入口就从启动flink作业的命令开始。
2.1 作业提交脚本
提交FlinkX
作业的脚本都放在了项目的bin
目录下,不同提交模式(local、standlone、session等)里面的脚本,最终执行的都是submit.sh
:
因此,我们看submit.sh命令里面的内容,注释后如下:
#!/usr/bin/env bash set -e # 查找 Java 可执行文件的路径 if [ -n "${JAVA_HOME}" ]; then JAVA_RUN="${JAVA_HOME}/bin/java" else if [ `command -v java` ]; then JAVA_RUN="java" else echo "JAVA_HOME is not set" >&2 exit 1 fi fi # 注释: 以上代码用于查找 Java 可执行文件的路径,首先检查 JAVA_HOME 环境变量,然后检查系统中是否已经安装了 Java。 # 设置 CHUNJUN_HOME 变量,以确定部署模式 # 1:使用装配分发包文件进行部署 # 2:使用项目包进行部署 CHUNJUN_DEPLOY_MODE=1 if [[ $CHUNJUN_HOME && -z $CHUNJUN_HOME ]];then export CHUNJUN_HOME=$CHUNJUN_HOME else CHUNJUN_HOME="$(cd "`dirname "$0"`"/..; pwd)" if [ -d "$CHUNJUN_HOME/chunjun-dist" ]; then CHUNJUN_HOME="$CHUNJUN_HOME/chunjun-dist" CHUNJUN_DEPLOY_MODE=2 fi fi # 注释: 以上代码用于设置 CHUNJUN_HOME 变量,用于确定部署模式。首先检查是否已经设置 CHUNJUN_HOME 变量,如果没有设置,则根据脚本的路径来确定 CHUNJUN_HOME 的值。 # 根据部署模式设置 JAR_DIR 变量 # 1. 在 yarn-session 情况下,无法找到 JAR_DIR # 2. 在其他情况下,可以找到 JAR_DIR if [ $CHUNJUN_DEPLOY_MODE -eq 1 ]; then JAR_DIR=$CHUNJUN_HOME/lib/chunjun-clients.jar:$CHUNJUN_HOME/lib/* else JAR_DIR=$CHUNJUN_HOME/../lib/chunjun-clients.jar:$CHUNJUN_HOME/../lib/* fi # 注释: 以上代码根据不同的部署模式设置 JAR_DIR 变量,用于指定需要加载的 Java 类库路径。 # 入口类全路径 CLASS_NAME=com.dtstack.chunjun.client.Launcher # 检查参数中是否包含 ".sql",以确定作业类型 JOBTYPE="sync" ARGS=$@ if [[ $ARGS == *.sql* ]]; then JOBTYPE="sql" fi; echo " # # # # # ##### ###### # # # #### #### # # # #### # # # # # ## # # # # ## # # # # # # # # # # # # # # # # # ## # # # # ## # # ##### # # #### # # # # #### # # # # #### Reference site: https://dtstack.github.io/chunjun chunjun is starting ... CHUNJUN_HOME is auto set $CHUNJUN_HOME" # 设置基本参数,用于所有作业 PARAMS="$ARGS -mode $MODE -jobType $JOBTYPE -chunjunDistDir $CHUNJUN_HOME" # 如果 FLINK_HOME 未设置或不是目录,则忽略 flinkConfDir 参数 if [ ! -z $FLINK_HOME ] && [ -d $FLINK_HOME ]; then echo "FLINK_HOME is $FLINK_HOME" PARAMS="$PARAMS -flinkConfDir $FLINK_HOME/conf -flinkLibDir $FLINK_HOME/lib" else echo "FLINK_HOME is empty!" fi # 如果 HADOOP_HOME 未设置或不是目录,则忽略 hadoopConfDir 参数 if [ ! -z $HADOOP_HOME ] && [ -d $HADOOP_HOME ]; then echo "HADOOP_HOME is $HADOOP_HOME" PARAMS="$PARAMS -hadoopConfDir $HADOOP_HOME/etc/hadoop" else echo "HADOOP_HOME is empty!" fi # 添加一个空行,用于与日志分隔 echo "" echo "start command: $JAVA_RUN -cp $JAR_DIR $CLASS_NAME $PARAMS" echo "" # 执行Java命令 $JAVA_RUN -cp $JAR_DIR $CLASS_NAME $PARAMS
总结:其实submit.sh命令主要的作用就是,执行提交作业的java程序,首先封装好入参(如:flink_home、hadoop_home、依赖jar路径等参数),然后指定程序的入口类(com.dtstack.chunjun.client.Launcher
),最后使用java命令来执行java程序。
ok,接下来看看com.dtstack.chunjun.client.Launcher
这个类做了什么事情。
2.2 作业提交入口类
作业提交入口里在项目chunjun-clients
目录下的Launcher类(com.dtstack.chunjun.client.Launcher
)。
我们来看看它的main
入口方法,这里添加了相关的注释:
/** * 作业提交命令(sbumit.sh)执行之后,会进入该方法 * * @author : YangLinWei * @createTime: 2023/11/8 15:31 * @version: 1.0.0 */ public static void main(String[] args) throws Exception { // 1. 解析入参,并设置进Options类 OptionParser optionParser = new OptionParser(args); Options launcherOptions = optionParser.getOptions(); // 2. 查询并设置flink_home以及hadoop_home的配置进入Options类 findDefaultConfigDir(launcherOptions); // 3. 解析程序执行参数集合,例如该命令里面的参数: sh bin/chunjun-local.sh -job chunjun-examples/json/stream/stream.json List<String> argList = optionParser.getProgramExeArgList(); // 将argList转化为HashMap,方便通过参数名称来获取参数值 Map<String, String> commandMap = Maps.newHashMap(); for (int i = 0; i < argList.size(); i += 2) { commandMap.put(argList.get(i), argList.get(i + 1)); } // 清空list,填充修改后的参数值 argList.clear(); for (int i = 0; i < commandMap.size(); i++) { argList.add(commandMap.keySet().toArray()[i].toString()); argList.add(commandMap.values().toArray()[i].toString()); } // 4. 根据Options类里面的设置的参数以及命令行里面的参数,初始化构建作业发布类。 JobDeployer jobDeployer = new JobDeployer(launcherOptions, argList); // 5. 根据启动模式,创建不同的提交客户端帮助类 ClusterClientHelper<?> clusterClientHelper = createHelper(launcherOptions.getMode()); // 6. 加载自定义的jar包到当前的类加载器 URLClassLoader urlClassLoader = (URLClassLoader) Launcher.class.getClassLoader(); List<URL> jarUrlList = ExecuteProcessHelper.getExternalJarUrls(launcherOptions.getAddjar()); ClassLoaderManager.loadExtraJar(jarUrlList, urlClassLoader); // 7. 使用不同的提交客户端提交作业 try (ClusterClient<?> client = clusterClientHelper.submit(jobDeployer)) { if (null != client) { log.info(client.getClusterId() + " submit successfully."); } } }
通过阅读源码,可以知道该方法主要做了如下几个事情:
- 封装执行作业相关的参数(flink_home、hadoop_home以及命令行的参数);
- 根据不同的模式,生成不同的提交客户端;
- 使用不同的客户端提交作业。
上述的第2个步骤,即根据模式生成提交客户端的代码截图如下:
第3个步骤是最为核心的一个步骤,它是根据第2步骤生成不同的客户端来提交作业的,通过代码提示,可以看到现在支持这几种提交模式:
接下来,分别讲解不同模式下的提交。
2.3 local模式提交
local模式的提交,使用到了LocalClusterClientHelper
,其代码很简单,做了注释之后如下:
/** * 提交客户端-Local模式 * * @author : YangLinWei * @createTime: 2023/11/8 15:41 * @version: 1.0.0 */ public class LocalClusterClientHelper implements ClusterClientHelper<Void> { @Override public ClusterClient<Void> submit(JobDeployer jobDeployer) throws Exception { // 获取程序参数 String[] args = jobDeployer.getProgramArgs().toArray(new String[0]); // 执行程序 Main.main(args); return null; } }
继续看执行的细节,具体的方法在 “com.dtstack.chunjun.Main#main
”(友情提示:这个类也可以用作不同模式下的entrypoint的入口程序):
/** * 执行入口 * * @author : YangLinWei * @createTime: 2023/11/8 15:45 * @version: 1.0.0 */ public static void main(String[] args) throws Exception { log.info("------------program params-------------------------"); Arrays.stream(args).forEach(arg -> log.info("{}", arg)); log.info("-------------------------------------------"); Options options = new OptionParser(args).getOptions(); String replacedJob = ""; // 获取作业配置 File file = new File(options.getJob()); if (file.isFile()) { try { replacedJob = FileUtils.readFileToString(file, StandardCharsets.UTF_8.name()); } catch (IOException ioe) { log.error("Can not get the job info !!!", ioe); throw new RuntimeException(ioe); } } else { String job = URLDecoder.decode(options.getJob(), StandardCharsets.UTF_8.name()); replacedJob = JobUtil.replaceJobParameter(options.getP(), job); } Properties confProperties = PropertiesUtil.parseConf(options.getConfProp()); // 判断是否为SQL作业类型,如果是,则设置SQL作业的配置 if (EJobType.getByName(options.getJobType()).equals(SQL)) { options.setSqlSetConfiguration(SqlParser.parseSqlSet(replacedJob)); } // 初始化TableEnviorment,并注入配置 StreamExecutionEnvironment env = EnvFactory.createStreamExecutionEnvironment(options); StreamTableEnvironment tEnv = EnvFactory.createStreamTableEnvironment(env, confProperties, options.getJobName()); log.info( "Register to table configuration:{}", tEnv.getConfig().getConfiguration().toString()); // 判断不同的类型,执行作业 switch (EJobType.getByName(options.getJobType())) { case SQL: exeSqlJob(env, tEnv, replacedJob, options); break; case SYNC: exeSyncJob(env, tEnv, replacedJob, options); break; default: throw new ChunJunRuntimeException( "unknown jobType: [" + options.getJobType() + "], jobType must in [SQL, SYNC]."); } log.info("program {} execution success", options.getJobName()); }
可以看到,执行local
模式的入口方法主要是初始化了Flink
的TableEnviorment
,以及注册了相关的配置,并根据类型去执行SQL
作业或同步作业。
2.3.1 执行SQL类型作业
执行SQL类型的作业实际代码如下:
/** * 执行类型为sql的作业 * * @author : YangLinWei * @createTime: 2023/11/8 15:50 * @version: 1.0.0 */ private static void exeSqlJob( StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String job, Options options) { try { // 配置TableEnviorment环境 configStreamExecutionEnvironment(env, options, null); List<URL> jarUrlList = ExecuteProcessHelper.getExternalJarUrls(options.getAddjar()); // 根据配置,设置为“批”还是“流”任务 String runMode = options.getRunMode(); if ("batch".equalsIgnoreCase(runMode)) env.setRuntimeMode(RuntimeExecutionMode.BATCH); // 解析SQL语句并执行 StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tableEnv); TableResult execute = statementSet.execute(); // 解决执行批处理作业时YARN模式不退出的问题 Properties confProperties = PropertiesUtil.parseConf(options.getConfProp()); String executionMode = confProperties.getProperty( "chunjun.cluster.execution-mode", ClusterEntrypoint.ExecutionMode.DETACHED.name()); if (!ClusterEntrypoint.ExecutionMode.DETACHED.name().equalsIgnoreCase(executionMode)) { // 等待作业结束 printSqlResult(execute); } // 如果是本地环境,打印作业执行结果 if (env instanceof MyLocalStreamEnvironment) { Optional<JobClient> jobClient = execute.getJobClient(); if (jobClient.isPresent()) { PrintUtil.printResult( jobClient .get() .getJobExecutionResult() .get() .getAllAccumulatorResults()); } } } catch (Exception e) { throw new ChunJunRuntimeException(e); } }
以上的代码:主要做的事情就是设置TableEnviorment的参数,解析并执行Flink SQL。
2.3.2 执行同步类型作业
执行同步类型的作业实际代码如下:
/** * 执行类型为“同步”的作业 * * @author : YangLinWei * @createTime: 2023/11/8 15:58 * @version: 1.0.0 */ private static void exeSyncJob( StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String job, Options options) throws Exception { // 解析同步作业配置 SyncConfig config = parseConfig(job, options); // 配置TableEnvironment环境 configStreamExecutionEnvironment(env, options, config); // 创建数据源工厂,用于获取输入数据流 SourceFactory sourceFactory = DataSyncFactoryUtil.discoverSource(config, env); DataStream<RowData> dataStreamSource = sourceFactory.createSource(); // 根据配置设置输入数据流的并行度 SpeedConfig speed = config.getSpeed(); if (speed.getReaderChannel() > 0) { dataStreamSource = ((DataStreamSource<RowData>) dataStreamSource) .setParallelism(speed.getReaderChannel()); } // 添加映射操作符 dataStreamSource = addMappingOperator(config, dataStreamSource); // 处理CDC(变更数据捕获)相关配置 if (null != config.getCdcConf() && (null != config.getCdcConf().getDdl() && null != config.getCdcConf().getCache())) { CdcConfig cdcConfig = config.getCdcConf(); DDLHandler ddlHandler = DataSyncFactoryUtil.discoverDdlHandler(cdcConfig, config); // 获取DDL处理器和缓存处理器 CacheHandler cacheHandler = DataSyncFactoryUtil.discoverCacheHandler(cdcConfig, config); // 应用变更数据捕获处理器 dataStreamSource = dataStreamSource.flatMap( new RestorationFlatMap(ddlHandler, cacheHandler, cdcConfig)); } DataStream<RowData> dataStream; boolean transformer = config.getTransformer() != null && StringUtils.isNotBlank(config.getTransformer().getTransformSql()); if (transformer) { // 如果存在数据转换操作,将数据流转换为表 dataStream = syncStreamToTable(tableEnv, config, dataStreamSource); } else { dataStream = dataStreamSource; } // 根据配置设置是否进行数据重平衡 if (speed.isRebalance()) { dataStream = dataStream.rebalance(); } // 创建目标端工厂,用于将数据流写入目标 SinkFactory sinkFactory = DataSyncFactoryUtil.discoverSink(config); DataStreamSink<RowData> dataStreamSink = sinkFactory.createSink(dataStream); if (speed.getWriterChannel() > 0) { dataStreamSink.setParallelism(speed.getWriterChannel()); } // 根据配置设置输出数据流的并行度 JobExecutionResult result = env.execute(options.getJobName()); // 如果是本地环境,打印作业执行结果 if (env instanceof MyLocalStreamEnvironment) { PrintUtil.printResult(result.getAllAccumulatorResults()); } }
以上代码主要做了:根据配置初始化TableEnviorment,使用工厂模式去加载源端数据源和目标端数据源,并设置源端(并行度、操作符、ddl变更等)、算子、目标端等。
博主认为这一块是FlinkX区别与其它的框架的一个核心点,里面用到了不少的技术,后续会博主也会单独拎出来讲解(如:脏数据如何处理、ddl变更是如何捕获的、checkpoint增量同步的原理等)
ok,接下来快速的看看其它的几种模式。
2.4 其它模式提交
YarnPerJob模式提交,在com.dtstack.chunjun.client.yarn.YarnPerJobClusterClientHelper#submit
,主要:就是做了配置的初始化,安全认证的处理以及使用yarn的api提交,具体代码如下:
/** * Yarn per-job模式提交 * * @author : YangLinWei * @createTime: 2023/11/8 16:15 * @version: 1.0.0 */ @Override public ClusterClient<ApplicationId> submit(JobDeployer jobDeployer) throws Exception { //获取配置参数 Options launcherOptions = jobDeployer.getLauncherOptions(); String confProp = launcherOptions.getConfProp(); if (StringUtils.isBlank(confProp)) { throw new IllegalArgumentException("per-job mode must have confProp!"); } // 获取flink lib目录路径 String libJar = launcherOptions.getFlinkLibDir(); if (StringUtils.isBlank(libJar)) { throw new IllegalArgumentException("per-job mode must have flink lib path!"); } // 获取有效的配置 Configuration flinkConfig = jobDeployer.getEffectiveConfiguration(); // 处理安全认证(如:kerberos、simple模式等) SecurityUtils.install(new SecurityConfiguration(flinkConfig)); // 初始化ClusterSpecification,并使用yarn的api提交作业 ClusterSpecification clusterSpecification = createClusterSpecification(jobDeployer); try (YarnClusterDescriptor descriptor = createPerJobClusterDescriptor(launcherOptions, flinkConfig)) { ClusterClientProvider<ApplicationId> provider = descriptor.deployJobCluster( clusterSpecification, new JobGraph("chunjun"), true); String applicationId = provider.getClusterClient().getClusterId().toString(); String flinkJobId = clusterSpecification.getJobGraph().getJobID().toString(); log.info("deploy per_job with appId: {}}, jobId: {}", applicationId, flinkJobId); return provider.getClusterClient(); } }
Kubernetes模式提交,在“com.dtstack.chunjun.client.kubernetes.KubernetesApplicationClusterClientHelper#submit
”,主要的作用就是初始化作业参数配置以及使用flink自带的Kubernetes api去提交作业到Kubernetes。
/** * 提交kubernetes Application模式的作业 * * @author : YangLinWei * @createTime: 2023/11/8 16:19 * @version: 1.0.0 */ @Override public ClusterClient<String> submit(JobDeployer jobDeployer) throws Exception { // 获取并设置部署的配置 Options launcherOptions = jobDeployer.getLauncherOptions(); List<String> programArgs = jobDeployer.getProgramArgs(); Configuration effectiveConfiguration = jobDeployer.getEffectiveConfiguration(); setDeployerConfig(effectiveConfiguration, launcherOptions); // 设置kubernetes的主机别名 setHostAliases(effectiveConfiguration); replaceRemoteParams(programArgs, effectiveConfiguration); ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration( programArgs.toArray(new String[0]), PluginInfoUtil.getMainClass()); // 使用flink自带的kubernetes api提交作业 KubernetesClusterClientFactory kubernetesClusterClientFactory = new KubernetesClusterClientFactory(); try (KubernetesClusterDescriptor descriptor = kubernetesClusterClientFactory.createClusterDescriptor(effectiveConfiguration)) { ClusterSpecification clusterSpecification = getClusterSpecification(effectiveConfiguration); ClusterClientProvider<String> clientProvider = descriptor.deployApplicationCluster( clusterSpecification, applicationConfiguration); ClusterClient<String> clusterClient = clientProvider.getClusterClient(); log.info("Deploy Application with Cluster Id: {}", clusterClient.getClusterId()); return clusterClient; } }
以上大致讲了部署目标为yarn
或者Kubernetes
类型的的作业提交方式,与这两种部署目标类型相关的作业提交类型的操作方式也大同小异,此处不再详述。
03 小结&感想
本文主要从FlinkX
的一个sh
提交命令开始讲解,然后逐步深入到不同的提交类型客户端的java
代码。
可以知道,FlinkX
主要的作用还是做“同步”,相对于其他的Flink
二次开发框架,在简单了解它的官网描述之后,可以知道FlinkX在脏数据处理、增量同步 这两块可能做得是比较有特色,但是关于增量同步是否只适用于“批”类型的作业,博主有空再阅读其源码,再出相关的博文。
其实给我的感觉,“批” 这一块,给我的感觉有点像DataX
,只是底层使用的技术栈不同。
还有就是对比其它的Flink
二次开发框架,很好奇为何不直接使用Flink原生的命令行呢?这样不更好的去与其它的开源框架整合吗?举个情景例子:Flink
批处理作业作为DolphinScheduler
的一个任务节点,那么具体Flink
作业节点执行的方式是官方提供的命令行标准,这样就不会出现一件事情同时多个人(Flink Developer Or Others
)做了。