开发者社区> 问答> 正文

MaxCompute用户指南:MapReduce:示例程序:二次排序示例



测试准备


  1. 准备好测试程序的 Jar 包,假设名字为 mapreduce-examples.jar,本地存放路径为 data\resources。

  2. 准备好 SecondarySort 的测试表和资源。

    • 创建测试表。create table ss_in(key bigint, value bigint);
    • create table ss_out(key bigint, value bigint)

  • 添加测试资源。
    1. add jar data\resources\mapreduce-examples.jar -f;

    使用 tunnel 导入数据。
    1. tunnel upload data ss_in;

    导入 ss_in 表的数据文件 data 的内容,如下所示:
    1. 1,2
    2. 2,1
    3. 1,1
    4. 2,2


    测试步骤


    在 odpscmd 中执行 SecondarySort,如下所示:
    1. jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
    2. com.aliyun.odps.mapred.open.example.SecondarySort ss_in ss_out;


    预期结果


    作业成功结束后,输出表 ss_out 中的内容,如下所示:
    1. +------------+------------+
    2. | key        | value      |
    3. +------------+------------+
    4. | 1          | 1          |
    5. | 1          | 2          |
    6. | 2          | 1          |
    7. | 2          | 2          |
    8. +------------+------------+


    代码示例

    1.     package com.aliyun.odps.mapred.open.example;
    2.     import java.io.IOException;
    3.     import java.util.Iterator;
    4.     import com.aliyun.odps.data.Record;
    5.     import com.aliyun.odps.mapred.JobClient;
    6.     import com.aliyun.odps.mapred.MapperBase;
    7.     import com.aliyun.odps.mapred.ReducerBase;
    8.     import com.aliyun.odps.mapred.TaskContext;
    9.     import com.aliyun.odps.mapred.conf.JobConf;
    10.     import com.aliyun.odps.mapred.utils.SchemaUtils;
    11.     import com.aliyun.odps.mapred.utils.InputUtils;
    12.     import com.aliyun.odps.mapred.utils.OutputUtils;
    13.     import com.aliyun.odps.data.TableInfo;
    14.     /**
    15.      *
    16.      * This is an example ODPS Map/Reduce application. It reads the input table that
    17.      * must contain two integers per record. The output is sorted by the first and
    18.      * second number and grouped on the first number.
    19.      *
    20.      **/
    21.     public class SecondarySort {
    22.       /**
    23.        * Read two integers from each line and generate a key, value pair as ((left,
    24.        * right), right).
    25.        **/
    26.       public static class MapClass extends MapperBase {
    27.         private Record key;
    28.         private Record value;
    29.         @Override
    30.         public void setup(TaskContext context) throws IOException {
    31.           key = context.createMapOutputKeyRecord();
    32.           value = context.createMapOutputValueRecord();
    33.         }
    34.         @Override
    35.         public void map(long recordNum, Record record, TaskContext context)
    36.             throws IOException {
    37.           long left = 0;
    38.           long right = 0;
    39.           if (record.getColumnCount() > 0) {
    40.             left = (Long) record.get(0);
    41.             if (record.getColumnCount() > 1) {
    42.               right = (Long) record.get(1);
    43.             }
    44.             key.set(new Object[] { (Long) left, (Long) right });
    45.             value.set(new Object[] { (Long) right });
    46.             context.write(key, value);
    47.           }
    48.         }
    49.       }
    50.       /**
    51.        * A reducer class that just emits the sum of the input values.
    52.        **/
    53.       public static class ReduceClass extends ReducerBase {
    54.         private Record result = null;
    55.         @Override
    56.         public void setup(TaskContext context) throws IOException {
    57.           result = context.createOutputRecord();
    58.         }
    59.         @Override
    60.         public void reduce(Record key, Iterator<Record> values, TaskContext context)
    61.             throws IOException {
    62.           result.set(0, key.get(0));
    63.           while (values.hasNext()) {
    64.             Record value = values.next();
    65.             result.set(1, value.get(0));
    66.             context.write(result);
    67.           }
    68.         }
    69.       }
    70.       public static void main(String[] args) throws Exception {
    71.         if (args.length != 2) {
    72.           System.err.println("Usage: secondarysrot <in> <out>");
    73.           System.exit(2);
    74.         }
    75.         JobConf job = new JobConf();
    76.         job.setMapperClass(MapClass.class);
    77.         job.setReducerClass(ReduceClass.class);
    78.         //将多列设置为Key
    79.         // compare first and second parts of the pair
    80.         job.setOutputKeySortColumns(new String[] { "i1", "i2" });
    81.         // partition based on the first part of the pair
    82.         job.setPartitionColumns(new String[] { "i1" });
    83.         // grouping comparator based on the first part of the pair
    84.         job.setOutputGroupingColumns(new String[] { "i1" });
    85.         // the map output is LongPair, Long
    86.         job.setMapOutputKeySchema(SchemaUtils.fromString("i1:bigint,i2:bigint"));
    87.         job.setMapOutputValueSchema(SchemaUtils.fromString("i2x:bigint"));
    88.         InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
    89.         OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
    90.         JobClient.runJob(job);
    91.         System.exit(0);
    92.       }
    93.     }
  • 展开
    收起
    行者武松 2017-10-23 17:45:13 1738 0
    0 条回答
    写回答
    取消 提交回答
    问答排行榜
    最热
    最新

    相关电子书

    更多
    Data+AI时代大数据平台应该如何建设 立即下载
    大数据AI一体化的解读 立即下载
    极氪大数据 Serverless 应用实践 立即下载