开发者社区> 问答> 正文

MaxCompute用户指南:MapReduce:Java SDK:原生SDK概述


本文将会为您介绍较为常用的 MapReduce 核心接口。如果您使用 Maven,可以从 Maven 库 中搜索“odps-sdk-mapred”获取不同版本的 Java SDK,相关配置信息如下:

  1. <dependency>
  2.     <groupId>com.aliyun.odps</groupId>
  3.     <artifactId>odps-sdk-mapred</artifactId>
  4.     <version>0.26.2-public</version>
  5. </dependency>
主要接口描述
MapperBase用户自定义的Map函数需要继承自此类。处理输入表的记录对 象,加工处理成键值对集合输出到Reduce阶段,或者不经过 Reduce阶段直接输出结果记录到结果表。不经过Reduce阶段而 直接输出计算结果的作业,也可称之为Map-Only作业。
ReducerBase用户自定义的Reduce函数需要继承自此类。对与一个键(Key)关联的一组数值集(Values)进行归约计算。
TaskContext是MapperBase及ReducerBase多个成员函数的输入参数之一。 含有任务运行的上下文信息。
JobClient用于提交和管理作业,提交方式包括阻塞(同步)方式及非阻塞(异步) 方式。
RunningJob作业运行时对象,用于跟踪运行中的MapReduce作业实例。
JobConf描述一个MapReduce任务的配置,通常在主程序(main函数)中定义JobConf对象,然后通过JobClient提交作业给MaxCompute服务。


MapperBase


主要函数接口:  
主要接口描述
void cleanup(TaskContext context)在Map阶段结束时,map方法之后调用。
void map(long key, Record record, TaskContext context)map方法,处理输入表的记录。
void setup(TaskContext context)在Map阶段开始时,map方法之前调用。


ReducerBase


主要函数接口:  
主要接口描述
void cleanup( TaskContext context)在Reduce阶段结束时,reduce方法之后调用。
void reduce(Record key, Iterator<Record > values, TaskContext context)reduce方法,处理输入表的记录。
void setup( TaskContext context)在Reduce阶段开始时,reduce方法之前调用。


TaskContext


主要函数接口:  
主要接口描述
TableInfo[] getOutputTableInfo()获取输出的表信息
Record createOutputRecord()创建默认输出表的记录对象
Record createOutputRecord(String label)创建给定label输出表的记录对象
Record createMapOutputKeyRecord()创建Map输出Key的记录对象
Record createMapOutputValueRecord()创建Map输出Value的记录对象
void write(Record record)写记录到默认输出,用于Reduce端写出数据, 可以在Reduce端多次调用。
void write(Record record, String label)写记录到给定标签输出,用于Reduce端写出数据。可以在 Reduce端多次调用。
void write(Record key, Record value)Map写记录到中间结果,可以在Map函数中多次调用。 可以在Map端多次调用。
BufferedInputStream readResourceFileAsStream(String resourceName)读取文件类型资源
Iterator<Record > readResourceTable(String resourceName)读取表类型资源
Counter getCounter(Enum<? > name)获取给定名称的Counter对象
Counter getCounter(String group, String name)获取给定组名和名称的Counter对象
void progress()向MapReduce框架报告心跳信息。 如果用户方法处理时间 很长,且中间没有调用框架,可以调用这个方法避免task 超时,框架默认600秒超时。

注意:
MaxCompute 的 TaskContext 接口中提供了 progress 功能,但此功能是在防止 Worker长时间运行未结束,被框架误认为超时而被杀的情况出现。此接口更类似于向框架发送心跳信息,并不是用来汇报 Worker 进度。MaxComputeMapReduce 默认 Worker 超时时间为 10 分钟(系统默认配置,不受用户控制),如果超过 10 分钟,Worker仍然没有向框架发送心跳(调用 progress 接口),框架会强制停止该 Worker,MapReduce 任务失败退出。因此,建议您在Mapper/Reducer 函数中,定期调用 progress 接口,防止框架认为 Worker 超时,误杀任务。


JobConf


主要函数接口:  
主要接口描述
void setResources(String resourceNames)声明本作业使用的资源。只有声明的资源才能在运行Mapper/Reducer时通过TaskContext对象读取。
void setMapOutputKeySchema(Column[] schema)设置Mapper输出到Reducer的Key属性
void setMapOutputValueSchema(Column[] schema)设置Mapper输出到Reducer的Value属性
void setOutputKeySortColumns(String[] cols)设置Mapper输出到Reducer的Key排序列
void setOutputGroupingColumns(String[] cols)设置Key分组列
void setMapperClass(Class<? extends Mapper > theClass)设置作业的Mapper函数
void setPartitionColumns(String[] cols)设置作业指定的分区列. 默认是Mapper输出Key的所有列
void setReducerClass(Class<? extends Reducer > theClass)设置作业的Reducer
void setCombinerClass(Class<? extends Reducer > theClass)设置作业的combiner。在Map端运行,作用类似于单个Map 对本地的相同Key值做Reduce
void setSplitSize(long size)设置输入分片大小,单位 MB,默认256
void setNumReduceTasks(int n)设置Reducer任务数,默认为Mapper任务数的1/4
void setMemoryForMapTask(int mem)设置Mapper任务中单个Worker的内存大小,单位:MB, 默认值2048。
void setMemoryForReduceTask(int mem)设置Reducer任务中单个Worker的内存大小,单位:MB, 默认值 2048。

注意:

  • 通常情况下,GroupingColumns 包含在 KeySortColumns 中,KeySortColumns 和 PartitionColumns 要包含在 Key 中。

  • 在 Map 端,Mapper 输出的 Record 会根据设置的 PartitionColumns 计算哈希值,决定分配到哪个 Reducer,会根据 KeySortColumns 对 Record 进行排序。

  • 在 Reduce 端,输入 Records 在按照 KeySortColumns 排序好后,会根据GroupingColumns 指定的列对输入的 Records 进行分组,即会顺序遍历输入的 Records,把GroupingColumns 所指定列相同的 Records 作为一次 reduce 函数调用的输入。


JobClient


主要函数接口:  
主要接口描述
static RunningJob runJob(JobConf job)阻塞(同步)方式提交MapReduce作业后立即返回
static RunningJob submitJob(JobConf job)非阻塞(异步)方式提交MapReduce作业后立即返回


RunningJob


主要函数接口:  
主要接口描述
String getInstanceID()获取作业运行实例ID,用于查看运行日志和作业管理。
boolean isComplete()查询作业是否结束。
boolean isSuccessful()查询作业实例是否运行成功。
void waitForCompletion()等待直至作业实例结束。一般使用于异步方式提交的作业。
JobStatus getJobStatus()查询作业实例运行状态。
void killJob()结束此作业。
Counters getCounters()获取Conter信息。


InputUtils


主要函数接口:  
主要接口描述
static void addTable(TableInfo table, JobConf conf)添加表table到任务输入,可以被调用多次 ,新加入的表以append方式添加到输入队列中。
static void setTables(TableInfo [] tables, JobConf conf)添加多张表到任务输入中。


OutputUtils


主要函数接口:  
主要接口描述
static void addTable(TableInfo table, JobConf conf)添加表table到任务输出,可以被调用多次 ,新加入的表以append方式添加到输出队 列中。
static void setTables(TableInfo [] tables, JobConf conf)添加多张表到任务输出中。


Pipeline


Pipeline是 MR2 的主体类。可以通过 Pipeline.builder 构建一个 Pipeline。Pipeline 的主要接口如下:
  1.     public Builder addMapper(Class<? extends Mapper> mapper)
  2.     public Builder addMapper(Class<? extends Mapper> mapper,
  3.            Column[] keySchema, Column[] valueSchema, String[] sortCols,
  4.            SortOrder[] order, String[] partCols,
  5.            Class<? extends Partitioner> theClass, String[] groupCols)
  6.     public Builder addReducer(Class<? extends Reducer> reducer)
  7.     public Builder addReducer(Class<? extends Reducer> reducer,
  8.            Column[] keySchema, Column[] valueSchema, String[] sortCols,
  9.            SortOrder[] order, String[] partCols,
  10.            Class<? extends Partitioner> theClass, String[] groupCols)
  11.     public Builder setOutputKeySchema(Column[] keySchema)
  12.     public Builder setOutputValueSchema(Column[] valueSchema)
  13.     public Builder setOutputKeySortColumns(String[] sortCols)
  14.     public Builder setOutputKeySortOrder(SortOrder[] order)
  15.     public Builder setPartitionColumns(String[] partCols)
  16.     public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
  17.     public Builder setOutputGroupingColumns(String[] cols)

示例如下:
  1.     Job job = new Job();
  2.     Pipeline pipeline = Pipeline.builder()
  3.      .addMapper(TokenizerMapper.class)
  4.      .setOutputKeySchema(
  5.          new Column[] { new Column("word", OdpsType.STRING) })
  6.      .setOutputValueSchema(
  7.          new Column[] { new Column("count", OdpsType.BIGINT) })
  8.      .addReducer(SumReducer.class)
  9.      .setOutputKeySchema(
  10.          new Column[] { new Column("count", OdpsType.BIGINT) })
  11.      .setOutputValueSchema(
  12.          new Column[] { new Column("word", OdpsType.STRING),
  13.          new Column("count", OdpsType.BIGINT) })
  14.      .addReducer(IdentityReducer.class).createPipeline();
  15.     job.setPipeline(pipeline);  
  16.     job.addInput(...)
  17.     job.addOutput(...)
  18.     job.submit();

如上所示,您可以在 main 函数中构建一个 Map 后连续接两个 Reduce 的 MapReduce 任务。如果您比较熟悉 MapReduce 的基础功能,即可轻松使用 MR 2。  

注意

  • 建议您在使用 MR2 功能前,首先了解 MapReduce 的基础用法。  

  • JobConf 仅能够配置 Map 后接单 Reduce 的 MapReduce 任务。  


数据类型


MapReduce 支持的数据类型有:Bigint,String,Double,Boolean,datetime 和 Decimal 类型。MaxCompute 数据类型与 Java 数据类型的对应关系,如下所示:  
ODPS SQL TypeBigintStringDoubleBooleanDatetimeDecimal
Java TypeLongStringDoubleBooleanDateBigDecimal

展开
收起
行者武松 2017-10-24 10:17:28 2142 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
从 SDK 到编解码:视频直播架构解析 立即下载
跨平台的云服务SDK需要什么 立即下载
一个跨平台的云服务SDK需要什么 立即下载