datax源码阅读二:Engine流程

简介: datax源码阅读二:基本流程

一、根据前面python文件知道,java的main函数是com.alibaba.datax.core.Engine

  public static void main(String[] args) throws Exception {
    int exitCode = 0;
    try {
        Engine.entry(args);
    } catch (Throwable e) {
        exitCode = 1;
        String trace = ExceptionTracker.trace(e);
        String errDesc = "未知datax错误,参考堆栈内容分析。";
        LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + trace);
        if (e instanceof DataXException) {
            DataXException tempException = (DataXException) e;
            ErrorCode errorCode = tempException.getErrorCode();
            errDesc = errorCode.getDescription();
            if (errorCode instanceof FrameworkErrorCode) {
                FrameworkErrorCode tempErrorCode = (FrameworkErrorCode) errorCode;
                exitCode = tempErrorCode.toExitValue();
            }
        }
        System.exit(exitCode);
    }
    System.exit(exitCode);
}

main函数主要catch了一下异常,并将异常信息打印出来,实际执行在entry函数中

  public static void entry(final String[] args) throws Throwable {
    Options options = new Options();
    options.addOption("job", true, "Job config.");
    options.addOption("jobid", true, "Job unique id.");
    options.addOption("mode", true, "Job runtime mode.");

    BasicParser parser = new BasicParser();
    CommandLine cl = parser.parse(options, args);

    String jobPath = cl.getOptionValue("job");

    // 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
    String jobIdString = cl.getOptionValue("jobid");
    RUNTIME_MODE = cl.getOptionValue("mode");

    Configuration configuration = ConfigParser.parse(jobPath);

    long jobId;
    if (!"-1".equalsIgnoreCase(jobIdString)) {
        jobId = Long.parseLong(jobIdString);
    } else {
        // only for dsc & ds & datax 3 update
        String dscJobUrlPatternString = "/instance/(\\d{1,})/config.xml";
        String dsJobUrlPatternString = "/inner/job/(\\d{1,})/config";
        String dsTaskGroupUrlPatternString = "/inner/job/(\\d{1,})/taskGroup/";
        List<String> patternStringList = Arrays.asList(dscJobUrlPatternString,
                dsJobUrlPatternString, dsTaskGroupUrlPatternString);
        jobId = parseJobIdFromUrl(patternStringList, jobPath);
    }

    boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
    if (!isStandAloneMode && jobId == -1) {
        // 如果不是 standalone 模式,那么 jobId 一定不能为-1
        throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必须在 URL 中提供有效的 jobId.");
    }
    configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);

    //打印vmInfo
    VMInfo vmInfo = VMInfo.getVmInfo();
    if (vmInfo != null) {
        LOG.info(vmInfo.toString());
    }

    LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n");

    LOG.debug(configuration.toJSON());

    ConfigurationValidate.doValidate(configuration);
    Engine engine = new Engine();
    engine.start(configuration);
}

entry函数主要功能:

1、解析了java命令行的三个参数,分别是job、jobid和mode,其中job是用户配置的json文件路径,jobid和mode是python文件带进来的,单机模式下可以忽略改参数
2、读取用户配置的json文件,转化为内部的configuration配置
3、打印相关信息,并校验json文件的合法性
4、启动engine执行

entry执行完毕之后,进入start函数,关键代码如下:

 public void start(Configuration allConf) {
    // 绑定column转换信息
    ColumnCast.bind(allConf);
    /**
     * 初始化PluginLoader,可以获取各种插件配置
     */
    LoadUtil.bind(allConf);
    *************
     container = new JobContainer(allConf);
    *************
    container.start();
}

start函数中主要包括:

1、列转换默认值,即动态在configuration中注入默认值
2、初始化插件的LoadUtil,后面classLoader相关操作都会依赖这个函数
3、初始化JobContainer并启动
目录
相关文章
|
Java DataX Maven
DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)
DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)
694 0
|
数据库
数据集成模块流程组件之限速介绍
在数据集成的过程中,在一些场景下,需要对集成速度进行限速操作,限速组件可限制读取速度,本文将介绍如何进行限速组件的配置。
1264 0
数据集成模块流程组件之限速介绍
|
JSON 数据格式
数据集成模块流程组件之条件分发介绍
在数据集成的过程中,在一些场景下,需要对上游数据进行分发操作,条件分发组件可对上游数据根据配置条件进行分发,本文将介绍如何进行条件分发组件的配置。
328 0
数据集成模块流程组件之条件分发介绍
|
Java DataX 调度
datax源码阅读四:TaskGroupContainer
datax源码阅读四:TaskGroupContainer
2631 0
|
Java DataX 调度
datax源码阅读三:JobContainer
datax源码阅读三:JobContainer
3848 0
|
Java DataX Python
datax源码阅读一:python文件
datax源码阅读一
4200 0
|
4月前
|
数据采集 DataWorks 监控
DataWorks产品使用合集之数据集成并发数不支持批量修改,该怎么办
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
数据采集 DataWorks 数据管理
DataWorks不是Excel,它是一个数据集成和数据管理平台
【10月更文挑战第10天】随着大数据技术的发展,企业对数据处理的需求日益增长。阿里云推出的DataWorks是一款强大的数据集成和管理平台,提供从数据采集、清洗、加工到应用的一站式解决方案。本文通过电商平台案例,详细介绍了DataWorks的核心功能和优势,展示了如何高效处理大规模数据,帮助企业挖掘数据价值。
138 1
|
2月前
|
数据采集 SQL DataWorks
DataWorks不是Excel,它是一个数据集成和数据管理平台
【10月更文挑战第5天】本文通过一家电商平台的案例,详细介绍了阿里云DataWorks在数据处理全流程中的应用。从多源数据采集、清洗加工到分析可视化,DataWorks提供了强大的一站式解决方案,显著提升了数据分析效率和质量。通过具体SQL示例,展示了如何构建高效的数据处理流程,突显了DataWorks相较于传统工具如Excel的优势,为企业决策提供了有力支持。
130 3
|
3月前
|
存储 分布式计算 DataWorks
dataworks数据集成
dataworks数据集成
148 1

热门文章

最新文章