一、✌题目要求
record表:
ID | 城市编号 | 空气指数 |
001 | 03 | 245 |
002 | 02 | 655 |
003 | 05 | 743 |
004 | 04 | 246 |
005 | 02 | 956 |
006 | 01 | 637 |
007 | 05 | 831 |
008 | 03 | 683 |
009 | 02 | 349 |
city表:
城市编号 | 城市名称 |
01 | 长沙 |
02 | 株洲 |
03 | 湘潭 |
04 | 怀化 |
05 | 岳阳 |
目标表:
ID | 城市名称 | 空气指数 |
001 | 湘潭 | 245 |
002 | 株洲 | 655 |
003 | 岳阳 | 743 |
004 | 怀化 | 246 |
005 | 株洲 | 956 |
006 | 长沙 | 637 |
007 | 岳阳 | 831 |
008 | 湘潭 | 683 |
009 | 株洲 | 349 |
二、✌实现思想
我们将重复字段作为Map的Key,其他属性封装在Bean中作为Value
经过Map后,文件的格式为:
城市编号 | ID | 城市名称 | 空气指数 | 文件类型 |
1 | 006 | 637 | record | |
1 | 长沙 | city | ||
2 | 002 | 655 | record | |
2 | 005 | 956 | record | |
2 | 009 | 349 | record | |
2 | 株洲 | city | ||
3 | 001 | 245 | record | |
3 | 008 | 683 | record | |
3 | 湘潭 | city | ||
4 | 004 | 246 | record | |
4 | 怀化 | city | ||
5 | 003 | 743 | record | |
5 | 007 | 831 | record | |
5 | 岳阳 | city |
1.将所有属性封装成一个对象,同时实现序列化
2.Map的输入格式应为LongWritable,Text
3.Map的输出格式为Text,Bean
4.Reduce输出格式为Bean,NullWritable
三、✌代码实现
1.✌Bean类
import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class Bean implements Writable { //定义属性 private String id; private String pid; private int amount; private String pname; private String type; //定义空参构造,为后面反射使用 public Bean() { super(); } //有参构造 public Bean(String id, String pid, int amount, String pname, String type) { this.id = id; this.pid = pid; this.amount = amount; this.pname = pname; this.type = type; } //重写toString方法 @Override public String toString() { return id + "\t" + pname + "\t\t" + amount; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getPid() { return pid; } public void setPid(String pid) { this.pid = pid; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } public String getType() { return type; } public void setType(String type) { this.type = type; } //重写序列化方法 @Override public void write(DataOutput out) throws IOException { out.writeUTF(id); out.writeUTF(pid); out.writeInt(amount); out.writeUTF(pname); out.writeUTF(type); } //反序列化方法 @Override public void readFields(DataInput in) throws IOException { id = in.readUTF(); pid = in.readUTF(); amount = in.readInt(); pname = in.readUTF(); type = in.readUTF(); } }
2.✌Map类
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class Map extends Mapper<LongWritable, Text, Text, Bean> { String fileName; //获得文件的名称,因为在同一目录,方便再map阶段对不同表做不同操作 @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit) context.getInputSplit(); fileName = split.getPath().getName(); } //map阶段,将文章内容封装为Bean对象 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if (fileName.startsWith("record")) { String[] words = line.split("\t"); context.write(new Text(words[1]), new Bean(words[0], words[1], Integer.parseInt(words[2]), "", "record")); } else { String[] words = line.split("\t"); context.write(new Text(words[0]), new Bean("", words[0], 0, words[1], "city")); } } }
3.✌Reduce类
import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; public class Reduce extends Reducer<Text, Bean, Bean, NullWritable> { @Override protected void reduce(Text key, Iterable<Bean> values, Context context) throws IOException, InterruptedException { //存取多个Bean对象 ArrayList<Bean> list = new ArrayList<>(); Bean pd = new Bean(); //对不同表做不同操作,设置pname for (Bean value : values) { if ("record".equals(value.getType())) { Bean temp = new Bean(); try { BeanUtils.copyProperties(temp, value); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } list.add(temp); } else { try { BeanUtils.copyProperties(pd, value); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } for (Bean bean : list) { bean.setPname(pd.getPname()); context.write(bean, NullWritable.get()); } } }
4.✌Driver类
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.BasicConfigurator; import java.io.IOException; public class Driver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //设置本地输入输出路径 args = new String[]{"D:/input", "D:/output"}; BasicConfigurator.configure(); //配置信息 Configuration conf = new Configuration(); //获取job对象 Job job = Job.getInstance(conf); //关联相关类 job.setJarByClass(Driver.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //设置map输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Bean.class); //设置最终输出类型 job.setOutputKeyClass(Bean.class); job.setOutputValueClass(NullWritable.class); //设置文件路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //提交任务 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
、