深度剖析FlinkX(纯钧)源码

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 深度剖析FlinkX(纯钧)源码

01 引言

博主在上一篇文章《数据集成框架FlinkX(纯钧)入门》,大致讲解了FlinkX的一些概念,以及举了相关FlinkX的使用案例。

本文我们继续探索一下FlinkX的源码以及整个执行流程。阅读前,复制上一篇文章有关FlinkX 的项目目录:

- bin                                # 存放执行脚本的目录
  ├── chunjun-docker.sh                  # Docker 启动脚本
  ├── chunjun-kubernetes-application.sh  # Kubernetes 应用模式启动脚本
  ├── chunjun-kubernetes-session.sh      # Kubernetes 会话模式启动脚本
  ├── chunjun-local.sh                   # 本地启动脚本
  ├── chunjun-standalone.sh              # 单机模式启动脚本
  ├── chunjun-yarn-perjob.sh             # YARN 每作业模式启动脚本
  ├── chunjun-yarn-session.sh            # YARN 会话模式启动脚本
  ├── start-chunjun                      # 通用启动脚本
  └── submit.sh                          # 提交任务脚本
- build                                  # 构建脚本目录
  └── build.sh                           # 构建脚本
- chunjun-assembly                       # 汇总装配模块目录
- chunjun-clients                        # 客户端模块目录
- chunjun-connectors                     # 连接器模块目录
  ├── (多个子目录)                         # 不同的数据连接器子模块
- chunjun-core                           # 核心模块目录
- chunjun-ddl                            # 数据定义语言模块目录
  ├── chunjun-ddl-base                   # DDL 基础模块
  ├── chunjun-ddl-mysql                  # MySQL DDL 模块
  ├── chunjun-ddl-oracle                 # Oracle DDL 模块
- chunjun-dev                            # 开发工具模块目录
  ├── (多个子目录)                         # 包含开发用的各种工具和资源
- chunjun-dirty                          # 脏数据处理模块目录
  ├── (多个子目录)                         # 不同的脏数据处理子模块
- chunjun-docker                         # Docker 相关模块目录
  ├── (多个子目录)                         # Docker 相关资源和配置
- chunjun-e2e                            # 端到端测试模块目录
- chunjun-examples                       # 示例模块目录
  ├── json                               # JSON 示例
  └── sql                                # SQL 示例
- chunjun-local-test                     # 本地测试模块目录
- chunjun-metrics                        # 指标监控模块目录
  ├── (多个子目录)                         # 包含不同的监控模块
- chunjun-restore                        # 数据恢复模块目录
  ├── chunjun-restore-common             # 通用数据恢复模块
  └── chunjun-restore-mysql              # MySQL 数据恢复模块

02 源码剖析

通过阅读官网,可以知道如果要跑一个FlinkX的任务,一般的都会使用如下命令:

## local模式
sh bin/chunjun-local.sh  -job chunjun-examples/json/stream/stream.json
## standalone模式
sh bin/chunjun-standalone.sh -job chunjun-examples/json/stream/stream.json
## yarn session模式
sh ./bin/chunjun-yarn-session.sh -job chunjun-examples/json/stream/stream.json -confProp {\"yarn.application.id\":\"SESSION_APPLICATION_ID\"}
## 其它模式。。。。。

命令主要有以下核心的参数:

参数 描述
mode 任务提交的类型,非必填项,类型有:local(默认值),standalone,yarn-session, yarn-per-job,kubernetes-session,kubernetes-application,对应源码中枚举类 ClusterMode;
jobType 纯钧任务类型,必填项,同步任务为:sync,SQL计算任务为:sql
job 纯钧任务脚本地址,必填项
chunjunDistDir 纯钧插件包地址
confProp 纯钧任务配置参数,Flink相关配置也是在这里配置
flinkConfDir flink-conf.yaml 地址,在非local模式时,需要配置

那么,源码的入口就从启动flink作业的命令开始。

2.1 作业提交脚本

提交FlinkX作业的脚本都放在了项目的bin目录下,不同提交模式(local、standlone、session等)里面的脚本,最终执行的都是submit.sh

因此,我们看submit.sh命令里面的内容,注释后如下:

#!/usr/bin/env bash
set -e
# 查找 Java 可执行文件的路径
if [ -n "${JAVA_HOME}" ]; then
  JAVA_RUN="${JAVA_HOME}/bin/java"
else
  if [ `command -v java` ]; then
    JAVA_RUN="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi
# 注释: 以上代码用于查找 Java 可执行文件的路径,首先检查 JAVA_HOME 环境变量,然后检查系统中是否已经安装了 Java。
# 设置 CHUNJUN_HOME 变量,以确定部署模式
# 1:使用装配分发包文件进行部署
# 2:使用项目包进行部署
CHUNJUN_DEPLOY_MODE=1
if [[ $CHUNJUN_HOME && -z $CHUNJUN_HOME ]];then
  export CHUNJUN_HOME=$CHUNJUN_HOME
else
  CHUNJUN_HOME="$(cd "`dirname "$0"`"/..; pwd)"
  if [ -d "$CHUNJUN_HOME/chunjun-dist" ]; then
    CHUNJUN_HOME="$CHUNJUN_HOME/chunjun-dist"
    CHUNJUN_DEPLOY_MODE=2
  fi
fi
# 注释: 以上代码用于设置 CHUNJUN_HOME 变量,用于确定部署模式。首先检查是否已经设置 CHUNJUN_HOME 变量,如果没有设置,则根据脚本的路径来确定 CHUNJUN_HOME 的值。
# 根据部署模式设置 JAR_DIR 变量
# 1. 在 yarn-session 情况下,无法找到 JAR_DIR
# 2. 在其他情况下,可以找到 JAR_DIR
if [ $CHUNJUN_DEPLOY_MODE -eq 1 ]; then
  JAR_DIR=$CHUNJUN_HOME/lib/chunjun-clients.jar:$CHUNJUN_HOME/lib/*
else
  JAR_DIR=$CHUNJUN_HOME/../lib/chunjun-clients.jar:$CHUNJUN_HOME/../lib/*
fi
# 注释: 以上代码根据不同的部署模式设置 JAR_DIR 变量,用于指定需要加载的 Java 类库路径。
# 入口类全路径
CLASS_NAME=com.dtstack.chunjun.client.Launcher
# 检查参数中是否包含 ".sql",以确定作业类型
JOBTYPE="sync"
ARGS=$@
if [[ $ARGS == *.sql* ]];
  then JOBTYPE="sql"
fi;
echo "
          #                               #
          #                               #
          #
  #####   ######   #     #  # ####     ####   #     #  # ####
 #        #     #  #     #  ##    #       #   #     #  ##    #
 #        #     #  #     #  #     #       #   #     #  #     #
 #        #     #  #    ##  #     #       #   #    ##  #     #
  #####   #     #   #### #  #     #       #    #### #  #     #
                                          #
                                      ####
Reference site: https://dtstack.github.io/chunjun
chunjun is starting ...
CHUNJUN_HOME is auto set $CHUNJUN_HOME"
# 设置基本参数,用于所有作业
PARAMS="$ARGS -mode $MODE -jobType $JOBTYPE -chunjunDistDir $CHUNJUN_HOME"
# 如果 FLINK_HOME 未设置或不是目录,则忽略 flinkConfDir 参数
if [ ! -z $FLINK_HOME ] && [ -d $FLINK_HOME ];
  then
    echo "FLINK_HOME is $FLINK_HOME"
    PARAMS="$PARAMS -flinkConfDir $FLINK_HOME/conf -flinkLibDir $FLINK_HOME/lib"
  else
    echo "FLINK_HOME is empty!"
fi
# 如果 HADOOP_HOME 未设置或不是目录,则忽略 hadoopConfDir 参数
if [ ! -z $HADOOP_HOME ] && [ -d $HADOOP_HOME ];
  then
    echo "HADOOP_HOME is $HADOOP_HOME"
    PARAMS="$PARAMS -hadoopConfDir $HADOOP_HOME/etc/hadoop"
  else
    echo "HADOOP_HOME is empty!"
fi
# 添加一个空行,用于与日志分隔
echo ""
echo "start command: $JAVA_RUN -cp $JAR_DIR $CLASS_NAME $PARAMS"
echo ""
# 执行Java命令
$JAVA_RUN -cp $JAR_DIR $CLASS_NAME $PARAMS

总结其实submit.sh命令主要的作用就是,执行提交作业的java程序,首先封装好入参(如:flink_home、hadoop_home、依赖jar路径等参数),然后指定程序的入口类(com.dtstack.chunjun.client.Launcher),最后使用java命令来执行java程序

ok,接下来看看com.dtstack.chunjun.client.Launcher这个类做了什么事情。

2.2 作业提交入口类

作业提交入口里在项目chunjun-clients目录下的Launcher类(com.dtstack.chunjun.client.Launcher)。

我们来看看它的main入口方法,这里添加了相关的注释:

/**
 * 作业提交命令(sbumit.sh)执行之后,会进入该方法
 *
 * @author : YangLinWei
 * @createTime: 2023/11/8 15:31
 * @version: 1.0.0
 */
public static void main(String[] args) throws Exception {
    // 1. 解析入参,并设置进Options类
    OptionParser optionParser = new OptionParser(args);
    Options launcherOptions = optionParser.getOptions();
    // 2. 查询并设置flink_home以及hadoop_home的配置进入Options类
    findDefaultConfigDir(launcherOptions);
    // 3. 解析程序执行参数集合,例如该命令里面的参数: sh bin/chunjun-local.sh  -job chunjun-examples/json/stream/stream.json
    List<String> argList = optionParser.getProgramExeArgList();
    // 将argList转化为HashMap,方便通过参数名称来获取参数值
    Map<String, String> commandMap = Maps.newHashMap();
    for (int i = 0; i < argList.size(); i += 2) {
        commandMap.put(argList.get(i), argList.get(i + 1));
    }
    // 清空list,填充修改后的参数值
    argList.clear();
    for (int i = 0; i < commandMap.size(); i++) {
        argList.add(commandMap.keySet().toArray()[i].toString());
        argList.add(commandMap.values().toArray()[i].toString());
    }
    // 4. 根据Options类里面的设置的参数以及命令行里面的参数,初始化构建作业发布类。
    JobDeployer jobDeployer = new JobDeployer(launcherOptions, argList);
    // 5. 根据启动模式,创建不同的提交客户端帮助类
    ClusterClientHelper<?> clusterClientHelper = createHelper(launcherOptions.getMode());
    // 6. 加载自定义的jar包到当前的类加载器
    URLClassLoader urlClassLoader = (URLClassLoader) Launcher.class.getClassLoader();
    List<URL> jarUrlList = ExecuteProcessHelper.getExternalJarUrls(launcherOptions.getAddjar());
    ClassLoaderManager.loadExtraJar(jarUrlList, urlClassLoader);
    // 7. 使用不同的提交客户端提交作业
    try (ClusterClient<?> client = clusterClientHelper.submit(jobDeployer)) {
        if (null != client) {
            log.info(client.getClusterId() + " submit successfully.");
        }
    }
}

通过阅读源码,可以知道该方法主要做了如下几个事情:

  1. 封装执行作业相关的参数(flink_home、hadoop_home以及命令行的参数);
  2. 根据不同的模式,生成不同的提交客户端;
  3. 使用不同的客户端提交作业。

上述的第2个步骤,即根据模式生成提交客户端的代码截图如下:

第3个步骤是最为核心的一个步骤,它是根据第2步骤生成不同的客户端来提交作业的,通过代码提示,可以看到现在支持这几种提交模式:

接下来,分别讲解不同模式下的提交。

2.3 local模式提交

local模式的提交,使用到了LocalClusterClientHelper,其代码很简单,做了注释之后如下:

/**
 * 提交客户端-Local模式
 *
 * @author : YangLinWei
 * @createTime: 2023/11/8 15:41
 * @version: 1.0.0
 */
public class LocalClusterClientHelper implements ClusterClientHelper<Void> {
    @Override
    public ClusterClient<Void> submit(JobDeployer jobDeployer) throws Exception {
        // 获取程序参数
        String[] args = jobDeployer.getProgramArgs().toArray(new String[0]);
        // 执行程序
        Main.main(args);
        return null;
    }
}

继续看执行的细节,具体的方法在 “com.dtstack.chunjun.Main#main”(友情提示:这个类也可以用作不同模式下的entrypoint的入口程序):

/**
 * 执行入口
 *
 * @author : YangLinWei
 * @createTime: 2023/11/8 15:45
 * @version: 1.0.0
 */
public static void main(String[] args) throws Exception {
    log.info("------------program params-------------------------");
    Arrays.stream(args).forEach(arg -> log.info("{}", arg));
    log.info("-------------------------------------------");
    Options options = new OptionParser(args).getOptions();
    String replacedJob = "";
    // 获取作业配置
    File file = new File(options.getJob());
    if (file.isFile()) {
        try {
            replacedJob = FileUtils.readFileToString(file, StandardCharsets.UTF_8.name());
        } catch (IOException ioe) {
            log.error("Can not get the job info !!!", ioe);
            throw new RuntimeException(ioe);
        }
    } else {
        String job = URLDecoder.decode(options.getJob(), StandardCharsets.UTF_8.name());
        replacedJob = JobUtil.replaceJobParameter(options.getP(), job);
    }
    Properties confProperties = PropertiesUtil.parseConf(options.getConfProp());
    // 判断是否为SQL作业类型,如果是,则设置SQL作业的配置
    if (EJobType.getByName(options.getJobType()).equals(SQL)) {
        options.setSqlSetConfiguration(SqlParser.parseSqlSet(replacedJob));
    }
    // 初始化TableEnviorment,并注入配置
    StreamExecutionEnvironment env = EnvFactory.createStreamExecutionEnvironment(options);
    StreamTableEnvironment tEnv =
            EnvFactory.createStreamTableEnvironment(env, confProperties, options.getJobName());
    log.info(
            "Register to table configuration:{}",
            tEnv.getConfig().getConfiguration().toString());
    // 判断不同的类型,执行作业
    switch (EJobType.getByName(options.getJobType())) {
        case SQL:
            exeSqlJob(env, tEnv, replacedJob, options);
            break;
        case SYNC:
            exeSyncJob(env, tEnv, replacedJob, options);
            break;
        default:
            throw new ChunJunRuntimeException(
                    "unknown jobType: ["
                            + options.getJobType()
                            + "], jobType must in [SQL, SYNC].");
    }
    log.info("program {} execution success", options.getJobName());
}

可以看到,执行local模式的入口方法主要是初始化了FlinkTableEnviorment,以及注册了相关的配置,并根据类型去执行SQL作业同步作业

2.3.1 执行SQL类型作业

执行SQL类型的作业实际代码如下:

/**
 * 执行类型为sql的作业
 *
 * @author : YangLinWei
 * @createTime: 2023/11/8 15:50
 * @version: 1.0.0
 */
private static void exeSqlJob(
        StreamExecutionEnvironment env,
        StreamTableEnvironment tableEnv,
        String job,
        Options options) {
    try {
        // 配置TableEnviorment环境
        configStreamExecutionEnvironment(env, options, null);
        List<URL> jarUrlList = ExecuteProcessHelper.getExternalJarUrls(options.getAddjar());
        // 根据配置,设置为“批”还是“流”任务
        String runMode = options.getRunMode();
        if ("batch".equalsIgnoreCase(runMode)) env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        // 解析SQL语句并执行
        StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tableEnv);
        TableResult execute = statementSet.execute();
        // 解决执行批处理作业时YARN模式不退出的问题
        Properties confProperties = PropertiesUtil.parseConf(options.getConfProp());
        String executionMode =
                confProperties.getProperty(
                        "chunjun.cluster.execution-mode",
                        ClusterEntrypoint.ExecutionMode.DETACHED.name());
        if (!ClusterEntrypoint.ExecutionMode.DETACHED.name().equalsIgnoreCase(executionMode)) {
            // 等待作业结束
            printSqlResult(execute);
        }
        // 如果是本地环境,打印作业执行结果
        if (env instanceof MyLocalStreamEnvironment) {
            Optional<JobClient> jobClient = execute.getJobClient();
            if (jobClient.isPresent()) {
                PrintUtil.printResult(
                        jobClient
                                .get()
                                .getJobExecutionResult()
                                .get()
                                .getAllAccumulatorResults());
            }
        }
    } catch (Exception e) {
        throw new ChunJunRuntimeException(e);
    }
}

以上的代码:主要做的事情就是设置TableEnviorment的参数,解析并执行Flink SQL

2.3.2 执行同步类型作业

执行同步类型的作业实际代码如下:

/**
 * 执行类型为“同步”的作业
 *
 * @author : YangLinWei
 * @createTime: 2023/11/8 15:58
 * @version: 1.0.0
 */
private static void exeSyncJob(
        StreamExecutionEnvironment env,
        StreamTableEnvironment tableEnv,
        String job,
        Options options)
        throws Exception {
    // 解析同步作业配置
    SyncConfig config = parseConfig(job, options);
    // 配置TableEnvironment环境
    configStreamExecutionEnvironment(env, options, config);
    // 创建数据源工厂,用于获取输入数据流
    SourceFactory sourceFactory = DataSyncFactoryUtil.discoverSource(config, env);
    DataStream<RowData> dataStreamSource = sourceFactory.createSource();
    // 根据配置设置输入数据流的并行度
    SpeedConfig speed = config.getSpeed();
    if (speed.getReaderChannel() > 0) {
        dataStreamSource =
                ((DataStreamSource<RowData>) dataStreamSource)
                        .setParallelism(speed.getReaderChannel());
    }
    // 添加映射操作符
    dataStreamSource = addMappingOperator(config, dataStreamSource);
    // 处理CDC(变更数据捕获)相关配置
    if (null != config.getCdcConf()
            && (null != config.getCdcConf().getDdl()
            && null != config.getCdcConf().getCache())) {
        CdcConfig cdcConfig = config.getCdcConf();
        DDLHandler ddlHandler = DataSyncFactoryUtil.discoverDdlHandler(cdcConfig, config);
        // 获取DDL处理器和缓存处理器
        CacheHandler cacheHandler = DataSyncFactoryUtil.discoverCacheHandler(cdcConfig, config);
        // 应用变更数据捕获处理器
        dataStreamSource =
                dataStreamSource.flatMap(
                        new RestorationFlatMap(ddlHandler, cacheHandler, cdcConfig));
    }
    DataStream<RowData> dataStream;
    boolean transformer =
            config.getTransformer() != null
                    && StringUtils.isNotBlank(config.getTransformer().getTransformSql());
    if (transformer) {
        // 如果存在数据转换操作,将数据流转换为表
        dataStream = syncStreamToTable(tableEnv, config, dataStreamSource);
    } else {
        dataStream = dataStreamSource;
    }
    // 根据配置设置是否进行数据重平衡
    if (speed.isRebalance()) {
        dataStream = dataStream.rebalance();
    }
    // 创建目标端工厂,用于将数据流写入目标
    SinkFactory sinkFactory = DataSyncFactoryUtil.discoverSink(config);
    DataStreamSink<RowData> dataStreamSink = sinkFactory.createSink(dataStream);
    if (speed.getWriterChannel() > 0) {
        dataStreamSink.setParallelism(speed.getWriterChannel());
    }
    // 根据配置设置输出数据流的并行度
    JobExecutionResult result = env.execute(options.getJobName());
    // 如果是本地环境,打印作业执行结果
    if (env instanceof MyLocalStreamEnvironment) {
        PrintUtil.printResult(result.getAllAccumulatorResults());
    }
}

以上代码主要做了:根据配置初始化TableEnviorment,使用工厂模式去加载源端数据源和目标端数据源,并设置源端(并行度、操作符、ddl变更等)、算子、目标端等

博主认为这一块是FlinkX区别与其它的框架的一个核心点,里面用到了不少的技术,后续会博主也会单独拎出来讲解(如:脏数据如何处理、ddl变更是如何捕获的、checkpoint增量同步的原理等)

ok,接下来快速的看看其它的几种模式。

2.4 其它模式提交

YarnPerJob模式提交,在com.dtstack.chunjun.client.yarn.YarnPerJobClusterClientHelper#submit,主要:就是做了配置的初始化,安全认证的处理以及使用yarn的api提交,具体代码如下:

/**
 * Yarn per-job模式提交
 *
 * @author : YangLinWei
 * @createTime: 2023/11/8 16:15
 * @version: 1.0.0
 */
@Override
public ClusterClient<ApplicationId> submit(JobDeployer jobDeployer) throws Exception {
    //获取配置参数
    Options launcherOptions = jobDeployer.getLauncherOptions();
    String confProp = launcherOptions.getConfProp();
    if (StringUtils.isBlank(confProp)) {
        throw new IllegalArgumentException("per-job mode must have confProp!");
    }
    // 获取flink lib目录路径
    String libJar = launcherOptions.getFlinkLibDir();
    if (StringUtils.isBlank(libJar)) {
        throw new IllegalArgumentException("per-job mode must have flink lib path!");
    }
    // 获取有效的配置
    Configuration flinkConfig = jobDeployer.getEffectiveConfiguration();
    // 处理安全认证(如:kerberos、simple模式等)
    SecurityUtils.install(new SecurityConfiguration(flinkConfig));
    // 初始化ClusterSpecification,并使用yarn的api提交作业
    ClusterSpecification clusterSpecification = createClusterSpecification(jobDeployer);
    try (YarnClusterDescriptor descriptor =
                 createPerJobClusterDescriptor(launcherOptions, flinkConfig)) {
        ClusterClientProvider<ApplicationId> provider =
                descriptor.deployJobCluster(
                        clusterSpecification, new JobGraph("chunjun"), true);
        String applicationId = provider.getClusterClient().getClusterId().toString();
        String flinkJobId = clusterSpecification.getJobGraph().getJobID().toString();
        log.info("deploy per_job with appId: {}}, jobId: {}", applicationId, flinkJobId);
        return provider.getClusterClient();
    }
}

Kubernetes模式提交,在“com.dtstack.chunjun.client.kubernetes.KubernetesApplicationClusterClientHelper#submit”,主要的作用就是初始化作业参数配置以及使用flink自带的Kubernetes api去提交作业到Kubernetes。

/**
 * 提交kubernetes Application模式的作业
 *
 * @author : YangLinWei
 * @createTime: 2023/11/8 16:19
 * @version: 1.0.0
 */
@Override
public ClusterClient<String> submit(JobDeployer jobDeployer) throws Exception {
    // 获取并设置部署的配置
    Options launcherOptions = jobDeployer.getLauncherOptions();
    List<String> programArgs = jobDeployer.getProgramArgs();
    Configuration effectiveConfiguration = jobDeployer.getEffectiveConfiguration();
    setDeployerConfig(effectiveConfiguration, launcherOptions);
    // 设置kubernetes的主机别名
    setHostAliases(effectiveConfiguration);
    replaceRemoteParams(programArgs, effectiveConfiguration);
    ApplicationConfiguration applicationConfiguration =
            new ApplicationConfiguration(
                    programArgs.toArray(new String[0]), PluginInfoUtil.getMainClass());
    // 使用flink自带的kubernetes api提交作业
    KubernetesClusterClientFactory kubernetesClusterClientFactory =
            new KubernetesClusterClientFactory();
    try (KubernetesClusterDescriptor descriptor =
                 kubernetesClusterClientFactory.createClusterDescriptor(effectiveConfiguration)) {
        ClusterSpecification clusterSpecification =
                getClusterSpecification(effectiveConfiguration);
        ClusterClientProvider<String> clientProvider =
                descriptor.deployApplicationCluster(
                        clusterSpecification, applicationConfiguration);
        ClusterClient<String> clusterClient = clientProvider.getClusterClient();
        log.info("Deploy Application with Cluster Id: {}", clusterClient.getClusterId());
        return clusterClient;
    }
}

以上大致讲了部署目标为yarn或者Kubernetes类型的的作业提交方式,与这两种部署目标类型相关的作业提交类型的操作方式也大同小异,此处不再详述。

03 小结&感想

本文主要从FlinkX的一个sh提交命令开始讲解,然后逐步深入到不同的提交类型客户端的java代码。

可以知道,FlinkX主要的作用还是做“同步”,相对于其他的Flink二次开发框架,在简单了解它的官网描述之后,可以知道FlinkX在脏数据处理增量同步 这两块可能做得是比较有特色,但是关于增量同步是否只适用于“”类型的作业,博主有空再阅读其源码,再出相关的博文。

其实给我的感觉,“” 这一块,给我的感觉有点像DataX,只是底层使用的技术栈不同。

还有就是对比其它的Flink二次开发框架,很好奇为何不直接使用Flink原生的命令行呢?这样不更好的去与其它的开源框架整合吗?举个情景例子:Flink批处理作业作为DolphinScheduler的一个任务节点,那么具体Flink作业节点执行的方式是官方提供的命令行标准,这样就不会出现一件事情同时多个人(Flink Developer Or Others)做了。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
12月前
|
存储 前端开发 JavaScript
AntV X6源码探究简析
AntV是蚂蚁金服全新一代数据可视化解决方案,其中X6主要用于解决图编辑领域相关的解决方案,其是一款图编辑引擎,内置了一下编辑器所需的功能及组件等,本文旨在通过简要分析x6源码来对图编辑领域的一些底层引擎进行一个大致了解,同时也为团队中需要进行基于X6编辑引擎进行构建的图编辑器提供一些侧面了解,在碰到问题时可以较快的找到问题点。
285 0
|
5月前
|
SQL 资源调度 前端开发
深度剖析Dinky源码(下)
深度剖析Dinky源码(下)
124 0
|
5月前
|
资源调度 前端开发 Java
深度剖析Dinky源码(上)
深度剖析Dinky源码
142 0
|
6月前
|
负载均衡 前端开发 Java
阿里面试:看过框架源码吗?举例说明一下
阿里面试:看过框架源码吗?举例说明一下
81 0
|
10月前
|
SQL Web App开发 JSON
深度剖析Dinky源码
深度剖析Dinky源码
680 0
|
网络协议 程序员 编译器
方法使用的深度剖析(1)|学习笔记
快速学习方法使用的深度剖析(1)
67 0
方法使用的深度剖析(1)|学习笔记
|
网络协议 Java 测试技术
方法使用的深度剖析(2)|学习笔记
快速学习方法使用的深度剖析(2)
69 0
|
设计模式 druid 网络协议
当我们说起看源码时,我们是在看什么
当我们说起看源码时,我们是在看什么