本文将会为您介绍较为常用的 MapReduce 核心接口。如果您使用 Maven,可以从
Maven 库 中搜索“odps-sdk-mapred”获取不同版本的 Java SDK,相关配置信息如下:
- <dependency>
- <groupId>com.aliyun.odps</groupId>
- <artifactId>odps-sdk-mapred</artifactId>
- <version>0.26.2-public</version>
- </dependency>
MapperBase
主要函数接口:
ReducerBase
主要函数接口:
TaskContext
主要函数接口:
注意:
MaxCompute 的 TaskContext 接口中提供了 progress 功能,但此功能是在防止 Worker长时间运行未结束,被框架误认为超时而被杀的情况出现。此接口更类似于向框架发送心跳信息,并不是用来汇报 Worker 进度。MaxComputeMapReduce 默认 Worker 超时时间为 10 分钟(系统默认配置,不受用户控制),如果超过 10 分钟,Worker仍然没有向框架发送心跳(调用 progress 接口),框架会强制停止该 Worker,MapReduce 任务失败退出。因此,建议您在Mapper/Reducer 函数中,定期调用 progress 接口,防止框架认为 Worker 超时,误杀任务。
JobConf
主要函数接口:
注意:
通常情况下,GroupingColumns 包含在 KeySortColumns 中,KeySortColumns 和 PartitionColumns 要包含在 Key 中。
在 Map 端,Mapper 输出的 Record 会根据设置的 PartitionColumns 计算哈希值,决定分配到哪个 Reducer,会根据 KeySortColumns 对 Record 进行排序。
在 Reduce 端,输入 Records 在按照 KeySortColumns 排序好后,会根据GroupingColumns 指定的列对输入的 Records 进行分组,即会顺序遍历输入的 Records,把GroupingColumns 所指定列相同的 Records 作为一次 reduce 函数调用的输入。
JobClient
主要函数接口:
RunningJob
主要函数接口:
InputUtils
主要函数接口:
OutputUtils
主要函数接口:
Pipeline
Pipeline是
MR2 的主体类。可以通过 Pipeline.builder 构建一个 Pipeline。Pipeline 的主要接口如下:
- public Builder addMapper(Class<? extends Mapper> mapper)
- public Builder addMapper(Class<? extends Mapper> mapper,
- Column[] keySchema, Column[] valueSchema, String[] sortCols,
- SortOrder[] order, String[] partCols,
- Class<? extends Partitioner> theClass, String[] groupCols)
- public Builder addReducer(Class<? extends Reducer> reducer)
- public Builder addReducer(Class<? extends Reducer> reducer,
- Column[] keySchema, Column[] valueSchema, String[] sortCols,
- SortOrder[] order, String[] partCols,
- Class<? extends Partitioner> theClass, String[] groupCols)
- public Builder setOutputKeySchema(Column[] keySchema)
- public Builder setOutputValueSchema(Column[] valueSchema)
- public Builder setOutputKeySortColumns(String[] sortCols)
- public Builder setOutputKeySortOrder(SortOrder[] order)
- public Builder setPartitionColumns(String[] partCols)
- public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
- public Builder setOutputGroupingColumns(String[] cols)
示例如下:
- Job job = new Job();
- Pipeline pipeline = Pipeline.builder()
- .addMapper(TokenizerMapper.class)
- .setOutputKeySchema(
- new Column[] { new Column("word", OdpsType.STRING) })
- .setOutputValueSchema(
- new Column[] { new Column("count", OdpsType.BIGINT) })
- .addReducer(SumReducer.class)
- .setOutputKeySchema(
- new Column[] { new Column("count", OdpsType.BIGINT) })
- .setOutputValueSchema(
- new Column[] { new Column("word", OdpsType.STRING),
- new Column("count", OdpsType.BIGINT) })
- .addReducer(IdentityReducer.class).createPipeline();
- job.setPipeline(pipeline);
- job.addInput(...)
- job.addOutput(...)
- job.submit();
如上所示,您可以在 main 函数中构建一个 Map 后连续接两个 Reduce 的 MapReduce 任务。如果您比较熟悉 MapReduce 的基础功能,即可轻松使用 MR
2。
注意:
建议您在使用 MR2 功能前,首先了解 MapReduce 的基础用法。
JobConf 仅能够配置 Map 后接单 Reduce 的 MapReduce 任务。
数据类型
MapReduce 支持的数据类型有:Bigint,String,Double,Boolean,datetime 和 Decimal 类型。MaxCompute 数据类型与 Java 数据类型的对应关系,如下所示: