开发者社区> 问答> 正文

MaxCompute工具及下载:Eclipse开发插件:MapReduce开发插件介绍



选择ODPS项目中的WordCount示例:

右键”WordCount.java”,依次点击”Run As”,”ODPS MapReduce”:

弹出对话框后,选择”example_project”,点击确认:

运行成功后,会出现以下结果提示:


运行自定义MapReduce程序


右键选择src目录,选择新建(New) -> Mapper:

选择Mapper后出现下面的对话框。输入Mapper类的名字,并确认:

会看到在左侧包资源管理器(Package Explorer)中,src目录下生成文件UserMapper.java。该文件的内容即是一个Mapper类的模板:

  1. package odps;
  2. import java.io.IOException;
  3. import com.aliyun.odps.data.Record;
  4. import com.aliyun.odps.mapred.MapperBase;
  5. public class UserMapper extends MapperBase {
  6.     @Override
  7.     public void setup(TaskContext context) throws IOException {
  8.     }
  9.     @Override
  10.     public void map(long recordNum, Record record, TaskContext context)
  11.             throws IOException {
  12.     }
  13.     @Override
  14.     public void cleanup(TaskContext context) throws IOException {
  15.     }
  16. }

模板中,将package名称默认配置为”odps”,用户可以根据自己的需求进行修改。编写模板内容:
  1. package odps;
  2. import java.io.IOException;
  3. import com.aliyun.odps.counter.Counter;
  4. import com.aliyun.odps.data.Record;
  5. import com.aliyun.odps.mapred.MapperBase;
  6. public class UserMapper extends MapperBase {
  7.     Record word;
  8.     Record one;
  9.     Counter gCnt;
  10.     @Override
  11.     public void setup(TaskContext context) throws IOException {
  12.           word = context.createMapOutputKeyRecord();
  13.           one = context.createMapOutputValueRecord();
  14.           one.set(new Object[] { 1L });
  15.           gCnt = context.getCounter("MyCounters", "global_counts");
  16.     }
  17.     @Override
  18.     public void map(long recordNum, Record record, TaskContext context)
  19.             throws IOException {
  20.           for (int i = 0; i < record.getColumnCount(); i++) {
  21.               String[] words = record.get(i).toString().split("\\s+");
  22.               for (String w : words) {
  23.                 word.set(new Object[] { w });
  24.                 Counter cnt = context.getCounter("MyCounters", "map_outputs");
  25.                 cnt.increment(1);
  26.                 gCnt.increment(1);
  27.                 context.write(word, one);
  28.               }
  29.             }
  30.           }
  31.     @Override
  32.     public void cleanup(TaskContext context) throws IOException {
  33.     }
  34. }

同理,右键选择src目录,选择新建(New)->Reduce:

输入Reduce类的名字(本示例使用UserReduce):
同样在包资源管理器(Package Explorer)中,src目录下生成文件UserReduce.java。该文件的内容即是一个Reduce类的模板。编辑模板:
  1. package odps;
  2. import java.io.IOException;
  3. import java.util.Iterator;
  4. import com.aliyun.odps.counter.Counter;
  5. import com.aliyun.odps.data.Record;
  6. import com.aliyun.odps.mapred.ReducerBase;
  7. public class UserReduce extends ReducerBase {
  8.     private Record result;
  9.     Counter gCnt;
  10.     @Override
  11.     public void setup(TaskContext context) throws IOException {
  12.           result = context.createOutputRecord();
  13.           gCnt = context.getCounter("MyCounters", "global_counts");
  14.     }
  15.     @Override
  16.     public void reduce(Record key, Iterator<Record> values, TaskContext context)
  17.             throws IOException {
  18.           long count = 0;
  19.           while (values.hasNext()) {
  20.             Record val = values.next();
  21.             count += (Long) val.get(0);
  22.           }
  23.           result.set(0, key.get(0));
  24.           result.set(1, count);
  25.           Counter cnt = context.getCounter("MyCounters", "reduce_outputs");
  26.           cnt.increment(1);
  27.           gCnt.increment(1);
  28.           context.write(result);
  29.         }
  30.     @Override
  31.     public void cleanup(TaskContext context) throws IOException {
  32.     }
  33. }

创建main函数: 右键选择src目录,选择新建(New) -> MapReduce Driver。填写DriverName(示例中是UserDriver),Mapper及Recduce类(示例中是UserMapper及UserReduce),并确认。同样会在src目录下看到MyDriver.java文件:

编辑driver内容:
  1. package odps;
  2. import com.aliyun.odps.OdpsException;
  3. import com.aliyun.odps.data.TableInfo;
  4. import com.aliyun.odps.examples.mr.WordCount.SumCombiner;
  5. import com.aliyun.odps.examples.mr.WordCount.SumReducer;
  6. import com.aliyun.odps.examples.mr.WordCount.TokenizerMapper;
  7. import com.aliyun.odps.mapred.JobClient;
  8. import com.aliyun.odps.mapred.RunningJob;
  9. import com.aliyun.odps.mapred.conf.JobConf;
  10. import com.aliyun.odps.mapred.utils.InputUtils;
  11. import com.aliyun.odps.mapred.utils.OutputUtils;
  12. import com.aliyun.odps.mapred.utils.SchemaUtils;
  13. public class UserDriver {
  14.     public static void main(String[] args) throws OdpsException {
  15.         JobConf job = new JobConf();
  16.         job.setMapperClass(TokenizerMapper.class);
  17.         job.setCombinerClass(SumCombiner.class);
  18.         job.setReducerClass(SumReducer.class);
  19.         job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
  20.         job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
  21.         InputUtils.addTable(
  22.             TableInfo.builder().tableName("wc_in1").cols(new String[] { "col2", "col3" }).build(), job);
  23.         InputUtils.addTable(TableInfo.builder().tableName("wc_in2").partSpec("p1=2/p2=1").build(), job);
  24.         OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);
  25.         RunningJob rj = JobClient.runJob(job);
  26.         rj.waitForCompletion();
  27.     }
  28. }

运行MapReduce程序,选中UserDriver.java,右键选择Run As -> ODPS MapReduce,点击确认。出现如下对话框:

选择ODPS Project为:example_project,点击Finish按钮开始本地运行MapReduce程序:

有如上输出信息,说明本地运行成功。运行的输出结果在warehouse目录下。关于warehouse的说明请参考 本地运行 。刷新ODPS工程:

wc_out即是输出目录,R_000000即是结果文件。通过本地调试,确定输出结果正确后,可以通过Eclipse导出(Export)功能将MapReduce打包。打包后将jar包上传到ODPS中。在分布式环境下执行MapReduce,详情请参考 快速入门
本地调试通过后,用户可以通过Eclipse的Export功能将代码打成jar包,供后续分布式环境使用。在本示例中,我们将程序包命名为mr-examples.jar。选择src目录,点击Export:

选择导出模式为Jar File:

仅需要导出src目录下package(com.aliyun.odps.mapred.open.example),Jar File名称指定为”mr-examples.jar”:

确认后,导出成功。
如果用户想在本地模拟新建Project,可以在warehouse下面,创建一个新的子目录(与example_project平级的目录),目录层次结构为:
  1. <warehouse>
  2.    |____example_project(项目空间目录)
  3.           |____ <__tables__>
  4.           |       |__table_name1(非分区表)
  5.           |       |      |____ data(文件)
  6.           |       |      |
  7.           |       |      |____ <__schema__> (文件)
  8.           |       |
  9.           |       |__table_name2(分区表)
  10.           |            |____ partition_name=partition_value(分区目录)
  11.           |            |          |____ data(文件)
  12.           |            |
  13.           |            |____ <__schema__> (文件)
  14.           |
  15.           |____ <__resources__>
  16.                   |
  17.                   |___table_resource_name (表资源)
  18.                   |         |____<__ref__>
  19.                   |
  20.                   |___ file_resource_name(文件资源)

schema文件示例:
  1. 非分区表:
  2. project=project_name
  3. table=table_name
  4. columns=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
  5. 分区表:
  6. project=project_name
  7. table=table_name
  8. columns=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
  9. partitions=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
  10. 注:当前支持5种数据格式:bigint,double,boolean,datetime,string, 对应到java中的数据类型-long,double,boolean,java.util.Date,java.lang.String。

data文件示例:
  1. 1,1.1,true,2015-06-04 11:22:42 896,hello world
  2. \N,\N,\N,\N,\N
  3. 注:时间格式精确到毫秒级别,所有类型用\N表示null。

注解:
  • 本地模式运行MapReduce程序,默认情况下先到warehouse下查找相应的数据表或资源,如果表或资源不存在会到服务器上下载相应的数据存入warehouse目录下,再以本地模式运行。
  • 运行完MapReduce后,请刷新warehouse目录,才能看到生成的结果

展开
收起
行者武松 2017-10-24 14:35:30 1982 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Data+AI时代大数据平台应该如何建设 立即下载
大数据AI一体化的解读 立即下载
极氪大数据 Serverless 应用实践 立即下载

相关镜像