Hadoop MapReduce编程 API入门系列之MapReduce多种输入格式(十七)

简介:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

代码

 

复制代码
  1 package zhouls.bigdata.myMapReduce.ScoreCount;
  2 
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import org.apache.hadoop.io.WritableComparable;
  7 /**
  8 * 学习成绩读写类
  9 * 数据格式参考:19020090017 小讲 90 99 100 89 95
 10 * @author Bertron
 11 * 需要自定义一个 ScoreWritable 类实现 WritableComparable 接口,将学生各门成绩封装起来。
 12 */
 13 public class ScoreWritable implements WritableComparable< Object > {//其实这里,跟TVPlayData一样的
 14 //  注意:    Hadoop通过Writable接口实现的序列化机制,不过没有提供比较功能,所以和java中的Comparable接口合并,提供一个接口WritableComparable。(自定义比较)
 15 //         Writable接口提供两个方法(write和readFields)。
 16 
 17     
 18     private float Chinese;
 19     private float Math;
 20     private float English;
 21     private float Physics;
 22     private float Chemistry;
 23     
 24     
 25 //    问:这里我们自己编程时,是一定要创建一个带有参的构造方法,为什么还要显式的写出来一个带无参的构造方法呢?
 26 //    答:构造器其实就是构造对象实例的方法,无参数的构造方法是默认的,但是如果你创造了一个带有参数的构造方法,那么无参的构造方法必须显式的写出来,否则会编译失败。
 27     
 28     public ScoreWritable(){}//java里的无参构造函数,是用来在创建对象时初始化对象  
 29     //在hadoop的每个自定义类型代码里,好比,现在的ScoreWritable,都必须要写无参构造函数。
 30     
 31     
 32     //问:为什么我们在编程的时候,需要创建一个带有参的构造方法?
 33     //答:就是能让赋值更灵活。构造一般就是初始化数值,你不想别人用你这个类的时候每次实例化都能用另一个构造动态初始化一些信息么(当然没有需要额外赋值就用默认的)。
 34     
 35     public ScoreWritable(float Chinese,float Math,float English,float Physics,float Chemistry){//java里的有参构造函数,是用来在创建对象时初始化对象  
 36         this.Chinese = Chinese;
 37         this.Math = Math;
 38         this.English = English;
 39         this.Physics = Physics;
 40         this.Chemistry = Chemistry;
 41     }
 42     
 43     //问:其实set和get方法,这两个方法只是类中的setxxx和getxxx方法的总称,
 44     //    那么,为什么在编程时,有set和set***两个,只有get***一个呢?
 45     
 46     public void set(float Chinese,float Math,float English,float Physics,float Chemistry){
 47         this.Chinese = Chinese;//即float Chinese赋值给private float Chinese;
 48         this.Math = Math;
 49         this.English = English;
 50         this.Physics = Physics;
 51         this.Chemistry = Chemistry;
 52     }
 53 //    public float get(float Chinese,float Math,float English,float Physics,float Chemistry){因为这是错误的,所以对于set可以分开,get只能是get***
 54 //        return Chinese;
 55 //        return Math;
 56 //        return English;
 57 //        return Physics;
 58 //        return Chemistry;
 59 //    }
 60     
 61     
 62     public float getChinese() {//拿值,得返回,所以需有返回类型float
 63         return Chinese;
 64     }
 65     public void setChinese(float Chinese){//设值,不需,所以空返回类型
 66         this.Chinese = Chinese;
 67     }
 68     public float getMath() {//拿值
 69         return Math;
 70     }
 71     public void setMath(float Math){//设值
 72         this.Math = Math;
 73     }
 74     public float getEnglish() {//拿值
 75         return English;
 76     }
 77     public void setEnglish(float English){//设值
 78         this.English = English;
 79     }
 80     public float getPhysics() {//拿值
 81         return Physics;
 82     }
 83     public void setPhysics(float Physics){//设值
 84         this.Physics = Physics;
 85     }
 86     public float getChemistry() {//拿值
 87         return Chemistry;
 88     }
 89     public void setChemistry(float Chemistry) {//拿值
 90         this.Chemistry = Chemistry;
 91     }
 92     
 93     // 实现WritableComparable的readFields()方法
 94 //    对象不能传输的,需要转化成字节流!
 95 //    将对象转换为字节流并写入到输出流out中是序列化,write 的过程(最好记!!!)
 96 //    从输入流in中读取字节流反序列化为对象      是反序列化,readFields的过程(最好记!!!)
 97     public void readFields(DataInput in) throws IOException {//拿代码来说的话,对象就是比如Chinese、Math。。。。
 98         Chinese = in.readFloat();//因为,我们这里的对象是float类型,所以是readFloat()
 99         Math = in.readFloat();
100         English = in.readFloat();//注意:反序列化里,需要生成对象对吧,所以,是用到的是get那边对象
101         Physics = in.readFloat();
102         Chemistry = in.readFloat();
103 //        in.readByte()
104 //        in.readChar()
105 //        in.readDouble()
106 //        in.readLine() 
107 //        in.readFloat()
108 //        in.readLong()
109 //        in.readShort()
110     }
111     
112     // 实现WritableComparable的write()方法,以便该数据能被序列化后完成网络传输或文件输出 
113 //    将对象转换为字节流并写入到输出流out中是序列化,write 的过程(最好记!!!)
114 //    从输入流in中读取字节流反序列化为对象      是反序列化,readFields的过程(最好记!!!)
115     public void write(DataOutput out) throws IOException {//拿代码来说的话,对象就是比如Chinese、Math。。。。
116         out.writeFloat(Chinese);//因为,我们这里的对象是float类型,所以是writeFloat()
117         out.writeFloat(Math);
118         out.writeFloat(English);//注意:序列化里,需要对象对吧,所以,用到的是set那边的对象
119         out.writeFloat(Physics);
120         out.writeFloat(Chemistry);
121 //        out.writeByte()
122 //        out.writeChar()
123 //        out.writeDouble()
124 //        out.writeFloat()
125 //        out.writeLong()
126 //        out.writeShort()
127 //        out.writeUTF()
128     }
129     
130     public int compareTo(Object o) {//java里的比较,Java String.compareTo()
131         return 0;
132     }
133     
134     
135 //    Hadoop中定义了两个序列化相关的接口:Writable 接口和 Comparable 接口,这两个接口可以合并成一个接口 WritableComparable。
136 //    Writable 接口中定义了两个方法,分别为write(DataOutput out)和readFields(DataInput in)
137 //    所有实现了Comparable接口的对象都可以和自身相同类型的对象比较大小
138     
139     
140 //    Hadoop中定义了两个序列化相关的接口:Writable 接口和 Comparable 接口,这两个接口可以合并成一个接口 WritableComparable。
141 //    Writable 接口中定义了两个方法,分别为write(DataOutput out)和readFields(DataInput in)
142 //    所有实现了Comparable接口的对象都可以和自身相同类型的对象比较大小
143     
144     
145 //  源码是
146 //    package java.lang;  
147 //    import java.util.*;      
148 //    public interface Comparable {  
149 //        /** 
150 //        * 将this对象和对象o进行比较,约定:返回负数为小于,零为大于,整数为大于 
151 //        */  
152 //        public int compareTo(T o);  
153 //    }
154     
155 }
复制代码

 

 

 

 

 

 

 

 

 

复制代码
  1 package zhouls.bigdata.myMapReduce.ScoreCount;
  2 
  3 import java.io.IOException;
  4 import org.apache.hadoop.conf.Configuration;
  5 import org.apache.hadoop.fs.FSDataInputStream;
  6 import org.apache.hadoop.fs.FileSystem;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.mapreduce.InputSplit;
 10 import org.apache.hadoop.mapreduce.JobContext;
 11 import org.apache.hadoop.mapreduce.RecordReader;
 12 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 15 import org.apache.hadoop.util.LineReader;
 16 /**
 17 * 自定义学生成绩读写InputFormat
 18 * 数据格式参考:19020090017 小讲 90 99 100 89 95
 19 * @author Bertron
 20 */
 21 
 22             //其实这个程序,就是在实现InputFormat接口,TVPlayInputFormat是InputFormat接口的实现类
 23             //比如   ScoreInputFormat  extends FileInputFormat implements InputFormat。
 24 
 25             //问:自定义输入格式 ScoreInputFormat 类,首先继承 FileInputFormat,然后分别重写 isSplitable() 方法和 createRecordReader() 方法。
 26 
 27 public class ScoreInputFormat extends FileInputFormat<Text,ScoreWritable > {//自定义数据输入格式,其实这都是模仿源码的!可以去看
 28 
 29 //    线路是: boolean  isSplitable()   ->   RecordReader<Text,ScoreWritable> createRecordReader()   ->   ScoreRecordReader extends RecordReader<Text, ScoreWritable > 
 30     
 31     @Override
 32     protected boolean isSplitable(JobContext context, Path filename) {//这是InputFormat的isSplitable方法
 33             //isSplitable方法就是是否要切分文件,这个方法显示如果是压缩文件就不切分,非压缩文件就切分。
 34 //        如果不允许分割,则isSplitable==false,则将第一个block、文件目录、开始位置为0,长度为整个文件的长度封装到一个InputSplit,加入splits中
 35 //        如果文件长度不为0且支持分割,则isSplitable==true,获取block大小,默认是64MB
 36         return false;    //整个文件封装到一个InputSplit
 37         //要么就是return true;        //切分64MB大小的一块一块,再封装到InputSplit
 38     }
 39     
 40     @Override
 41     public RecordReader<Text,ScoreWritable> createRecordReader(InputSplit inputsplit,TaskAttemptContext context) throws IOException, InterruptedException {
 42 //        RecordReader<k1, v1>是返回类型,返回的RecordReader对象的封装
 43 //        createRecordReader是方法,在这里是,ScoreInputFormat.createRecordReader。ScoreInputFormat是InputFormat类的实例
 44 //        InputSplit input和TaskAttemptContext context是传入参数
 45         
 46 //        isSplitable(),如果是压缩文件就不切分,整个文件封装到一个InputSplit
 47 //        isSplitable(),如果是非压缩文件就切,切分64MB大小的一块一块,再封装到InputSplit
 48         
 49         //这里默认是系统实现的的RecordReader,按行读取,下面我们自定义这个类ScoreRecordReader。
 50         //类似与Excel、WeiBo、TVPlayData代码写法
 51         return new ScoreRecordReader();//新建一个ScoreRecordReader实例,所有才有了上面RecordReader<Text,ScoreWritable>,所以才如下ScoreRecordReader,写我们自己的
 52     }
 53     
 54     
 55     //RecordReader中的两个参数分别填写我们期望返回的key/value类型,我们期望key为Text类型,value为ScoreWritable类型封装学生所有成绩
 56     public static class ScoreRecordReader extends RecordReader<Text, ScoreWritable > {//RecordReader<k1, v1>是一个整体
 57         public LineReader in;//行读取器
 58         public Text line;//每行数据类型
 59         public Text lineKey;//自定义key类型,即k1
 60         public ScoreWritable lineValue;//自定义value类型,即v1
 61         
 62         @Override
 63         public void close() throws IOException {//关闭输入流
 64             if(in !=null){
 65                 in.close();
 66             }
 67         }
 68         @Override
 69         public Text getCurrentKey() throws IOException, InterruptedException {//获取当前的key,即CurrentKey
 70             return lineKey;//返回类型是Text,即Text lineKey
 71         }
 72         @Override
 73         public ScoreWritable getCurrentValue() throws IOException,InterruptedException {//获取当前的Value,即CurrentValue
 74             return lineValue;//返回类型是ScoreWritable,即ScoreWritable lineValue
 75         }
 76         @Override
 77         public float getProgress() throws IOException, InterruptedException {//获取进程,即Progress
 78             return 0;//返回类型是float,即float 0
 79         }
 80         @Override
 81         public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {//初始化,都是模板
 82             FileSplit split=(FileSplit)input;
 83             Configuration job=context.getConfiguration();
 84             Path file=split.getPath();
 85             FileSystem fs=file.getFileSystem(job);
 86             
 87             FSDataInputStream filein=fs.open(file);
 88             in=new LineReader(filein,job);//输入流in
 89             line=new Text();//每行数据类型
 90             lineKey=new Text();//自定义key类型,即k1。//新建一个Text实例作为自定义格式输入的key
 91             lineValue = new ScoreWritable();//自定义value类型,即v1。//新建一个TVPlayData实例作为自定义格式输入的value
 92         }
 93         
 94         //此方法读取每行数据,完成自定义的key和value
 95         @Override
 96         public boolean nextKeyValue() throws IOException, InterruptedException {//这里面,才是篡改的重点
 97             int linesize=in.readLine(line);//line是每行数据,我们这里用到的是in.readLine(str)这个构造函数,默认读完读到文件末尾。其实这里有三种。
 98             
 99 //            是SplitLineReader.readLine  ->  SplitLineReader  extends   LineReader  ->  org.apache.hadoop.util.LineReader
100             
101 //            in.readLine(str)//这个构造方法执行时,会首先将value原来的值清空。默认读完读到文件末尾
102 //            in.readLine(str, maxLineLength)//只读到maxLineLength行
103 //            in.readLine(str, maxLineLength, maxBytesToConsume)//这个构造方法来实现不清空,前面读取的行的值
104 
105             if(linesize==0) return false;
106             
107             
108             String[] pieces = line.toString().split("\\s+");//解析每行数据
109                     //因为,我们这里是。默认读完读到文件末尾。line是Text类型。pieces是String[],即String数组。
110             
111             if(pieces.length != 7){
112                 throw new IOException("Invalid record received");
113             }
114             //将学生的每门成绩转换为 float 类型
115             float a,b,c,d,e;
116             try{
117                 a = Float.parseFloat(pieces[2].trim());//将String类型,如pieces[2]转换成,float类型,给a
118                 b = Float.parseFloat(pieces[3].trim());
119                 c = Float.parseFloat(pieces[4].trim());
120                 d = Float.parseFloat(pieces[5].trim());
121                 e = Float.parseFloat(pieces[6].trim());
122             }catch(NumberFormatException nfe){
123                 throw new IOException("Error parsing floating poing value in record");
124             }
125             lineKey.set(pieces[0]+"\t"+pieces[1]);//完成自定义key数据
126             lineValue.set(a, b, c, d, e);//封装自定义value数据
127 //            或者写
128 //            lineValue.set(Float.parseFloat(pieces[2].trim()),Float.parseFloat(pieces[3].trim()),Float.parseFloat(pieces[4].trim()),
129 //                    Float.parseFloat(pieces[5].trim()),Float.parseFloat(pieces[6].trim()));
130             
131 //            pieces[0]   pieces[1] pieces[2]  ... pieces[6]
132 //            19020090040 秦心芯 123 131 100 95 100
133 //            19020090006 李磊 99 92 100 90 100
134 //            19020090017 唐一建 90 99 100 89 95
135 //            19020090031 曾丽丽 100 99 97 79 96
136 //            19020090013 罗开俊 105 115 94 45 100
137 //            19020090039 周世海 114 116 93 31 97
138 //            19020090020 王正伟 109 98 88 47 99
139 //            19020090025 谢瑞彬 94 120 100 50 73
140 //            19020090007 于微 89 78 100 66 99
141 //            19020090012 刘小利 87 82 89 71 99
142             
143             
144             
145             return true;
146         }        
147     }
148 }
复制代码

 

 

 

 

 

 

复制代码
 1 package zhouls.bigdata.myMapReduce.ScoreCount;
 2 
 3 
 4 import java.io.IOException;
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.conf.Configured;
 7 import org.apache.hadoop.fs.FileSystem;
 8 import org.apache.hadoop.fs.Path;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.Reducer;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 import org.apache.hadoop.util.Tool;
16 import org.apache.hadoop.util.ToolRunner;
17 /**
18 * 学生成绩统计Hadoop程序
19 * 数据格式参考:19020090017 小讲 90 99 100 89 95
20 * @author HuangBQ
21 */
22 public class ScoreCount extends Configured implements Tool{
23     public static class ScoreMapper extends Mapper<Text,ScoreWritable,Text,ScoreWritable>{
24         @Override
25         protected void map(Text key, ScoreWritable value, Context context)throws IOException, InterruptedException{
26             context.write(key, value);//写入key是k2,value是v2
27 //            context.write(new Text(key), new ScoreWritable(value));等价           
28         }
29     }
30     
31     public static class ScoreReducer extends Reducer<Text,ScoreWritable,Text,Text>{
32         private Text text = new Text();
33         protected void reduce(Text Key, Iterable< ScoreWritable > Values, Context context)throws IOException, InterruptedException{
34             float totalScore=0.0f;
35             float averageScore = 0.0f;
36             for(ScoreWritable ss:Values){
37                 totalScore +=ss.getChinese()+ss.getMath()+ss.getEnglish()+ss.getPhysics()+ss.getChemistry();
38                 averageScore +=totalScore/5;
39             }
40             text.set(totalScore+"\t"+averageScore);
41             context.write(Key, text);//写入Key是k3,text是v3
42 //            context.write(new Text(Key),new Text(text));等价            
43         }
44     }
45     
46 
47     public int run(String[] args) throws Exception{
48         Configuration conf = new Configuration();//读取配置文件
49         
50         Path mypath = new Path(args[1]);
51         FileSystem hdfs = mypath.getFileSystem(conf);//创建输出路径
52         if (hdfs.isDirectory(mypath)) 
53         {
54             hdfs.delete(mypath, true);
55         }
56         
57         Job job = new Job(conf, "ScoreCount");//新建任务
58         job.setJarByClass(ScoreCount.class);//设置主类
59         
60         FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
61         FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
62         
63         job.setMapperClass(ScoreMapper.class);// Mapper
64         job.setReducerClass(ScoreReducer.class);// Reducer
65         
66         job.setMapOutputKeyClass(Text.class);// Mapper key输出类型
67         job.setMapOutputValueClass(ScoreWritable.class);// Mapper value输出类型
68                 
69         job.setInputFormatClass(ScoreInputFormat.class);//设置自定义输入格式
70         
71         job.waitForCompletion(true);        
72         return 0;
73     }
74     
75     
76     
77     public static void main(String[] args) throws Exception{
78 //        String[] args0 = 
79 //                { 
80 //                "hdfs://HadoopMaster:9000/score/score.txt",
81 //                "hdfs://HadoopMaster:9000/out/score/" 
82 //                };
83         
84         String[] args0 = 
85             { 
86             "./data/score/score.txt",
87             "./out/score/" 
88             };
89         
90         int ec = ToolRunner.run(new Configuration(), new ScoreCount(), args0);
91         System.exit(ec);
92     }
93 }
复制代码

 


本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6165667.html,如需转载请自行联系原作者

相关文章
|
2月前
|
开发框架 .NET API
RESTful API 设计与实现:C# 开发者的一分钟入门
【10月更文挑战第5天】本文从零开始,介绍了如何使用 C# 和 ASP.NET Core 设计并实现一个简单的 RESTful API。首先解释了 RESTful API 的概念及其核心原则,然后详细说明了设计 RESTful API 的关键步骤,包括资源识别、URI 设计、HTTP 方法选择、状态码使用和错误处理。最后,通过一个用户管理 API 的示例,演示了如何创建项目、定义模型、实现控制器及运行测试,帮助读者掌握 RESTful API 的开发技巧。
75 7
|
4月前
|
安全 Java API
告别繁琐编码,拥抱Java 8新特性:Stream API与Optional类助你高效编程,成就卓越开发者!
【8月更文挑战第29天】Java 8为开发者引入了多项新特性,其中Stream API和Optional类尤其值得关注。Stream API对集合操作进行了高级抽象,支持声明式的数据处理,避免了显式循环代码的编写;而Optional类则作为非空值的容器,有效减少了空指针异常的风险。通过几个实战示例,我们展示了如何利用Stream API进行过滤与转换操作,以及如何借助Optional类安全地处理可能为null的数据,从而使代码更加简洁和健壮。
128 0
|
2月前
|
机器学习/深度学习 算法 API
机器学习入门(五):KNN概述 | K 近邻算法 API,K值选择问题
机器学习入门(五):KNN概述 | K 近邻算法 API,K值选择问题
|
3月前
|
网络协议 API Windows
MASM32编程调用 API函数RtlIpv6AddressToString,windows 10 容易,Windows 7 折腾
MASM32编程调用 API函数RtlIpv6AddressToString,windows 10 容易,Windows 7 折腾
|
2月前
|
IDE API 定位技术
Python--API编程:IP地址翻译成实际的物理地址
Python--API编程:IP地址翻译成实际的物理地址
63 0
|
4月前
|
开发者
告别繁琐代码,JSF标签库带你走进高效开发的新时代!
【8月更文挑战第31天】JSF(JavaServer Faces)标准标签库为页面开发提供了大量组件标签,如`&lt;h:inputText&gt;`、`&lt;h:dataTable&gt;`等,简化代码、提升效率并确保稳定性。本文通过示例展示如何使用这些标签实现常见功能,如创建登录表单和展示数据列表,帮助开发者更高效地进行Web应用开发。
47 0
|
4月前
|
前端开发 API 开发者
【React状态管理新思路】Context API入门:从零开始摆脱props钻孔的优雅之道,全面解析与实战案例分享!
【8月更文挑战第31天】React 的 Context API 有效解决了多级组件间状态传递的 &quot;props 钻孔&quot; 问题,使代码更简洁、易维护。本文通过电子商务网站登录状态管理案例,详细介绍了 Context API 的使用方法,包括创建、提供及消费 Context,以及处理多个 Context 的场景,适合各水平开发者学习与应用,提高开发效率和代码质量。
43 0
|
4月前
|
JSON API 数据库
神秘编程力量来袭!Rails 究竟隐藏着怎样的魔力,能构建出强大的 RESTful API?快来一探究竟!
【8月更文挑战第31天】《构建 RESTful API:使用 Rails 进行服务端开发》介绍了如何利用 Ruby on Rails 框架高效构建可扩展的 RESTful API。Rails 采用“约定优于配置”,简化开发流程,通过示例展示了路由定义、控制器设计及模型层交互等内容,帮助开发者快速搭建稳定可靠的服务端。无论小型项目还是大型应用,Rails 均能提供强大支持,提升开发效率。
31 0
|
分布式计算 Hadoop 测试技术
hadoop: hdfs API示例
利用hdfs的api,可以实现向hdfs的文件、目录读写,利用这一套API可以设计一个简易的山寨版云盘,见下图: 为了方便操作,将常用的文件读写操作封装了一个工具类: 1 import org.
1018 0
|
4天前
|
人工智能 自然语言处理 API
Multimodal Live API:谷歌推出新的 AI 接口,支持多模态交互和低延迟实时互动
谷歌推出的Multimodal Live API是一个支持多模态交互、低延迟实时互动的AI接口,能够处理文本、音频和视频输入,提供自然流畅的对话体验,适用于多种应用场景。
34 3
Multimodal Live API:谷歌推出新的 AI 接口,支持多模态交互和低延迟实时互动