开发者社区> 问答> 正文

MaxCompute用户指南:MapReduce:示例程序:使用资源示例



测试准备


  1. 准备好测试程序的 Jar 包,假设名字为 mapreduce-examples.jar,本地存放路径为 data\resources。

  2. 准备好测试表和资源。

    • 创建测试表。create table mr_upload_src(key bigint, value string);

  • 添加测试资源。
    1. add jar data\resources\mapreduce-examples.jar -f;
    2. add file data\resources\import.txt -f;

  • import.txt 的数据内容,如下所示:
    1. 1000,odps


    测试步骤


    在 odpscmd 中执行 Upload,如下所示:
    1. jar -resources mapreduce-examples.jar,import.txt -classpath data\resources\mapreduce-examples.jar
    2. com.aliyun.odps.mapred.open.example.Upload import.txt mr_upload_src;


    预期结果


    作业成功结束后,输出表 mr_upload_src 中的内容,如下所示:
    1. +------------+------------+
    2. | key        | value      |
    3. +------------+------------+
    4. | 1000       | odps       |
    5. +------------+------------+


    代码示例

    1.     package com.aliyun.odps.mapred.open.example;
    2.     import java.io.BufferedInputStream;
    3.     import java.io.FileNotFoundException;
    4.     import java.io.IOException;
    5.     import com.aliyun.odps.data.Record;
    6.     import com.aliyun.odps.data.TableInfo;
    7.     import com.aliyun.odps.mapred.JobClient;
    8.     import com.aliyun.odps.mapred.MapperBase;
    9.     import com.aliyun.odps.mapred.TaskContext;
    10.     import com.aliyun.odps.mapred.conf.JobConf;
    11.     import com.aliyun.odps.mapred.utils.InputUtils;
    12.     import com.aliyun.odps.mapred.utils.OutputUtils;
    13.     import com.aliyun.odps.mapred.utils.SchemaUtils;
    14.     /**
    15.      * Upload
    16.      *
    17.      * Import data from text file into table
    18.      *
    19.      **/
    20.     public class Upload {
    21.       public static class UploadMapper extends MapperBase {
    22.         @Override
    23.         public void setup(TaskContext context) throws IOException {
    24.           Record record = context.createOutputRecord();
    25.           StringBuilder importdata = new StringBuilder();
    26.           BufferedInputStream bufferedInput = null;
    27.           try {
    28.             byte[] buffer = new byte[1024];
    29.             int bytesRead = 0;
    30.             String filename = context.getJobConf().get("import.filename");
    31.             bufferedInput = context.readResourceFileAsStream(filename);
    32.             while ((bytesRead = bufferedInput.read(buffer)) != -1) {
    33.               String chunk = new String(buffer, 0, bytesRead);
    34.               importdata.append(chunk);
    35.             }
    36.             String lines[] = importdata.toString().split("\n");
    37.             for (int i = 0; i < lines.length; i++) {
    38.               String[] ss = lines.split(",");
    39.               record.set(0, Long.parseLong(ss[0].trim()));
    40.               record.set(1, ss[1].trim());
    41.               context.write(record);
    42.             }
    43.           } catch (FileNotFoundException ex) {
    44.             throw new IOException(ex);
    45.           } catch (IOException ex) {
    46.             throw new IOException(ex);
    47.           } finally {
    48.           }
    49.         }
    50.         @Override
    51.         public void map(long recordNum, Record record, TaskContext context)
    52.             throws IOException {
    53.         }
    54.       }
    55.       public static void main(String[] args) throws Exception {
    56.         if (args.length != 2) {
    57.           System.err.println("Usage: Upload <import_txt> <out_table>");
    58.           System.exit(2);
    59.         }
    60.         JobConf job = new JobConf();
    61.         job.setMapperClass(UploadMapper.class);
    62.         job.set("import.filename", args[0]);
    63.         job.setNumReduceTasks(0);
    64.         job.setMapOutputKeySchema(SchemaUtils.fromString("key:bigint"));
    65.         job.setMapOutputValueSchema(SchemaUtils.fromString("value:string"));
    66.         InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), job);
    67.         OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
    68.         JobClient.runJob(job);
    69.       }
    70.     }

    实际上,您有多种手段设置 JobConf。如下所示:

    • 通过 SDK 中 JobConf 的接口设置,本示例即是通过此方法实现。此方法的优先级最低

    • 在 Jar 命令行中,通过 –conf 参数指定新的 JobConf 文件。此方法的优先级最低。
  • 展开
    收起
    行者武松 2017-10-23 17:45:45 2312 0
    0 条回答
    写回答
    取消 提交回答
    问答排行榜
    最热
    最新

    相关电子书

    更多
    大数据AI一体化的解读 立即下载
    极氪大数据 Serverless 应用实践 立即下载
    大数据&AI实战派 第2期 立即下载