使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner

简介: 工程结构:在整个案例过程中,代码如下:WordCountMapper的代码如下:package cn.toto.bigdata.mr.wc;   import java.io.IOException;   import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable;

工程结构:

在整个案例过程中,代码如下:

WordCountMapper的代码如下:

package cn.toto.bigdata.mr.wc;

 

import java.io.IOException;

 

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

/**

 * 这里的Mapper是hadoop-mapreduce-client-core-2.8.0.jar中的内容

 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

 * KEYIN     :是指框架读取到的数据的key的类型,在默认的InputFormat下,读到的key是一行文本的起始偏移量,所以key的类型是Long

 * VALUEIN   :是指框架读取到的数据的value的类型,在默认的InputFormat下,读到的value是一行文本的内容,所以value的类型是String

 * KEYOUT    :是指用户自定义逻辑方法返回的数据中key的类型,由用户业务逻辑决定,在此wordcount程序中,我们输出的key是单词,所以是String

 * VALUEOUT  :是指用户自定义逻辑方法返回的数据中value的类型,由用户业务逻辑决定,在此wordcount程序中,我们输出的value是单词的数量,所以是Integer

 *

 * 但是,String,Long等jdk中自带的数据类型,在序列化是,效率比较低,hadoop为了提高序列化效率,自定义了一套序列化框架,

 * 所以,在hadoop的程序中,如果该数据需要进行序列化(写磁盘,或者网络传输),就一定要用实现了hadoop序列化框架的数据类型

 *

 * Long       ----> LongWritable

 * String     ----> Text

 * Integer    ----> IntWritable

 * Null       ----> NullWritable

 */

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

 

         /**

          * 这就是mapreduce框架中一个主体运行程序MapTask所要调用的用户业务逻辑方法

          * MapTask会驱动InputFormat去读取数据(keyIN,VALUEIN),每读到一个KV对,就传入这个用户写的map方法中调用一次

          * 在默认的inputformat实现中,此处的一个key就是一行的起始偏移量,value就是一行的内容

          */

         @Override

         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)

                            throws IOException, InterruptedException {

                  

                   String line = value.toString();

                   String[] words = line.split(" ");

                   for (String word : words) {

                            context.write(new Text(word), new IntWritable(1));

                   }

         }

}

 

WordCountReducer的代码如下:

package cn.toto.bigdata.mr.wc;

 

import java.io.IOException;

 

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

 

         /** reducetask在调我们写的reduce方法

                   reducetask应该收到了前一阶段(map阶段)中所有maptask输出的数据中的一部分

                   (数据的key.hashcode%reducetask数==本reductask号)

                   reducetask将这些收到kv数据拿来处理时,是这样调用我们的reduce方法的:

                          先将自己收到的所有的kv对按照k分组(根据k是否相同)

                          将某一组kv中的第一个kv中的k传给reduce方法的key变量,把这一组kv中所有的v用一个迭代器传给reduce方法的变量values

                    */

         @Override

         protected void reduce(Text key, Iterable<IntWritable> values,

                            Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

                   int count = 0;

                   for(IntWritable v : values) {

                            count += v.get();

                   }

                  

                   context.write(key, new IntWritable(count));

         }

}

 

WordCountDriver的代码如下:

package cn.toto.bigdata.mr.wc;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

 

/**

 * 本类是客户端用来指定wordcount job程序运行时所需要的很多参数:

 * 比如,指定用哪个组件作为数据读取器、数据结果输出器

 *     指定用哪个类作为map阶段的业务逻辑类,哪个类作为reduce阶段的业务逻辑类

 *     指定wordcount job程序的jar包所在路径

 *     ....

 *     运行前准备工作

 *     1、将当前的工程导出成wordcount.jar

 *     2、准备/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容类似:

 *                  The true

                nobility is

                in being

                superior to

                your previous

                self guess

      3、将 /home/toto/software/wordcount通过 hadoop fs -put wordcount /wordcount 上传到hdfs

 *    

 *     以及其他各种需要的参数

 *     hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver

 *     上面的命令等同:

 *     java -cp wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver

 *    

 *     上面的含义是通过hadoop jarhadoop classpathjar都拷贝到应用中,并且指定执行cn.toto.bigdata.mr.wc.WordCountDriver

 *     

 *     最后查看结果的方式是:hadoop fs -cat /wordcount/output/part-r-00000,通过这个命令可以查看查看到

 */

public class WordCountDriver {

 

   public static void main(String[] args) throws Exception {

      Configuration conf = new Configuration();

     

      conf.set("fs.defaultFS", "hdfs://hadoop:9000");

     

      /*conf.set("mapreduce.framework.name", "yarn");

      conf.set("yarn.resourcemanager.hostname", "mini1");*/

     

      Job job = Job.getInstance(conf);

      //告诉框架,我们的程序所在jar包的路径

      // job.setJar("c:/wordcount.jar");

      job.setJarByClass(WordCountDriver.class);

     

      //告诉框架,我们的程序所用的mapper类和reducer

      job.setMapperClass(WordCountMapper.class);

      job.setReducerClass(WordCountReducer.class);

     

      //告诉框架,我们的mapperreducer输出的数据类型

      job.setMapOutputKeyClass(Text.class);

      job.setMapOutputValueClass(IntWritable.class);

     

      job.setOutputKeyClass(Text.class);

      job.setOutputValueClass(IntWritable.class);

     

      // 告诉框架,我们的数据读取、结果输出所用的format组件

      // TextInputFormatmapreduce框架中内置的一种读取文本文件的输入组件

      job.setInputFormatClass(TextInputFormat.class);

      job.setOutputFormatClass(TextOutputFormat.class);

     

      // 告诉框架,我们要处理的文件在哪个路径下,注意若hdfs中已经有了/wordcount/input/这个文件,说明

      FileInputFormat.setInputPaths(job, new Path("/wordcount/input/"));

     

      // 告诉框架,我们的处理结果要输出到哪里去

      FileOutputFormat.setOutputPath(job, new Path("/wordcount/output/"));深

     

      boolean res = job.waitForCompletion(true);

     

      System.exit(res?0:1);

   }

}

 

运行前的准备工作:

运行前准备工作

1、将当前的工程导出成wordcount.jar

2、准备/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容类似:

                   The true

                nobility is

                in being

                superior to

                your previous

                self guess

   3、将 /home/toto/software/wordcount通过 hadoop fs -put wordcount /wordcount 上传到hdfs

最后,可以执行的命令是:

hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver

执行后的效果如下:


 

B:使用WordCount本地运行,并且使用Combiner的案例(主要改变是在WordCountDriver中),代码如下:

package cn.toto.bigdata.mr.wc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * 本类是客户端用来指定wordcount job程序运行时所需要的很多参数:
 * 比如,指定用哪个组件作为数据读取器、数据结果输出器
 *     指定用哪个类作为map阶段的业务逻辑类,哪个类作为reduce阶段的业务逻辑类
 *     指定wordcount job程序的jar包所在路径
 *     ....
 *     运行前准备工作
 *     1、将当前的工程导出成wordcount.jar
 *     2、准备/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容类似:
                    The true
					nobility is 
					in being 
					superior to 
					your previous 
					self guess
	   3、将 /home/toto/software/wordcount通过 hadoop fs -put wordcount /wordcount 上传到hdfs中
 *     
 *     以及其他各种需要的参数
 *     hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver
 *     上面的命令等同:
 *     java -cp wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver
 *     
 *     上面的含义是通过hadoop jar将hadoop classpath的jar都拷贝到应用中,并且指定执行cn.toto.bigdata.mr.wc.WordCountDriver
 *     
 *     最后查看结果的方式是:hadoop fs -cat /wordcount/output/part-r-00000,通过这个命令可以查看查看到 
 */
public class WordCountDriver {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		
		//conf.set("fs.defaultFS", "hdfs://hadoop:9000");
		
		/*conf.set("mapreduce.framework.name", "yarn");
		conf.set("yarn.resourcemanager.hostname", "mini1");*/
		
		Job job = Job.getInstance(conf);
		//告诉框架,我们的程序所在jar包的路径
		// job.setJar("c:/wordcount.jar");
		job.setJarByClass(WordCountDriver.class);
		
		//告诉框架,我们的程序所用的mapper类和reducer类
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		//使用tCombiner,使用Combiner的好处是让数据在mapper task中就做统计求和,然后将求和后的结果传递给
		//reducer,然后reducer可以在进行求和。这样的好处是减少了reducer的工作。让每个mapper task自己做聚合,
		//通过分担的方式让效率得以提升,由于combiner的内容结构,编程规范也是集成reducer,所以在当前场景中可以将combiner直接
		//设置成WordCountReducer
		job.setCombinerClass(WordCountReducer.class);
		
		//告诉框架,我们的mapperreducer输出的数据类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		// 告诉框架,我们的数据读取、结果输出所用的format组件
		// TextInputFormat是mapreduce框架中内置的一种读取文本文件的输入组件
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		// 告诉框架,我们要处理的文件在哪个路径下,注意若hdfs中已经有了/wordcount/input/这个文件,说明
		FileInputFormat.setInputPaths(job, new Path("e:/wordcount/input/"));
		
		// 告诉框架,我们的处理结果要输出到哪里去
		FileOutputFormat.setOutputPath(job, new Path("e:/wordcount/output/"));
		
		boolean res = job.waitForCompletion(true);
		
		System.exit(res?0:1);
	}
}

准备工作:

在E盘下准备e:/wordcount/input/a.txt,其中的内容如下:

The true
nobility is 
in being 
superior to 
your previous 
self guess
No great 
discovery 
was ever 
made without 
a bold 
Knowledge will 
give you 
power but 
character respect
The sun 
is just 
rising in 
the morning 
of another 
day I
I figure 
life is 
a gift 
and I 
don't intend 
on wasting

右键运行上面的代码,进入:

E:\wordcount\output\part-r-00000中看结果,结果内容如下:

I	3
Knowledge	1
No	1
The	2
a	2
and	1
another	1
being	1
bold	1
but	1
character	1
day	1
discovery	1
don't	1
ever	1
figure	1
gift	1
give	1
great	1
guess	1
in	2
intend	1
is	3
just	1
life	1
made	1
morning	1
nobility	1
of	1
on	1
power	1
previous	1
respect	1
rising	1
self	1
sun	1
superior	1
the	1
to	1
true	1
was	1
wasting	1
will	1
without	1
you	1
your	1

经过上面的所有步骤之后,程序已经编写完成


总结:

3.MAPREDUCE中的Combiner[dht1] 

(1)combiner是MR程序中Mapper和Reducer之外的一种组件

(2)combiner组件的父类就是Reducer

(3)combiner和reducer的区别在于运行的位置:

Combiner是在每一个maptask所在的节点运行

Reducer是接收全局所有Mapper的输出结果;

(4) combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量

具体实现步骤:

    1、 自定义一个combiner继承Reducer,重写reduce方法

    2、 job中设置: job.setCombinerClass(CustomCombiner.class)

(5) combiner能够应用的前提是不能影响最终的业务逻辑

而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来


Combiner的使用要非常谨慎

因为combiner在mapreduce过程中可能调用也肯能不调用,可能调一次也可能调多次

所以:combiner使用的原则是:有或没有都不能影响业务逻辑


===============================================================================


 流量统计和自定义类实现序列化案例:

package cn.toto.bigdata.mr.wc.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
 * 自定义的类要被mapreduce使用,需要序列化WritableComparable
 */
public class FlowBean implements WritableComparable<FlowBean> {

	private String phoneNbr;
	private long upFlow;
	private long dFlow;
	private long sumFlow;
	
	/**
	 * 
	 */
	public FlowBean() {}
	
	/**
	 * 序列化框架在反序列化操作创建对象实例时会调用无参构造
	 */
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(phoneNbr);
		out.writeLong(upFlow);
		out.writeLong(dFlow);
		out.writeLong(sumFlow);
	}

	/**
	 * 反序列化方法,注意:字段的反序列化顺序与序列化时的顺序保持一致
	 */
	@Override
	public void readFields(DataInput in) throws IOException {
		this.phoneNbr = in.readUTF();
		this.upFlow = in.readLong();
		this.dFlow = in.readLong();
		this.sumFlow = in.readLong();
	}

	@Override
	public int compareTo(FlowBean o) {
		return (int)(o.getSumFlow() - this.sumFlow);
	}

	public void set(long upFlow,long dFlow) {
		this.upFlow = upFlow;
		this.dFlow = dFlow;
		this.sumFlow = upFlow + dFlow;
	}
	

	public void set(String phoneNbr,long upFlow, long dFlow) {
		this.phoneNbr = phoneNbr;
		this.upFlow = upFlow;
		this.dFlow = dFlow;
		this.sumFlow = upFlow + dFlow;
	}
	
	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getdFlow() {
		return dFlow;
	}

	public void setdFlow(long dFlow) {
		this.dFlow = dFlow;
	}

	public long getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}
	
	public String getPhoneNbr() {
		return phoneNbr;
	}

	public void setPhoneNbr(String phoneNbr) {
		this.phoneNbr = phoneNbr;
	}

	@Override
	public String toString() {
		return "FlowBean [phoneNbr=" + phoneNbr + ", upFlow=" + upFlow + ", dFlow=" + dFlow + ", sumFlow=" + sumFlow
				+ "]";
	}
}

package cn.toto.bigdata.mr.wc.flowsum;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowSum {
	
	//在kv中传输我们自定义的对象时可以的,但是必须实现hadoop的序列化机制 implements Writable
	public static class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
		Text k = new Text();
		FlowBean v = new FlowBean();
		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			//将读到的一行数据进行字段切分
			String line = value.toString();
			String[] fields = StringUtils.split(line,"\t");
			
			//抽取业务所需要的个字段
			String phoneNbr = fields[1];
			long upFlow = Long.parseLong(fields[fields.length - 3]);
			long dFlow = Long.parseLong(fields[fields.length - 2]);
			
			k.set(phoneNbr);
			v.set(upFlow, dFlow);
			
			context.write(k, v);
		}
	}
	
	public static class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
		FlowBean v = new FlowBean();
		
		/**
		 * reduce方法接收到的key是某一组<a手机号,bean><a手机号,bean><a手机号,bean>中的第一个手机号
		 * reduce方法接收到的vlaues是这一组kv中的所有bean的一个迭代器
		 */
		@Override
		protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
			long upFlowCount = 0;
			long dFlowCount = 0;
			for (FlowBean bean : values) {
				upFlowCount += bean.getUpFlow();
				dFlowCount += bean.getdFlow();
			}

			v.set(upFlowCount, dFlowCount);
			context.write(key, v);
		}
	}
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		/*
		 * conf.set("mapreduce.framework.name", "yarn");
		 * conf.set("yarn.resourcemanager.hostname", "mini1");
		 */

		Job job = Job.getInstance(conf);
		// 告诉框架,我们的程序所在jar包的路径
		// job.setJar("c:/wordcount.jar");
		job.setJarByClass(FlowSum.class);

		// 告诉框架,我们的程序所用的mapper类和reducer类
		job.setMapperClass(FlowSumMapper.class);
		job.setReducerClass(FlowSumReducer.class);

		// 告诉框架,我们的mapperreducer输出的数据类型
		/*
		 * job.setMapOutputKeyClass(Text.class);
		 * job.setMapOutputValueClass(FlowBean.class);
		 */
		// 如果map阶段输出的数据类型跟最终输出的数据类型一致,就只要以下两行代码来指定
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		// 框架中默认的输入输出组件就是这俩货,所以可以省略这两行代码
		/*
		 * job.setInputFormatClass(TextInputFormat.class);
		 * job.setOutputFormatClass(TextOutputFormat.class);
		 */

		// 告诉框架,我们要处理的文件在哪个路径下
		FileInputFormat.setInputPaths(job, new Path("E:/learnTempFolder/flow/input/"));

		// 告诉框架,我们的处理结果要输出到哪里去
		FileOutputFormat.setOutputPath(job, new Path("E:/learnTempFolder/flow/output/"));

		boolean res = job.waitForCompletion(true);

		System.exit(res ? 0 : 1);
	}
}
package cn.toto.bigdata.mr.wc.flowsum;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 实现流量汇总并且按照流量大小倒序排序 前提:处理的数据是已经汇总过的结果文件
 * 
 * @author
 * 
 */
public class FlowSumSort {
	
	public static class FlowSumSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
		FlowBean k = new FlowBean();
		Text v = new Text();
		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] fields = line.split("\t");
			String phoneNbr = fields[0];
			long upFlowSum = Long.parseLong(fields[1]);
			long dFlowSum = Long.parseLong(fields[2]);
			k.set(upFlowSum, dFlowSum);
			v.set(phoneNbr);
			context.write(k, v);
		}
	}
	
	public static class FlowSumSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

		@Override
		protected void reduce(FlowBean bean, Iterable<Text> phoneNbrs, Context context) throws IOException, InterruptedException {

			context.write(phoneNbrs.iterator().next(), bean);

		}

	}
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		Job job = Job.getInstance(conf);
		job.setJarByClass(FlowSumSort.class);

		// 告诉框架,我们的程序所用的mapper类和reducer类
		job.setMapperClass(FlowSumSortMapper.class);
		job.setReducerClass(FlowSumSortReducer.class);

		job.setMapOutputKeyClass(FlowBean.class);
		job.setMapOutputValueClass(Text.class);

		// 告诉框架,我们的mapperreducer输出的数据类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		// 告诉框架,我们要处理的文件在哪个路径下(注意:这里的程序执行)
		FileInputFormat.setInputPaths(job, new Path("E:/learnTempFolder/flow/output/"));

		// 告诉框架,我们的处理结果要输出到哪里去
		FileOutputFormat.setOutputPath(job, new Path("E:/learnTempFolder/flow/sortout/"));

		boolean res = job.waitForCompletion(true);

		System.exit(res ? 0 : 1);
	}
}

运行条件模拟:

1、配置环境变量为HADOOP_HOME=E:\learnTempFolder\hadoop-2.7.3

 2、从CSDN资源上下载支持win10版本的:E:\learnTempFolder\hadoop-2.7.3\bin\winutils.exe 和 E:\learnTempFolder\hadoop-2.7.3\bin\hadoop.dll

界面效果如下:


3、准备要处理的资料:


HTTP_20130313143750.dat 数据文件的具体内容如:

1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	200
1363157991076 	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99			2	4	132	1512	200
1363154400022 	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4			4	0	240	0	200
1363157993044 	18211575961	94-71-AC-CD-E6-18:CMCC-EASY	120.196.100.99	iface.qiyi.com	视频网站	15	12	1527	2106	200
1363157995074 	84138413	5C-0E-8B-8C-E8-20:7DaysInn	120.197.40.4	122.72.52.12		20	16	4116	1432	200
1363157993055 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
1363157995033 	15920133257	5C-0E-8B-C7-BA-20:CMCC	120.197.40.4	sug.so.360.cn	信息安全	20	20	3156	2936	200
1363157983019 	13719199419	68-A1-B7-03-07-B1:CMCC-EASY	120.196.100.82			4	0	240	0	200
1363157984041 	13660577991	5C-0E-8B-92-5C-20:CMCC-EASY	120.197.40.4	s19.cnzz.com	站点统计	24	9	6960	690	200
1363157973098 	15013685858	5C-0E-8B-C7-F7-90:CMCC	120.197.40.4	rank.ie.sogou.com	搜索引擎	28	27	3659	3538	200
1363157986029 	15989002119	E8-99-C4-4E-93-E0:CMCC-EASY	120.196.100.99	www.umeng.com	站点统计	3	3	1938	180	200
1363157992093 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			15	9	918	4938	200
1363157986041 	13480253104	5C-0E-8B-C7-FC-80:CMCC-EASY	120.197.40.4			3	3	180	180	200
1363157984040 	13602846565	5C-0E-8B-8B-B6-00:CMCC	120.197.40.4	2052.flash2-http.qq.com	综合门户	15	12	1938	2910	200
1363157995093 	13922314466	00-FD-07-A2-EC-BA:CMCC	120.196.100.82	img.qfc.cn		12	12	3008	3720	200
1363157982040 	13502468823	5C-0A-5B-6A-0B-D4:CMCC-EASY	120.196.100.99	y0.ifengimg.com	综合门户	57	102	7335	110349	200
1363157986072 	18320173382	84-25-DB-4F-10-1A:CMCC-EASY	120.196.100.99	input.shouji.sogou.com	搜索引擎	21	18	9531	2412	200
1363157990043 	13925057413	00-1F-64-E1-E6-9A:CMCC	120.196.100.55	t3.baidu.com	搜索引擎	69	63	11058	48243	200
1363157988072 	13760778710	00-FD-07-A4-7B-08:CMCC	120.196.100.82			2	2	120	120	200
1363157985066 	13726238888	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157993055 	13560436666	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
4、先运行FlowSum(右键执行Java程序)

运行生成的文件为E:\learnTempFolder\flow\output\part-r-00000,内容如下:

13480253104	180	180	360
13502468823	7335	110349	117684
13560436666	1116	954	2070
13560439658	2034	5892	7926
13602846565	1938	2910	4848
13660577991	6960	690	7650
13719199419	240	0	240
13726230503	2481	24681	27162
13726238888	2481	24681	27162
13760778710	120	120	240
13826544101	264	0	264
13922314466	3008	3720	6728
13925057413	11058	48243	59301
13926251106	240	0	240
13926435656	132	1512	1644
15013685858	3659	3538	7197
15920133257	3156	2936	6092
15989002119	1938	180	2118
18211575961	1527	2106	3633
18320173382	9531	2412	11943
84138413	4116	1432	5548
5、运行FlowSumSort(注意不要删除上面的part-r-00000)

运行后产生的文件内容是:

13502468823	7335	110349	117684
13925057413	11058	48243	59301
13726230503	2481	24681	27162
18320173382	9531	2412	11943
13560439658	2034	5892	7926
13660577991	6960	690	7650
15013685858	3659	3538	7197
13922314466	3008	3720	6728
15920133257	3156	2936	6092
84138413	4116	1432	5548
13602846565	1938	2910	4848
18211575961	1527	2106	3633
15989002119	1938	180	2118
13560436666	1116	954	2070
13926435656	132	1512	1644
13480253104	180	180	360
13826544101	264	0	264
13719199419	240	0	240

当然,我们也可以一次性求和并运算出结果输出到指定的文件目录中,代码如下:

package cn.toto.bigdata.mr.wc.flowsum;

import java.io.IOException;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OneStepSumSort {

	public static class OneStepSumSortMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
		Text k = new Text();
		FlowBean v = new FlowBean();
		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			//将读到的一行数据进行字段切分
			String line = value.toString();
			String[] fields = StringUtils.split(line,"\t");
			
			//抽取业务所需要的各字段
			String phoneNbr = fields[1];
			long upFlow = Long.parseLong(fields[fields.length - 3]);
			long dFlow = Long.parseLong(fields[fields.length - 2]);
			
			k.set(phoneNbr);
			v.set(phoneNbr, upFlow, dFlow);
			
			context.write(k, v);
		}
	}
	
	public static class OneStepSumSortReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
		TreeMap<FlowBean, Text> treeMap = new TreeMap<FlowBean,Text>();
		
		@Override
		protected void reduce(Text key, Iterable<FlowBean> values, Context context)
				throws IOException, InterruptedException {
			int upCount = 0;
			int dCount = 0;
			for (FlowBean bean : values) {
				upCount += bean.getUpFlow();
				dCount += bean.getdFlow();
			}
			FlowBean sumBean = new FlowBean();
			sumBean.set(key.toString(), upCount, dCount);
			
			Text text = new Text(key.toString());
			treeMap.put(sumBean, text);
		}
		
		@Override
		protected void cleanup(Reducer<Text, FlowBean, Text, FlowBean>.Context context)
				throws IOException, InterruptedException {
			Set<Entry<FlowBean, Text>> entrySet = treeMap.entrySet();
			for (Entry<FlowBean, Text> ent : entrySet) {
				context.write(ent.getValue(), ent.getKey());
			}
		}
	}
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		Job job = Job.getInstance(conf);
		job.setJarByClass(OneStepSumSort.class);

		// 告诉框架,我们的程序所用的mapper类和reducer类
		job.setMapperClass(OneStepSumSortMapper.class);
		job.setReducerClass(OneStepSumSortReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);

		// 告诉框架,我们的mapperreducer输出的数据类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		
		// 告诉框架,我们要处理的文件在哪个路径下
		FileInputFormat.setInputPaths(job, new Path("E:/flow/input/"));

	    // 告诉框架,我们的处理结果要输出到哪里去
	    FileOutputFormat.setOutputPath(job, new Path("E:/flow/sortout/"));
		
	    boolean res = job.waitForCompletion(true);
		
	    System.exit(res ? 0 : 1);
	}
}

到"E:/flow/sortout/"目录下,查看结果:


即:

13502468823	FlowBean [phoneNbr=13502468823, upFlow=7335, dFlow=110349, sumFlow=117684]
13925057413	FlowBean [phoneNbr=13925057413, upFlow=11058, dFlow=48243, sumFlow=59301]
13726238888	FlowBean [phoneNbr=13726230503, upFlow=2481, dFlow=24681, sumFlow=27162]
18320173382	FlowBean [phoneNbr=18320173382, upFlow=9531, dFlow=2412, sumFlow=11943]
13560439658	FlowBean [phoneNbr=13560439658, upFlow=2034, dFlow=5892, sumFlow=7926]
13660577991	FlowBean [phoneNbr=13660577991, upFlow=6960, dFlow=690, sumFlow=7650]
15013685858	FlowBean [phoneNbr=15013685858, upFlow=3659, dFlow=3538, sumFlow=7197]
13922314466	FlowBean [phoneNbr=13922314466, upFlow=3008, dFlow=3720, sumFlow=6728]
15920133257	FlowBean [phoneNbr=15920133257, upFlow=3156, dFlow=2936, sumFlow=6092]
84138413	FlowBean [phoneNbr=84138413, upFlow=4116, dFlow=1432, sumFlow=5548]
13602846565	FlowBean [phoneNbr=13602846565, upFlow=1938, dFlow=2910, sumFlow=4848]
18211575961	FlowBean [phoneNbr=18211575961, upFlow=1527, dFlow=2106, sumFlow=3633]
15989002119	FlowBean [phoneNbr=15989002119, upFlow=1938, dFlow=180, sumFlow=2118]
13560436666	FlowBean [phoneNbr=13560436666, upFlow=1116, dFlow=954, sumFlow=2070]
13926435656	FlowBean [phoneNbr=13926435656, upFlow=132, dFlow=1512, sumFlow=1644]
13480253104	FlowBean [phoneNbr=13480253104, upFlow=180, dFlow=180, sumFlow=360]
13826544101	FlowBean [phoneNbr=13826544101, upFlow=264, dFlow=0, sumFlow=264]
13926251106	FlowBean [phoneNbr=13719199419, upFlow=240, dFlow=0, sumFlow=240]

6、为不同的手机号设置分区,让不同的手机号在不同的文件中。方法如下:

A:下面是自定义分区,自定分区的代码如下:

package cn.toto.bigdata.mr.wc.flowsum;

import java.util.HashMap;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * 自定义分区要继承Partitioner
 */
public class ProvincePartitioner extends Partitioner<Text, FlowBean>{
	private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();
	
	static {
		provinceMap.put("138", 0);
		provinceMap.put("139", 1);
		provinceMap.put("136", 2);
		provinceMap.put("137", 3);
		provinceMap.put("135", 4);
	}
	
	@Override
	public int getPartition(Text key, FlowBean value, int numPartitions) {
		Integer code = provinceMap.get(key.toString().substring(0,3));
		if (code != null) {
			return code;
		}
		return 5;
	}
}
B:测试一下自定义分区:

package cn.toto.bigdata.mr.wc.flowsum;

import java.io.IOException;

import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowSumProvince {

	public static class FlowSumProvinceMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
		Text k = new Text();
		FlowBean v = new FlowBean();

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			// 将读到的一行数据进行字段切分
			String line = value.toString();
			String[] fields = StringUtils.split(line, "\t");

			// 抽取业务所需要的各字段
			String phoneNbr = fields[1];
			long upFlow = Long.parseLong(fields[fields.length - 3]);
			long dFlow = Long.parseLong(fields[fields.length - 2]);

			k.set(phoneNbr);
			v.set(phoneNbr, upFlow, dFlow);

			context.write(k, v);

		}
	}
	
	public static class FlowSumProvinceReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
		
		@Override
		protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {

			int upCount = 0;
			int dCount = 0;
			for (FlowBean bean : values) {
				upCount += bean.getUpFlow();
				dCount += bean.getdFlow();

			}
			FlowBean sumBean = new FlowBean();
			sumBean.set(key.toString(), upCount, dCount);
			context.write(key, sumBean);

		}
	}
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		Job job = Job.getInstance(conf);
		job.setJarByClass(FlowSumProvince.class);

		// 告诉框架,我们的程序所用的mapper类和reducer类
		job.setMapperClass(FlowSumProvinceMapper.class);
		job.setReducerClass(FlowSumProvinceReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);

		// 告诉框架,我们的mapperreducer输出的数据类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		
		//设置shuffle的分区组件使用我们自定义的分区组件,按照手机号进行分区,注意在自定义的手机号分区中有5个,所以我们的分区不能少于6个
		job.setPartitionerClass(ProvincePartitioner.class);
		
		//设置reduce task的数量
		job.setNumReduceTasks(6);
		
		//告诉框架,我们要处理的文件在哪个路径下
		FileInputFormat.setInputPaths(job, new Path("E:/flow/input/"));
		
		//告诉框架,我们的处理结果要输出到哪里去
		Path out = new Path("E:/flow/provinceout/");
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(out)) {
			fs.delete(out,true);
		}
		
		FileOutputFormat.setOutputPath(job, out);
		boolean res = job.waitForCompletion(true);
		System.exit(res ? 0 : 1);
	}
}
C:运行所需的准备:

数据文件:


文件内容如下:

1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	200
1363157991076 	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99			2	4	132	1512	200
1363154400022 	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4			4	0	240	0	200
1363157993044 	18211575961	94-71-AC-CD-E6-18:CMCC-EASY	120.196.100.99	iface.qiyi.com	视频网站	15	12	1527	2106	200
1363157995074 	84138413	5C-0E-8B-8C-E8-20:7DaysInn	120.197.40.4	122.72.52.12		20	16	4116	1432	200
1363157993055 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
1363157995033 	15920133257	5C-0E-8B-C7-BA-20:CMCC	120.197.40.4	sug.so.360.cn	信息安全	20	20	3156	2936	200
1363157983019 	13719199419	68-A1-B7-03-07-B1:CMCC-EASY	120.196.100.82			4	0	240	0	200
1363157984041 	13660577991	5C-0E-8B-92-5C-20:CMCC-EASY	120.197.40.4	s19.cnzz.com	站点统计	24	9	6960	690	200
1363157973098 	15013685858	5C-0E-8B-C7-F7-90:CMCC	120.197.40.4	rank.ie.sogou.com	搜索引擎	28	27	3659	3538	200
1363157986029 	15989002119	E8-99-C4-4E-93-E0:CMCC-EASY	120.196.100.99	www.umeng.com	站点统计	3	3	1938	180	200
1363157992093 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			15	9	918	4938	200
1363157986041 	13480253104	5C-0E-8B-C7-FC-80:CMCC-EASY	120.197.40.4			3	3	180	180	200
1363157984040 	13602846565	5C-0E-8B-8B-B6-00:CMCC	120.197.40.4	2052.flash2-http.qq.com	综合门户	15	12	1938	2910	200
1363157995093 	13922314466	00-FD-07-A2-EC-BA:CMCC	120.196.100.82	img.qfc.cn		12	12	3008	3720	200
1363157982040 	13502468823	5C-0A-5B-6A-0B-D4:CMCC-EASY	120.196.100.99	y0.ifengimg.com	综合门户	57	102	7335	110349	200
1363157986072 	18320173382	84-25-DB-4F-10-1A:CMCC-EASY	120.196.100.99	input.shouji.sogou.com	搜索引擎	21	18	9531	2412	200
1363157990043 	13925057413	00-1F-64-E1-E6-9A:CMCC	120.196.100.55	t3.baidu.com	搜索引擎	69	63	11058	48243	200
1363157988072 	13760778710	00-FD-07-A4-7B-08:CMCC	120.196.100.82			2	2	120	120	200
1363157985066 	13726238888	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157993055 	13560436666	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200

运行后的结果如下:



part-r-00001中内容:




等等






 

目录
相关文章
|
7月前
|
分布式计算 Hadoop Java
MapReduce编程:自定义分区和自定义计数器
MapReduce编程:自定义分区和自定义计数器
94 0
|
3月前
|
分布式计算 资源调度 Hadoop
在YARN集群上运行部署MapReduce分布式计算框架
主要介绍了如何在YARN集群上配置和运行MapReduce分布式计算框架,包括准备数据、运行MapReduce任务、查看任务日志,并启动HistoryServer服务以便于日志查看。
66 0
|
4月前
|
分布式计算 Hadoop Java
Hadoop_MapReduce中的WordCount运行详解
MapReduce的WordCount程序在分布式系统中计算大数据集中单词出现的频率时,提供了一个可以复用和可伸缩的解决方案。它体现了MapReduce编程模型的强大之处:简单、可靠且将任务自动分布到一个集群中去执行。它首先运行一系列的Map任务来处理原始数据,然后通过Shuffle和Sort机制来组织结果,最后通过运行Reduce任务来完成最终计算。因此,即便数据量非常大,通过该模型也可以高效地进行处理。
96 1
|
6月前
|
存储 分布式计算 Hadoop
MapReduce编程模型——自定义序列化类实现多指标统计
MapReduce编程模型——自定义序列化类实现多指标统计
48 0
|
6月前
|
分布式计算 Java Hadoop
简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行
简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行
60 0
|
6月前
|
分布式计算 数据挖掘
通过mapreduce程序统计旅游订单(wordcount升级版)
通过mapreduce程序统计旅游订单(wordcount升级版)
|
7月前
|
分布式计算 Hadoop 大数据
【云计算与大数据计算】Hadoop MapReduce实战之统计每个单词出现次数、单词平均长度、Grep(附源码 )
【云计算与大数据计算】Hadoop MapReduce实战之统计每个单词出现次数、单词平均长度、Grep(附源码 )
290 1
|
7月前
|
存储 分布式计算 算法
MapReduce计数器,Tash的运行机制,shuffle过程,压缩算法
MapReduce计数器,Tash的运行机制,shuffle过程,压缩算法
61 0
|
7月前
|
分布式计算 Java Hadoop
IDEA 打包MapReduce程序到集群运行的两种方式以及XShell和Xftp过期的解决
IDEA 打包MapReduce程序到集群运行的两种方式以及XShell和Xftp过期的解决
|
7月前
|
存储 分布式计算 关系型数据库
MapReduce【自定义OutputFormat】
MapReduce【自定义OutputFormat】
下一篇
无影云桌面