Hadoop MapReduce(FlowCount) Java编程

简介:

编写PhoneFlow程序,计算手机上行流量、下行流量以及总流量,数据如下:

 13685295623 122  201 

 13985295600 102  11 

 13885295622 22   101 

 13785295633 120  20 

1、FlowMapper:

package com.hadoop.flow;


import java.io.IOException;


import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.commons.lang.StringUtils;



public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

/**

 * 数据格式:

 * 13685295623 122  201 

 * 13985295600 102  11 

 * 13885295622 22   101 

 * 13785295633 120  20 

 */

@Override

protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {

String line=value.toString();

String [] fields=StringUtils.split(line,"\t");

String phoneNB=fields[0];

long up_flow=Long.valueOf(fields[1]);

long d_flow=Long.valueOf(fields[2]);

context.write(new Text(phoneNB), new FlowBean(phoneNB,up_flow,d_flow));

}


}

2、FlowReducer:

package com.hadoop.flow;


import java.io.IOException;


import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;


public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> {

@Override

protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException {

long upflowC=0;

long dflowD=0;

for(FlowBean bean:values){

upflowC+=bean.getUp_flow();

dflowD+=bean.getD_flow();

}

context.write(key,new FlowBean(key.toString(),upflowC,dflowD));

}


}


3、FlowRunner 

package com.hadoop.flow;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import org.apache.hadoop.io.Text;


public class FlowRunner extends Configured implements Tool{

public int run(String[] args) throws Exception {

Configuration conf=new Configuration();

Job job=Job.getInstance(conf);

job.setJarByClass(FlowRunner.class);

job.setMapperClass(FlowMapper.class);

job.setReducerClass(FlowReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(FlowBean.class);

FileInputFormat.setInputPaths(job,new Path(args[0]));

FileOutputFormat.setOutputPath(job,new Path(args[1]));

return job.waitForCompletion(true)?0:1;

}

public static void main(String[] args) throws Exception {

ToolRunner.run(new Configuration(), new FlowRunner(), args);

}


}


4、FlowBean :

package com.hadoop.flow;


import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;


import org.apache.hadoop.io.Writable;


public class FlowBean implements Writable{

private String phoneNB;

private long up_flow;

private long d_flow;

private long s_flow;

public FlowBean(){

}

public FlowBean (String phoneNB,long up_flow,long d_flow){

this.phoneNB=phoneNB;

this.up_flow=up_flow;

this.d_flow=d_flow;

this.s_flow=up_flow+d_flow;

}

public String getPhoneNB() {

return phoneNB;

}


public void setPhoneNB(String phoneNB) {

this.phoneNB = phoneNB;

}


public long getUp_flow() {

return up_flow;

}


public void setUp_flow(long up_flow) {

this.up_flow = up_flow;

}


public long getD_flow() {

return d_flow;

}


public void setD_flow(long d_flow) {

this.d_flow = d_flow;

}


public long getS_flow() {

return s_flow;

}


public void setS_flow(long s_flow) {

this.s_flow = s_flow;

}

//

public void write(DataOutput out) throws IOException {

out.writeUTF(phoneNB);

out.writeLong(up_flow);

out.writeLong(d_flow);

out.writeLong(s_flow);

}

public void readFields(DataInput in) throws IOException {

phoneNB= in.readUTF();

up_flow=in.readLong();

d_flow=in.readLong();

s_flow=in.readLong();

}

@Override

public String toString() {

return up_flow+"   "+d_flow+"   "+"   "+s_flow;

}

}










本文转自lzf0530377451CTO博客,原文链接:http://blog.51cto.com/8757576/1839299 ,如需转载请自行联系原作者





相关文章
|
8天前
|
算法 Java
【编程基础知识】Java打印九九乘法表
本文介绍了在Java中实现九九乘法表的三种方法:嵌套循环、数组和流控制。通过代码示例、流程图和表格对比,帮助读者深入理解每种方法的优缺点,提升编程技能。
30 2
|
8天前
|
存储 Java
【编程基础知识】 分析学生成绩:用Java二维数组存储与输出
本文介绍如何使用Java二维数组存储和处理多个学生的各科成绩,包括成绩的输入、存储及格式化输出,适合初学者实践Java基础知识。
37 1
|
8天前
|
Java 开发者
【编程进阶知识】《Java 文件复制魔法:FileReader/FileWriter 的奇妙之旅》
本文深入探讨了如何使用 Java 中的 FileReader 和 FileWriter 进行文件复制操作,包括按字符和字符数组复制。通过详细讲解、代码示例和流程图,帮助读者掌握这一重要技能,提升 Java 编程能力。适合初学者和进阶开发者阅读。
109 61
|
8天前
|
存储 Java
【编程基础知识】《Java 起航指南:配置 Java 环境变量的秘籍与奥秘》
本文详细介绍了如何配置 Java 环境变量及其重要性,通过具体步骤、代码示例和流程图,帮助初学者轻松掌握 Java 环境变量的设置,为 Java 编程打下坚实基础。关键词:Java、环境变量、配置方法、编程基础。
116 57
|
4天前
|
安全 Java UED
Java中的多线程编程:从基础到实践
本文深入探讨了Java中的多线程编程,包括线程的创建、生命周期管理以及同步机制。通过实例展示了如何使用Thread类和Runnable接口来创建线程,讨论了线程安全问题及解决策略,如使用synchronized关键字和ReentrantLock类。文章还涵盖了线程间通信的方式,包括wait()、notify()和notifyAll()方法,以及如何避免死锁。此外,还介绍了高级并发工具如CountDownLatch和CyclicBarrier的使用方法。通过综合运用这些技术,可以有效提高多线程程序的性能和可靠性。
|
4天前
|
缓存 Java UED
Java中的多线程编程:从基础到实践
【10月更文挑战第13天】 Java作为一门跨平台的编程语言,其强大的多线程能力一直是其核心优势之一。本文将从最基础的概念讲起,逐步深入探讨Java多线程的实现方式及其应用场景,通过实例讲解帮助读者更好地理解和应用这一技术。
19 3
|
4天前
|
Java 开发者
在Java编程中,正确的命名规范不仅能提升代码的可读性和可维护性,还能有效避免命名冲突。
【10月更文挑战第13天】在Java编程中,正确的命名规范不仅能提升代码的可读性和可维护性,还能有效避免命名冲突。本文将带你深入了解Java命名规则,包括标识符的基本规则、变量和方法的命名方式、常量的命名习惯以及如何避免关键字冲突,通过实例解析,助你写出更规范、优雅的代码。
25 3
|
4天前
|
Java 程序员
在Java编程中,关键字不仅是简单的词汇,更是赋予代码强大功能的“魔法咒语”。
【10月更文挑战第13天】在Java编程中,关键字不仅是简单的词汇,更是赋予代码强大功能的“魔法咒语”。本文介绍了Java关键字的基本概念及其重要性,并通过定义类和对象、控制流程、访问修饰符等示例,展示了关键字的实际应用。掌握这些关键字,是成为优秀Java程序员的基础。
11 3
|
4天前
|
Java 程序员 编译器
在Java编程中,保留字(如class、int、for等)是具有特定语法意义的预定义词汇,被语言本身占用,不能用作变量名、方法名或类名。
在Java编程中,保留字(如class、int、for等)是具有特定语法意义的预定义词汇,被语言本身占用,不能用作变量名、方法名或类名。本文通过示例详细解析了保留字的定义、作用及与自定义标识符的区别,帮助开发者避免因误用保留字而导致的编译错误,确保代码的正确性和可读性。
15 3
|
4天前
|
算法 Java
在Java编程中,关键字和保留字是基础且重要的组成部分,正确理解和使用它们
【10月更文挑战第13天】在Java编程中,关键字和保留字是基础且重要的组成部分。正确理解和使用它们,如class、int、for、while等,不仅能够避免语法错误,还能提升代码的可读性和执行效率。本指南将通过解答常见问题,帮助你掌握Java关键字的正确使用方法,以及如何避免误用保留字,使你的代码更加高效流畅。
19 3