使用Spark实现微博内容解析-每月前十最热词汇

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: 简单spark+scala的demo,具体为map,flatmap,reducebykey等算子的运用

使用公开数据集,2015-02-01至2015-07-31微博博文数据,具体下载位置为https://tianchi.aliyun.com/dataset/51,第三列为发博时间,第六列为博文内容,分割键位为tab键

思路如下:

  1. 先将需要的数据分割拿出来,然后对博文进行分词,但是目前scala版本的jieba分词中没有词性,原因github上解释是处于性能的考虑,目前是根据停用词+长度来规避了不需要的词汇,稍微麻烦了一些
  2. 然后每一个词汇出现一次,记作量级1,根据词汇和日期对量级聚合
  3. 根据日期进行分组,同时对分组内聚合后的量级进行排序,同时截取前十条
packagecom.daishuimportcom.huaban.analysis.jieba.JiebaSegmenterimportorg.apache.spark.{SparkConf, SparkContext}
objectLy003 {
//分词defgetData(data: String): List[String] = {
valjieba=newJiebaSegmentervaljieba_data=jieba.sentenceProcess(data).toArray()
varrel_data: List[String] =List()
valblack_data=List("微博", "新浪", "账号", "客户端", "凤凰", "分享", "礼物", "你", "我", "它", "的", "了"      , "在", "吧", "一个")
//因为jieba分词处于性能考虑,java版本去除了词性判断,所以这里采用正则+长度的方式判定valp_t="[\u4e00-\u9fa5]*"for (x<-jieba_data         ) {
if ((!black_data.contains(x)) &&x.toString.matches(p_t) &&x.toString.length>1) {
rel_data=List.concat(rel_data, List(x.toString))
      }
    }
rel_data  }
/*主方法*/defmain(args: Array[String]): Unit= {
valconf=newSparkConf().setMaster("local[2]").setAppName("weibo")
valsc=newSparkContext(conf)
valr1=sc.textFile("/Users/mac/IdeaProjects/ly04/src/main/resources/weibo_data.txt")
//   按照tab键分割,并且将博文内容进行分词valr2=r1.map(row=>Tuple2(row.split("\t")(2).split(" ")(0).split("-")(1) +"月",
getData(row.split("\t")(6))))
//    将分词内容日期变为一个数组valr3=r2.map(v=> {
v._2.map(x=> (v._1, x))
    })
/*将一行拆分成多行*/valr4=r3.flatMap(x=>x)
//    每个单词+日期出现了一次valr5=r4.map(x=>Tuple2(Tuple2(x._1, x._2), 1))
//    按照key分组聚合valr6=r5.reduceByKey(_+_)
/*将词汇名称从key中抽离出来,并将日期+出现次数组合成新的key*/valr7=r6.map(v=>Tuple2(Tuple2(v._1._1, v._2), v._1._2))
//    根据聚合值进行降序排序valr8=r7.sortBy(_._1._2, false)
//    调整输出格式valr9=r8.map(v=>Tuple2(v._1._1, Tuple2(v._2, v._1._2)))
//    按照key分组排序并只要前十个valr10=r9.groupByKey().map(
item=> {
valindex=item._1valrow=item._2.toList.sortWith(_._2>_._2).take(10)
        (index, row)
      }
    )
valr100=r10.map(v=> {
v._2.map(x=> (v._1, x))
    })
      .flatMap(x=>x)
      .map(item=> (item._1, item._2._1, item._2._2))
r100.repartition(1).saveAsTextFile(
"/Users/mac/IdeaProjects/ly04/src/main/resources/"+System.currentTimeMillis()
    )
  }
}

如果优化代码,并增加排名,结果可以如下:

packagecom.daishuimportcom.huaban.analysis.jieba.JiebaSegmenterimportorg.apache.spark.{SparkConf, SparkContext}
objectLy003 {
//分词defgetData(data: String): List[String] = {
valjieba=newJiebaSegmentervaljieba_data=jieba.sentenceProcess(data).toArray()
varrel_data: List[String] =List()
valblack_data=List("微博", "新浪", "账号", "客户端",
"凤凰", "分享", "礼物", "你", "我", "它", "的", "了"      , "在", "吧", "一个", "我们", "自己", "这个", "今天"    )
//因为jieba分词处于性能考虑,java版本去除了词性判断,所以这里采用正则+长度的方式判定valp_t="[\u4e00-\u9fa5]*"for (x<-jieba_data         ) {
if ((!black_data.contains(x)) &&x.toString.matches(p_t) &&x.toString.length>1) {
rel_data=List.concat(rel_data, List(x.toString))
      }
    }
rel_data  }
/*主方法*/defmain(args: Array[String]): Unit= {
valconf=newSparkConf().setMaster("local").setAppName("weibo")
valsc=newSparkContext(conf)
varidx=0valr1=sc.textFile("/Users/mac/IdeaProjects/ly04/src/main/resources/weibo_data.txt")
      .map(row=> (row.split("\t")(2).split(" ")(0).split("-")(1) +"月",
getData(row.split("\t")(6))))
      .map(v=> {
v._2.map(vi=> (v._1, vi))
      })
      .flatMap(x=>x)
      .map(v=> ((v._1, v._2), 1))
      .reduceByKey(_+_)
      .map(v=> (v._1._1, (v._1._2, v._2)))
      .groupByKey()
      .map(
item=> {
valindex=item._1valrow=item._2.toList.sortWith(_._2>_._2).take(10)
          (index, row)
        }
      )
      .map(v=> {
v._2.map(x=> (v._1, x))
      })
      .flatMap(x=>x)
      .map(v=> {
if (idx<=10) {
idx=idx+1          (idx, v._1, v._2._1, v._2._2)
        } else {
idx=1          (idx, v._1, v._2._1, v._2._2)
        }
      })
      .groupBy(_._1)
      .sortBy(_._1)
      .map(v=> (v._1, v._2.toList))
r1.repartition(1).saveAsTextFile(
"/Users/mac/IdeaProjects/ly04/src/main/resources/"+System.currentTimeMillis()
    )
  }
}

最后呈现结果会如下图:

实际操作中,发现有时候预期结果无法正确呈现,怀疑是跟破坏了函数返回类型有关

目录
相关文章
|
6月前
|
机器学习/深度学习 SQL 分布式计算
Spark核心原理与应用场景解析:面试经验与必备知识点解析
本文深入探讨Spark核心原理(RDD、DAG、内存计算、容错机制)和生态系统(Spark SQL、MLlib、Streaming),并分析其在大规模数据处理、机器学习及实时流处理中的应用。通过代码示例展示DataFrame操作,帮助读者准备面试,同时强调结合个人经验、行业趋势和技术发展以展现全面的技术实力。
536 0
|
1月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
42 0
|
1月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
78 0
|
3月前
|
分布式计算 安全 OLAP
7倍性能提升|阿里云AnalyticDB Spark向量化能力解析
AnalyticDB Spark如何通过向量化引擎提升性能?
|
2月前
|
分布式计算 Java Apache
Apache Spark Streaming技术深度解析
【9月更文挑战第4天】Apache Spark Streaming是Apache Spark生态系统中用于处理实时数据流的一个重要组件。它将输入数据分成小批次(micro-batch),然后利用Spark的批处理引擎进行处理,从而结合了批处理和流处理的优点。这种处理方式使得Spark Streaming既能够保持高吞吐量,又能够处理实时数据流。
70 0
|
6月前
|
分布式计算 Scala Spark
Spark参数解析之MasterArguments
Spark参数解析之MasterArguments
40 0
|
6月前
|
存储 分布式计算 Apache
Spark编程范例:Word Count示例解析
Spark编程范例:Word Count示例解析
|
3天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
14 2
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
66 0
|
1月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
52 0

推荐镜像

更多