手头有大量网址,以文本格式存储,一行一个url,像这样:
- http://beijing.baixing.com/zhengzu/a232119437.html
- http://mall.cnki.net/magazine/Article/JCTD199507000.htm
- http://meishi.qq.com/shops/2847395840683387518
- http://beijing.baixing.com/zhengzu/a233512411.html
- http://meishi.qq.com/shops/2710663397051226108
现在想按域名整理成下面这种格式:
- beijing.baixing.com
- http://beijing.baixing.com/zhengzu/a232119437.html
- http://beijing.baixing.com/zhengzu/a233512411.html
- mall.cnki.net
- http://mall.cnki.net/magazine/Article/JCTD199507000.htm
- meishi.qq.com
- http://meishi.qq.com/shops/2847395840683387518
- http://meishi.qq.com/shops/2710663397051226108
正好想到了用hadoop试试,于是试着写了一个Map/Reduce程序,太简单了,只记录不解释。
- package com.rs;
- import java.io.*;
- import java.util.*;
- import org.apache.hadoop.fs.*;
- import org.apache.hadoop.io.*;
- import org.apache.hadoop.mapred.*;
- public class Url {
- /**
- * 实现map
- */
- @SuppressWarnings("deprecation")
- public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>{
- /**
- * map方法,提取每个url的域名
- */
- public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter report) throws IOException{
- //value是一行的内容,output是map的结果输出
- String line = value.toString();
- int pos = line.indexOf("/", 7);
- String domain = line.substring(7, pos);
- //map的结果为(domain, url)
- output.collect(new Text(domain), value);
- }
- }
- /**
- * 实现reduce
- */
- @SuppressWarnings("deprecation")
- public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text>{
- /**
- * reduce方法,将map的结果聚合
- */
- public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter report) throws IOException{
- //key就是map后的那个domain,values是map后的一个个url集合,output是reduce后的结果输出
- //把每一个url用\n连起来
- StringBuilder urls = new StringBuilder();
- urls.append("\n");
- while(values.hasNext()){
- urls.append(values.next().toString());
- urls.append("\n");
- }
- //reduce的结果为(domain, urls)
- output.collect(key, new Text(urls.toString()));
- }
- }
- @SuppressWarnings("deprecation")
- public static void main(String[] args) throws Exception{
- JobConf conf = new JobConf(Url.class);
- conf.setJobName("urlanalyse"); //任务名称
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(Text.class);
- //设置map与reduce的类
- conf.setMapperClass(Map.class);
- conf.setReducerClass(Reduce.class);
- //输入输出均为文本格式,所以用TextInputFormat和TextOutputFormat
- //可以换其它的比如DBOutputFormat输出到数据库,也可以自定义
- conf.setInputFormat(TextInputFormat.class);
- conf.setOutputFormat(TextOutputFormat.class);
- //指定输入输出(由命令行参数控制)
- FileInputFormat.setInputPaths(conf, new Path(args[0]));
- FileOutputFormat.setOutputPath(conf, new Path(args[1]));
- //执行任务
- JobClient.runJob(conf);
- }
- }
导出成jar文件,放到hadoop目录下,用命令行跑一下:
- #在dfs上清空input与output目录
- hadoop@rs:/usr/local/hadoop$ bin/hadoop fs -rmr input
- hadoop@rs:/usr/local/hadoop$ bin/hadoop fs -rmr output
- #将url.txt放到dfs上的input目录下
- hadoop@rs:/usr/local/hadoop$ bin/hadoop fs -put url.txt input/url.txt
- #对dfs上的input目录中的文件执行map/reduce,输出结果放到output中
- hadoop@rs:/usr/local/hadoop$ bin/hadoop jar url.jar input output
- #查看一下dfs上output目录下的文件内容
- hadoop@rs:/usr/local/hadoop$ bin/hadoop fs -cat output/part*
搞定收工。
PS:单机伪分布模式下,测了一下500万条url跑了110秒,有点慢。等正式数据来了上集群环境试试。
PPS:环境为Ubuntu 10.10 + Sun JDK 1.6.38 + Hadoop 0.20.2
本文转自 BoyTNT 51CTO博客,原文链接:http://blog.51cto.com/boytnt/1089195,如需转载请自行联系原作者