Hadoop MapReduce编程:计算最大值

简介:

其实,使用MapReduce计算最大值的问题,和Hadoop自带的WordCount的程序没什么区别,不过在Reducer中一个是求最大值,一个是做累加,本质一样,比较简单。下面我们结合一个例子来实现。

测试数据

我们通过自己的模拟程序,生成了一组简单的测试样本数据。输入数据的格式,截取一个片段,如下所示:

01 SG 253654006139495 253654006164392 619850464
02 KG 253654006225166 253654006252433 743485698
03 UZ 253654006248058 253654006271941 570409379
04 TT 253654006282019 253654006286839 23236775
05 BE 253654006276984 253654006301435 597874033
06 BO 253654006293624 253654006315946 498265375
07 SR 253654006308428 253654006330442 484613339
08 SV 253654006320312 253654006345405 629640166
09 LV 253654006330384 253654006359891 870680704
10 FJ 253654006351709 253654006374468 517965666

上面文本数据一行一行存储,一行包含4部分,分别表示:

  1. 国家代码
  2. 起始时间
  3. 截止时间
  4. 随机成本/权重估值

各个字段之间以空格号分隔。我们要计算的结果是,求各个国家(以国家代码标识)的成本估值的最大值。

编程实现

因为比较简单,直接看实际的代码。代码分为三个部分,当然是Mapper、Reducer、Driver。Mapper实现类为GlobalCostMapper,实现代码如下所示:

01 package org.shirdrn.kodz.inaction.hadoop.extremum.max;
02
03 import java.io.IOException;
04
05 import org.apache.hadoop.io.LongWritable;
06 import org.apache.hadoop.io.Text;
07 import org.apache.hadoop.mapreduce.Mapper;
08
09 public class GlobalCostMapper extends
10 Mapper<LongWritable, Text, Text, LongWritable> {
11
12 private final static LongWritable costValue = new LongWritable(0);
13 private Text code = new Text();
14
15 @Override
16 protected void map(LongWritable key, Text value, Context context)
17 throws IOException, InterruptedException {
18 // a line, such as 'SG 253654006139495 253654006164392 619850464'
19 String line = value.toString();
20 String[] array = line.split("\\s");
21 if (array.length == 4) {
22 String countryCode = array[0];
23 String strCost = array[3];
24 long cost = 0L;
25 try {
26 cost = Long.parseLong(strCost);
27 } catch (NumberFormatException e) {
28 cost = 0L;
29 }
30 if (cost != 0) {
31 code.set(countryCode);
32 costValue.set(cost);
33 context.write(code, costValue);
34 }
35 }
36 }
37 }

上面实现逻辑非常简单,就是根据空格分隔符,将各个字段的值分离出来,最后输出键值对。
接着,Mapper输出了的键值对列表,在Reducer中就需要进行合并化简,Reducer的实现类为GlobalCostReducer,实现代码如下所示:

01 package org.shirdrn.kodz.inaction.hadoop.extremum.max;
02
03 import java.io.IOException;
04 import java.util.Iterator;
05
06 import org.apache.hadoop.io.LongWritable;
07 import org.apache.hadoop.io.Text;
08 import org.apache.hadoop.mapreduce.Reducer;
09
10 public class GlobalCostReducer extends
11 Reducer<Text, LongWritable, Text, LongWritable> {
12
13 @Override
14 protected void reduce(Text key, Iterable<LongWritable> values,
15 Context context) throws IOException, InterruptedException {
16 long max = 0L;
17 Iterator<LongWritable> iter = values.iterator();
18 while (iter.hasNext()) {
19 LongWritable current = iter.next();
20 if (current.get() > max) {
21 max = current.get();
22 }
23 }
24 context.write(key, new LongWritable(max));
25 }
26 }

上面计算一组键值对列表中代价估值的最大值,逻辑比较简单。为了优化,在Map输出以后,可以使用该Reducer进行合并操作,即作为Combiner,减少从Mapper到Reducer的数据传输量,在配置Job的时候可以指定。
下面看,如何来配置和运行一个Job,实现类为GlobalMaxCostDriver,实现代码如下所示:

01 package org.shirdrn.kodz.inaction.hadoop.extremum.max;
02
03 import java.io.IOException;
04
05 import org.apache.hadoop.conf.Configuration;
06 import org.apache.hadoop.fs.Path;
07 import org.apache.hadoop.io.LongWritable;
08 import org.apache.hadoop.io.Text;
09 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12 import org.apache.hadoop.util.GenericOptionsParser;
13
14 public class GlobalMaxCostDriver {
15
16 public static void main(String[] args) throws IOException,
17 InterruptedException, ClassNotFoundException {
18
19 Configuration conf = new Configuration();
20 String[] otherArgs = new GenericOptionsParser(conf, args)
21 .getRemainingArgs();
22 if (otherArgs.length != 2) {
23 System.err.println("Usage: maxcost <in> <out>");
24 System.exit(2);
25 }
26
27 Job job = new Job(conf, "max cost");
28
29 job.setJarByClass(GlobalMaxCostDriver.class);
30 job.setMapperClass(GlobalCostMapper.class);
31 job.setCombinerClass(GlobalCostReducer.class);
32 job.setReducerClass(GlobalCostReducer.class);
33
34 job.setOutputKeyClass(Text.class);
35 job.setOutputValueClass(LongWritable.class);
36
37 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
38 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
39
40 int exitFlag = job.waitForCompletion(true) ? 0 : 1;
41 System.exit(exitFlag);
42 }
43 }

运行程序

首先,需要保证Hadoop集群正常运行,我这里NameNode是主机ubuntu3。下面看运行程序的过程:

  • 编译代码(我直接使用Maven进行),打成jar文件
1 shirdrn@SYJ:~/programs/eclipse-jee-juno/workspace/kodz-all/kodz-hadoop/target/classes$ jar -cvf global-max-cost.jar -C ./ org
  • 拷贝上面生成的jar文件,到NameNode环境中
1 xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ scpshirdrn@172.0.8.212:~/programs/eclipse-jee-juno/workspace/kodz-all/kodz-hadoop/target/classes/global-max-cost.jar ./
2 global-max-cost.jar
  • 上传待处理的数据文件
1 xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -copyFromLocal /opt/stone/cloud/dataset/data_10m /user/xiaoxiang/datasets/cost/
  • 运行我们编写MapReduce任务,计算最大值
1 xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop jar global-max-cost.jar org.shirdrn.kodz.inaction.hadoop.extremum.max.GlobalMaxCostDriver /user/xiaoxiang/datasets/cost /user/xiaoxiang/output/cost

运行过程控制台输出内容,大概如下所示:

01 13/03/22 16:30:16 INFO input.FileInputFormat: Total input paths to process : 1
02 13/03/22 16:30:16 INFO util.NativeCodeLoader: Loaded the native-hadoop library
03 13/03/22 16:30:16 WARN snappy.LoadSnappy: Snappy native library not loaded
04 13/03/22 16:30:16 INFO mapred.JobClient: Running job: job_201303111631_0004
05 13/03/22 16:30:17 INFO mapred.JobClient: map 0% reduce 0%
06 13/03/22 16:30:33 INFO mapred.JobClient: map 22% reduce 0%
07 13/03/22 16:30:36 INFO mapred.JobClient: map 28% reduce 0%
08 13/03/22 16:30:45 INFO mapred.JobClient: map 52% reduce 9%
09 13/03/22 16:30:48 INFO mapred.JobClient: map 57% reduce 9%
10 13/03/22 16:30:57 INFO mapred.JobClient: map 80% reduce 9%
11 13/03/22 16:31:00 INFO mapred.JobClient: map 85% reduce 19%
12 13/03/22 16:31:10 INFO mapred.JobClient: map 100% reduce 28%
13 13/03/22 16:31:19 INFO mapred.JobClient: map 100% reduce 100%
14 13/03/22 16:31:24 INFO mapred.JobClient: Job complete: job_201303111631_0004
15 13/03/22 16:31:24 INFO mapred.JobClient: Counters: 29
16 13/03/22 16:31:24 INFO mapred.JobClient: Job Counters
17 13/03/22 16:31:24 INFO mapred.JobClient: Launched reduce tasks=1
18 13/03/22 16:31:24 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=76773
19 13/03/22 16:31:24 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
20 13/03/22 16:31:24 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
21 13/03/22 16:31:24 INFO mapred.JobClient: Launched map tasks=7
22 13/03/22 16:31:24 INFO mapred.JobClient: Data-local map tasks=7
23 13/03/22 16:31:24 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=40497
24 13/03/22 16:31:24 INFO mapred.JobClient: File Output Format Counters
25 13/03/22 16:31:24 INFO mapred.JobClient: Bytes Written=3029
26 13/03/22 16:31:24 INFO mapred.JobClient: FileSystemCounters
27 13/03/22 16:31:24 INFO mapred.JobClient: FILE_BYTES_READ=142609
28 13/03/22 16:31:24 INFO mapred.JobClient: HDFS_BYTES_READ=448913653
29 13/03/22 16:31:24 INFO mapred.JobClient: FILE_BYTES_WRITTEN=338151
30 13/03/22 16:31:24 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=3029
31 13/03/22 16:31:24 INFO mapred.JobClient: File Input Format Counters
32 13/03/22 16:31:24 INFO mapred.JobClient: Bytes Read=448912799
33 13/03/22 16:31:24 INFO mapred.JobClient: Map-Reduce Framework
34 13/03/22 16:31:24 INFO mapred.JobClient: Map output materialized bytes=21245
35 13/03/22 16:31:24 INFO mapred.JobClient: Map input records=10000000
36 13/03/22 16:31:24 INFO mapred.JobClient: Reduce shuffle bytes=18210
37 13/03/22 16:31:24 INFO mapred.JobClient: Spilled Records=12582
38 13/03/22 16:31:24 INFO mapred.JobClient: Map output bytes=110000000
39 13/03/22 16:31:24 INFO mapred.JobClient: CPU time spent (ms)=80320
40 13/03/22 16:31:24 INFO mapred.JobClient: Total committed heap usage (bytes)=1535639552
41 13/03/22 16:31:24 INFO mapred.JobClient: Combine input records=10009320
42 13/03/22 16:31:24 INFO mapred.JobClient: SPLIT_RAW_BYTES=854
43 13/03/22 16:31:24 INFO mapred.JobClient: Reduce input records=1631
44 13/03/22 16:31:24 INFO mapred.JobClient: Reduce input groups=233
45 13/03/22 16:31:24 INFO mapred.JobClient: Combine output records=10951
46 13/03/22 16:31:24 INFO mapred.JobClient: Physical memory (bytes) snapshot=1706708992
47 13/03/22 16:31:24 INFO mapred.JobClient: Reduce output records=233
48 13/03/22 16:31:24 INFO mapred.JobClient: Virtual memory (bytes) snapshot=4316872704
49 13/03/22 16:31:24 INFO mapred.JobClient: Map output records=10000000
  • 验证Job结果输出
001 xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -cat/user/xiaoxiang/output/cost/part-r-00000
002 AD 999974516
003 AE 999938630
004 AF 999996180
005 AG 999991085
006 AI 999989595
007 AL 999998489
008 AM 999976746
009 AO 999989628
010 AQ 999995031
011 AR 999953989
012 AS 999935982
013 AT 999999909
014 AU 999937089
015 AW 999965784
016 AZ 999996557
017 BA 999949773
018 BB 999987345
019 BD 999992272
020 BE 999925057
021 BF 999999220
022 BG 999971528
023 BH 999994900
024 BI 999978516
025 BJ 999977886
026 BM 999991925
027 BN 999986630
028 BO 999995482
029 BR 999989947
030 BS 999980931
031 BT 999977488
032 BW 999935985
033 BY 999998496
034 BZ 999975972
035 CA 999978275
036 CC 999968311
037 CD 999978139
038 CF 999995342
039 CG 999788112
040 CH 999997524
041 CI 999998864
042 CK 999968719
043 CL 999967083
044 CM 999998369
045 CN 999975367
046 CO 999999167
047 CR 999971685
048 CU 999976352
049 CV 999990543
050 CW 999987713
051 CX 999987579
052 CY 999982925
053 CZ 999993908
054 DE 999985416
055 DJ 999997438
056 DK 999963312
057 DM 999941706
058 DO 999945597
059 DZ 999973610
060 EC 999920447
061 EE 999949534
062 EG 999980522
063 ER 999980425
064 ES 999949155
065 ET 999987033
066 FI 999966243
067 FJ 999990686
068 FK 999966573
069 FM 999972146
070 FO 999988472
071 FR 999988342
072 GA 999982099
073 GB 999970658
074 GD 999996318
075 GE 999991970
076 GF 999982024
077 GH 999941039
078 GI 999995295
079 GL 999948726
080 GM 999967823
081 GN 999951804
082 GP 999904645
083 GQ 999988635
084 GR 999999672
085 GT 999972984
086 GU 999919056
087 GW 999962551
088 GY 999999881
089 HK 999970084
090 HN 999972628
091 HR 999986688
092 HT 999970913
093 HU 999997568
094 ID 999994762
095 IE 999996686
096 IL 999982184
097 IM 999987831
098 IN 999914991
099 IO 999968575
100 IQ 999990126
101 IR 999986780
102 IS 999973585
103 IT 999997239
104 JM 999982209
105 JO 999977276
106 JP 999983684
107 KE 999996012
108 KG 999991556
109 KH 999975644
110 KI 999994328
111 KM 999989895
112 KN 999991068
113 KP 999967939
114 KR 999992162
115 KW 999924295
116 KY 999977105
117 KZ 999992835
118 LA 999989151
119 LB 999963014
120 LC 999962233
121 LI 999986863
122 LK 999989876
123 LR 999897202
124 LS 999957706
125 LT 999999688
126 LU 999999823
127 LV 999945411
128 LY 999992365
129 MA 999922726
130 MC 999978886
131 MD 999996042
132 MG 999996602
133 MH 999989668
134 MK 999968900
135 ML 999990079
136 MM 999987977
137 MN 999969051
138 MO 999977975
139 MP 999995234
140 MQ 999913110
141 MR 999982303
142 MS 999974690
143 MT 999982604
144 MU 999988632
145 MV 999961206
146 MW 999991903
147 MX 999978066
148 MY 999995010
149 MZ 999981189
150 NA 999961177
151 NC 999961053
152 NE 999990091
153 NF 999989399
154 NG 999985037
155 NI 999965733
156 NL 999949789
157 NO 999993122
158 NP 999972410
159 NR 999956464
160 NU 999987046
161 NZ 999998214
162 OM 999967428
163 PA 999924435
164 PE 999981176
165 PF 999959978
166 PG 999987347
167 PH 999981534
168 PK 999954268
169 PL 999996619
170 PM 999998975
171 PR 999906386
172 PT 999993404
173 PW 999991278
174 PY 999985509
175 QA 999995061
176 RE 999952291
177 RO 999994148
178 RS 999999923
179 RU 999894985
180 RW 999980184
181 SA 999973822
182 SB 999972832
183 SC 999973271
184 SD 999963744
185 SE 999972256
186 SG 999977637
187 SH 999983638
188 SI 999980580
189 SK 999998152
190 SL 999999269
191 SM 999941188
192 SN 999990278
193 SO 999973175
194 SR 999975964
195 ST 999980447
196 SV 999999945
197 SX 999903445
198 SY 999988858
199 SZ 999992537
200 TC 999969540
201 TD 999999303
202 TG 999977640
203 TH 999968746
204 TJ 999983666
205 TK 999971131
206 TM 999958998
207 TN 999963035
208 TO 999947915
209 TP 999986796
210 TR 999995112
211 TT 999984435
212 TV 999971989
213 TW 999975092
214 TZ 999992734
215 UA 999970993
216 UG 999976267
217 UM 999998377
218 US 999912229
219 UY 999989662
220 UZ 999982762
221 VA 999975548
222 VC 999991495
223 VE 999997971
224 VG 999949690
225 VI 999990063
226 VN 999974393
227 VU 999953162
228 WF 999947666
229 WS 999970242
230 YE 999984650
231 YT 999994707
232 ZA 999998692
233 ZM 999973392
234 ZW 999928087

可见,结果是我们所期望的。

目录
相关文章
|
3天前
|
存储 分布式计算 监控
Hadoop【基础知识 01+02】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
141 2
|
2天前
|
分布式计算 并行计算 搜索推荐
Hadoop MapReduce计算框架
【5月更文挑战第10天】HadoopMapReduce计算框架
13 3
|
3天前
|
分布式计算 数据可视化 Hadoop
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
38 0
|
3天前
|
分布式计算 并行计算 Java
【分布式计算框架】 MapReduce编程初级实践
【分布式计算框架】 MapReduce编程初级实践
9 2
|
3天前
|
分布式计算 资源调度 Hadoop
java与大数据:Hadoop与MapReduce
java与大数据:Hadoop与MapReduce
27 0
|
3天前
|
机器学习/深度学习 分布式计算 监控
面经:MapReduce编程模型与优化策略详解
【4月更文挑战第10天】本文是关于MapReduce在大数据处理中的关键作用的博客摘要。作者分享了面试经验,强调了MapReduce的基本原理、Hadoop API、优化策略和应用场景。MapReduce包含Map和Reduce两个主要阶段,Map阶段处理输入数据生成中间键值对,Reduce阶段进行聚合计算。面试重点包括理解MapReduce工作流程、使用Hadoop API编写Map/Reduce函数、选择优化策略(如分区、Combiner和序列化)以及应用场景,如日志分析和机器学习。
23 2
|
3天前
|
分布式计算 监控 Hadoop
Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
59 0
|
3天前
|
存储 分布式计算 Hadoop
大数据处理架构Hadoop
【4月更文挑战第10天】Hadoop是开源的分布式计算框架,核心包括MapReduce和HDFS,用于海量数据的存储和计算。具备高可靠性、高扩展性、高效率和低成本优势,但存在低延迟访问、小文件存储和多用户写入等问题。运行模式有单机、伪分布式和分布式。NameNode管理文件系统,DataNode存储数据并处理请求。Hadoop为大数据处理提供高效可靠的解决方案。
95 2
|
3天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
3天前
|
存储 分布式计算 Hadoop
【专栏】Hadoop,开源大数据处理框架:驭服数据洪流的利器
【4月更文挑战第28天】Hadoop,开源大数据处理框架,由Hadoop Common、HDFS、YARN和MapReduce组成,提供大规模数据存储和并行处理。其优势在于可扩展性、容错性、高性能、灵活性及社区支持。然而,数据安全、处理速度、系统复杂性和技能短缺是挑战。通过加强安全措施、结合Spark、自动化工具和培训,Hadoop在应对大数据问题中保持关键地位。

相关实验场景

更多