Hadoop MapReduce编程 API入门系列之FOF(Fund of Fund)

简介:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

代码

package zhouls.bigdata.myMapReduce.friend;

import org.apache.hadoop.io.Text;

public class Fof extends Text{//自定义Fof,表示f1和f2关系

public Fof(){//无参构造
super();
}

public Fof(String a,String b){//有参构造
super(getFof(a, b));
}

public static String getFof(String a,String b){
int r =a.compareTo(b);
if(r<0){
return a+"\t"+b;
}else{
return b+"\t"+a;
}
}
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.friend;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class User implements WritableComparable<User>{
//WritableComparable,实现这个方法,要多很多
//readFields是读入,write是写出
private String uname;
private int friendsCount;

public String getUname() {
return uname;
}
public void setUname(String uname) {
this.uname = uname;
}
public int getFriendsCount() {
return friendsCount;
}
public void setFriendsCount(int friendsCount) {
this.friendsCount = friendsCount;
}//这一大段的get和set,可以右键,source,产生get和set,自动生成。

public User() {//无参构造

}

public User(String uname,int friendsCount){//有参构造
this.uname=uname;
this.friendsCount=friendsCount;
}

public void write(DataOutput out) throws IOException { //序列化
out.writeUTF(uname);
out.writeInt(friendsCount);
}


public void readFields(DataInput in) throws IOException {//反序列化
this.uname=in.readUTF();
this.friendsCount=in.readInt();
}

public int compareTo(User o) {//核心
int result = this.uname.compareTo(o.getUname());
if(result==0){
return Integer.compare(this.friendsCount, o.getFriendsCount());
}
return result;
}



}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.friend;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class FoFSort extends WritableComparator{

public FoFSort() {//把自定义的User,传进了
super(User.class,true);
}

public int compare(WritableComparable a, WritableComparable b) {//排序核心
User u1 =(User) a;
User u2=(User) b;

int result =u1.getUname().compareTo(u2.getUname());
if(result==0){
return -Integer.compare(u1.getFriendsCount(), u2.getFriendsCount());
}
return result;
}
}

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.friend;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class FoFGroup extends WritableComparator{

public FoFGroup() {//把自定义的User,传进了
super(User.class,true);
}

public int compare(WritableComparable a, WritableComparable b) {//分组核心
User u1 =(User) a;
User u2=(User) b;

return u1.getUname().compareTo(u2.getUname());
}
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.friend;


import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StringUtils;

public class RunJob {

// 小明 老王 如花 林志玲
// 老王 小明 凤姐 排序在FoFSort.java
// 如花 小明 李刚 凤姐
// 林志玲 小明 李刚 凤姐 郭美美 分组在FoFGroup.java
// 李刚 如花 凤姐 林志玲
// 郭美美 凤姐 林志玲
// 凤姐 如花 老王 林志玲 郭美美


public static void main(String[] args) {
Configuration config =new Configuration();
// config.set("fs.defaultFS", "hdfs://HadoopMaster:9000");
// config.set("yarn.resourcemanager.hostname", "HadoopMaster");
// config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");
// config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");//默认分隔符是制表符"\t",这里自定义,如","
if(run1(config)){
run2(config);//设置两个run,即两个mr。
}
}

public static void run2(Configuration config) {
try {
FileSystem fs =FileSystem.get(config);
Job job =Job.getInstance(config);
job.setJarByClass(RunJob.class);

job.setJobName("fof2");

job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
job.setSortComparatorClass(FoFSort.class);
job.setGroupingComparatorClass(FoFGroup.class);
job.setMapOutputKeyClass(User.class);
job.setMapOutputValueClass(User.class);

job.setInputFormatClass(KeyValueTextInputFormat.class);

// //设置MR执行的输入文件
// FileInputFormat.addInputPath(job, new Path("hdfs://HadoopMaster:9000/f1"));
//
// //该目录表示MR执行之后的结果数据所在目录,必须不能存在
// Path outputPath=new Path("hdfs://HadoopMaster:9000/out/f2");

//设置MR执行的输入文件
FileInputFormat.addInputPath(job, new Path("./out/f1"));

//该目录表示MR执行之后的结果数据所在目录,必须不能存在
Path outputPath=new Path("./out/f2");


if(fs.exists(outputPath)){
fs.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job, outputPath);

boolean f =job.waitForCompletion(true);
if(f){
System.out.println("job 成功执行");
}

} catch (Exception e) {
e.printStackTrace();

}

public static boolean run1(Configuration config) {
try {
FileSystem fs =FileSystem.get(config);
Job job =Job.getInstance(config);
job.setJarByClass(RunJob.class);
job.setJobName("friend");
job.setMapperClass(FofMapper.class);
job.setReducerClass(FofReducer.class);
job.setMapOutputKeyClass(Fof.class);
job.setMapOutputValueClass(IntWritable.class);

job.setInputFormatClass(KeyValueTextInputFormat.class);

// FileInputFormat.addInputPath(job, new Path("hdfs://HadoopMaster:9000/friend/friend.txt"));//下有friend.txt
//
// Path outpath =new Path("hdfs://HadoopMaster:9000/out/f1");

FileInputFormat.addInputPath(job, new Path("./data/friend/friend.txt"));//下有friend.txt

Path outpath =new Path("./out/f1");



if(fs.exists(outpath)){
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);

boolean f= job.waitForCompletion(true);
return f;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}

static class FofMapper extends Mapper<Text, Text, Fof, IntWritable>{
protected void map(Text key, Text value,
Context context)
throws IOException, InterruptedException {
String user =key.toString();
String[] friends =StringUtils.split(value.toString(), '\t');
for (int i = 0; i < friends.length; i++) {
String f1 = friends[i];
Fof ofof =new Fof(user, f1);
context.write(ofof, new IntWritable(0));
for (int j = i+1; j < friends.length; j++) {
String f2 = friends[j];
Fof fof =new Fof(f1, f2);
context.write(fof, new IntWritable(1));
}
}
}
}

static class FofReducer extends Reducer<Fof, IntWritable, Fof, IntWritable>{
protected void reduce(Fof arg0, Iterable<IntWritable> arg1,
Context arg2)
throws IOException, InterruptedException {
int sum =0;
boolean f =true;
for(IntWritable i: arg1){
if(i.get()==0){
f=false;
break;
}else{
sum=sum+i.get();
}
}
if(f){
arg2.write(arg0, new IntWritable(sum));
}
}
}

static class SortMapper extends Mapper<Text, Text, User, User>{

protected void map(Text key, Text value,
Context context)
throws IOException, InterruptedException {
String[] args=StringUtils.split(value.toString(),'\t');
String other=args[0];
int friendsCount =Integer.parseInt(args[1]);

context.write(new User(key.toString(),friendsCount), new User(other,friendsCount));
context.write(new User(other,friendsCount), new User(key.toString(),friendsCount));
}
}

static class SortReducer extends Reducer<User, User, Text, Text>{
protected void reduce(User arg0, Iterable<User> arg1,
Context arg2)
throws IOException, InterruptedException {
String user =arg0.getUname();
StringBuffer sb =new StringBuffer();
for(User u: arg1 ){
sb.append(u.getUname()+":"+u.getFriendsCount());
sb.append(",");
}
arg2.write(new Text(user), new Text(sb.toString()));
}
}


本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6166095.html,如需转载请自行联系原作者

相关文章
|
5月前
|
JSON 安全 API
电商API入门问答:开发者必知的10个基础问题
本文详解电商API的10个基础知识,涵盖定义、用途、认证、安全等内容,帮助开发者快速入门并提升开发效率。
151 0
|
5月前
|
缓存 监控 安全
电商API集成入门:从零开始搭建高效接口
在数字化电商时代,API集成成为企业提升效率、实现系统互联的关键。本文从零开始,逐步讲解如何搭建高效、可靠的电商API接口,适合初学者学习。内容涵盖API基础、认证安全、请求处理、性能优化等核心步骤,并提供Python代码示例与数学公式辅助理解。通过实践,读者可掌握构建优质电商API的技巧,提升用户体验与系统性能。
250 0
|
2月前
|
Cloud Native 算法 API
Python API接口实战指南:从入门到精通
🌟蒋星熠Jaxonic,技术宇宙的星际旅人。深耕API开发,以Python为舟,探索RESTful、GraphQL等接口奥秘。擅长requests、aiohttp实战,专注性能优化与架构设计,用代码连接万物,谱写极客诗篇。
Python API接口实战指南:从入门到精通
|
8月前
|
JSON 算法 API
一文掌握 1688 商品详情 API 接口:从入门到实战
1688是国内领先的综合电商批发平台,提供海量商品资源。其商品详情API助力开发者与企业获取商品的详细信息(如属性、价格、库存等),广泛应用于电商数据分析、比价系统及采购场景。API支持GET/POST请求,需传入通用参数(app_key、timestamp等)与业务参数(如product_id)。返回JSON格式数据,包含商品标题、价格、图片链接等详情,提升业务效率与决策精准度。
|
8月前
|
搜索推荐 API 开发者
京东商品列表 API 接口全解析:从入门到精通
京东商品列表API是京东开放平台为开发者提供的核心数据接口,支持批量获取商品基础信息、价格、库存状态等多维度数据。它具备数据丰富性、灵活筛选与分页查询、稳定高效等特点,可满足市场分析、选品优化、比价工具及推荐系统开发等需求,为电商业务创新提供坚实支撑。通过标准化通道,助力第三方高效、合法地利用京东海量商品数据。
|
7月前
|
JSON API 开发工具
电商API接口入门指南
本文介绍了API的基础知识及其在电商领域的实际应用。首先,阐释了API的概念、运作机制及参数与返回值的作用,帮助读者理解如何通过API实现软件间的交互。接着,以获取电商商品列表为例,详细讲解了从选择平台、引入SDK到编写代码调用API的全流程。示例代码采用Python语言,利用requests库发送请求并解析JSON数据,为开发者提供了清晰的实践指导。
|
5月前
|
存储 安全 API
亚马逊SP-API入门:海外电商接口调用与国内平台的差异化
亚马逊 SP-API 与国内电商 API 在技术架构、安全机制及开发流程上差异显著。本文对比京东、淘宝等平台,分析接口设计、地域适配、权限管理等核心差异,并结合实战经验提供开发建议,助力开发者高效接入 SP-API,实现全球电商业务拓展。
|
7月前
|
数据挖掘 API 开发者
京东商品详情 API 接口全攻略:从入门到精通
京东商品详情API接口是京东开放平台为开发者提供的服务,用于获取商品详细信息。通过调用接口,开发者可获得商品属性、价格、库存、促销信息等数据,适用于电商应用、价格比较工具及数据分析平台等场景。支持GET/POST请求方式,参数包括API版本、密钥等。示例代码展示了如何使用Python的requests库调用该接口,并获取JSON格式的返回数据,包含商品基本信息、价格、库存和用户评价等内容。
300 16
|
开发框架 .NET API
RESTful API 设计与实现:C# 开发者的一分钟入门
【10月更文挑战第5天】本文从零开始,介绍了如何使用 C# 和 ASP.NET Core 设计并实现一个简单的 RESTful API。首先解释了 RESTful API 的概念及其核心原则,然后详细说明了设计 RESTful API 的关键步骤,包括资源识别、URI 设计、HTTP 方法选择、状态码使用和错误处理。最后,通过一个用户管理 API 的示例,演示了如何创建项目、定义模型、实现控制器及运行测试,帮助读者掌握 RESTful API 的开发技巧。
538 7
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
291 2