大数据之Oozie——源码分析(一)程序入口

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介:

工作中发现在oozie中使用sqoop与在shell中直接调度sqoop性能上有很大的差异。为了更深入的探索其中的缘由,开始了oozie的源码分析之路。今天第一天阅读源码,由于没有编译成功,不能运行测试用例,直接使用sublime肉眼阅读,还是挺费劲的。

虽然流程还不是顺畅,但是大体上的内容还算是了解了。

我这里使用的是oozie4.2的版本,之前稍微看过4.3版本的,源码上还是有一定的差异的。

看上面的图,大致理解oozie的过程是:

  • oozie cli提交任务
  • oozie server创建一个对应任务的client
  • client去提交相应的任务

oozie工程结构

最重要的就是三个:

  • 1 client 这是任务提交的入口
  • 2 core 这是oozie的核心(在3中好像拆分成了core和server)
  • 3 distro 这里保存了启动脚本

寻找源码入口

  • 一种方式是直接以文件夹搜索main方法。
  • 另一种是看它的启动脚本。

在启动脚本中oozie.cmd,有这样一句:

%JAVA_BIN% %JAVA_PROPERTIES% -cp %OOZIECPPATH% org.apache.oozie.cli.OozieCLI %OOZIE_PROPERTIES%

可见,入口在org.apache.oozie.cli.OozieCLI这个类中,那就从它开始吧。

sqoop作业的提交

首先是OozieCLI的入口main方法:

public static void main(String[] args) {
        //oozie方法的入口
        if (!System.getProperties().containsKey(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP)) {
            System.setProperty(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP, "true");
        }
        System.exit(new OozieCLI().run(args));
    }

前面是一些认证的东西,可以忽略,直接进入run方法:

public synchronized int run(String[] args) {
        //保证clent仅启动一次
        if (used) {
            throw new IllegalStateException("CLI instance already used");
        }
        used = true;
        //创建参数解析器
        final CLIParser parser = getCLIParser();
        try {
            final CLIParser.Command command = parser.parse(args);

            String doAsUser = command.getCommandLine().getOptionValue(DO_AS_OPTION);

            if (doAsUser != null) {
                OozieClient.doAs(doAsUser, new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        processCommand(parser, command);
                        return null;
                    }
                });
            }
            else {
                processCommand(parser, command);
            }
            return 0;
        }
        ...
    }

主要的内容是在这个processCommand里面,processCommand会根据命令调用相应的命令方法:

public void processCommand(CLIParser parser, CLIParser.Command command) throws Exception {
        if (command.getName().equals(HELP_CMD)) {
            parser.showHelp(command.getCommandLine());
        }
        else if (command.getName().equals(JOB_CMD)) {
            jobCommand(command.getCommandLine());
        }
        else if (command.getName().equals(JOBS_CMD)) {
            jobsCommand(command.getCommandLine());
        }
        else if (command.getName().equals(ADMIN_CMD)) {
            adminCommand(command.getCommandLine());
        }
        else if (command.getName().equals(VERSION_CMD)) {
            versionCommand();
        }
        else if (command.getName().equals(VALIDATE_CMD)) {
            validateCommand(command.getCommandLine());
        }
        else if (command.getName().equals(SLA_CMD)) {
            slaCommand(command.getCommandLine());
        }
        else if (command.getName().equals(PIG_CMD)) {
            scriptLanguageCommand(command.getCommandLine(), PIG_CMD);
        }
        else if (command.getName().equals(HIVE_CMD)) {
            scriptLanguageCommand(command.getCommandLine(), HIVE_CMD);
        }
        else if (command.getName().equals(SQOOP_CMD)) {
            sqoopCommand(command.getCommandLine());//我关注的sqoop在这里
        }
        else if (command.getName().equals(INFO_CMD)) {
            infoCommand(command.getCommandLine());
        }
        else if (command.getName().equals(MR_CMD)){
            mrCommand(command.getCommandLine());
        }
    }

在sqoopCommand方法里面,sqoop任务被提交:

private void sqoopCommand(CommandLine commandLine) throws IOException, OozieCLIException {
        List<String> args = commandLine.getArgList();
        if (args.size() > 0) {
            // checking if args starts with -X (because CLIParser cannot check this)
            if (!args.get(0).equals("-X")) {
                throw new OozieCLIException("Unrecognized option: " + args.get(0) + " Expecting -X");
            }
            args.remove(0);
        }

        if (!commandLine.hasOption(SQOOP_COMMAND_OPTION)) {
            throw new OozieCLIException("Need to specify -command");
        }

        if (!commandLine.hasOption(CONFIG_OPTION)) {
            throw new OozieCLIException("Need to specify -config <configfile>");
        }

        try {
            XOozieClient wc = createXOozieClient(commandLine);
            Properties conf = getConfiguration(wc, commandLine);
            String[] command = commandLine.getOptionValues(SQOOP_COMMAND_OPTION);
            System.out.println(JOB_ID_PREFIX + wc.submitSqoop(conf, command, args.toArray(new String[args.size()])));
        }
        catch (OozieClientException ex) {
            throw new OozieCLIException(ex.toString(), ex);
        }
    }

最重要的内容就在这几行:

XOozieClient wc = createXOozieClient(commandLine);
Properties conf = getConfiguration(wc, commandLine);
String[] command = commandLine.getOptionValues(SQOOP_COMMAND_OPTION);
System.out.println(JOB_ID_PREFIX + wc.submitSqoop(conf, command, args.toArray(new String[args.size()])));

其中wc.submitSqoop提交了sqoop的任务。

后续问题

  • 1 任务提交到了哪里?
  • 2 在提交任务的时候都做了很么?
  • 3 如何在mapreduce开启一个新的sqoop的?
  • 4 为什么在yarn中可以同时看到两个应用,一个oozie,一个是sqoop

参考

oozie(4.1.0)架构及二次开发流程

本文转自博客园xingoo的博客,原文链接:大数据之Oozie——源码分析(一)程序入口,如需转载请自行联系原博主。
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
3月前
|
消息中间件 分布式计算 大数据
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
69 0
|
SQL 分布式计算 大数据
大数据Oozie任务调度
大数据Oozie任务调度
204 0
|
8月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
172 0
|
3月前
|
分布式计算 监控 大数据
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
76 0
|
3月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
70 0
|
3月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
62 0
|
8月前
|
分布式计算 大数据 Java
大数据必知必会系列——面试官问能不能手写一个spark程序?[新星计划]
大数据必知必会系列——面试官问能不能手写一个spark程序?[新星计划]
88 0
|
6月前
|
运维 监控 大数据
部署-Linux01,后端开发,运维开发,大数据开发,测试开发,后端软件,大数据系统,运维监控,测试程序,网页服务都要在Linux中进行部署
部署-Linux01,后端开发,运维开发,大数据开发,测试开发,后端软件,大数据系统,运维监控,测试程序,网页服务都要在Linux中进行部署
|
7月前
|
分布式计算 DataWorks 大数据
MaxCompute操作报错合集之pyodps3的报错信息里,报了程序的解析错误,是什么导致的
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
8月前
|
分布式计算 大数据 Linux
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
285 0

热门文章

最新文章