本文将为您介绍两个把 Partition 作为输入输出的示例,仅供参考。
示例一:
- public static void main(String[] args) throws Exception {
- JobConf job = new JobConf();
- ...
- LinkedHashMap<String, String> input = new LinkedHashMap<String, String>();
- input.put("pt", "123456");
- InputUtils.addTable(TableInfo.builder().tableName("input_table").partSpec(input).build(), job);
- LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
- output.put("ds", "654321");
- OutputUtils.addTable(TableInfo.builder().tableName("output_table").partSpec(output).build(), job);
- JobClient.runJob(job);
- }
示例二:
- package com.aliyun.odps.mapred.open.example;
- ...
- public static void main(String[] args) throws Exception {
- if (args.length != 2) {
- System.err.println("Usage: WordCount <in_table> <out_table>");
- System.exit(2);
- }
- JobConf job = new JobConf();
- job.setMapperClass(TokenizerMapper.class);
- job.setCombinerClass(SumCombiner.class);
- job.setReducerClass(SumReducer.class);
- job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
- job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
- Account account = new AliyunAccount("my_access_id", "my_access_key");
- Odps odps = new Odps(account);
- odps.setEndpoint("odps_endpoint_url");
- odps.setDefaultProject("my_project");
- Table table = odps.tables().get(tblname);
- TableInfoBuilder builder = TableInfo.builder().tableName(tblname);
- for (Partition p : table.getPartitions()) {
- if (applicable(p)) {
- LinkedHashMap<String, String> partSpec = new LinkedHashMap<String, String>();
- for (String key : p.getPartitionSpec().keys()) {
- partSpec.put(key, p.getPartitionSpec().get(key));
- }
- InputUtils.addTable(builder.partSpec(partSpec).build(), conf);
- }
- }
- OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
- JobClient.runJob(job);
- }
注意:
这是一段使用 MaxCompute SDK 和 MapReduce SDK 组合实现 MapReduce 任务读取范围 Partitoin 的示例。
此段代码不能够编译执行,仅给出了 main 函数的示例。
示例中 applicable 函数是用户逻辑,用来决定该 Partition 是否符合作为该 MapReduce 作业的输入。