使用Spark实现微博内容解析-每月前十最热词汇

简介: 简单spark+scala的demo,具体为map,flatmap,reducebykey等算子的运用

使用公开数据集,2015-02-01至2015-07-31微博博文数据,具体下载位置为https://tianchi.aliyun.com/dataset/51,第三列为发博时间,第六列为博文内容,分割键位为tab键

思路如下:

  1. 先将需要的数据分割拿出来,然后对博文进行分词,但是目前scala版本的jieba分词中没有词性,原因github上解释是处于性能的考虑,目前是根据停用词+长度来规避了不需要的词汇,稍微麻烦了一些
  2. 然后每一个词汇出现一次,记作量级1,根据词汇和日期对量级聚合
  3. 根据日期进行分组,同时对分组内聚合后的量级进行排序,同时截取前十条
packagecom.daishuimportcom.huaban.analysis.jieba.JiebaSegmenterimportorg.apache.spark.{SparkConf, SparkContext}
objectLy003 {
//分词defgetData(data: String): List[String] = {
valjieba=newJiebaSegmentervaljieba_data=jieba.sentenceProcess(data).toArray()
varrel_data: List[String] =List()
valblack_data=List("微博", "新浪", "账号", "客户端", "凤凰", "分享", "礼物", "你", "我", "它", "的", "了"      , "在", "吧", "一个")
//因为jieba分词处于性能考虑,java版本去除了词性判断,所以这里采用正则+长度的方式判定valp_t="[\u4e00-\u9fa5]*"for (x<-jieba_data         ) {
if ((!black_data.contains(x)) &&x.toString.matches(p_t) &&x.toString.length>1) {
rel_data=List.concat(rel_data, List(x.toString))
      }
    }
rel_data  }
/*主方法*/defmain(args: Array[String]): Unit= {
valconf=newSparkConf().setMaster("local[2]").setAppName("weibo")
valsc=newSparkContext(conf)
valr1=sc.textFile("/Users/mac/IdeaProjects/ly04/src/main/resources/weibo_data.txt")
//   按照tab键分割,并且将博文内容进行分词valr2=r1.map(row=>Tuple2(row.split("\t")(2).split(" ")(0).split("-")(1) +"月",
getData(row.split("\t")(6))))
//    将分词内容日期变为一个数组valr3=r2.map(v=> {
v._2.map(x=> (v._1, x))
    })
/*将一行拆分成多行*/valr4=r3.flatMap(x=>x)
//    每个单词+日期出现了一次valr5=r4.map(x=>Tuple2(Tuple2(x._1, x._2), 1))
//    按照key分组聚合valr6=r5.reduceByKey(_+_)
/*将词汇名称从key中抽离出来,并将日期+出现次数组合成新的key*/valr7=r6.map(v=>Tuple2(Tuple2(v._1._1, v._2), v._1._2))
//    根据聚合值进行降序排序valr8=r7.sortBy(_._1._2, false)
//    调整输出格式valr9=r8.map(v=>Tuple2(v._1._1, Tuple2(v._2, v._1._2)))
//    按照key分组排序并只要前十个valr10=r9.groupByKey().map(
item=> {
valindex=item._1valrow=item._2.toList.sortWith(_._2>_._2).take(10)
        (index, row)
      }
    )
valr100=r10.map(v=> {
v._2.map(x=> (v._1, x))
    })
      .flatMap(x=>x)
      .map(item=> (item._1, item._2._1, item._2._2))
r100.repartition(1).saveAsTextFile(
"/Users/mac/IdeaProjects/ly04/src/main/resources/"+System.currentTimeMillis()
    )
  }
}

如果优化代码,并增加排名,结果可以如下:

packagecom.daishuimportcom.huaban.analysis.jieba.JiebaSegmenterimportorg.apache.spark.{SparkConf, SparkContext}
objectLy003 {
//分词defgetData(data: String): List[String] = {
valjieba=newJiebaSegmentervaljieba_data=jieba.sentenceProcess(data).toArray()
varrel_data: List[String] =List()
valblack_data=List("微博", "新浪", "账号", "客户端",
"凤凰", "分享", "礼物", "你", "我", "它", "的", "了"      , "在", "吧", "一个", "我们", "自己", "这个", "今天"    )
//因为jieba分词处于性能考虑,java版本去除了词性判断,所以这里采用正则+长度的方式判定valp_t="[\u4e00-\u9fa5]*"for (x<-jieba_data         ) {
if ((!black_data.contains(x)) &&x.toString.matches(p_t) &&x.toString.length>1) {
rel_data=List.concat(rel_data, List(x.toString))
      }
    }
rel_data  }
/*主方法*/defmain(args: Array[String]): Unit= {
valconf=newSparkConf().setMaster("local").setAppName("weibo")
valsc=newSparkContext(conf)
varidx=0valr1=sc.textFile("/Users/mac/IdeaProjects/ly04/src/main/resources/weibo_data.txt")
      .map(row=> (row.split("\t")(2).split(" ")(0).split("-")(1) +"月",
getData(row.split("\t")(6))))
      .map(v=> {
v._2.map(vi=> (v._1, vi))
      })
      .flatMap(x=>x)
      .map(v=> ((v._1, v._2), 1))
      .reduceByKey(_+_)
      .map(v=> (v._1._1, (v._1._2, v._2)))
      .groupByKey()
      .map(
item=> {
valindex=item._1valrow=item._2.toList.sortWith(_._2>_._2).take(10)
          (index, row)
        }
      )
      .map(v=> {
v._2.map(x=> (v._1, x))
      })
      .flatMap(x=>x)
      .map(v=> {
if (idx<=10) {
idx=idx+1          (idx, v._1, v._2._1, v._2._2)
        } else {
idx=1          (idx, v._1, v._2._1, v._2._2)
        }
      })
      .groupBy(_._1)
      .sortBy(_._1)
      .map(v=> (v._1, v._2.toList))
r1.repartition(1).saveAsTextFile(
"/Users/mac/IdeaProjects/ly04/src/main/resources/"+System.currentTimeMillis()
    )
  }
}

最后呈现结果会如下图:

实际操作中,发现有时候预期结果无法正确呈现,怀疑是跟破坏了函数返回类型有关

目录
相关文章
|
3月前
|
存储 分布式计算 Apache
Spark编程范例:Word Count示例解析
Spark编程范例:Word Count示例解析
|
4月前
|
存储 缓存 分布式计算
【云计算与大数据技术】Spark的解析(图文解释 超详细必看)
【云计算与大数据技术】Spark的解析(图文解释 超详细必看)
91 0
|
8月前
|
分布式计算 Hadoop 大数据
大数据技术解析:Hadoop、Spark、Flink和数据湖的对比
Hadoop、Spark、Flink 和数据湖都在大数据处理领域有着重要的地位,但它们各自的优势和劣势也需考虑实际应用场景。Hadoop 适用于批处理任务,Spark 更适合实时分析,而 Flink 则强调低延迟的流式处理。数据湖则是存储和管理大规模多样性数据的选择。
391 1
大数据技术解析:Hadoop、Spark、Flink和数据湖的对比
|
消息中间件 分布式计算 关系型数据库
流式读取热搜词汇并解析,urllib+Kafka+Spark
紧接上文,本次对于上次的优化是增加了kafka的插件,用简单消费者和生产者在本地window系统完成模拟,每五分钟爬取一次百度热搜,条数基本为145条,然后消费者来消费数据,写入到spark,下次的优化应该就是从sparksql转化为sparkstreaming,并直接整合kafka,而不是中间转row再写入。
86 0
|
机器学习/深度学习 分布式计算 算法
【Spark MLlib】(一)架构解析(包含分类、回归、聚类和协同过滤)
【Spark MLlib】(一)架构解析(包含分类、回归、聚类和协同过滤)
438 0
【Spark MLlib】(一)架构解析(包含分类、回归、聚类和协同过滤)
|
分布式计算 API 图计算
【Spark】(九)Spark GraphX 图计算解析2
【Spark】(九)Spark GraphX 图计算解析2
134 0
【Spark】(九)Spark GraphX 图计算解析2
|
分布式计算 算法 大数据
【Spark】(九)Spark GraphX 图计算解析1
【Spark】(九)Spark GraphX 图计算解析1
549 0
【Spark】(九)Spark GraphX 图计算解析1
|
SQL 分布式计算 HIVE
【Spark】(八)Spark SQL 应用解析2
【Spark】(八)Spark SQL 应用解析2
173 0
【Spark】(八)Spark SQL 应用解析2
|
SQL 分布式计算 关系型数据库
【Spark】(八)Spark SQL 应用解析1
【Spark】(八)Spark SQL 应用解析1
109 0
【Spark】(八)Spark SQL 应用解析1
|
3月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
159 0

推荐镜像

更多