简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行

简介: 简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行

如果你想试着做一个mapreduce,下面刚好,阅读大约6分钟


简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行

程序源码

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Score {
    public static class Map extends
            Mapper<LongWritable, Text, Text, IntWritable> {
        // 实现map函数
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // 将输入的纯文本文件的数据转化成String
            String line = value.toString();
            // 将输入的数据首先按行进行分割
            StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");
            // 分别对每一行进行处理
            while (tokenizerArticle.hasMoreElements()) {
                // 每行按空格划分
                StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());
                String strName = tokenizerLine.nextToken();// 学生姓名部分
                String strScore = tokenizerLine.nextToken();// 成绩部分
                Text name = new Text(strName);
                int scoreInt = Integer.parseInt(strScore);
                // 输出姓名和成绩
                context.write(name, new IntWritable(scoreInt));
            }
        }
    }
 
 
 
    public static class Reduce extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        // 实现reduce函数
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            int count = 0;
            Iterator<IntWritable> iterator = values.iterator();
            while (iterator.hasNext()) {
                sum += iterator.next().get();// 计算总分
                count++;// 统计总的科目数
            }
            int average = (int) sum / count;// 计算平均成绩
            context.write(key, new IntWritable(average));
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // "localhost:9000" 需要根据实际情况设置一下
        conf.set("mapred.job.tracker", "localhost:9000");
        // 一个hdfs文件系统中的 输入目录 及 输出目录
        String[] ioArgs = new String[] { "input/score", "output" };
        String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: Score Average <in> <out>");
            System.exit(2);
        }
 
        Job job = new Job(conf, "Score Average");
        job.setJarByClass(Score.class);
        // 设置Map、Combine和Reduce处理类
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);
        // 设置输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 将输入的数据集分割成小数据块splites,提供一个RecordReder的实现
        job.setInputFormatClass(TextInputFormat.class);
        // 提供一个RecordWriter的实现,负责数据输出
        job.setOutputFormatClass(TextOutputFormat.class);
        // 设置输入和输出目录
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

编译

命令

javac Score.java

依赖错误

如果出现如下错误:

mint@lenovo ~/Desktop/hadoop $ javac Score.java 
Score.java:4: error: package org.apache.hadoop.conf does not exist
import org.apache.hadoop.conf.Configuration;
                             ^
Score.java:5: error: package org.apache.hadoop.fs does not exist
import org.apache.hadoop.fs.Path;
                           ^
Score.java:6: error: package org.apache.hadoop.io does not exist
import org.apache.hadoop.io.IntWritable;
                           ^
Score.java:7: error: package org.apache.hadoop.io does not exist
import org.apache.hadoop.io.LongWritable;
                           ^
Score.java:8: error: package org.apache.hadoop.io does not exist
import org.apache.hadoop.io.Text;

尝试修改环境变量CLASSPATH

sudo vim /etc/profile
# 添加如下内容
export HADOOP_HOME=/usr/local/hadoop    # 如果没设置的话, 路径是hadoop安装目录
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH    # 如果没设置的话
export CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath):$CLASSPATH

source /etc/profile

然后重复上述编译命令.

打包

编译之后会生成三个class文件:

mint@lenovo ~/Desktop/hadoop $ ls | grep class
Score.class
Score$Map.class
Score$Reduce.class


使用tar程序打包class文件.

tar -cvf Score.jar ./Score*.class

会生成Score.jar文件.

提交运行

样例输入

mint@lenovo ~/Desktop/hadoop $ ls | grep txt
chinese.txt
english.txt
math.txt
mint@lenovo ~/Desktop/hadoop $ cat chinese.txt 
Zhao 98
Qian 9
Sun 67
Li 23
mint@lenovo ~/Desktop/hadoop $ cat english.txt 
Zhao 93
Qian 42
Sun 87
Li 54
mint@lenovo ~/Desktop/hadoop $ cat math.txt 
Zhao 38
Qian 45
Sun 23
Li 43

上传到HDFS

hdfs dfs -put ./*/txt input/score

mint@lenovo ~/Desktop/hadoop $ hdfs dfs -ls input/score
Found 3 items
-rw-r--r--   1 mint supergroup         28 2017-01-11 23:25 input/score/chinese.txt
-rw-r--r--   1 mint supergroup         29 2017-01-11 23:25 input/score/english.txt
-rw-r--r--   1 mint supergroup         29 2017-01-11 23:25 input/score/math.txt

运行

mint@lenovo ~/Desktop/hadoop $ hadoop jar Score.jar Score input/score output
17/01/11 23:26:26 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
17/01/11 23:26:27 INFO input.FileInputFormat: Total input paths to process : 3
17/01/11 23:26:27 INFO mapreduce.JobSubmitter: number of splits:3
17/01/11 23:26:27 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
17/01/11 23:26:27 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1484147224423_0006
17/01/11 23:26:27 INFO impl.YarnClientImpl: Submitted application application_1484147224423_0006
17/01/11 23:26:27 INFO mapreduce.Job: The url to track the job: http://lenovo:8088/proxy/application_1484147224423_0006/
17/01/11 23:26:27 INFO mapreduce.Job: Running job: job_1484147224423_0006
17/01/11 23:26:33 INFO mapreduce.Job: Job job_1484147224423_0006 running in uber mode : false
17/01/11 23:26:33 INFO mapreduce.Job:  map 0% reduce 0%
17/01/11 23:26:40 INFO mapreduce.Job:  map 67% reduce 0%
17/01/11 23:26:41 INFO mapreduce.Job:  map 100% reduce 0%
17/01/11 23:26:46 INFO mapreduce.Job:  map 100% reduce 100%
17/01/11 23:26:46 INFO mapreduce.Job: Job job_1484147224423_0006 completed successfully
17/01/11 23:26:47 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=129
        FILE: Number of bytes written=471147
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=443
        HDFS: Number of bytes written=29
        HDFS: Number of read operations=12
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=3
        Launched reduce tasks=1
        Data-local map tasks=3
        Total time spent by all maps in occupied slots (ms)=15538
        Total time spent by all reduces in occupied slots (ms)=2551
        Total time spent by all map tasks (ms)=15538
        Total time spent by all reduce tasks (ms)=2551
        Total vcore-milliseconds taken by all map tasks=15538
        Total vcore-milliseconds taken by all reduce tasks=2551
        Total megabyte-milliseconds taken by all map tasks=15910912
        Total megabyte-milliseconds taken by all reduce tasks=2612224
    Map-Reduce Framework
        Map input records=12
        Map output records=12
        Map output bytes=99
        Map output materialized bytes=141
        Input split bytes=357
        Combine input records=12
        Combine output records=12
        Reduce input groups=4
        Reduce shuffle bytes=141
        Reduce input records=12
        Reduce output records=4
        Spilled Records=24
        Shuffled Maps =3
        Failed Shuffles=0
        Merged Map outputs=3
        GC time elapsed (ms)=462
        CPU time spent (ms)=2940
        Physical memory (bytes) snapshot=992215040
        Virtual memory (bytes) snapshot=7659905024
        Total committed heap usage (bytes)=732430336
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=86
    File Output Format Counters 
        Bytes Written=29

输出

mint@lenovo ~/Desktop/hadoop $ hdfs dfs -ls output
Found 2 items
-rw-r--r--   1 mint supergroup          0 2017-01-11 23:26 output/_SUCCESS
-rw-r--r--   1 mint supergroup         29 2017-01-11 23:26 output/part-r-00000
mint@lenovo ~/Desktop/hadoop $ hdfs dfs -cat output/part-r-00000
Li  40
Qian    32
Sun 59
Zhao    76

目录
相关文章
|
3天前
|
Java 应用服务中间件 Apache
安装和配置Apache Tomcat是部署Java Web应用程序的常见任务
安装和配置Apache Tomcat是部署Java Web应用程序的常见任务
19 7
|
3天前
|
监控 算法 Java
Java虚拟机(JVM)使用多种垃圾回收算法来管理内存,以确保程序运行时不会因为内存不足而崩溃。
【6月更文挑战第20天】Java JVM运用多种GC算法,如标记-清除、复制、标记-压缩、分代收集、增量收集、并行收集和并发标记,以自动化内存管理,防止因内存耗尽导致的程序崩溃。这些算法各有优劣,适应不同的性能和资源需求。垃圾回收旨在避免手动内存管理,简化编程。当遇到内存泄漏,可以借助VisualVM、JConsole或MAT等工具监测内存、生成堆转储,分析引用链并定位泄漏源,从而解决问题。
12 4
|
4天前
|
Java
在Java中,你可以创建一个简单的四则运算程序来执行小学级别的加减乘除操作
【6月更文挑战第19天】Java程序实现简单四则运算,接收用户输入的两个数字和运算符,根据运算符调用相应函数进行计算。包含加、减、乘、除功能,其中除法操作检查了除数是否为零,避免运行时错误。
18 5
|
5天前
|
Java 程序员
Java多线程编程是指在一个进程中创建并运行多个线程,每个线程执行不同的任务,并行地工作,以达到提高效率的目的
【6月更文挑战第18天】Java多线程提升效率,通过synchronized关键字、Lock接口和原子变量实现同步互斥。synchronized控制共享资源访问,基于对象内置锁。Lock接口提供更灵活的锁管理,需手动解锁。原子变量类(如AtomicInteger)支持无锁的原子操作,减少性能影响。
18 3
|
5天前
|
算法 Java
Java垃圾回收(Garbage Collection,GC)是Java虚拟机(JVM)的一种自动内存管理机制,用于在运行时自动回收不再使用的对象所占的内存空间
【6月更文挑战第18天】Java的GC自动回收内存,包括标记清除(产生碎片)、复制(效率低)、标记整理(兼顾连续性与效率)和分代收集(区分新生代和老年代,用不同算法优化)等策略。现代JVM通常采用分代收集,以平衡性能和内存利用率。
31 3
|
4天前
|
Java 调度
【实战指南】Java多线程高手秘籍:线程生命周期管理,掌控程序命运的钥匙!
【6月更文挑战第19天】Java多线程涉及线程生命周期的五个阶段:新建、就绪、运行、阻塞和死亡。理解这些状态转换对性能优化至关重要。线程从新建到调用`start()`变为就绪,等待CPU执行。获得执行权后进入运行状态,执行`run()`。遇到阻塞如等待锁时,进入阻塞状态。完成后或被中断则死亡。管理线程包括合理使用锁、利用线程池、处理异常和优雅关闭线程。通过控制这些,能编写更高效稳定的多线程程序。
|
4天前
|
Java
【技术解码】Java线程的五味人生:新建、就绪、运行、阻塞与死亡的哲学解读!
【6月更文挑战第19天】Java线程生命周期如同人生旅程,经历新建、就绪、运行、阻塞至死亡五阶段。从`new Thread()`的诞生到`start()`的蓄势待发,再到`run()`的全力以赴,线程在代码中奔跑。阻塞时面临挑战,等待资源释放,最终通过`join()`或中断结束生命。线程的每个状态转变,都是编程世界与哲思的交汇点。
|
5天前
|
存储 安全 Java
告别低效!Java Queue与LinkedList的完美结合,让你的程序更高效!
【6月更文挑战第18天】Java的`LinkedList`作为`Queue`实现,提供高效并发队列。利用双向链表,它在头部和尾部操作有O(1)复杂度,适合大量数据和高并发。通过`Collections.synchronizedList`可使其线程安全,用于任务调度等场景,展现灵活性和高性能。
|
6天前
|
IDE Oracle Java
Java 是一种跨平台的编程语言,可以在各种操作系统上运行。
Java 是一种跨平台的编程语言,可以在各种操作系统上运行。
|
9天前
|
前端开发 JavaScript Java
计算Java项目|基于SpringBoot的协力服装厂服装生产管理系统的设计与实现
计算Java项目|基于SpringBoot的协力服装厂服装生产管理系统的设计与实现