Hadoop MapReduce(FlowCount) Java编程

简介:

编写PhoneFlow程序,计算手机上行流量、下行流量以及总流量,数据如下:

 13685295623 122  201 

 13985295600 102  11 

 13885295622 22   101 

 13785295633 120  20 

1、FlowMapper:

package com.hadoop.flow;


import java.io.IOException;


import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.commons.lang.StringUtils;



public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

/**

 * 数据格式:

 * 13685295623 122  201 

 * 13985295600 102  11 

 * 13885295622 22   101 

 * 13785295633 120  20 

 */

@Override

protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {

String line=value.toString();

String [] fields=StringUtils.split(line,"\t");

String phoneNB=fields[0];

long up_flow=Long.valueOf(fields[1]);

long d_flow=Long.valueOf(fields[2]);

context.write(new Text(phoneNB), new FlowBean(phoneNB,up_flow,d_flow));

}


}

2、FlowReducer:

package com.hadoop.flow;


import java.io.IOException;


import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;


public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> {

@Override

protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException {

long upflowC=0;

long dflowD=0;

for(FlowBean bean:values){

upflowC+=bean.getUp_flow();

dflowD+=bean.getD_flow();

}

context.write(key,new FlowBean(key.toString(),upflowC,dflowD));

}


}


3、FlowRunner 

package com.hadoop.flow;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import org.apache.hadoop.io.Text;


public class FlowRunner extends Configured implements Tool{

public int run(String[] args) throws Exception {

Configuration conf=new Configuration();

Job job=Job.getInstance(conf);

job.setJarByClass(FlowRunner.class);

job.setMapperClass(FlowMapper.class);

job.setReducerClass(FlowReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(FlowBean.class);

FileInputFormat.setInputPaths(job,new Path(args[0]));

FileOutputFormat.setOutputPath(job,new Path(args[1]));

return job.waitForCompletion(true)?0:1;

}

public static void main(String[] args) throws Exception {

ToolRunner.run(new Configuration(), new FlowRunner(), args);

}


}


4、FlowBean :

package com.hadoop.flow;


import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;


import org.apache.hadoop.io.Writable;


public class FlowBean implements Writable{

private String phoneNB;

private long up_flow;

private long d_flow;

private long s_flow;

public FlowBean(){

}

public FlowBean (String phoneNB,long up_flow,long d_flow){

this.phoneNB=phoneNB;

this.up_flow=up_flow;

this.d_flow=d_flow;

this.s_flow=up_flow+d_flow;

}

public String getPhoneNB() {

return phoneNB;

}


public void setPhoneNB(String phoneNB) {

this.phoneNB = phoneNB;

}


public long getUp_flow() {

return up_flow;

}


public void setUp_flow(long up_flow) {

this.up_flow = up_flow;

}


public long getD_flow() {

return d_flow;

}


public void setD_flow(long d_flow) {

this.d_flow = d_flow;

}


public long getS_flow() {

return s_flow;

}


public void setS_flow(long s_flow) {

this.s_flow = s_flow;

}

//

public void write(DataOutput out) throws IOException {

out.writeUTF(phoneNB);

out.writeLong(up_flow);

out.writeLong(d_flow);

out.writeLong(s_flow);

}

public void readFields(DataInput in) throws IOException {

phoneNB= in.readUTF();

up_flow=in.readLong();

d_flow=in.readLong();

s_flow=in.readLong();

}

@Override

public String toString() {

return up_flow+"   "+d_flow+"   "+"   "+s_flow;

}

}
















本文转自lzf0530377451CTO博客,原文链接: http://blog.51cto.com/8757576/1839299,如需转载请自行联系原作者



相关文章
|
7天前
|
安全 Java 开发者
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第9天】本文将深入探讨Java并发编程的核心概念,包括线程安全和性能优化。我们将详细解析Java中的同步机制,包括synchronized关键字、Lock接口以及并发集合等,并探讨它们如何影响程序的性能。此外,我们还将讨论Java内存模型,以及它如何影响并发程序的行为。最后,我们将提供一些实用的并发编程技巧和最佳实践,帮助开发者编写出既线程安全又高效的Java程序。
20 3
|
10天前
|
Java 调度
Java并发编程:深入理解线程池的原理与实践
【4月更文挑战第6天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将从线程池的基本原理入手,逐步解析其工作过程,以及如何在实际开发中合理使用线程池以提高程序性能。同时,我们还将关注线程池的一些高级特性,如自定义线程工厂、拒绝策略等,以帮助读者更好地掌握线程池的使用技巧。
|
13天前
|
存储 NoSQL Java
Java数据库编程指南:实现高效数据存储与访问
【4月更文挑战第2天】Java开发者必须掌握数据库编程,尤其是JDBC,它是连接数据库的标准接口。使用Spring JDBC或JPA能简化操作。选择合适的JDBC驱动,如MySQL Connector/J,对性能至关重要。最佳实践包括事务管理、防SQL注入、优化索引和数据库设计。NoSQL数据库如MongoDB也日益重要,Java有对应的驱动支持。理解这些概念和技术是构建高效数据库应用的基础。
Java数据库编程指南:实现高效数据存储与访问
|
10天前
|
设计模式 安全 Java
Java并发编程实战:使用synchronized关键字实现线程安全
【4月更文挑战第6天】Java中的`synchronized`关键字用于处理多线程并发,确保共享资源的线程安全。它可以修饰方法或代码块,实现互斥访问。当用于方法时,锁定对象实例或类对象;用于代码块时,锁定指定对象。过度使用可能导致性能问题,应注意避免锁持有时间过长、死锁,并考虑使用`java.util.concurrent`包中的高级工具。正确理解和使用`synchronized`是编写线程安全程序的关键。
|
12天前
|
Java
深入理解Java并发编程:线程池的应用与优化
【4月更文挑战第3天】 在Java并发编程中,线程池是一种重要的资源管理工具,它能有效地控制和管理线程的数量,提高系统性能。本文将深入探讨Java线程池的工作原理、应用场景以及优化策略,帮助读者更好地理解和应用线程池。
|
8天前
|
Java
Java 并发编程:深入理解线程池
【4月更文挑战第8天】本文将深入探讨 Java 中的线程池技术,包括其工作原理、优势以及如何使用。线程池是 Java 并发编程的重要工具,它可以有效地管理和控制线程的执行,提高系统性能。通过本文的学习,读者将对线程池有更深入的理解,并能在实际开发中灵活运用。
|
4天前
|
安全 算法 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第11天】 在Java中,高效的并发编程是提升应用性能和响应能力的关键。本文将探讨Java并发的核心概念,包括线程安全、锁机制、线程池以及并发集合等,同时提供实用的编程技巧和最佳实践,帮助开发者在保证线程安全的前提下,优化程序性能。我们将通过分析常见的并发问题,如竞态条件、死锁,以及如何利用现代Java并发工具来避免这些问题,从而构建更加健壮和高效的多线程应用程序。
|
8天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第7天】在现代软件开发中,多线程编程已经成为一种不可或缺的技术。为了提高程序性能和资源利用率,Java提供了线程池这一强大工具。本文将深入探讨Java线程池的原理、使用方法以及如何根据实际需求定制线程池,帮助读者更好地理解和应用线程池技术。
14 0
|
10天前
|
缓存 安全 Java
Java并发编程进阶:深入理解Java内存模型
【4月更文挑战第6天】Java内存模型(JMM)是多线程编程的关键,定义了线程间共享变量读写的规则,确保数据一致性和可见性。主要包括原子性、可见性和有序性三大特性。Happens-Before原则规定操作顺序,内存屏障和锁则保障这些原则的实施。理解JMM和相关机制对于编写线程安全、高性能的Java并发程序至关重要。
|
14小时前
|
设计模式 运维 安全
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第15天】在Java开发中,多线程编程是提升应用程序性能和响应能力的关键手段。然而,它伴随着诸多挑战,尤其是在保证线程安全的同时如何避免性能瓶颈。本文将探讨Java并发编程的核心概念,包括同步机制、锁优化、线程池使用以及并发集合等,旨在为开发者提供实用的线程安全策略和性能优化技巧。通过实例分析和最佳实践的分享,我们的目标是帮助读者构建既高效又可靠的多线程应用。