MaxCompute用户指南:MapReduce:示例程序:多任务示例
测试准备
准备好测试程序的 Jar 包,假设名字为 mapreduce-examples.jar,本地存放路径为 data\resources。
准备好 MultiJobs 测试表和资源。
创建测试表。create table mr_empty (key string, value string);- create table mr_multijobs_out (value bigint);
添加测试资源。
-
add table mr_multijobs_out as multijobs_res_table -f;
- add jar data\resources\mapreduce-examples.jar -f;
测试步骤
在 odpscmd 中执行 MultiJobs,如下所示:
- jar -resources mapreduce-examples.jar,multijobs_res_table -classpath data\resources\mapreduce-examples.jar
- com.aliyun.odps.mapred.open.example.MultiJobs mr_multijobs_out;
预期结果
作业成功结束后,输出表 mr_multijobs_out 中的内容,如下所示:
- +------------+
- | value |
- +------------+
- | 0 |
- +------------+
代码示例
- package com.aliyun.odps.mapred.open.example;
- import java.io.IOException;
- import java.util.Iterator;
- import com.aliyun.odps.data.Record;
- import com.aliyun.odps.data.TableInfo;
- import com.aliyun.odps.mapred.JobClient;
- import com.aliyun.odps.mapred.MapperBase;
- import com.aliyun.odps.mapred.RunningJob;
- import com.aliyun.odps.mapred.TaskContext;
- import com.aliyun.odps.mapred.conf.JobConf;
- import com.aliyun.odps.mapred.utils.InputUtils;
- import com.aliyun.odps.mapred.utils.OutputUtils;
- import com.aliyun.odps.mapred.utils.SchemaUtils;
- /**
- * MultiJobs
- *
- * Running multiple job
- *
- **/
- public class MultiJobs {
- public static class InitMapper extends MapperBase {
- @Override
- public void setup(TaskContext context) throws IOException {
- Record record = context.createOutputRecord();
- long v = context.getJobConf().getLong("multijobs.value", 2);
- record.set(0, v);
- context.write(record);
- }
- }
- public static class DecreaseMapper extends MapperBase {
- @Override
- public void cleanup(TaskContext context) throws IOException {
- //从JobConf中获取main函数中定义的变量值
- long expect = context.getJobConf().getLong("multijobs.expect.value", -1);
- long v = -1;
- int count = 0;
- Iterator<Record> iter = context.readResourceTable("multijobs_res_table");
- while (iter.hasNext()) {
- Record r = iter.next();
- v = (Long) r.get(0);
- if (expect != v) {
- throw new IOException("expect: " + expect + ", but: " + v);
- }
- count++;
- }
- if (count != 1) {
- throw new IOException("res_table should have 1 record, but: " + count);
- }
- Record record = context.createOutputRecord();
- v--;
- record.set(0, v);
- context.write(record);
- context.getCounter("multijobs", "value").setValue(v);
- }
- }
- public static void main(String[] args) throws Exception {
- if (args.length != 1) {
- System.err.println("Usage: TestMultiJobs <table>");
- System.exit(1);
- }
- String tbl = args[0];
- long iterCount = 2;
- System.err.println("Start to run init job.");
- JobConf initJob = new JobConf();
- initJob.setLong("multijobs.value", iterCount);
- initJob.setMapperClass(InitMapper.class);
- InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), initJob);
- OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), initJob);
- initJob.setMapOutputKeySchema(SchemaUtils.fromString("key:string"));
- initJob.setMapOutputValueSchema(SchemaUtils.fromString("value:string"));
- initJob.setNumReduceTasks(0);
- JobClient.runJob(initJob);
- while (true) {
- System.err.println("Start to run iter job, count: " + iterCount);
- JobConf decJob = new JobConf();
- decJob.setLong("multijobs.expect.value", iterCount);
- decJob.setMapperClass(DecreaseMapper.class);
- InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), decJob);
- OutputUtils.addTable(TableInfo.builder().tableName(tbl).build(), decJob);
- decJob.setNumReduceTasks(0);
- RunningJob rJob = JobClient.runJob(decJob);
- iterCount--;
- if (rJob.getCounters().findCounter("multijobs", "value").getValue() == 0) {
- break;
- }
- }
- if (iterCount != 0) {
- throw new IOException("Job failed.");
- }
- }
- }
收起
行者武松
2017-10-23 17:44:47
1967
0
0
条回答
写回答
取消
提交回答