开发者社区> 问答> 正文

MaxCompute用户指南:MapReduce:示例程序:Sort示例



测试准备


  1. 准备好测试程序的 Jar 包,假设名字为 mapreduce-examples.jar,本地存放路径为 data\resources。

  2. 准备好 Sort 的测试表和资源。

    • 创建测试表。create table ss_in(key bigint, value bigint);
    • create table ss_out(key bigint, value bigint);

  • 添加测试资源。
    1. add jar data\resources\mapreduce-examples.jar -f;

    使用 tunnel 导入数据。
    1. tunnel upload data ss_in;

    导入 ss_in 表的数据文件 data 的内容,如下所示:
    1. 2,1
    2. 1,1
    3. 3,1


    测试步骤


    在 odpscmd 中执行 Sort,如下所示:
    1. jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
    2. com.aliyun.odps.mapred.open.example.Sort ss_in ss_out;


    预期结果


    作业成功结束后,输出表 ss_out 中的内容,如下所示:
    1. +------------+------------+
    2. | key        | value      |
    3. +------------+------------+
    4. | 1          | 1          |
    5. | 2          | 1          |
    6. | 3          | 1          |
    7. +------------+------------+


    代码示例

    1.     package com.aliyun.odps.mapred.open.example;
    2.     import java.io.IOException;
    3.     import java.util.Date;
    4.     import com.aliyun.odps.data.Record;
    5.     import com.aliyun.odps.data.TableInfo;
    6.     import com.aliyun.odps.mapred.JobClient;
    7.     import com.aliyun.odps.mapred.MapperBase;
    8.     import com.aliyun.odps.mapred.TaskContext;
    9.     import com.aliyun.odps.mapred.conf.JobConf;
    10.     import com.aliyun.odps.mapred.example.lib.IdentityReducer;
    11.     import com.aliyun.odps.mapred.utils.InputUtils;
    12.     import com.aliyun.odps.mapred.utils.OutputUtils;
    13.     import com.aliyun.odps.mapred.utils.SchemaUtils;
    14.     /**
    15.      * This is the trivial map/reduce program that does absolutely nothing other
    16.      * than use the framework to fragment and sort the input values.
    17.      *
    18.      **/
    19.     public class Sort {
    20.       static int printUsage() {
    21.         System.out.println("sort <input> <output>");
    22.         return -1;
    23.       }
    24.       /**
    25.        * Implements the identity function, mapping record's first two columns to
    26.        * outputs.
    27.        **/
    28.       public static class IdentityMapper extends MapperBase {
    29.         private Record key;
    30.         private Record value;
    31.         @Override
    32.         public void setup(TaskContext context) throws IOException {
    33.           key = context.createMapOutputKeyRecord();
    34.           value = context.createMapOutputValueRecord();
    35.         }
    36.         @Override
    37.         public void map(long recordNum, Record record, TaskContext context)
    38.             throws IOException {
    39.           key.set(new Object[] { (Long) record.get(0) });
    40.           value.set(new Object[] { (Long) record.get(1) });
    41.           context.write(key, value);
    42.         }
    43.       }
    44.       /**
    45.        * The main driver for sort program. Invoke this method to submit the
    46.        * map/reduce job.
    47.        *
    48.        * @throws IOException
    49.        *           When there is communication problems with the job tracker.
    50.        **/
    51.       public static void main(String[] args) throws Exception {
    52.         JobConf jobConf = new JobConf();
    53.         jobConf.setMapperClass(IdentityMapper.class);
    54.         jobConf.setReducerClass(IdentityReducer.class);
    55.         jobConf.setNumReduceTasks(1);
    56.         jobConf.setMapOutputKeySchema(SchemaUtils.fromString("key:bigint"));
    57.         jobConf.setMapOutputValueSchema(SchemaUtils.fromString("value:bigint"));
    58.         InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), jobConf);
    59.         OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), jobConf);
    60.         Date startTime = new Date();
    61.         System.out.println("Job started: " + startTime);
    62.         JobClient.runJob(jobConf);
    63.         Date end_time = new Date();
    64.         System.out.println("Job ended: " + end_time);
    65.         System.out.println("The job took "
    66.             + (end_time.getTime() - startTime.getTime()) / 1000 + " seconds.");
    67.       }
    68.     }
  • 展开
    收起
    行者武松 2017-10-23 17:48:22 2198 0
    1 条回答
    写回答
    取消 提交回答
    • 路过, 加油!
      2017-10-23 22:30:25
      赞同 展开评论 打赏
    问答排行榜
    最热
    最新

    相关电子书

    更多
    Data+AI时代大数据平台应该如何建设 立即下载
    大数据AI一体化的解读 立即下载
    极氪大数据 Serverless 应用实践 立即下载