开发者社区> 问答> 正文

MaxCompute用户指南:MapReduce:示例程序:Grep示例



测试准备


  1. 准备好测试程序的 Jar 包,假设名字为 mapreduce-examples.jar,本地存放路径为 data\resources。

  2. 准备好 Grep 测试表和资源。
    • 创建测试表。
create table mr_src(key string, value string);
  • create table mr_grep_tmp (key string, cnt bigint);
  • create table mr_grep_out (key bigint, value string);

    • 添加测试资源。
      1. add jar data\resources\mapreduce-examples.jar -f;

    使用 tunnel 导入数据。
    1. tunnel upload data mr_src;

    导入 mr_src 表的数据文件 data 的内容,如下所示:
    1. hello,odps
    2. hello,world


    测试步骤


    在 odpscmd 中执行 Grep,如下所示:
    1. jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
    2. com.aliyun.odps.mapred.open.example.Grep mr_src mr_grep_tmp mr_grep_out hello;


    预期结果


    作业成功结束后,输出表 mr_grep_out 中的内容,如下所示:
    1. +------------+------------+
    2. | key        | value      |
    3. +------------+------------+
    4. | 2          | hello      |
    5. +------------+------------+


    代码示例

    1.     package com.aliyun.odps.mapred.open.example;
    2.     import java.io.IOException;
    3.     import java.util.Iterator;
    4.     import java.util.regex.Matcher;
    5.     import java.util.regex.Pattern;
    6.     import com.aliyun.odps.data.Record;
    7.     import com.aliyun.odps.data.TableInfo;
    8.     import com.aliyun.odps.mapred.JobClient;
    9.     import com.aliyun.odps.mapred.Mapper;
    10.     import com.aliyun.odps.mapred.MapperBase;
    11.     import com.aliyun.odps.mapred.ReducerBase;
    12.     import com.aliyun.odps.mapred.RunningJob;
    13.     import com.aliyun.odps.mapred.TaskContext;
    14.     import com.aliyun.odps.mapred.conf.JobConf;
    15.     import com.aliyun.odps.mapred.utils.InputUtils;
    16.     import com.aliyun.odps.mapred.utils.OutputUtils;
    17.     import com.aliyun.odps.mapred.utils.SchemaUtils;
    18.     /**
    19.      *
    20.      * Extracts matching regexs from input files and counts them.
    21.      *
    22.      **/
    23.     public class Grep {
    24.       /**
    25.        * RegexMapper
    26.        **/
    27.       public class RegexMapper extends MapperBase {
    28.         private Pattern pattern;
    29.         private int group;
    30.         private Record word;
    31.         private Record one;
    32.         @Override
    33.         public void setup(TaskContext context) throws IOException {
    34.           JobConf job = (JobConf) context.getJobConf();
    35.           pattern = Pattern.compile(job.get("mapred.mapper.regex"));
    36.           group = job.getInt("mapred.mapper.regex.group", 0);
    37.           word = context.createMapOutputKeyRecord();
    38.           one = context.createMapOutputValueRecord();
    39.           one.set(new Object[] { 1L });
    40.         }
    41.         @Override
    42.         public void map(long recordNum, Record record, TaskContext context) throws IOException {
    43.           for (int i = 0; i < record.getColumnCount(); ++i) {
    44.             String text = record.get(i).toString();
    45.             Matcher matcher = pattern.matcher(text);
    46.             while (matcher.find()) {
    47.               word.set(new Object[] { matcher.group(group) });
    48.               context.write(word, one);
    49.             }
    50.           }
    51.         }
    52.       }
    53.       /**
    54.        * LongSumReducer
    55.        **/
    56.       public class LongSumReducer extends ReducerBase {
    57.         private Record result = null;
    58.         @Override
    59.         public void setup(TaskContext context) throws IOException {
    60.           result = context.createOutputRecord();
    61.         }
    62.         @Override
    63.         public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
    64.           long count = 0;
    65.           while (values.hasNext()) {
    66.             Record val = values.next();
    67.             count += (Long) val.get(0);
    68.           }
    69.           result.set(0, key.get(0));
    70.           result.set(1, count);
    71.           context.write(result);
    72.         }
    73.       }
    74.       /**
    75.        * A {@link Mapper} that swaps keys and values.
    76.        **/
    77.       public class InverseMapper extends MapperBase {
    78.         private Record word;
    79.         private Record count;
    80.         @Override
    81.         public void setup(TaskContext context) throws IOException {
    82.           word = context.createMapOutputValueRecord();
    83.           count = context.createMapOutputKeyRecord();
    84.         }
    85.         /**
    86.          * The inverse function. Input keys and values are swapped.
    87.          **/
    88.         @Override
    89.         public void map(long recordNum, Record record, TaskContext context) throws IOException {
    90.           word.set(new Object[] { record.get(0).toString() });
    91.           count.set(new Object[] { (Long) record.get(1) });
    92.           context.write(count, word);
    93.         }
    94.       }
    95.       /**
    96.        * IdentityReducer
    97.        **/
    98.       public class IdentityReducer extends ReducerBase {
    99.         private Record result = null;
    100.         @Override
    101.         public void setup(TaskContext context) throws IOException {
    102.           result = context.createOutputRecord();
    103.         }
    104.         /** Writes all keys and values directly to output. **/
    105.         @Override
    106.         public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
    107.           result.set(0, key.get(0));
    108.           while (values.hasNext()) {
    109.             Record val = values.next();
    110.             result.set(1, val.get(0));
    111.             context.write(result);
    112.           }
    113.         }
    114.       }
    115.       public static void main(String[] args) throws Exception {
    116.         if (args.length < 4) {
    117.           System.err.println("Grep <inDir> <tmpDir> <outDir> <regex> [<group>]");
    118.           System.exit(2);
    119.         }
    120.         JobConf grepJob = new JobConf();
    121.         grepJob.setMapperClass(RegexMapper.class);
    122.         grepJob.setReducerClass(LongSumReducer.class);
    123.         grepJob.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
    124.         grepJob.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
    125.         InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), grepJob);
    126.         OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), grepJob);
    127.         grepJob.set("mapred.mapper.regex", args[3]);
    128.         if (args.length == 5) {
    129.           grepJob.set("mapred.mapper.regex.group", args[4]);
    130.         }
    131.         @SuppressWarnings("unused")
    132.         RunningJob rjGrep = JobClient.runJob(grepJob);
    133.         JobConf sortJob = new JobConf();
    134.         sortJob.setMapperClass(InverseMapper.class);
    135.         sortJob.setReducerClass(IdentityReducer.class);
    136.         sortJob.setMapOutputKeySchema(SchemaUtils.fromString("count:bigint"));
    137.         sortJob.setMapOutputValueSchema(SchemaUtils.fromString("word:string"));
    138.         InputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), sortJob);
    139.         OutputUtils.addTable(TableInfo.builder().tableName(args[2]).build(), sortJob);
    140.         sortJob.setNumReduceTasks(1); // write a single file
    141.         sortJob.setOutputKeySortColumns(new String[] { "count" });
    142.         @SuppressWarnings("unused")
    143.         RunningJob rjSort = JobClient.runJob(sortJob);
    144.       }
    145.     }

    展开
    收起
    行者武松 2017-10-23 17:46:38 2030 0
    0 条回答
    写回答
    取消 提交回答
    问答排行榜
    最热
    最新

    相关电子书

    更多
    大数据AI一体化的解读 立即下载
    极氪大数据 Serverless 应用实践 立即下载
    大数据&AI实战派 第2期 立即下载