开发者社区> 问答> 正文

MaxCompute用户指南:MapReduce:示例程序:多路输入输出示例



目前 MaxCompute 支持多路的输入及输出。

测试准备


  1. 准备好测试程序的 Jar 包,假设名字为 mapreduce-examples.jar,本地存放路径为 data\resources。

  2. 准备好多路输入输出的测试表和资源。

    • 创建测试表create table wc_in1(key string, value string);
    • create table wc_in2(key string, value string);
    • create table mr_multiinout_out1 (key string, cnt bigint);
    • create table mr_multiinout_out2 (key string, cnt bigint) partitioned by (a string, b string);
    • alter table mr_multiinout_out2 add partition (a='1', b='1');
    • alter table mr_multiinout_out2 add partition (a='2', b='2');

  • 添加测试资源。
    1. add jar data\resources\mapreduce-examples.jar -f;

    使用 tunnel 导入数据。
    1. tunnel upload data1 wc_in1;
    2. tunnel upload data2 wc_in2;

    导入 wc_in1 表的数据文件 data 的内容,如下所示:
    1. hello,odps

    导入 wc_in2 表的数据文件 data 的内容,如下所示:
    1. hello,world


    测试步骤


    在 odpscmd 中执行 MultipleInOut,如下所示:
    1. jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
    2. com.aliyun.odps.mapred.open.example.MultipleInOut wc_in1,wc_in2 mr_multiinout_out1,mr_multiinout_out2|a=1/b=1|out1,mr_multiinout_out2|a=2/b=2|out2;


    预期结果


    作业成功结束后,mr_multiinout_out1 中的内容,如下所示:
    1. +------------+------------+
    2. | key        | cnt        |
    3. +------------+------------+
    4. | default    | 1          |
    5. +------------+------------+

    mr_multiinout_out2 中内容,如下所示:
    1. +--------+------------+---+---+
    2. | key    | cnt        | a | b |
    3. +--------+------------+---+---+
    4. | odps   | 1          | 1 | 1 |
    5. | world  | 1          | 1 | 1 |
    6. | out1   | 1          | 1 | 1 |
    7. | hello  | 2          | 2 | 2 |
    8. | out2   | 1          | 2 | 2 |
    9. +--------+------------+---+---+


    代码示例

    1.     package com.aliyun.odps.mapred.open.example;
    2.     import java.io.IOException;
    3.     import java.util.Iterator;
    4.     import java.util.LinkedHashMap;
    5.     import com.aliyun.odps.data.Record;
    6.     import com.aliyun.odps.data.TableInfo;
    7.     import com.aliyun.odps.mapred.JobClient;
    8.     import com.aliyun.odps.mapred.MapperBase;
    9.     import com.aliyun.odps.mapred.ReducerBase;
    10.     import com.aliyun.odps.mapred.TaskContext;
    11.     import com.aliyun.odps.mapred.conf.JobConf;
    12.     import com.aliyun.odps.mapred.utils.InputUtils;
    13.     import com.aliyun.odps.mapred.utils.OutputUtils;
    14.     import com.aliyun.odps.mapred.utils.SchemaUtils;
    15.     /**
    16.      * Multi input & output example.
    17.      **/
    18.     public class MultipleInOut {
    19.       public static class TokenizerMapper extends MapperBase {
    20.         Record word;
    21.         Record one;
    22.         @Override
    23.         public void setup(TaskContext context) throws IOException {
    24.           word = context.createMapOutputKeyRecord();
    25.           one = context.createMapOutputValueRecord();
    26.           one.set(new Object[] { 1L });
    27.         }
    28.         @Override
    29.         public void map(long recordNum, Record record, TaskContext context)
    30.             throws IOException {
    31.           for (int i = 0; i < record.getColumnCount(); i++) {
    32.             word.set(new Object[] { record.get(i).toString() });
    33.             context.write(word, one);
    34.           }
    35.         }
    36.       }
    37.       public static class SumReducer extends ReducerBase {
    38.         private Record result;
    39.         private Record result1;
    40.         private Record result2;
    41.         @Override
    42.         public void setup(TaskContext context) throws IOException {
    43.           result = context.createOutputRecord();
    44.           result1 = context.createOutputRecord("out1");
    45.           result2 = context.createOutputRecord("out2");
    46.         }
    47.         @Override
    48.         public void reduce(Record key, Iterator<Record> values, TaskContext context)
    49.             throws IOException {
    50.           long count = 0;
    51.           while (values.hasNext()) {
    52.             Record val = values.next();
    53.             count += (Long) val.get(0);
    54.           }
    55.           long mod = count % 3;
    56.           if (mod == 0) {
    57.             result.set(0, key.get(0));
    58.             result.set(1, count);
    59.             //不指定label,输出的默认(default)输出
    60.             context.write(result);
    61.           } else if (mod == 1) {
    62.             result1.set(0, key.get(0));
    63.             result1.set(1, count);
    64.             context.write(result1, "out1");
    65.           } else {
    66.             result2.set(0, key.get(0));
    67.             result2.set(1, count);
    68.             context.write(result2, "out2");
    69.           }
    70.         }
    71.         @Override
    72.         public void cleanup(TaskContext context) throws IOException {
    73.           Record result = context.createOutputRecord();
    74.           result.set(0, "default");
    75.           result.set(1, 1L);
    76.           context.write(result);
    77.           Record result1 = context.createOutputRecord("out1");
    78.           result1.set(0, "out1");
    79.           result1.set(1, 1L);
    80.           context.write(result1, "out1");
    81.           Record result2 = context.createOutputRecord("out2");
    82.           result2.set(0, "out2");
    83.           result2.set(1, 1L);
    84.           context.write(result2, "out2");
    85.         }
    86.       }
    87.       public static LinkedHashMap<String, String> convertPartSpecToMap(
    88.           String partSpec) {
    89.         LinkedHashMap<String, String> map = new LinkedHashMap<String, String>();
    90.         if (partSpec != null && !partSpec.trim().isEmpty()) {
    91.           String[] parts = partSpec.split("/");
    92.           for (String part : parts) {
    93.             String[] ss = part.split("=");
    94.             if (ss.length != 2) {
    95.               throw new RuntimeException("ODPS-0730001: error part spec format: "
    96.                   + partSpec);
    97.             }
    98.             map.put(ss[0], ss[1]);
    99.           }
    100.         }
    101.         return map;
    102.       }
    103.       public static void main(String[] args) throws Exception {
    104.         String[] inputs = null;
    105.         String[] outputs = null;
    106.         if (args.length == 2) {
    107.           inputs = args[0].split(",");
    108.           outputs = args[1].split(",");
    109.         } else {
    110.           System.err.println("MultipleInOut in... out...");
    111.           System.exit(1);
    112.         }
    113.         JobConf job = new JobConf();
    114.         job.setMapperClass(TokenizerMapper.class);
    115.         job.setReducerClass(SumReducer.class);
    116.         job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
    117.         job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
    118.         //解析用户的输入表字符串
    119.         for (String in : inputs) {
    120.           String[] ss = in.split("\\|");
    121.           if (ss.length == 1) {
    122.             InputUtils.addTable(TableInfo.builder().tableName(ss[0]).build(), job);
    123.           } else if (ss.length == 2) {
    124.             LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
    125.             InputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
    126.           } else {
    127.             System.err.println("Style of input: " + in + " is not right");
    128.             System.exit(1);
    129.           }
    130.         }
    131.         //解析用户的输出表字符串
    132.         for (String out : outputs) {
    133.           String[] ss = out.split("\\|");
    134.           if (ss.length == 1) {
    135.             OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).build(), job);
    136.           } else if (ss.length == 2) {
    137.             LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
    138.             OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
    139.           } else if (ss.length == 3) {
    140.             if (ss[1].isEmpty()) {
    141.               LinkedHashMap<String, String> map = convertPartSpecToMap(ss[2]);
    142.               OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
    143.             } else {
    144.               LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
    145.               OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map)
    146.                   .label(ss[2]).build(), job);
    147.             }
    148.           } else {
    149.             System.err.println("Style of output: " + out + " is not right");
    150.             System.exit(1);
    151.           }
    152.         }
    153.         JobClient.runJob(job);
    154.       }
    155.     }
  • 展开
    收起
    行者武松 2017-10-23 17:44:25 1980 0
    0 条回答
    写回答
    取消 提交回答
    问答排行榜
    最热
    最新

    相关电子书

    更多
    Data+AI时代大数据平台应该如何建设 立即下载
    大数据AI一体化的解读 立即下载
    极氪大数据 Serverless 应用实践 立即下载