使用公开数据集,2015-02-01至2015-07-31微博博文数据,具体下载位置为https://tianchi.aliyun.com/dataset/51,第三列为发博时间,第六列为博文内容,分割键位为tab键
思路如下:
- 先将需要的数据分割拿出来,然后对博文进行分词,但是目前scala版本的jieba分词中没有词性,原因github上解释是处于性能的考虑,目前是根据停用词+长度来规避了不需要的词汇,稍微麻烦了一些
- 然后每一个词汇出现一次,记作量级1,根据词汇和日期对量级聚合
- 根据日期进行分组,同时对分组内聚合后的量级进行排序,同时截取前十条
packagecom.daishuimportcom.huaban.analysis.jieba.JiebaSegmenterimportorg.apache.spark.{SparkConf, SparkContext} objectLy003 { //分词defgetData(data: String): List[String] = { valjieba=newJiebaSegmentervaljieba_data=jieba.sentenceProcess(data).toArray() varrel_data: List[String] =List() valblack_data=List("微博", "新浪", "账号", "客户端", "凤凰", "分享", "礼物", "你", "我", "它", "的", "了" , "在", "吧", "一个") //因为jieba分词处于性能考虑,java版本去除了词性判断,所以这里采用正则+长度的方式判定valp_t="[\u4e00-\u9fa5]*"for (x<-jieba_data ) { if ((!black_data.contains(x)) &&x.toString.matches(p_t) &&x.toString.length>1) { rel_data=List.concat(rel_data, List(x.toString)) } } rel_data } /*主方法*/defmain(args: Array[String]): Unit= { valconf=newSparkConf().setMaster("local[2]").setAppName("weibo") valsc=newSparkContext(conf) valr1=sc.textFile("/Users/mac/IdeaProjects/ly04/src/main/resources/weibo_data.txt") // 按照tab键分割,并且将博文内容进行分词valr2=r1.map(row=>Tuple2(row.split("\t")(2).split(" ")(0).split("-")(1) +"月", getData(row.split("\t")(6)))) // 将分词内容日期变为一个数组valr3=r2.map(v=> { v._2.map(x=> (v._1, x)) }) /*将一行拆分成多行*/valr4=r3.flatMap(x=>x) // 每个单词+日期出现了一次valr5=r4.map(x=>Tuple2(Tuple2(x._1, x._2), 1)) // 按照key分组聚合valr6=r5.reduceByKey(_+_) /*将词汇名称从key中抽离出来,并将日期+出现次数组合成新的key*/valr7=r6.map(v=>Tuple2(Tuple2(v._1._1, v._2), v._1._2)) // 根据聚合值进行降序排序valr8=r7.sortBy(_._1._2, false) // 调整输出格式valr9=r8.map(v=>Tuple2(v._1._1, Tuple2(v._2, v._1._2))) // 按照key分组排序并只要前十个valr10=r9.groupByKey().map( item=> { valindex=item._1valrow=item._2.toList.sortWith(_._2>_._2).take(10) (index, row) } ) valr100=r10.map(v=> { v._2.map(x=> (v._1, x)) }) .flatMap(x=>x) .map(item=> (item._1, item._2._1, item._2._2)) r100.repartition(1).saveAsTextFile( "/Users/mac/IdeaProjects/ly04/src/main/resources/"+System.currentTimeMillis() ) } }
如果优化代码,并增加排名,结果可以如下:
packagecom.daishuimportcom.huaban.analysis.jieba.JiebaSegmenterimportorg.apache.spark.{SparkConf, SparkContext} objectLy003 { //分词defgetData(data: String): List[String] = { valjieba=newJiebaSegmentervaljieba_data=jieba.sentenceProcess(data).toArray() varrel_data: List[String] =List() valblack_data=List("微博", "新浪", "账号", "客户端", "凤凰", "分享", "礼物", "你", "我", "它", "的", "了" , "在", "吧", "一个", "我们", "自己", "这个", "今天" ) //因为jieba分词处于性能考虑,java版本去除了词性判断,所以这里采用正则+长度的方式判定valp_t="[\u4e00-\u9fa5]*"for (x<-jieba_data ) { if ((!black_data.contains(x)) &&x.toString.matches(p_t) &&x.toString.length>1) { rel_data=List.concat(rel_data, List(x.toString)) } } rel_data } /*主方法*/defmain(args: Array[String]): Unit= { valconf=newSparkConf().setMaster("local").setAppName("weibo") valsc=newSparkContext(conf) varidx=0valr1=sc.textFile("/Users/mac/IdeaProjects/ly04/src/main/resources/weibo_data.txt") .map(row=> (row.split("\t")(2).split(" ")(0).split("-")(1) +"月", getData(row.split("\t")(6)))) .map(v=> { v._2.map(vi=> (v._1, vi)) }) .flatMap(x=>x) .map(v=> ((v._1, v._2), 1)) .reduceByKey(_+_) .map(v=> (v._1._1, (v._1._2, v._2))) .groupByKey() .map( item=> { valindex=item._1valrow=item._2.toList.sortWith(_._2>_._2).take(10) (index, row) } ) .map(v=> { v._2.map(x=> (v._1, x)) }) .flatMap(x=>x) .map(v=> { if (idx<=10) { idx=idx+1 (idx, v._1, v._2._1, v._2._2) } else { idx=1 (idx, v._1, v._2._1, v._2._2) } }) .groupBy(_._1) .sortBy(_._1) .map(v=> (v._1, v._2.toList)) r1.repartition(1).saveAsTextFile( "/Users/mac/IdeaProjects/ly04/src/main/resources/"+System.currentTimeMillis() ) } }
最后呈现结果会如下图:
实际操作中,发现有时候预期结果无法正确呈现,怀疑是跟破坏了函数返回类型有关