开发者社区> 问答> 正文

MaxCompute用户指南:MapReduce:示例程序:Join示例



MaxCompute MapReduce 框架自身并不支持 Join 逻辑,但您可以在自己的 Map/Reduce 函数中实现数据的 Join,当然这需要您做一些额外的工作。
假设需要 Join 两张表 mr_Join_src1(key bigint, value string) 和mr_Join_src2(key bigint, value string),输出表是 mr_Join_out(key bigint,value1 string, value2 string),其中 value1 是 mr_Join_src1 的 value 值,value2 是mr_Join_src2 的 value 值。

测试准备


  1. 准备好测试程序的 Jar 包,假设名字为 mapreduce-examples.jar,本地存放路径为 data\resources。

  2. 准备好 Join 的测试表和资源。

    • 创建测试表。create table mr_Join_src1(key bigint, value string);
    • create table mr_Join_src2(key bigint, value string);
    • create table mr_Join_out(key bigint, value1 string, value2 string);

  • 添加测试资源。
    1. add jar data\resources\mapreduce-examples.jar -f;

    使用 tunnel 导入数据。
    1. tunnel upload data1 mr_Join_src1;
    2. tunnel upload data2 mr_Join_src2;

    导入 mr_Join_src1 数据的内容,如下所示:
    1. 1,hello
    2. 2,odps

    导入 mr_Join_src2 数据的内容,如下所示:
    1. 1,odps
    2. 3,hello
    3. 4,odps


    测试步骤


    在 odpscmd 中执行 Join,如下所示:
    1. jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
    2. com.aliyun.odps.mapred.open.example.Join mr_Join_src1 mr_Join_src2 mr_Join_out;


    预期结果


    作业成功结束后,输出表 mr_Join_out 中的内容,如下所示:
    1. +------------+------------+------------+
    2. | key        | value1     | value2     |
    3. +------------+------------+------------+
    4. |  1         | hello      |  odps      |
    5. +------------+------------+------------+


    代码示例

    1.     package com.aliyun.odps.mapred.open.example;
    2.     import java.io.IOException;
    3.     import java.util.ArrayList;
    4.     import java.util.Iterator;
    5.     import java.util.List;
    6.     import org.apache.commons.logging.Log;
    7.     import org.apache.commons.logging.LogFactory;
    8.     import com.aliyun.odps.data.Record;
    9.     import com.aliyun.odps.data.TableInfo;
    10.     import com.aliyun.odps.mapred.JobClient;
    11.     import com.aliyun.odps.mapred.MapperBase;
    12.     import com.aliyun.odps.mapred.ReducerBase;
    13.     import com.aliyun.odps.mapred.conf.JobConf;
    14.     import com.aliyun.odps.mapred.utils.InputUtils;
    15.     import com.aliyun.odps.mapred.utils.OutputUtils;
    16.     import com.aliyun.odps.mapred.utils.SchemaUtils;
    17.     /**
    18.      * Join, mr_Join_src1/mr_Join_src2(key bigint, value string), mr_Join_out(key
    19.      * bigint, value1 string, value2 string)
    20.      *
    21.      */
    22.     public class Join {
    23.       public static final Log LOG = LogFactory.getLog(Join.class);
    24.       public static class JoinMapper extends MapperBase {
    25.         private Record mapkey;
    26.         private Record mapvalue;
    27.         private long tag;
    28.         @Override
    29.         public void setup(TaskContext context) throws IOException {
    30.           mapkey = context.createMapOutputKeyRecord();
    31.           mapvalue = context.createMapOutputValueRecord();
    32.           tag = context.getInputTableInfo().getLabel().equals("left") ? 0 : 1;
    33.         }
    34.         @Override
    35.         public void map(long key, Record record, TaskContext context)
    36.             throws IOException {
    37.           mapkey.set(0, record.get(0));
    38.           mapkey.set(1, tag);
    39.           for (int i = 1; i < record.getColumnCount(); i++) {
    40.             mapvalue.set(i - 1, record.get(i));
    41.           }
    42.           context.write(mapkey, mapvalue);
    43.         }
    44.       }
    45.       public static class JoinReducer extends ReducerBase {
    46.         private Record result = null;
    47.         @Override
    48.         public void setup(TaskContext context) throws IOException {
    49.           result = context.createOutputRecord();
    50.         }
    51.         @Override
    52.         public void reduce(Record key, Iterator<Record> values, TaskContext context)
    53.             throws IOException {
    54.           long k = key.getBigint(0);
    55.           List<Object[]> leftValues = new ArrayList<Object[]>();
    56.           while (values.hasNext()) {
    57.             Record value = values.next();
    58.             long tag = (Long) key.get(1);
    59.             if (tag == 0) {
    60.               leftValues.add(value.toArray().clone());
    61.             } else {
    62.               for (Object[] leftValue : leftValues) {
    63.                 int index = 0;
    64.                 result.set(index++, k);
    65.                 for (int i = 0; i < leftValue.length; i++) {
    66.                   result.set(index++, leftValue);
    67.                 }
    68.                 for (int i = 0; i < value.getColumnCount(); i++) {
    69.                   result.set(index++, value.get(i));
    70.                 }
    71.                 context.write(result);
    72.               }
    73.             }
    74.           }
    75.         }
    76.       }
    77.       public static void main(String[] args) throws Exception {
    78.         if (args.length != 3) {
    79.           System.err.println("Usage: Join <input table1> <input table2> <out>");
    80.           System.exit(2);
    81.         }
    82.         JobConf job = new JobConf();
    83.         job.setMapperClass(JoinMapper.class);
    84.         job.setReducerClass(JoinReducer.class);
    85.         job.setMapOutputKeySchema(SchemaUtils.fromString("key:bigint,tag:bigint"));
    86.         job.setMapOutputValueSchema(SchemaUtils.fromString("value:string"));
    87.         job.setPartitionColumns(new String[]{"key"});
    88.         job.setOutputKeySortColumns(new String[]{"key", "tag"});
    89.         job.setOutputGroupingColumns(new String[]{"key"});
    90.         job.setNumReduceTasks(1);
    91.         InputUtils.addTable(TableInfo.builder().tableName(args[0]).label("left").build(), job);
    92.         InputUtils.addTable(TableInfo.builder().tableName(args[1]).label("right").build(), job);
    93.         OutputUtils.addTable(TableInfo.builder().tableName(args[2]).build(), job);
    94.         JobClient.runJob(job);
    95.       }
    96.     }
  • 展开
    收起
    行者武松 2017-10-23 17:47:03 2475 0
    0 条回答
    写回答
    取消 提交回答
    问答排行榜
    最热
    最新

    相关电子书

    更多
    基于E-MapReduce梨视频推荐系统 立即下载
    大数据解决方案构建详解 以阿里云E-MapReduce为例 立即下载
    阿里云E-MapReduce 立即下载