mapreduce 模板代码

简介: jai包<dependency>    <groupId>org.apache.hadoop</groupId>    <artifactId>hadoop-core</artifactId>    <version>1.2.1</version></dependency>2.x以后



jai包

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-core</artifactId>
    <version>1.2.1</version>
</dependency>

2.x以后就拆成一些零散的包了,没有core包了



代码:

package org.conan.myhadoop.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
//org.apache.hadoop.mapred 老系统的包
//org.apache.hadoop.mapreduce 新系统的包 
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/*
 * ModuleMapReduce Class
 * 单纯的注释 
 */
public class ModuleMapReduce extends Configured implements Tool {

    /**
     * 
     * ModuleMapper Class 不仅有注释的功效而且你鼠标放在你注释的方法上面他会把你注释的内容显示出来,
     * 
     */
    public static class ModuleMapper extends
            Mapper<LongWritable, Text, LongWritable, Text>

    {

        @Override
        public void setup(Context context) throws IOException,
                InterruptedException {

            super.setup(context);
        }

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // TODO

        }

        @Override
        public void cleanup(Context context) throws IOException,
                InterruptedException {

            super.cleanup(context);
        }

    }

    /**
     * 
     * ModuleReducer Class
     * 
     */
    public static class ModuleReducer extends
            Reducer<LongWritable, Text, LongWritable, Text> {
        @Override
        public void setup(Context context) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            super.setup(context);
        }

        @Override
        protected void reduce(LongWritable key, Iterable<Text> value,
                Context context) throws IOException, InterruptedException {
            // TODO

        }

        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            super.cleanup(context);
        }

    }

    // Driver 驱动
    // @Override //实现接口时关键字1.5和1.7的JDK都会报错,只有1.6不报错
    public int run(String[] args) throws Exception {
        Job job = parseInputAndOutput(this, this.getConf(), args);
        // 2.set job

        // step 1:set input
        job.setInputFormatClass(TextInputFormat.class);

        // step 3:set mappper class
        job.setMapperClass(ModuleMapper.class);
        // step 4:set mapout key/value class
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        // step 5:set shuffle(sort,combiner,group)
        // set sort
        job.setSortComparatorClass(LongWritable.Comparator.class);
        // set combiner(optional,default is unset)必须是Reducer的子类
        job.setCombinerClass(ModuleReducer.class);
        // set grouping
        job.setGroupingComparatorClass(LongWritable.Comparator.class);
        // step 6 set reducer class
        job.setReducerClass(ModuleReducer.class);
        // step 7:set job output key/value class
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        // step 8:set output format
        job.setOutputFormatClass(FileOutputFormat.class);

        // step 10: submit job
        Boolean isCompletion = job.waitForCompletion(true);// 提交job
        return isCompletion ? 0 : 1;
    }

    public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args)
            throws IOException {
        // 输入参数的合法性
        if (args.length != 2) {
            System.err.printf(
                    "Usage: %s [generic options] <input> <output> \n ", tool
                            .getClass().getSimpleName());
      //%s表示输出字符串,也就是将后面的字符串替换模式中的%s
            ToolRunner.printGenericCommandUsage(System.err);
            return null;
        }

        // 1.create job

        Job job = Job.getInstance(conf, this.getClass().getSimpleName());
        job.setJarByClass(ModuleMapReduce.class);
        // step 2:set input path
        Path inputPath = new Path(args[0]);
        FileInputFormat.addInputPath(job, inputPath);
        // step 9:set output path
        Path outputPath = new Path(args[0]);
        FileOutputFormat.setOutputPath(job, outputPath);

        return job;
    }

    public static void main(String[] args) {
        try {
            int status = ToolRunner.run(new ModuleMapReduce(), args);// 返回值即为isCompletion ? 0 : 1
            System.exit(status);// System.exit(0)中断虚拟机的运行,退出应用程序,0表示没有异常正常退出。
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


倒排索引代码


输入文件如下:
13588888888 112
13678987879 13509098987
18987655436 110
2543789    112
15699807656 110
011-678987 112
说明:每一行为一条电话通话记录,左边的号码(记为a)打给右边的号码(记为b号码),中间用空格隔开

要求:
将以上文件以如下格式输出:
110 18987655436|15699807656
112 13588888888|011-678987
13509098987 13678987879
说明:左边为被呼叫的号码b,右边为呼叫b的号码a以"|"分割

package org.conan.myhadoop.mr;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReverseIndex extends Configured implements Tool {

    enum Counter {
        LINESKIP, // 出错的行
    }

    public static class Map extends Mapper {
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString(); // 读取源数据
            try {
                // 数据处理
                String[] lineSplit = line.split(" ");
                String anum = lineSplit[0];
                String bnum = lineSplit[1];
                context.write(new Text(bnum), new Text(anum)); // 输出

            } catch (java.lang.ArrayIndexOutOfBoundsException e) {
                context.getCounter(Counter.LINESKIP).increment(1); // 出错hang计数器+1
                return;
            }
        }
    }

    public static class Reduce extends Reducer {
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            String valueString;
            String out = "";
            for (Text value : values) {
                valueString = value.toString();
                out += valueString + "|";
                System.out.println("Ruduce:key=" + key + "  value=" + value);
            }
            context.write(key, new Text(out));
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();

        Job job = new Job(conf, "ReverseIndex"); // 任务名
        job.setJarByClass(ReverseIndex.class); // 指定Class

        FileInputFormat.addInputPath(job, new Path(args[0])); // 输入路径
        FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径

        job.setMapperClass(Map.class); // 调用上面Map类作为Map任务代码
        job.setReducerClass(ReverseIndex.Reduce.class); // 调用上面Reduce类作为Reduce任务代码

        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class); // 指定输出的KEY的格式
        job.setOutputValueClass(Text.class); // 指定输出的VALUE的格式

        job.waitForCompletion(true);

        // 输出任务完成情况
        System.out.println("任务名称:" + job.getJobName());
        System.out.println("任务成功:" + (job.isSuccessful() ? "是" : "否"));
        System.out.println("输入行数:"
                + job.getCounters()
                        .findCounter("org.apache.hadoop.mapred.Task$Counter",
                                "MAP_INPUT_RECORDS").getValue());
        System.out.println("输出行数:"
                + job.getCounters()
                        .findCounter("org.apache.hadoop.mapred.Task$Counter",
                                "MAP_OUTPUT_RECORDS").getValue());
        System.out.println("跳过的行:"
                + job.getCounters().findCounter(Counter.LINESKIP).getValue());

        return job.isSuccessful() ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        // 判断参数个数是否正确
        // 如果无参数运行则显示以作程序说明
        if (args.length != 2) {
            System.err.println("");
            System.err
                    .println("Usage: ReverseIndex < input path > < output path > ");
            System.err
                    .println("Example: hadoop jar ~/ReverseIndex.jar hdfs://localhost:9000/in/telephone.txt hdfs://localhost:9000/out");

            System.exit(-1);
        }
        // 记录开始时间
        DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date start = new Date();
        // 运行任务
        int res = ToolRunner.run(new Configuration(), new ReverseIndex(), args);

        // 输出任务耗时
        Date end = new Date();
        float time = (float) ((end.getTime() - start.getTime()) / 60000.0);
        System.out.println("任务开始:" + formatter.format(start));
        System.out.println("任务结束:" + formatter.format(end));
        System.out.println("任务耗时:" + String.valueOf(time) + " 分钟");

        System.exit(res);
   }
    
}


去重代码

 //Mapper任务
      static class DDMap extends Mapper<LongWritable,Text,Text,Text>{
       private static Text line = new Text();
       protected void map(LongWritable k1,Text v1,Context context){
        line = v1;
        Text text = new Text("");
         try {
          context.write(line,text);
         } catch (IOException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
         } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
         }
       };
      }

    //Reducer任务
      static class DDReduce extends Reducer<Text,Text,Text,Text>{
       protected void reduce(Text k2,Iterable<Text> v2s,Context context){
        Text text = new Text("");
        try {
         context.write(k2, text);
        } catch (IOException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
        } catch (InterruptedException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
        }
       };
      }


参考文章;

一个经典的MapReduce模板代码,倒排索引(ReverseIndex)

http://blog.itpub.net/26400547/viewspace-1214945/

详解MapReduce实现数据去重与倒排索引应用场景案例

http://www.tuicool.com/articles/emi6Fb



本文出自 “点滴积累” 博客,请务必保留此出处http://tianxingzhe.blog.51cto.com/3390077/1698489

目录
相关文章
|
2月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
101 3
|
SQL 分布式计算 并行计算
【手把手 脑把脑】教会你使用idea基于MapReduce的统计数据分析(从问题分析到代码编写)(一)
【手把手 脑把脑】教会你使用idea基于MapReduce的统计数据分析(从问题分析到代码编写)(一)
305 0
【手把手 脑把脑】教会你使用idea基于MapReduce的统计数据分析(从问题分析到代码编写)(一)
|
分布式计算 数据挖掘
【手把手 脑把脑】教会你使用idea基于MapReduce的统计数据分析(从问题分析到代码编写)(二)
【手把手 脑把脑】教会你使用idea基于MapReduce的统计数据分析(从问题分析到代码编写)(二)
480 0
|
分布式计算 大数据
大数据||MapReduce编程模板
标准模板代码 package com.lizh.hadoop.mapreduce; import java.io.IOException; import org.
1082 0
|
分布式计算 Hadoop 测试技术
|
7月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
84 1
|
6月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
65 1
|
6月前
|
数据采集 SQL 分布式计算
|
7月前
|
分布式计算 Hadoop Java
Hadoop MapReduce 调优参数
对于 Hadoop v3.1.3,针对三台4核4G服务器的MapReduce调优参数包括:`mapreduce.reduce.shuffle.parallelcopies`设为10以加速Shuffle,`mapreduce.reduce.shuffle.input.buffer.percent`和`mapreduce.reduce.shuffle.merge.percent`分别设为0.8以减少磁盘IO。
76 1

相关实验场景

更多