第1关:统计每个城市的宾馆平均价格
编程要求
在右侧代码窗口完成代码编写:
1:MapReduce类已经配置好,只需完成MapReduce的数据分析;
2:在Map节点执行类中把城市ID当成的输出key,酒店价格当成Mapper类的输出value;
3:在Reduce节点执行类中,统计以城市ID为维度的酒店价格均价,并保存到Hbase;需要满足ROWKEY为城市ID、列族为average_infos、表字段名称为price,计算出的价格均价为表字段值。
t_city_hotels_info表结构如下
列族名称 | 字段 | 对应的文件数据字段 | 描述 | ROWKEY (格式为:城市ID_酒店ID) |
cityInfo | cityId | city_id | 城市ID | city_id + “_” + id |
cityInfo | cityName | city_name | 城市名称 | city_id + “_” + id |
cityInfo | pinyin | pinyin | 城市拼音 | city_id + “_” + id |
hotel_info | id | id | 酒店id | city_id + “_” + id |
hotel_info | name | name | 酒店名称 | city_id + “_” + id |
hotel_info | price | price | 酒店价格 | city_id + “_” + id |
hotel_info | lon | lon | 经度 | city_id + “_” + id |
hotel_info | url | url | url | 地址 |
hotel_info | img | img | 图片 | city_id + “_” + id |
hotel_info | address | address | 地址 | city_id + “_” + id |
hotel_info | score | score | 得分 | city_id + “_” + id |
hotel_info | dpscore | dpscore | 用户评分 | city_id + “_” + id |
hotel_info | dpcount | dpcount | 评分个数 | city_id + “_” + id |
hotel_info | star | star | 星级 | city_id + “_” + id |
hotel_info | stardesc | stardesc | 舒适度 | city_id + “_” + id |
hotel_info | shortName | shortName | 酒店简称 | city_id + “_” + id |
测试说明
平台会对你编写的代码进行测试:
测试输入:
t_city_hotels_info,average_table;
预期输出:
row:58 average_infos:price 1145.6170212765958 row:59 average_infos:price 797.2197802197802
代码
package com.processdata; import java.io.IOException; import java.util.Scanner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.util.HBaseUtil; /** * 使用MapReduce程序处理HBase中的数据并将最终结果存入到另一张表 1中 */ public class HBaseMapReduce extends Configured implements Tool { public static class MyMapper extends TableMapper<Text, DoubleWritable> { public static final byte[] column = "price".getBytes(); public static final byte[] family = "hotel_info".getBytes(); @Override protected void map(ImmutableBytesWritable rowKey, Result result, Context context) throws IOException, InterruptedException { /********** Begin *********/ String cityId = Bytes.toString(result.getValue("cityInfo".getBytes(), "cityId".getBytes())); byte[] value = result.getValue(family, column); Double value1 = Double.parseDouble(Bytes.toString(value)); DoubleWritable i = new DoubleWritable(value1); String priceKey = cityId; context.write(new Text(priceKey),i); /********** End *********/ } } public static class MyTableReducer extends TableReducer<Text, DoubleWritable, ImmutableBytesWritable> { @Override public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { /********** Begin *********/ double sum = 0; int len = 0; for (DoubleWritable price : values) { len ++; sum += price.get(); } Put put = new Put(Bytes.toBytes(key.toString())); put.addColumn("average_infos".getBytes(),"price".getBytes(),Bytes.toBytes(String.valueOf(sum / len))); context.write(null, put); /********** End *********/ } } public int run(String[] args) throws Exception { //配置Job Configuration conf = HBaseConfiguration.create(getConf()); conf.set("hbase.zookeeper.quorum", "127.0.0.1"); //hbase 服务地址 conf.set("hbase.zookeeper.property.clientPort", "2181"); //端口号 Scanner sc = new Scanner(System.in); String arg1 = sc.next(); String arg2 = sc.next(); //String arg1 = "t_city_hotels_info"; //String arg2 = "average_table"; try { HBaseUtil.createTable("average_table", new String[] {"average_infos"}); } catch (Exception e) { // 创建表失败 e.printStackTrace(); } Job job = configureJob(conf,new String[]{arg1,arg2}); return job.waitForCompletion(true) ? 0 : 1; } private Job configureJob(Configuration conf, String[] args) throws IOException { String tablename = args[0]; String targetTable = args[1]; Job job = new Job(conf,tablename); Scan scan = new Scan(); scan.setCaching(300); scan.setCacheBlocks(false);//在mapreduce程序中千万不要设置允许缓存 //初始化Mapreduce程序 TableMapReduceUtil.initTableMapperJob(tablename,scan,MyMapper.class, Text.class, DoubleWritable.class,job); //初始化Reduce TableMapReduceUtil.initTableReducerJob( targetTable, // output table MyTableReducer.class, // reducer class job); job.setNumReduceTasks(1); return job; } }
第2关:统计酒店评论中词频较高的词
编程要求
在右侧代码窗口完成代码编写,MapReduce类已经配置好,只需完成MapReduce的数据分析,你只需将所有分词后的数据存入新表中,平台将会为你输出词频大于100的词组:
1:在Map节点执行类中把评论进行分词当成输出key,Mapper类的输出value为固定值1。
2:在Reduce节点执行类中,统计以评论中分词后的词组为维度的词频数量,并保存到Hbase。需要满足ROWKEY为评论分词、列族为 word_info 、表字段名称为 count 。
t_hotel_comment表结构如下
列族名称 | 字段 | 对应的文件数据字段 | 描述 | ROWKEY (格式为:城市ID_酒店ID) |
hotel_info | hotel_name | hotel_name | 酒店名称 | Hotel_id+ “_” + id |
hotel_info | hotel_id | hotel_id | 酒店ID | Hotel_id+ “_” + id |
comment_info | id | id | 评论id | Hotel_id+ “_” + id |
comment_info | baseRoomId | baseRoomId | 房间类型 | Hotel_id+ “_” + id |
comment_info | content | content | 评论内容 | Hotel_id+ “_” + id |
comment_info | checkInDate | checkInDate | 入住时间 | Hotel_id+ “_” + id |
comment_info | postDate | postDate | 离开时间 | Hotel_id+ “_” + id |
comment_info | userNickName | userNickName | 用户昵称 | Hotel_id+ “_” + id |
测试说明
平台会对你编写的代码进行测试:
测试输入:
t_hotel_comment,comment_word_count;
预期输出:
word:不错 word_info:count 344 word:位置 word_info:count 159 word:住 word_info:count 150 word:免费 word_info:count 110 word:入住 word_info:count 112 word:卫生 word_info:count 106 word:地铁站 word_info:count 144 word:巴士 word_info:count 174 word:干净 word_info:count 211 word:很好 word_info:count 200 word:性价比 word_info:count 123 word:房间 word_info:count 449 word:早餐 word_info:count 116 word:环境 word_info:count 166 word:葵 word_info:count 112 word:酒店 word_info:count 970 word:香港 word_info:count 224
代码
package com.processdata; import java.io.IOException; import java.util.List; import java.util.Scanner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apdplat.word.WordSegmenter; import org.apdplat.word.segmentation.Word; import com.util.HBaseUtil; import com.vdurmont.emoji.EmojiParser; /** * 词频统计 * */ public class WorldCountMapReduce extends Configured implements Tool { public static class MyMapper extends TableMapper<Text, IntWritable> { private static byte[] family = "comment_info".getBytes(); private static byte[] column = "content".getBytes(); @Override protected void map(ImmutableBytesWritable rowKey, Result result, Context context) throws IOException, InterruptedException { /********** Begin *********/ byte[] value = result.getValue(family, column); String word = new String(value,"utf-8"); if(!word.isEmpty()) { String filter = EmojiParser.removeAllEmojis(word); List<Word> segs = WordSegmenter.seg(filter); for(Word cont : segs) { Text text = new Text(cont.getText()); IntWritable v = new IntWritable(1); context.write(text,v); } } /********** End *********/ } } public static class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { private static byte[] family = "word_info".getBytes(); private static byte[] column = "count".getBytes(); @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { /********** Begin *********/ int sum = 0; for (IntWritable value : values) sum += value.get(); Put put = new Put(Bytes.toBytes(key.toString())); put.addColumn(family,column,Bytes.toBytes(sum)); context.write(null,put); /********** End *********/ } } public int run(String[] args) throws Exception { //配置Job Configuration conf = HBaseConfiguration.create(getConf()); conf.set("hbase.zookeeper.quorum", "127.0.0.1"); //hbase 服务地址 conf.set("hbase.zookeeper.property.clientPort", "2181"); //端口号 Scanner sc = new Scanner(System.in); String arg1 = sc.next(); String arg2 = sc.next(); try { HBaseUtil.createTable("comment_word_count", new String[] {"word_info"}); } catch (Exception e) { // 创建表失败 e.printStackTrace(); } Job job = configureJob(conf,new String[]{arg1,arg2}); return job.waitForCompletion(true) ? 0 : 1; } private Job configureJob(Configuration conf, String[] args) throws IOException { String tablename = args[0]; String targetTable = args[1]; Job job = new Job(conf,tablename); Scan scan = new Scan(); scan.setCaching(300); scan.setCacheBlocks(false);//在mapreduce程序中千万不要设置允许缓存 //初始化Mapper Reduce程序 TableMapReduceUtil.initTableMapperJob(tablename,scan,MyMapper.class, Text.class, IntWritable.class,job); TableMapReduceUtil.initTableReducerJob(targetTable,MyReducer.class,job); job.setNumReduceTasks(1); return job; } }