Hadoop MapReduce(FlowCount) Java编程

简介:

编写PhoneFlow程序,计算手机上行流量、下行流量以及总流量,数据如下:

 13685295623 122  201 

 13985295600 102  11 

 13885295622 22   101 

 13785295633 120  20 

1、FlowMapper:

package com.hadoop.flow;


import java.io.IOException;


import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.commons.lang.StringUtils;



public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

/**

 * 数据格式:

 * 13685295623 122  201 

 * 13985295600 102  11 

 * 13885295622 22   101 

 * 13785295633 120  20 

 */

@Override

protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {

String line=value.toString();

String [] fields=StringUtils.split(line,"\t");

String phoneNB=fields[0];

long up_flow=Long.valueOf(fields[1]);

long d_flow=Long.valueOf(fields[2]);

context.write(new Text(phoneNB), new FlowBean(phoneNB,up_flow,d_flow));

}


}

2、FlowReducer:

package com.hadoop.flow;


import java.io.IOException;


import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;


public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> {

@Override

protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException {

long upflowC=0;

long dflowD=0;

for(FlowBean bean:values){

upflowC+=bean.getUp_flow();

dflowD+=bean.getD_flow();

}

context.write(key,new FlowBean(key.toString(),upflowC,dflowD));

}


}


3、FlowRunner 

package com.hadoop.flow;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import org.apache.hadoop.io.Text;


public class FlowRunner extends Configured implements Tool{

public int run(String[] args) throws Exception {

Configuration conf=new Configuration();

Job job=Job.getInstance(conf);

job.setJarByClass(FlowRunner.class);

job.setMapperClass(FlowMapper.class);

job.setReducerClass(FlowReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(FlowBean.class);

FileInputFormat.setInputPaths(job,new Path(args[0]));

FileOutputFormat.setOutputPath(job,new Path(args[1]));

return job.waitForCompletion(true)?0:1;

}

public static void main(String[] args) throws Exception {

ToolRunner.run(new Configuration(), new FlowRunner(), args);

}


}


4、FlowBean :

package com.hadoop.flow;


import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;


import org.apache.hadoop.io.Writable;


public class FlowBean implements Writable{

private String phoneNB;

private long up_flow;

private long d_flow;

private long s_flow;

public FlowBean(){

}

public FlowBean (String phoneNB,long up_flow,long d_flow){

this.phoneNB=phoneNB;

this.up_flow=up_flow;

this.d_flow=d_flow;

this.s_flow=up_flow+d_flow;

}

public String getPhoneNB() {

return phoneNB;

}


public void setPhoneNB(String phoneNB) {

this.phoneNB = phoneNB;

}


public long getUp_flow() {

return up_flow;

}


public void setUp_flow(long up_flow) {

this.up_flow = up_flow;

}


public long getD_flow() {

return d_flow;

}


public void setD_flow(long d_flow) {

this.d_flow = d_flow;

}


public long getS_flow() {

return s_flow;

}


public void setS_flow(long s_flow) {

this.s_flow = s_flow;

}

//

public void write(DataOutput out) throws IOException {

out.writeUTF(phoneNB);

out.writeLong(up_flow);

out.writeLong(d_flow);

out.writeLong(s_flow);

}

public void readFields(DataInput in) throws IOException {

phoneNB= in.readUTF();

up_flow=in.readLong();

d_flow=in.readLong();

s_flow=in.readLong();

}

@Override

public String toString() {

return up_flow+"   "+d_flow+"   "+"   "+s_flow;

}

}
















本文转自lzf0530377451CTO博客,原文链接: http://blog.51cto.com/8757576/1839299,如需转载请自行联系原作者



相关文章
|
2月前
|
IDE Java 编译器
java编程最基础学习
Java入门需掌握:环境搭建、基础语法、面向对象、数组集合与异常处理。通过实践编写简单程序,逐步深入学习,打牢编程基础。
210 1
|
2月前
|
Java
如何在Java中进行多线程编程
Java多线程编程常用方式包括:继承Thread类、实现Runnable接口、Callable接口(可返回结果)及使用线程池。推荐线程池以提升性能,避免频繁创建线程。结合同步与通信机制,可有效管理并发任务。
151 6
|
5月前
|
Java API 微服务
为什么虚拟线程将改变Java并发编程?
为什么虚拟线程将改变Java并发编程?
306 83
|
3月前
|
SQL Java 数据库
2025 年 Java 从零基础小白到编程高手的详细学习路线攻略
2025年Java学习路线涵盖基础语法、面向对象、数据库、JavaWeb、Spring全家桶、分布式、云原生与高并发技术,结合实战项目与源码分析,助力零基础学员系统掌握Java开发技能,从入门到精通,全面提升竞争力,顺利进阶编程高手。
616 1
|
2月前
|
安全 前端开发 Java
从反射到方法句柄:深入探索Java动态编程的终极解决方案
从反射到方法句柄,Java 动态编程不断演进。方法句柄以强类型、低开销、易优化的特性,解决反射性能差、类型弱、安全性低等问题,结合 `invokedynamic` 成为支撑 Lambda 与动态语言的终极方案。
152 0
|
4月前
|
安全 Java 数据库连接
2025 年最新 Java 学习路线图含实操指南助你高效入门 Java 编程掌握核心技能
2025年最新Java学习路线图,涵盖基础环境搭建、核心特性(如密封类、虚拟线程)、模块化开发、响应式编程、主流框架(Spring Boot 3、Spring Security 6)、数据库操作(JPA + Hibernate 6)及微服务实战,助你掌握企业级开发技能。
602 3
|
3月前
|
Java 开发者
Java并发编程:CountDownLatch实战解析
Java并发编程:CountDownLatch实战解析
443 100
|
3月前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
237 16
|
3月前
|
NoSQL Java 关系型数据库
超全 Java 学习路线,帮你系统掌握编程的超详细 Java 学习路线
本文为超全Java学习路线,涵盖基础语法、面向对象编程、数据结构与算法、多线程、JVM原理、主流框架(如Spring Boot)、数据库(MySQL、Redis)及项目实战等内容,助力从零基础到企业级开发高手的进阶之路。
301 1