Hadoop MapReduce(FlowCount) Java编程

简介:

编写PhoneFlow程序,计算手机上行流量、下行流量以及总流量,数据如下:

 13685295623 122  201 

 13985295600 102  11 

 13885295622 22   101 

 13785295633 120  20 

1、FlowMapper:

package com.hadoop.flow;


import java.io.IOException;


import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.commons.lang.StringUtils;



public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

/**

 * 数据格式:

 * 13685295623 122  201 

 * 13985295600 102  11 

 * 13885295622 22   101 

 * 13785295633 120  20 

 */

@Override

protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {

String line=value.toString();

String [] fields=StringUtils.split(line,"\t");

String phoneNB=fields[0];

long up_flow=Long.valueOf(fields[1]);

long d_flow=Long.valueOf(fields[2]);

context.write(new Text(phoneNB), new FlowBean(phoneNB,up_flow,d_flow));

}


}

2、FlowReducer:

package com.hadoop.flow;


import java.io.IOException;


import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;


public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> {

@Override

protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException {

long upflowC=0;

long dflowD=0;

for(FlowBean bean:values){

upflowC+=bean.getUp_flow();

dflowD+=bean.getD_flow();

}

context.write(key,new FlowBean(key.toString(),upflowC,dflowD));

}


}


3、FlowRunner 

package com.hadoop.flow;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import org.apache.hadoop.io.Text;


public class FlowRunner extends Configured implements Tool{

public int run(String[] args) throws Exception {

Configuration conf=new Configuration();

Job job=Job.getInstance(conf);

job.setJarByClass(FlowRunner.class);

job.setMapperClass(FlowMapper.class);

job.setReducerClass(FlowReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(FlowBean.class);

FileInputFormat.setInputPaths(job,new Path(args[0]));

FileOutputFormat.setOutputPath(job,new Path(args[1]));

return job.waitForCompletion(true)?0:1;

}

public static void main(String[] args) throws Exception {

ToolRunner.run(new Configuration(), new FlowRunner(), args);

}


}


4、FlowBean :

package com.hadoop.flow;


import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;


import org.apache.hadoop.io.Writable;


public class FlowBean implements Writable{

private String phoneNB;

private long up_flow;

private long d_flow;

private long s_flow;

public FlowBean(){

}

public FlowBean (String phoneNB,long up_flow,long d_flow){

this.phoneNB=phoneNB;

this.up_flow=up_flow;

this.d_flow=d_flow;

this.s_flow=up_flow+d_flow;

}

public String getPhoneNB() {

return phoneNB;

}


public void setPhoneNB(String phoneNB) {

this.phoneNB = phoneNB;

}


public long getUp_flow() {

return up_flow;

}


public void setUp_flow(long up_flow) {

this.up_flow = up_flow;

}


public long getD_flow() {

return d_flow;

}


public void setD_flow(long d_flow) {

this.d_flow = d_flow;

}


public long getS_flow() {

return s_flow;

}


public void setS_flow(long s_flow) {

this.s_flow = s_flow;

}

//

public void write(DataOutput out) throws IOException {

out.writeUTF(phoneNB);

out.writeLong(up_flow);

out.writeLong(d_flow);

out.writeLong(s_flow);

}

public void readFields(DataInput in) throws IOException {

phoneNB= in.readUTF();

up_flow=in.readLong();

d_flow=in.readLong();

s_flow=in.readLong();

}

@Override

public String toString() {

return up_flow+"   "+d_flow+"   "+"   "+s_flow;

}

}










本文转自lzf0530377451CTO博客,原文链接:http://blog.51cto.com/8757576/1839299 ,如需转载请自行联系原作者





相关文章
|
2月前
|
Kubernetes 负载均衡 Java
k8s的出现解决了java并发编程胡问题了
Kubernetes通过提供自动化管理、资源管理、服务发现和负载均衡、持续交付等功能,有效地解决了Java并发编程中的许多复杂问题。它不仅简化了线程管理和资源共享,还提供了强大的负载均衡和故障恢复机制,确保应用程序在高并发环境下的高效运行和稳定性。通过合理配置和使用Kubernetes,开发者可以显著提高Java应用程序的性能和可靠性。
90 31
|
2月前
|
Java 编译器 开发者
注解的艺术:Java编程的高级定制
注解是Java编程中的高级特性,通过内置注解、自定义注解及注解处理器,可以实现代码的高度定制和扩展。通过理解和掌握注解的使用方法,开发者可以提高代码的可读性、可维护性和开发效率。在实际应用中,注解广泛用于框架开发、代码生成和配置管理等方面,展示了其强大的功能和灵活性。
81 25
|
5月前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!
|
2月前
|
Java API Docker
在线编程实现!如何在Java后端通过DockerClient操作Docker生成python环境
以上内容是一个简单的实现在Java后端中通过DockerClient操作Docker生成python环境并执行代码,最后销毁的案例全过程,也是实现一个简单的在线编程后端API的完整流程,你可以在此基础上添加额外的辅助功能,比如上传文件、编辑文件、查阅文件、自定义安装等功能。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
在线编程实现!如何在Java后端通过DockerClient操作Docker生成python环境
|
2月前
|
Java 开发工具
课时6:Java编程起步
课时6:Java编程起步,主讲人李兴华。课程摘要:介绍Java编程的第一个程序“Hello World”,讲解如何使用记事本或EditPlus编写、保存和编译Java源代码(*.java文件),并解释类定义、主方法(public static void main)及屏幕打印(System.out.println)。强调类名与文件名一致的重要性,以及Java程序的编译和执行过程。通过实例演示,帮助初学者掌握Java编程的基本步骤和常见问题。
|
5月前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
158 5
Java 并发编程——volatile 关键字解析
|
4月前
|
存储 分布式计算 Hadoop
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
144 7
|
5月前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
142 12
|
5月前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
421 2
|
6月前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####