Hadoop MapReduce编程 API入门系列之分区和合并(十四)

简介:

 

 

 

 

 

 

 

 

 

 

 

 

代码

复制代码
  1 package zhouls.bigdata.myMapReduce.Star;
  2 
  3 
  4 import java.io.IOException;
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.conf.Configured;
  7 import org.apache.hadoop.fs.FileSystem;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Partitioner;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16 import org.apache.hadoop.util.Tool;
 17 import org.apache.hadoop.util.ToolRunner;
 18 /**
 19  * 
 20  * @function 统计分别统计出男女明星最大搜索指数
 21  * @author 小讲
 22  */
 23  
 24  /*
 25 姓名    性别    搜索指数
 26 李易峰    male    32670
 27 朴信惠    female    13309
 28 林心如    female    5242
 29 黄海波    male    5505
 30 成龙    male    7757
 31 刘亦菲    female    14830
 32 angelababy    female    55083
 33 王宝强    male    9472
 34 郑爽    female    9279
 35 周杰伦    male    42020
 36 莫小棋    female    13978
 37 朱一龙    male    10524
 38 宋智孝    female    12494
 39 吴京    male    6684
 40 赵丽颖    female    24174
 41 尹恩惠    female    5985
 42 李金铭    female    5925
 43 关之琳    female    7668
 44 邓超    male    11532
 45 钟汉良    male    8289
 46 周润发    male    4808
 47 甄子丹    male    5479
 48 林妙可    female    5306
 49 柳岩    female    8221
 50 蔡琳    female    7320
 51 张佳宁    female    6628
 52 裴涩琪    female    5658
 53 李晨    male    9559
 54 周星驰    male    11483
 55 杨紫    female    11094
 56 全智贤    female    5336
 57 张柏芝    female    9337
 58 孙俪    female    7295
 59 鲍蕾    female    5375
 60 杨幂    female    20238
 61 刘德华    male    19786
 62 柯震东    male    6398
 63 张国荣    male    5013
 64 王阳    male    5169
 65 李小龙    male    6859
 66 林志颖    male    4512
 67 林正英    male    5832
 68 吴秀波    male    5668
 69 陈伟霆    male    12817
 70 陈奕迅    male    10472
 71 赵又廷    male    5190
 72 张馨予    female    35062
 73 陈晓    male    17901
 74 赵韩樱子    female    7077
 75 乔振宇    male    8877
 76 宋慧乔    female    5708
 77 韩艺瑟    female    5426
 78 张翰    male    7012
 79 谢霆锋    male    6654
 80 刘晓庆    female    5553
 81 陈翔    male    7999
 82 陈学冬    male    8829
 83 秋瓷炫    female    6504
 84 王祖蓝    male    6662
 85 吴亦凡    male    16472
 86 陈妍希    female    32590
 87 倪妮    female    9278
 88 高梓淇    male    7101
 89 赵奕欢    female    7197
 90 赵本山    male    12655
 91 高圆圆    female    13688
 92 陈赫    male    6820
 93 鹿晗    male    32492
 94 贾玲    female    5304
 95 宋佳    female    6202
 96 郭碧婷    female    5295
 97 唐嫣    female    12055
 98 杨蓉    female    10512
 99 李钟硕    male    26278
100 郑秀晶    female    10479
101 熊黛林    female    26732
102 金秀贤    male    11370
103 古天乐    male    4954
104 黄晓明    male    10964
105 李敏镐    male    10512
106 王丽坤    female    5501
107 谢依霖    female    7000
108 陈冠希    male    9135
109 范冰冰    female    13734
110 姚笛    female    6953
111 彭于晏    male    14136
112 张学友    male    4578
113 谢娜    female    6886
114 胡歌    male    8015
115 古力娜扎    female    8858
116 黄渤    male    7825
117 周韦彤    female    7677
118 刘诗诗    female    16548
119 郭德纲    male    10307
120 郑恺    male    21145
121 赵薇    female    5339
122 李连杰    male    4621
123 宋茜    female    11164
124 任重    male    8383
125 李若彤    female    9968
126 
127 
128 得到:
129 angelababy    female    55083
130 周杰伦    male    42020
131 */
132 public class Star extends Configured implements Tool{
133     /**
134      * @function Mapper 解析明星数据
135      * @input key=偏移量  value=明星数据
136      * @output key=gender value=name+hotIndex
137      */
138     public static class ActorMapper extends Mapper<Object,Text,Text,Text>{
139                     //在这个例子里,第一个参数Object是Hadoop根据默认值生成的,一般是文件块里的一行文字的行偏移数,这些偏移数不重要,在处理时候一般用不上
140         public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
141         //拿:周杰伦    male    42020
142             //value=name+gender+hotIndex
143             String[] tokens = value.toString().split("\t");//使用分隔符\t,将数据解析为数组 tokens
144             String gender = tokens[1].trim();//性别,trim()是去除两边空格的方法
145                         //tokens[0]        tokens[1]        tokens[2]        
146                         //周杰伦        male            42020
147             String nameHotIndex = tokens[0] + "\t" + tokens[2];//名称和关注指数
148             //输出key=gender value=name+hotIndex
149             context.write(new Text(gender), new Text(nameHotIndex));//写入gender是k2,nameHotIndex是v2
150 //            context.write(gender,nameHotIndex);等价        
151             //将gender和nameHotIndex写入到context中
152         }
153     }
154 
155     
156     
157     /**
158      * @function Partitioner 根据sex选择分区
159      */
160     public static class ActorPartitioner extends Partitioner<Text, Text>{     
161         @Override
162         public int getPartition(Text key, Text value, int numReduceTasks){
163             String sex = key.toString();//按性别分区
164             
165             // 默认指定分区 0
166             if(numReduceTasks==0)
167                 return 0;
168             
169             //性别为male 选择分区0
170             if(sex.equals("male"))             
171                 return 0;
172             //性别为female 选择分区1
173             if(sex.equals("female"))
174                 return 1 % numReduceTasks;
175             //其他性别 选择分区2
176             else
177                 return 2 % numReduceTasks;
178            
179         }
180     }
181 
182     
183     
184     /**
185      * @function 定义Combiner 合并 Mapper 输出结果
186      */
187     public static class ActorCombiner extends Reducer<Text, Text, Text, Text>{
188         private Text text = new Text();
189         @Override
190         public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException{
191             int maxHotIndex = Integer.MIN_VALUE;
192             int hotIndex = 0;
193             String name="";
194             for (Text val : values){//星型for循环,即把values的值传给Text val
195                 String[] valTokens = val.toString().split("\\t");
196                 hotIndex = Integer.parseInt(valTokens[1]);
197                 if(hotIndex>maxHotIndex){
198                     name = valTokens[0];
199                     maxHotIndex = hotIndex;
200                 }
201             }
202             text.set(name+"\t"+maxHotIndex);
203             context.write(key, text);
204         }
205     }
206     
207     
208     
209     /**
210      * @function Reducer 统计男、女明星最高搜索指数
211      * @input key=gender  value=name+hotIndex
212      * @output key=name value=gender+hotIndex(max)
213      */
214     public static class ActorReducer extends Reducer<Text,Text,Text,Text>{
215         @Override
216         public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException{
217             int maxHotIndex = Integer.MIN_VALUE;
218 
219             String name = " ";
220             int hotIndex = 0;
221             // 根据key,迭代 values 集合,求出最高搜索指数
222             for (Text val : values){//星型for循环,即把values的值传给Text val
223                 String[] valTokens = val.toString().split("\\t");
224                 hotIndex = Integer.parseInt(valTokens[1]);
225                 if (hotIndex > maxHotIndex){
226                     name = valTokens[0];
227                     maxHotIndex = hotIndex;
228                 }
229             }
230             context.write(new Text(name), new Text(key + "\t"+ maxHotIndex));//写入name是k3,key + "\t"+ maxHotIndex是v3
231 //            context.write(name,key + "\t"+ maxHotIndex);//等价            
232         }
233     }
234 
235     /**
236      * @function 任务驱动方法
237      * @param args
238      * @return
239      * @throws Exception
240      */
241 
242     public int run(String[] args) throws Exception{
243         // TODO Auto-generated method stub
244         
245         Configuration conf = new Configuration();//读取配置文件,比如core-site.xml等等
246         Path mypath = new Path(args[1]);//Path对象mypath
247         FileSystem hdfs = mypath.getFileSystem(conf);//FileSystem对象hdfs
248         if (hdfs.isDirectory(mypath)){    
249             hdfs.delete(mypath, true);
250         }
251 
252         Job job = new Job(conf, "star");//新建一个任务
253         job.setJarByClass(Star.class);//主类
254         
255         job.setNumReduceTasks(2);//reduce的个数设置为2
256         job.setPartitionerClass(ActorPartitioner.class);//设置Partitioner类
257         
258         job.setMapperClass(ActorMapper.class);//Mapper
259         job.setMapOutputKeyClass(Text.class);//map 输出key类型
260         job.setMapOutputValueClass(Text.class);//map 输出value类型
261                 
262         job.setCombinerClass(ActorCombiner.class);//设置Combiner类
263         
264         job.setReducerClass(ActorReducer.class);//Reducer
265         job.setOutputKeyClass(Text.class);//输出结果 key类型
266         job.setOutputValueClass(Text.class);//输出结果 value类型
267         
268         FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
269         FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
270         job.waitForCompletion(true);//提交任务
271         return 0;
272     }
273     
274     
275     /**
276      * @function main 方法
277      * @param args
278      * @throws Exception
279      */
280     public static void main(String[] args) throws Exception{
281 //        String[] args0 = { "hdfs://HadoopMaster:9000/star/star.txt",
282 //                            "hdfs://HadoopMaster:9000/out/star/" };
283         String[] args0 = { "./data/star/star.txt",
284                             "./out/star" };
285         
286         int ec = ToolRunner.run(new Configuration(), new Star(), args0);
287         System.exit(ec);
288     }
289 }
复制代码

 


本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6165047.html,如需转载请自行联系原作者

相关文章
|
1月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
62 2
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
84 3
|
1月前
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
30 1
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
45 1
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
79 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
35 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
44 0
|
3月前
|
分布式计算 负载均衡 Hadoop
MapReduce 分区器的作用与重要性
【8月更文挑战第31天】
52 1
|
3月前
|
分布式计算 Hadoop Java
面向开发者的Hadoop编程指南
【8月更文第28天】Hadoop是一个开源软件框架,用于分布式存储和处理大规模数据集。它由Hadoop分布式文件系统(HDFS)和MapReduce编程模型组成。本指南旨在帮助初学者和中级开发者快速掌握Hadoop的基本概念和编程技巧,并通过一些简单的示例来加深理解。
98 0
|
3月前
|
缓存 分布式计算 算法
优化Hadoop MapReduce性能的最佳实践
【8月更文第28天】Hadoop MapReduce是一个用于处理大规模数据集的软件框架,适用于分布式计算环境。虽然MapReduce框架本身具有很好的可扩展性和容错性,但在某些情况下,任务执行可能会因为各种原因导致性能瓶颈。本文将探讨如何通过调整配置参数和优化算法逻辑来提高MapReduce任务的效率。
455 0
下一篇
无影云桌面