深度剖析FlinkX(纯钧)源码

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 深度剖析FlinkX(纯钧)源码

01 引言

博主在上一篇文章《数据集成框架FlinkX(纯钧)入门》,大致讲解了FlinkX的一些概念,以及举了相关FlinkX的使用案例。

本文我们继续探索一下FlinkX的源码以及整个执行流程。阅读前,复制上一篇文章有关FlinkX 的项目目录:

- bin                                # 存放执行脚本的目录
  ├── chunjun-docker.sh                  # Docker 启动脚本
  ├── chunjun-kubernetes-application.sh  # Kubernetes 应用模式启动脚本
  ├── chunjun-kubernetes-session.sh      # Kubernetes 会话模式启动脚本
  ├── chunjun-local.sh                   # 本地启动脚本
  ├── chunjun-standalone.sh              # 单机模式启动脚本
  ├── chunjun-yarn-perjob.sh             # YARN 每作业模式启动脚本
  ├── chunjun-yarn-session.sh            # YARN 会话模式启动脚本
  ├── start-chunjun                      # 通用启动脚本
  └── submit.sh                          # 提交任务脚本
- build                                  # 构建脚本目录
  └── build.sh                           # 构建脚本
- chunjun-assembly                       # 汇总装配模块目录
- chunjun-clients                        # 客户端模块目录
- chunjun-connectors                     # 连接器模块目录
  ├── (多个子目录)                         # 不同的数据连接器子模块
- chunjun-core                           # 核心模块目录
- chunjun-ddl                            # 数据定义语言模块目录
  ├── chunjun-ddl-base                   # DDL 基础模块
  ├── chunjun-ddl-mysql                  # MySQL DDL 模块
  ├── chunjun-ddl-oracle                 # Oracle DDL 模块
- chunjun-dev                            # 开发工具模块目录
  ├── (多个子目录)                         # 包含开发用的各种工具和资源
- chunjun-dirty                          # 脏数据处理模块目录
  ├── (多个子目录)                         # 不同的脏数据处理子模块
- chunjun-docker                         # Docker 相关模块目录
  ├── (多个子目录)                         # Docker 相关资源和配置
- chunjun-e2e                            # 端到端测试模块目录
- chunjun-examples                       # 示例模块目录
  ├── json                               # JSON 示例
  └── sql                                # SQL 示例
- chunjun-local-test                     # 本地测试模块目录
- chunjun-metrics                        # 指标监控模块目录
  ├── (多个子目录)                         # 包含不同的监控模块
- chunjun-restore                        # 数据恢复模块目录
  ├── chunjun-restore-common             # 通用数据恢复模块
  └── chunjun-restore-mysql              # MySQL 数据恢复模块

02 源码剖析

通过阅读官网,可以知道如果要跑一个FlinkX的任务,一般的都会使用如下命令:

## local模式
sh bin/chunjun-local.sh  -job chunjun-examples/json/stream/stream.json
## standalone模式
sh bin/chunjun-standalone.sh -job chunjun-examples/json/stream/stream.json
## yarn session模式
sh ./bin/chunjun-yarn-session.sh -job chunjun-examples/json/stream/stream.json -confProp {\"yarn.application.id\":\"SESSION_APPLICATION_ID\"}
## 其它模式。。。。。

命令主要有以下核心的参数:

参数 描述
mode 任务提交的类型,非必填项,类型有:local(默认值),standalone,yarn-session, yarn-per-job,kubernetes-session,kubernetes-application,对应源码中枚举类 ClusterMode;
jobType 纯钧任务类型,必填项,同步任务为:sync,SQL计算任务为:sql
job 纯钧任务脚本地址,必填项
chunjunDistDir 纯钧插件包地址
confProp 纯钧任务配置参数,Flink相关配置也是在这里配置
flinkConfDir flink-conf.yaml 地址,在非local模式时,需要配置

那么,源码的入口就从启动flink作业的命令开始。

2.1 作业提交脚本

提交FlinkX作业的脚本都放在了项目的bin目录下,不同提交模式(local、standlone、session等)里面的脚本,最终执行的都是submit.sh

因此,我们看submit.sh命令里面的内容,注释后如下:

#!/usr/bin/env bash
set -e
# 查找 Java 可执行文件的路径
if [ -n "${JAVA_HOME}" ]; then
  JAVA_RUN="${JAVA_HOME}/bin/java"
else
  if [ `command -v java` ]; then
    JAVA_RUN="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi
# 注释: 以上代码用于查找 Java 可执行文件的路径,首先检查 JAVA_HOME 环境变量,然后检查系统中是否已经安装了 Java。
# 设置 CHUNJUN_HOME 变量,以确定部署模式
# 1:使用装配分发包文件进行部署
# 2:使用项目包进行部署
CHUNJUN_DEPLOY_MODE=1
if [[ $CHUNJUN_HOME && -z $CHUNJUN_HOME ]];then
  export CHUNJUN_HOME=$CHUNJUN_HOME
else
  CHUNJUN_HOME="$(cd "`dirname "$0"`"/..; pwd)"
  if [ -d "$CHUNJUN_HOME/chunjun-dist" ]; then
    CHUNJUN_HOME="$CHUNJUN_HOME/chunjun-dist"
    CHUNJUN_DEPLOY_MODE=2
  fi
fi
# 注释: 以上代码用于设置 CHUNJUN_HOME 变量,用于确定部署模式。首先检查是否已经设置 CHUNJUN_HOME 变量,如果没有设置,则根据脚本的路径来确定 CHUNJUN_HOME 的值。
# 根据部署模式设置 JAR_DIR 变量
# 1. 在 yarn-session 情况下,无法找到 JAR_DIR
# 2. 在其他情况下,可以找到 JAR_DIR
if [ $CHUNJUN_DEPLOY_MODE -eq 1 ]; then
  JAR_DIR=$CHUNJUN_HOME/lib/chunjun-clients.jar:$CHUNJUN_HOME/lib/*
else
  JAR_DIR=$CHUNJUN_HOME/../lib/chunjun-clients.jar:$CHUNJUN_HOME/../lib/*
fi
# 注释: 以上代码根据不同的部署模式设置 JAR_DIR 变量,用于指定需要加载的 Java 类库路径。
# 入口类全路径
CLASS_NAME=com.dtstack.chunjun.client.Launcher
# 检查参数中是否包含 ".sql",以确定作业类型
JOBTYPE="sync"
ARGS=$@
if [[ $ARGS == *.sql* ]];
  then JOBTYPE="sql"
fi;
echo "
          #                               #
          #                               #
          #
  #####   ######   #     #  # ####     ####   #     #  # ####
 #        #     #  #     #  ##    #       #   #     #  ##    #
 #        #     #  #     #  #     #       #   #     #  #     #
 #        #     #  #    ##  #     #       #   #    ##  #     #
  #####   #     #   #### #  #     #       #    #### #  #     #
                                          #
                                      ####
Reference site: https://dtstack.github.io/chunjun
chunjun is starting ...
CHUNJUN_HOME is auto set $CHUNJUN_HOME"
# 设置基本参数,用于所有作业
PARAMS="$ARGS -mode $MODE -jobType $JOBTYPE -chunjunDistDir $CHUNJUN_HOME"
# 如果 FLINK_HOME 未设置或不是目录,则忽略 flinkConfDir 参数
if [ ! -z $FLINK_HOME ] && [ -d $FLINK_HOME ];
  then
    echo "FLINK_HOME is $FLINK_HOME"
    PARAMS="$PARAMS -flinkConfDir $FLINK_HOME/conf -flinkLibDir $FLINK_HOME/lib"
  else
    echo "FLINK_HOME is empty!"
fi
# 如果 HADOOP_HOME 未设置或不是目录,则忽略 hadoopConfDir 参数
if [ ! -z $HADOOP_HOME ] && [ -d $HADOOP_HOME ];
  then
    echo "HADOOP_HOME is $HADOOP_HOME"
    PARAMS="$PARAMS -hadoopConfDir $HADOOP_HOME/etc/hadoop"
  else
    echo "HADOOP_HOME is empty!"
fi
# 添加一个空行,用于与日志分隔
echo ""
echo "start command: $JAVA_RUN -cp $JAR_DIR $CLASS_NAME $PARAMS"
echo ""
# 执行Java命令
$JAVA_RUN -cp $JAR_DIR $CLASS_NAME $PARAMS

总结其实submit.sh命令主要的作用就是,执行提交作业的java程序,首先封装好入参(如:flink_home、hadoop_home、依赖jar路径等参数),然后指定程序的入口类(com.dtstack.chunjun.client.Launcher),最后使用java命令来执行java程序

ok,接下来看看com.dtstack.chunjun.client.Launcher这个类做了什么事情。

2.2 作业提交入口类

作业提交入口里在项目chunjun-clients目录下的Launcher类(com.dtstack.chunjun.client.Launcher)。

我们来看看它的main入口方法,这里添加了相关的注释:

/**
 * 作业提交命令(sbumit.sh)执行之后,会进入该方法
 *
 * @author : YangLinWei
 * @createTime: 2023/11/8 15:31
 * @version: 1.0.0
 */
public static void main(String[] args) throws Exception {
    // 1. 解析入参,并设置进Options类
    OptionParser optionParser = new OptionParser(args);
    Options launcherOptions = optionParser.getOptions();
    // 2. 查询并设置flink_home以及hadoop_home的配置进入Options类
    findDefaultConfigDir(launcherOptions);
    // 3. 解析程序执行参数集合,例如该命令里面的参数: sh bin/chunjun-local.sh  -job chunjun-examples/json/stream/stream.json
    List<String> argList = optionParser.getProgramExeArgList();
    // 将argList转化为HashMap,方便通过参数名称来获取参数值
    Map<String, String> commandMap = Maps.newHashMap();
    for (int i = 0; i < argList.size(); i += 2) {
        commandMap.put(argList.get(i), argList.get(i + 1));
    }
    // 清空list,填充修改后的参数值
    argList.clear();
    for (int i = 0; i < commandMap.size(); i++) {
        argList.add(commandMap.keySet().toArray()[i].toString());
        argList.add(commandMap.values().toArray()[i].toString());
    }
    // 4. 根据Options类里面的设置的参数以及命令行里面的参数,初始化构建作业发布类。
    JobDeployer jobDeployer = new JobDeployer(launcherOptions, argList);
    // 5. 根据启动模式,创建不同的提交客户端帮助类
    ClusterClientHelper<?> clusterClientHelper = createHelper(launcherOptions.getMode());
    // 6. 加载自定义的jar包到当前的类加载器
    URLClassLoader urlClassLoader = (URLClassLoader) Launcher.class.getClassLoader();
    List<URL> jarUrlList = ExecuteProcessHelper.getExternalJarUrls(launcherOptions.getAddjar());
    ClassLoaderManager.loadExtraJar(jarUrlList, urlClassLoader);
    // 7. 使用不同的提交客户端提交作业
    try (ClusterClient<?> client = clusterClientHelper.submit(jobDeployer)) {
        if (null != client) {
            log.info(client.getClusterId() + " submit successfully.");
        }
    }
}

通过阅读源码,可以知道该方法主要做了如下几个事情:

  1. 封装执行作业相关的参数(flink_home、hadoop_home以及命令行的参数);
  2. 根据不同的模式,生成不同的提交客户端;
  3. 使用不同的客户端提交作业。

上述的第2个步骤,即根据模式生成提交客户端的代码截图如下:

第3个步骤是最为核心的一个步骤,它是根据第2步骤生成不同的客户端来提交作业的,通过代码提示,可以看到现在支持这几种提交模式:

接下来,分别讲解不同模式下的提交。

2.3 local模式提交

local模式的提交,使用到了LocalClusterClientHelper,其代码很简单,做了注释之后如下:

/**
 * 提交客户端-Local模式
 *
 * @author : YangLinWei
 * @createTime: 2023/11/8 15:41
 * @version: 1.0.0
 */
public class LocalClusterClientHelper implements ClusterClientHelper<Void> {
    @Override
    public ClusterClient<Void> submit(JobDeployer jobDeployer) throws Exception {
        // 获取程序参数
        String[] args = jobDeployer.getProgramArgs().toArray(new String[0]);
        // 执行程序
        Main.main(args);
        return null;
    }
}

继续看执行的细节,具体的方法在 “com.dtstack.chunjun.Main#main”(友情提示:这个类也可以用作不同模式下的entrypoint的入口程序):

/**
 * 执行入口
 *
 * @author : YangLinWei
 * @createTime: 2023/11/8 15:45
 * @version: 1.0.0
 */
public static void main(String[] args) throws Exception {
    log.info("------------program params-------------------------");
    Arrays.stream(args).forEach(arg -> log.info("{}", arg));
    log.info("-------------------------------------------");
    Options options = new OptionParser(args).getOptions();
    String replacedJob = "";
    // 获取作业配置
    File file = new File(options.getJob());
    if (file.isFile()) {
        try {
            replacedJob = FileUtils.readFileToString(file, StandardCharsets.UTF_8.name());
        } catch (IOException ioe) {
            log.error("Can not get the job info !!!", ioe);
            throw new RuntimeException(ioe);
        }
    } else {
        String job = URLDecoder.decode(options.getJob(), StandardCharsets.UTF_8.name());
        replacedJob = JobUtil.replaceJobParameter(options.getP(), job);
    }
    Properties confProperties = PropertiesUtil.parseConf(options.getConfProp());
    // 判断是否为SQL作业类型,如果是,则设置SQL作业的配置
    if (EJobType.getByName(options.getJobType()).equals(SQL)) {
        options.setSqlSetConfiguration(SqlParser.parseSqlSet(replacedJob));
    }
    // 初始化TableEnviorment,并注入配置
    StreamExecutionEnvironment env = EnvFactory.createStreamExecutionEnvironment(options);
    StreamTableEnvironment tEnv =
            EnvFactory.createStreamTableEnvironment(env, confProperties, options.getJobName());
    log.info(
            "Register to table configuration:{}",
            tEnv.getConfig().getConfiguration().toString());
    // 判断不同的类型,执行作业
    switch (EJobType.getByName(options.getJobType())) {
        case SQL:
            exeSqlJob(env, tEnv, replacedJob, options);
            break;
        case SYNC:
            exeSyncJob(env, tEnv, replacedJob, options);
            break;
        default:
            throw new ChunJunRuntimeException(
                    "unknown jobType: ["
                            + options.getJobType()
                            + "], jobType must in [SQL, SYNC].");
    }
    log.info("program {} execution success", options.getJobName());
}

可以看到,执行local模式的入口方法主要是初始化了FlinkTableEnviorment,以及注册了相关的配置,并根据类型去执行SQL作业同步作业

2.3.1 执行SQL类型作业

执行SQL类型的作业实际代码如下:

/**
 * 执行类型为sql的作业
 *
 * @author : YangLinWei
 * @createTime: 2023/11/8 15:50
 * @version: 1.0.0
 */
private static void exeSqlJob(
        StreamExecutionEnvironment env,
        StreamTableEnvironment tableEnv,
        String job,
        Options options) {
    try {
        // 配置TableEnviorment环境
        configStreamExecutionEnvironment(env, options, null);
        List<URL> jarUrlList = ExecuteProcessHelper.getExternalJarUrls(options.getAddjar());
        // 根据配置,设置为“批”还是“流”任务
        String runMode = options.getRunMode();
        if ("batch".equalsIgnoreCase(runMode)) env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        // 解析SQL语句并执行
        StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tableEnv);
        TableResult execute = statementSet.execute();
        // 解决执行批处理作业时YARN模式不退出的问题
        Properties confProperties = PropertiesUtil.parseConf(options.getConfProp());
        String executionMode =
                confProperties.getProperty(
                        "chunjun.cluster.execution-mode",
                        ClusterEntrypoint.ExecutionMode.DETACHED.name());
        if (!ClusterEntrypoint.ExecutionMode.DETACHED.name().equalsIgnoreCase(executionMode)) {
            // 等待作业结束
            printSqlResult(execute);
        }
        // 如果是本地环境,打印作业执行结果
        if (env instanceof MyLocalStreamEnvironment) {
            Optional<JobClient> jobClient = execute.getJobClient();
            if (jobClient.isPresent()) {
                PrintUtil.printResult(
                        jobClient
                                .get()
                                .getJobExecutionResult()
                                .get()
                                .getAllAccumulatorResults());
            }
        }
    } catch (Exception e) {
        throw new ChunJunRuntimeException(e);
    }
}

以上的代码:主要做的事情就是设置TableEnviorment的参数,解析并执行Flink SQL

2.3.2 执行同步类型作业

执行同步类型的作业实际代码如下:

/**
 * 执行类型为“同步”的作业
 *
 * @author : YangLinWei
 * @createTime: 2023/11/8 15:58
 * @version: 1.0.0
 */
private static void exeSyncJob(
        StreamExecutionEnvironment env,
        StreamTableEnvironment tableEnv,
        String job,
        Options options)
        throws Exception {
    // 解析同步作业配置
    SyncConfig config = parseConfig(job, options);
    // 配置TableEnvironment环境
    configStreamExecutionEnvironment(env, options, config);
    // 创建数据源工厂,用于获取输入数据流
    SourceFactory sourceFactory = DataSyncFactoryUtil.discoverSource(config, env);
    DataStream<RowData> dataStreamSource = sourceFactory.createSource();
    // 根据配置设置输入数据流的并行度
    SpeedConfig speed = config.getSpeed();
    if (speed.getReaderChannel() > 0) {
        dataStreamSource =
                ((DataStreamSource<RowData>) dataStreamSource)
                        .setParallelism(speed.getReaderChannel());
    }
    // 添加映射操作符
    dataStreamSource = addMappingOperator(config, dataStreamSource);
    // 处理CDC(变更数据捕获)相关配置
    if (null != config.getCdcConf()
            && (null != config.getCdcConf().getDdl()
            && null != config.getCdcConf().getCache())) {
        CdcConfig cdcConfig = config.getCdcConf();
        DDLHandler ddlHandler = DataSyncFactoryUtil.discoverDdlHandler(cdcConfig, config);
        // 获取DDL处理器和缓存处理器
        CacheHandler cacheHandler = DataSyncFactoryUtil.discoverCacheHandler(cdcConfig, config);
        // 应用变更数据捕获处理器
        dataStreamSource =
                dataStreamSource.flatMap(
                        new RestorationFlatMap(ddlHandler, cacheHandler, cdcConfig));
    }
    DataStream<RowData> dataStream;
    boolean transformer =
            config.getTransformer() != null
                    && StringUtils.isNotBlank(config.getTransformer().getTransformSql());
    if (transformer) {
        // 如果存在数据转换操作,将数据流转换为表
        dataStream = syncStreamToTable(tableEnv, config, dataStreamSource);
    } else {
        dataStream = dataStreamSource;
    }
    // 根据配置设置是否进行数据重平衡
    if (speed.isRebalance()) {
        dataStream = dataStream.rebalance();
    }
    // 创建目标端工厂,用于将数据流写入目标
    SinkFactory sinkFactory = DataSyncFactoryUtil.discoverSink(config);
    DataStreamSink<RowData> dataStreamSink = sinkFactory.createSink(dataStream);
    if (speed.getWriterChannel() > 0) {
        dataStreamSink.setParallelism(speed.getWriterChannel());
    }
    // 根据配置设置输出数据流的并行度
    JobExecutionResult result = env.execute(options.getJobName());
    // 如果是本地环境,打印作业执行结果
    if (env instanceof MyLocalStreamEnvironment) {
        PrintUtil.printResult(result.getAllAccumulatorResults());
    }
}

以上代码主要做了:根据配置初始化TableEnviorment,使用工厂模式去加载源端数据源和目标端数据源,并设置源端(并行度、操作符、ddl变更等)、算子、目标端等

博主认为这一块是FlinkX区别与其它的框架的一个核心点,里面用到了不少的技术,后续会博主也会单独拎出来讲解(如:脏数据如何处理、ddl变更是如何捕获的、checkpoint增量同步的原理等)

ok,接下来快速的看看其它的几种模式。

2.4 其它模式提交

YarnPerJob模式提交,在com.dtstack.chunjun.client.yarn.YarnPerJobClusterClientHelper#submit,主要:就是做了配置的初始化,安全认证的处理以及使用yarn的api提交,具体代码如下:

/**
 * Yarn per-job模式提交
 *
 * @author : YangLinWei
 * @createTime: 2023/11/8 16:15
 * @version: 1.0.0
 */
@Override
public ClusterClient<ApplicationId> submit(JobDeployer jobDeployer) throws Exception {
    //获取配置参数
    Options launcherOptions = jobDeployer.getLauncherOptions();
    String confProp = launcherOptions.getConfProp();
    if (StringUtils.isBlank(confProp)) {
        throw new IllegalArgumentException("per-job mode must have confProp!");
    }
    // 获取flink lib目录路径
    String libJar = launcherOptions.getFlinkLibDir();
    if (StringUtils.isBlank(libJar)) {
        throw new IllegalArgumentException("per-job mode must have flink lib path!");
    }
    // 获取有效的配置
    Configuration flinkConfig = jobDeployer.getEffectiveConfiguration();
    // 处理安全认证(如:kerberos、simple模式等)
    SecurityUtils.install(new SecurityConfiguration(flinkConfig));
    // 初始化ClusterSpecification,并使用yarn的api提交作业
    ClusterSpecification clusterSpecification = createClusterSpecification(jobDeployer);
    try (YarnClusterDescriptor descriptor =
                 createPerJobClusterDescriptor(launcherOptions, flinkConfig)) {
        ClusterClientProvider<ApplicationId> provider =
                descriptor.deployJobCluster(
                        clusterSpecification, new JobGraph("chunjun"), true);
        String applicationId = provider.getClusterClient().getClusterId().toString();
        String flinkJobId = clusterSpecification.getJobGraph().getJobID().toString();
        log.info("deploy per_job with appId: {}}, jobId: {}", applicationId, flinkJobId);
        return provider.getClusterClient();
    }
}

Kubernetes模式提交,在“com.dtstack.chunjun.client.kubernetes.KubernetesApplicationClusterClientHelper#submit”,主要的作用就是初始化作业参数配置以及使用flink自带的Kubernetes api去提交作业到Kubernetes。

/**
 * 提交kubernetes Application模式的作业
 *
 * @author : YangLinWei
 * @createTime: 2023/11/8 16:19
 * @version: 1.0.0
 */
@Override
public ClusterClient<String> submit(JobDeployer jobDeployer) throws Exception {
    // 获取并设置部署的配置
    Options launcherOptions = jobDeployer.getLauncherOptions();
    List<String> programArgs = jobDeployer.getProgramArgs();
    Configuration effectiveConfiguration = jobDeployer.getEffectiveConfiguration();
    setDeployerConfig(effectiveConfiguration, launcherOptions);
    // 设置kubernetes的主机别名
    setHostAliases(effectiveConfiguration);
    replaceRemoteParams(programArgs, effectiveConfiguration);
    ApplicationConfiguration applicationConfiguration =
            new ApplicationConfiguration(
                    programArgs.toArray(new String[0]), PluginInfoUtil.getMainClass());
    // 使用flink自带的kubernetes api提交作业
    KubernetesClusterClientFactory kubernetesClusterClientFactory =
            new KubernetesClusterClientFactory();
    try (KubernetesClusterDescriptor descriptor =
                 kubernetesClusterClientFactory.createClusterDescriptor(effectiveConfiguration)) {
        ClusterSpecification clusterSpecification =
                getClusterSpecification(effectiveConfiguration);
        ClusterClientProvider<String> clientProvider =
                descriptor.deployApplicationCluster(
                        clusterSpecification, applicationConfiguration);
        ClusterClient<String> clusterClient = clientProvider.getClusterClient();
        log.info("Deploy Application with Cluster Id: {}", clusterClient.getClusterId());
        return clusterClient;
    }
}

以上大致讲了部署目标为yarn或者Kubernetes类型的的作业提交方式,与这两种部署目标类型相关的作业提交类型的操作方式也大同小异,此处不再详述。

03 小结&感想

本文主要从FlinkX的一个sh提交命令开始讲解,然后逐步深入到不同的提交类型客户端的java代码。

可以知道,FlinkX主要的作用还是做“同步”,相对于其他的Flink二次开发框架,在简单了解它的官网描述之后,可以知道FlinkX在脏数据处理增量同步 这两块可能做得是比较有特色,但是关于增量同步是否只适用于“”类型的作业,博主有空再阅读其源码,再出相关的博文。

其实给我的感觉,“” 这一块,给我的感觉有点像DataX,只是底层使用的技术栈不同。

还有就是对比其它的Flink二次开发框架,很好奇为何不直接使用Flink原生的命令行呢?这样不更好的去与其它的开源框架整合吗?举个情景例子:Flink批处理作业作为DolphinScheduler的一个任务节点,那么具体Flink作业节点执行的方式是官方提供的命令行标准,这样就不会出现一件事情同时多个人(Flink Developer Or Others)做了。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6月前
|
存储 Java 程序员
|
存储 前端开发 JavaScript
AntV X6源码探究简析
AntV是蚂蚁金服全新一代数据可视化解决方案,其中X6主要用于解决图编辑领域相关的解决方案,其是一款图编辑引擎,内置了一下编辑器所需的功能及组件等,本文旨在通过简要分析x6源码来对图编辑领域的一些底层引擎进行一个大致了解,同时也为团队中需要进行基于X6编辑引擎进行构建的图编辑器提供一些侧面了解,在碰到问题时可以较快的找到问题点。
380 0
|
6月前
|
存储 移动开发 前端开发
【Uniapp 专栏】Uniapp 架构设计与原理探究
【5月更文挑战第12天】Uniapp是一款用于跨平台移动应用开发的框架,以其高效性和灵活性脱颖而出。它基于HTML、CSS和Vue.js构建视图层,JavaScript处理逻辑层,管理数据层,实现统一编码并支持原生插件扩展。通过抽象平台特性,开发者能专注于业务逻辑,提高开发效率。尽管存在兼容性和复杂性挑战,但深入理解其架构设计与原理将助力开发者创建高质量的跨平台应用。随着技术进步,Uniapp将继续在移动开发领域扮演重要角色。
194 1
【Uniapp 专栏】Uniapp 架构设计与原理探究
|
机器学习/深度学习 人工智能 算法
这篇科普让你Get所有大模型的基础核心知识点
本文介绍了AI大模型的概念和发展历程。AI大模型是指具有1亿以上参数的机器学习模型,通过在大规模数据集上进行预训练,可以直接支撑各类应用。大模型的发展经历了从萌芽期到AI1.0时期,再到AI2.0时期的飞跃,目前最新发布的大模型参数已经达到了千亿甚至万亿级别。国内外的公司都在积极研发和应用大模型,如OpenAI、Google、Facebook、Microsoft等。国内也有百度、阿里巴巴、万维、商汤科技等公司发布了自己的大模型产品。大模型的建造离不开算力资源、算法人才、数据积累等核心要素。此外,文章还列举了一些与大模型相关的专业名词,如算法、模型参数、训练数据、Token等。
|
12月前
|
负载均衡 前端开发 Java
阿里面试:看过框架源码吗?举例说明一下
阿里面试:看过框架源码吗?举例说明一下
127 0
|
12月前
|
XML 安全 数据库连接
温故知新-源码分析篇
温故知新-源码分析篇
42 0
|
算法 开发者 容器
C++14新特性扫盲探究
闲暇之时,聊到C++14,实际上C++14相对之前的11并没有太大的改动,或者说更像C++11标准基础上的查漏补缺,C++14之后,还有17、20甚至23,所以说,C++14更像个过渡版本。
104 0
|
Java C#
【c#入门杂选】-带你熟知c#基础知识点《思维导图》
【c#入门杂选】-带你熟知c#基础知识点《思维导图》
171 0
|
网络协议 程序员 编译器
方法使用的深度剖析(1)|学习笔记
快速学习方法使用的深度剖析(1)
方法使用的深度剖析(1)|学习笔记
|
程序员 编译器 C++
C++(入门、核心、提高三篇)总结及补充
C++(入门、核心、提高三篇)总结及补充
109 0
C++(入门、核心、提高三篇)总结及补充