目前 MaxCompute 支持多路的输入及输出。
测试准备
准备好测试程序的 Jar 包,假设名字为 mapreduce-examples.jar,本地存放路径为 data\resources。
准备好多路输入输出的测试表和资源。
创建测试表create table wc_in1(key string, value string);- create table wc_in2(key string, value string);
- create table mr_multiinout_out1 (key string, cnt bigint);
- create table mr_multiinout_out2 (key string, cnt bigint) partitioned by (a string, b string);
- alter table mr_multiinout_out2 add partition (a='1', b='1');
- alter table mr_multiinout_out2 add partition (a='2', b='2');
添加测试资源。
-
add jar data\resources\mapreduce-examples.jar -f;
使用 tunnel 导入数据。
- tunnel upload data1 wc_in1;
- tunnel upload data2 wc_in2;
导入 wc_in1 表的数据文件 data 的内容,如下所示:
- hello,odps
导入 wc_in2 表的数据文件 data 的内容,如下所示:
- hello,world
测试步骤
在 odpscmd 中执行 MultipleInOut,如下所示:
- jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
- com.aliyun.odps.mapred.open.example.MultipleInOut wc_in1,wc_in2 mr_multiinout_out1,mr_multiinout_out2|a=1/b=1|out1,mr_multiinout_out2|a=2/b=2|out2;
预期结果
作业成功结束后,mr_multiinout_out1 中的内容,如下所示:
- +------------+------------+
- | key | cnt |
- +------------+------------+
- | default | 1 |
- +------------+------------+
mr_multiinout_out2 中内容,如下所示:
- +--------+------------+---+---+
- | key | cnt | a | b |
- +--------+------------+---+---+
- | odps | 1 | 1 | 1 |
- | world | 1 | 1 | 1 |
- | out1 | 1 | 1 | 1 |
- | hello | 2 | 2 | 2 |
- | out2 | 1 | 2 | 2 |
- +--------+------------+---+---+
代码示例
- package com.aliyun.odps.mapred.open.example;
- import java.io.IOException;
- import java.util.Iterator;
- import java.util.LinkedHashMap;
- import com.aliyun.odps.data.Record;
- import com.aliyun.odps.data.TableInfo;
- import com.aliyun.odps.mapred.JobClient;
- import com.aliyun.odps.mapred.MapperBase;
- import com.aliyun.odps.mapred.ReducerBase;
- import com.aliyun.odps.mapred.TaskContext;
- import com.aliyun.odps.mapred.conf.JobConf;
- import com.aliyun.odps.mapred.utils.InputUtils;
- import com.aliyun.odps.mapred.utils.OutputUtils;
- import com.aliyun.odps.mapred.utils.SchemaUtils;
- /**
- * Multi input & output example.
- **/
- public class MultipleInOut {
- public static class TokenizerMapper extends MapperBase {
- Record word;
- Record one;
- @Override
- public void setup(TaskContext context) throws IOException {
- word = context.createMapOutputKeyRecord();
- one = context.createMapOutputValueRecord();
- one.set(new Object[] { 1L });
- }
- @Override
- public void map(long recordNum, Record record, TaskContext context)
- throws IOException {
- for (int i = 0; i < record.getColumnCount(); i++) {
- word.set(new Object[] { record.get(i).toString() });
- context.write(word, one);
- }
- }
- }
- public static class SumReducer extends ReducerBase {
- private Record result;
- private Record result1;
- private Record result2;
- @Override
- public void setup(TaskContext context) throws IOException {
- result = context.createOutputRecord();
- result1 = context.createOutputRecord("out1");
- result2 = context.createOutputRecord("out2");
- }
- @Override
- public void reduce(Record key, Iterator<Record> values, TaskContext context)
- throws IOException {
- long count = 0;
- while (values.hasNext()) {
- Record val = values.next();
- count += (Long) val.get(0);
- }
- long mod = count % 3;
- if (mod == 0) {
- result.set(0, key.get(0));
- result.set(1, count);
- //不指定label,输出的默认(default)输出
- context.write(result);
- } else if (mod == 1) {
- result1.set(0, key.get(0));
- result1.set(1, count);
- context.write(result1, "out1");
- } else {
- result2.set(0, key.get(0));
- result2.set(1, count);
- context.write(result2, "out2");
- }
- }
- @Override
- public void cleanup(TaskContext context) throws IOException {
- Record result = context.createOutputRecord();
- result.set(0, "default");
- result.set(1, 1L);
- context.write(result);
- Record result1 = context.createOutputRecord("out1");
- result1.set(0, "out1");
- result1.set(1, 1L);
- context.write(result1, "out1");
- Record result2 = context.createOutputRecord("out2");
- result2.set(0, "out2");
- result2.set(1, 1L);
- context.write(result2, "out2");
- }
- }
- public static LinkedHashMap<String, String> convertPartSpecToMap(
- String partSpec) {
- LinkedHashMap<String, String> map = new LinkedHashMap<String, String>();
- if (partSpec != null && !partSpec.trim().isEmpty()) {
- String[] parts = partSpec.split("/");
- for (String part : parts) {
- String[] ss = part.split("=");
- if (ss.length != 2) {
- throw new RuntimeException("ODPS-0730001: error part spec format: "
- + partSpec);
- }
- map.put(ss[0], ss[1]);
- }
- }
- return map;
- }
- public static void main(String[] args) throws Exception {
- String[] inputs = null;
- String[] outputs = null;
- if (args.length == 2) {
- inputs = args[0].split(",");
- outputs = args[1].split(",");
- } else {
- System.err.println("MultipleInOut in... out...");
- System.exit(1);
- }
- JobConf job = new JobConf();
- job.setMapperClass(TokenizerMapper.class);
- job.setReducerClass(SumReducer.class);
- job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
- job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
- //解析用户的输入表字符串
- for (String in : inputs) {
- String[] ss = in.split("\\|");
- if (ss.length == 1) {
- InputUtils.addTable(TableInfo.builder().tableName(ss[0]).build(), job);
- } else if (ss.length == 2) {
- LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
- InputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
- } else {
- System.err.println("Style of input: " + in + " is not right");
- System.exit(1);
- }
- }
- //解析用户的输出表字符串
- for (String out : outputs) {
- String[] ss = out.split("\\|");
- if (ss.length == 1) {
- OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).build(), job);
- } else if (ss.length == 2) {
- LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
- OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
- } else if (ss.length == 3) {
- if (ss[1].isEmpty()) {
- LinkedHashMap<String, String> map = convertPartSpecToMap(ss[2]);
- OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
- } else {
- LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
- OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map)
- .label(ss[2]).build(), job);
- }
- } else {
- System.err.println("Style of output: " + out + " is not right");
- System.exit(1);
- }
- }
- JobClient.runJob(job);
- }
- }