如何在MaxCompute上运行HadoopMR作业

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: MaxCompute(原ODPS)有一套自己的MapReduce编程模型和接口,简单说来,这套接口的输入输出都是MaxCompute中的Table,处理的数据是以Record为组织形式的,它可以很好地描述Table中的数据处理过程,然而与社区的Hadoop相比,编程接口差异较大。Hadoop用户如果

本文用到的

阿里云数加-大数据计算服务MaxCompute产品地址:https://www.aliyun.com/product/odps


MaxCompute(原ODPS)有一套自己的MapReduce编程模型和接口,简单说来,这套接口的输入输出都是MaxCompute中的Table,处理的数据是以Record为组织形式的,它可以很好地描述Table中的数据处理过程,然而与社区的Hadoop相比,编程接口差异较大。Hadoop用户如果要将原来的Hadoop MR作业迁移到MaxCompute的MR执行,需要重写MR的代码,使用MaxCompute的接口进行编译和调试,运行正常后再打成一个Jar包才能放到MaxCompute的平台来运行。这个过程十分繁琐,需要耗费很多的开发和测试人力。如果能够完全不改或者少量地修改原来的Hadoop MR代码就能在MaxCompute平台上跑起来,将是一个比较理想的方式。

现在MaxCompute平台提供了一个HadoopMR到MaxCompute MR的适配工具,已经在一定程度上实现了Hadoop MR作业的二进制级别的兼容,即用户可以在不改代码的情况下通过指定一些配置,就能将原来在Hadoop上运行的MR jar包拿过来直接跑在MaxCompute上。该插件的下载地址在:http://repo.aliyun.com/download/hadoop2openmr-1.0.jar,目前该插件处于测试阶段,暂时还不能支持用户自定义comparator和自定义key类型,下面将以WordCount程序为例,介绍一下这个插件的基本使用方式。

使用该插件在MaxCompute平台跑一个HadoopMR作业的基本步骤如下:

1. 下载HadoopMR的插件

通过http://repo.aliyun.com/download/hadoop2openmr-1.0.jar下载插件,包名为hadoop2openmr-1.0.jar,注意,这个jar里面已经包含hadoop-2.7.2版本的相关依赖,在作业的jar包中请不要携带hadoop的依赖,避免版本冲突。

2. 准备好HadoopMR的程序jar包

编译导出WordCount的jar包:wordcount_test.jar ,wordcount程序的源码如下:


package com.aliyun.odps.mapred.example.hadoop;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
import java.io.IOException;
import java.util.StringTokenizer;
 
public class WordCount {
 
    public static class TokenizerMapper
        extends Mapper<Object, Text, Text, IntWritable>{
 
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
 
        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }
 
    public static class IntSumReducer
        extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();
 
        public void reduce(Text key, Iterable<IntWritable> values,
            Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
 
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

3. 测试数据准备

  • 创建输入表和输出表
create table if not exists wc_in(line string);
create table if not exists wc_out(key string, cnt bigint);
  • 通过tunnel将数据导入输入表中

待导入文本文件data.txt的数据内容如下:

hello maxcompute
hello mapreduce

例如可以通过如下命令将data.txt的数据导入wc_in中,

tunnel upload  data.txt wc_in;

4. 准备好表与hdfs文件路径的映射关系配置

配置文件命名为:wordcount-table-res.conf

{
  "file:/foo": {
    "resolver": {
      "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver",
      "properties": {
          "text.resolver.columns.combine.enable": "true",
          "text.resolver.seperator": "\t"
      }
    },
    "tableInfos": [
      {
        "tblName": "wc_in",
        "partSpec": {},
        "label": "__default__"
      }
    ],
    "matchMode": "exact"
  },
  "file:/bar": {
    "resolver": {
      "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver",
      "properties": {
          "binary.resolver.input.key.class" : "org.apache.hadoop.io.Text",
          "binary.resolver.input.value.class" : "org.apache.hadoop.io.LongWritable"
      }
    },
    "tableInfos": [
      {
        "tblName": "wc_out",
        "partSpec": {},
        "label": "__default__"
      }
    ],
    "matchMode": "fuzzy"
  }
}

配置项说明:

整个配置是一个json文件,描述hdfs上文件与maxcompute上表之间的映射关系,一般要配置输入和输出两部分,一个HDFS路径对应一个resolver配置,tableInfos配置以及matchMode配置。

  • resolver: 用于配置如何对待文件中的数据,目前有com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver和com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver两个内置的resolver可以选用。除了指定好resolver的名字,还需要为相应的resolver配置一些properties指导它正确的进行数据解析。

    • TextFileResolver: 对于纯文本的数据,输入输出都会当成纯文本对待。当作为输入resolver配置时,需要配置的properties有text.resolver.columns.combine.enable和text.resolver.seperator,当text.resolver.columns.combine.enable配置为true时,会把输入表的所有列按找text.resolver.seperator指定的分隔符组合成一个字符串作为输入。否则,会把输入表的前两列分别作为key,value。
    • BinaryFileResolver: 可以处理二进制的数据,自动将数据转换为maxcompute可以支持的数据类型,如bigint, bool, double等。当作为输出resolver配置时,需要配置的properties有binary.resolver.input.key.class和binary.resolver.input.value.class,分别代表中间结果的key和value类型。
  • tableInfos:用户配置HDFS对应的maxcompute的表,目前只支持配置表的名字tblName,而partSpec和label请保持和示例一致。
  • matchMode:路径的匹配模式,可选项为exact和fuzzy,分别代表精确匹配和模糊匹配,如果设置为fuzzy,则可以通过正则来匹配HDFS的输入路径

5. 使用MaxCompute命令行工具odpscmd提交作业

maxcompute命令行工具的安装和配置方法参考:http://repo.aliyun.com/odpscmd/

在maxcompute的命令行下运行如下命令:

jar -DODPS_HADOOPMR_TABLE_RES_CONF=./wordcount-table-res.conf -classpath hadoop2openmr-1.0.jar,wordcount_test.jar com.aliyun.odps.mapred.example.hadoop.WordCount /foo /bar;    

这里假设我们已经将hadoop2openmr-1.0.jar和wordcount_test.jar以及wordcount-table-res.conf已经放置到odpscmd的当前目录,否则在指定配置和-classpath的路径时需要做相应的修改。

运行过程如下图所示:

1

当作业运行完成后,可以查看结果表wc_out里面的内容,验证作业成功结束,结果符合预期。

2

欢迎加入MaxCompute钉钉群讨论

42559c7dde62e4d333c90e02efdf416257a4be27

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
3月前
|
消息中间件 分布式计算 大数据
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
185 0
|
3月前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
52 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
|
3月前
|
SQL 分布式计算 NoSQL
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
66 4
|
3月前
|
SQL 分布式计算 大数据
大数据-168 Elasticsearch 单机云服务器部署运行 详细流程
大数据-168 Elasticsearch 单机云服务器部署运行 详细流程
76 2
|
3月前
|
消息中间件 监控 Java
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
92 1
|
3月前
|
消息中间件 分布式计算 大数据
大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
185 0
|
6月前
|
弹性计算 分布式计算 DataWorks
MaxCompute操作报错合集之运行pyodps报错超时,该如何排查
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
6月前
|
分布式计算 资源调度 DataWorks
MaxCompute操作报错合集之出现“查询运行日志失败”的报错,一般是什么导致的
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
6月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之运行DDL任务时出现异常,具体错误是ODPS-0110061,该如何处理
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
130 3
|
6月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之执行多条SQL语句时,使用同一个实例来运行,遇到报错,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。

相关产品

  • 云原生大数据计算服务 MaxCompute