开发者社区> 问答> 正文

MaxCompute用户指南:MapReduce:示例程序:多任务示例



测试准备


  1. 准备好测试程序的 Jar 包,假设名字为 mapreduce-examples.jar,本地存放路径为 data\resources。

  2. 准备好 MultiJobs 测试表和资源。

    • 创建测试表。create table mr_empty (key string, value string);
    • create table mr_multijobs_out (value bigint);

  • 添加测试资源。
    1. add table mr_multijobs_out as multijobs_res_table -f;
    2. add jar data\resources\mapreduce-examples.jar -f;


    测试步骤


    在 odpscmd 中执行 MultiJobs,如下所示:
    1. jar -resources mapreduce-examples.jar,multijobs_res_table -classpath data\resources\mapreduce-examples.jar
    2. com.aliyun.odps.mapred.open.example.MultiJobs mr_multijobs_out;


    预期结果


    作业成功结束后,输出表 mr_multijobs_out 中的内容,如下所示:
    1. +------------+
    2. | value      |
    3. +------------+
    4. | 0          |
    5. +------------+


    代码示例

    1.     package com.aliyun.odps.mapred.open.example;
    2.     import java.io.IOException;
    3.     import java.util.Iterator;
    4.     import com.aliyun.odps.data.Record;
    5.     import com.aliyun.odps.data.TableInfo;
    6.     import com.aliyun.odps.mapred.JobClient;
    7.     import com.aliyun.odps.mapred.MapperBase;
    8.     import com.aliyun.odps.mapred.RunningJob;
    9.     import com.aliyun.odps.mapred.TaskContext;
    10.     import com.aliyun.odps.mapred.conf.JobConf;
    11.     import com.aliyun.odps.mapred.utils.InputUtils;
    12.     import com.aliyun.odps.mapred.utils.OutputUtils;
    13.     import com.aliyun.odps.mapred.utils.SchemaUtils;
    14.     /**
    15.      * MultiJobs
    16.      *
    17.      * Running multiple job
    18.      *
    19.      **/
    20.     public class MultiJobs {
    21.       public static class InitMapper extends MapperBase {
    22.         @Override
    23.         public void setup(TaskContext context) throws IOException {
    24.           Record record = context.createOutputRecord();
    25.           long v = context.getJobConf().getLong("multijobs.value", 2);
    26.           record.set(0, v);
    27.           context.write(record);
    28.         }
    29.       }
    30.       public static class DecreaseMapper extends MapperBase {
    31.         @Override
    32.         public void cleanup(TaskContext context) throws IOException {
    33.           //从JobConf中获取main函数中定义的变量值
    34.           long expect = context.getJobConf().getLong("multijobs.expect.value", -1);
    35.           long v = -1;
    36.           int count = 0;
    37.           Iterator<Record> iter = context.readResourceTable("multijobs_res_table");
    38.           while (iter.hasNext()) {
    39.             Record r = iter.next();
    40.             v = (Long) r.get(0);
    41.             if (expect != v) {
    42.               throw new IOException("expect: " + expect + ", but: " + v);
    43.             }
    44.             count++;
    45.           }
    46.           if (count != 1) {
    47.             throw new IOException("res_table should have 1 record, but: " + count);
    48.           }
    49.           Record record = context.createOutputRecord();
    50.           v--;
    51.           record.set(0, v);
    52.           context.write(record);
    53.           context.getCounter("multijobs", "value").setValue(v);
    54.         }
    55.       }
    56.       public static void main(String[] args) throws Exception {
    57.         if (args.length != 1) {
    58.           System.err.println("Usage: TestMultiJobs <table>");
    59.           System.exit(1);
    60.         }
    61.         String tbl = args[0];
    62.         long iterCount = 2;
    63.         System.err.println("Start to run init job.");
    64.         JobConf initJob = new JobConf();
    65.         initJob.setLong("multijobs.value", iterCount);
    66.         initJob.setMapperClass(InitMapper.class);
    67.         InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), initJob);
    68.         OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), initJob);
    69.         initJob.setMapOutputKeySchema(SchemaUtils.fromString("key:string"));
    70.         initJob.setMapOutputValueSchema(SchemaUtils.fromString("value:string"));
    71.         initJob.setNumReduceTasks(0);
    72.         JobClient.runJob(initJob);
    73.         while (true) {
    74.           System.err.println("Start to run iter job, count: " + iterCount);
    75.           JobConf decJob = new JobConf();
    76.           decJob.setLong("multijobs.expect.value", iterCount);
    77.           decJob.setMapperClass(DecreaseMapper.class);
    78.           InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), decJob);
    79.           OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), decJob);
    80.           decJob.setNumReduceTasks(0);
    81.           RunningJob rJob = JobClient.runJob(decJob);
    82.           iterCount--;
    83.           if (rJob.getCounters().findCounter("multijobs", "value").getValue() == 0) {
    84.             break;
    85.           }
    86.         }
    87.         if (iterCount != 0) {
    88.           throw new IOException("Job failed.");
    89.         }
    90.       }
    91.     }
  • 展开
    收起
    行者武松 2017-10-23 17:44:47 1967 0
    0 条回答
    写回答
    取消 提交回答
    问答排行榜
    最热
    最新

    相关电子书

    更多
    Data+AI时代大数据平台应该如何建设 立即下载
    大数据AI一体化的解读 立即下载
    极氪大数据 Serverless 应用实践 立即下载