使用Spark实现微博内容解析-每月前十最热词汇

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 简单spark+scala的demo,具体为map,flatmap,reducebykey等算子的运用

使用公开数据集,2015-02-01至2015-07-31微博博文数据,具体下载位置为https://tianchi.aliyun.com/dataset/51,第三列为发博时间,第六列为博文内容,分割键位为tab键

思路如下:

  1. 先将需要的数据分割拿出来,然后对博文进行分词,但是目前scala版本的jieba分词中没有词性,原因github上解释是处于性能的考虑,目前是根据停用词+长度来规避了不需要的词汇,稍微麻烦了一些
  2. 然后每一个词汇出现一次,记作量级1,根据词汇和日期对量级聚合
  3. 根据日期进行分组,同时对分组内聚合后的量级进行排序,同时截取前十条
packagecom.daishuimportcom.huaban.analysis.jieba.JiebaSegmenterimportorg.apache.spark.{SparkConf, SparkContext}
objectLy003 {
//分词defgetData(data: String): List[String] = {
valjieba=newJiebaSegmentervaljieba_data=jieba.sentenceProcess(data).toArray()
varrel_data: List[String] =List()
valblack_data=List("微博", "新浪", "账号", "客户端", "凤凰", "分享", "礼物", "你", "我", "它", "的", "了"      , "在", "吧", "一个")
//因为jieba分词处于性能考虑,java版本去除了词性判断,所以这里采用正则+长度的方式判定valp_t="[\u4e00-\u9fa5]*"for (x<-jieba_data         ) {
if ((!black_data.contains(x)) &&x.toString.matches(p_t) &&x.toString.length>1) {
rel_data=List.concat(rel_data, List(x.toString))
      }
    }
rel_data  }
/*主方法*/defmain(args: Array[String]): Unit= {
valconf=newSparkConf().setMaster("local[2]").setAppName("weibo")
valsc=newSparkContext(conf)
valr1=sc.textFile("/Users/mac/IdeaProjects/ly04/src/main/resources/weibo_data.txt")
//   按照tab键分割,并且将博文内容进行分词valr2=r1.map(row=>Tuple2(row.split("\t")(2).split(" ")(0).split("-")(1) +"月",
getData(row.split("\t")(6))))
//    将分词内容日期变为一个数组valr3=r2.map(v=> {
v._2.map(x=> (v._1, x))
    })
/*将一行拆分成多行*/valr4=r3.flatMap(x=>x)
//    每个单词+日期出现了一次valr5=r4.map(x=>Tuple2(Tuple2(x._1, x._2), 1))
//    按照key分组聚合valr6=r5.reduceByKey(_+_)
/*将词汇名称从key中抽离出来,并将日期+出现次数组合成新的key*/valr7=r6.map(v=>Tuple2(Tuple2(v._1._1, v._2), v._1._2))
//    根据聚合值进行降序排序valr8=r7.sortBy(_._1._2, false)
//    调整输出格式valr9=r8.map(v=>Tuple2(v._1._1, Tuple2(v._2, v._1._2)))
//    按照key分组排序并只要前十个valr10=r9.groupByKey().map(
item=> {
valindex=item._1valrow=item._2.toList.sortWith(_._2>_._2).take(10)
        (index, row)
      }
    )
valr100=r10.map(v=> {
v._2.map(x=> (v._1, x))
    })
      .flatMap(x=>x)
      .map(item=> (item._1, item._2._1, item._2._2))
r100.repartition(1).saveAsTextFile(
"/Users/mac/IdeaProjects/ly04/src/main/resources/"+System.currentTimeMillis()
    )
  }
}

如果优化代码,并增加排名,结果可以如下:

packagecom.daishuimportcom.huaban.analysis.jieba.JiebaSegmenterimportorg.apache.spark.{SparkConf, SparkContext}
objectLy003 {
//分词defgetData(data: String): List[String] = {
valjieba=newJiebaSegmentervaljieba_data=jieba.sentenceProcess(data).toArray()
varrel_data: List[String] =List()
valblack_data=List("微博", "新浪", "账号", "客户端",
"凤凰", "分享", "礼物", "你", "我", "它", "的", "了"      , "在", "吧", "一个", "我们", "自己", "这个", "今天"    )
//因为jieba分词处于性能考虑,java版本去除了词性判断,所以这里采用正则+长度的方式判定valp_t="[\u4e00-\u9fa5]*"for (x<-jieba_data         ) {
if ((!black_data.contains(x)) &&x.toString.matches(p_t) &&x.toString.length>1) {
rel_data=List.concat(rel_data, List(x.toString))
      }
    }
rel_data  }
/*主方法*/defmain(args: Array[String]): Unit= {
valconf=newSparkConf().setMaster("local").setAppName("weibo")
valsc=newSparkContext(conf)
varidx=0valr1=sc.textFile("/Users/mac/IdeaProjects/ly04/src/main/resources/weibo_data.txt")
      .map(row=> (row.split("\t")(2).split(" ")(0).split("-")(1) +"月",
getData(row.split("\t")(6))))
      .map(v=> {
v._2.map(vi=> (v._1, vi))
      })
      .flatMap(x=>x)
      .map(v=> ((v._1, v._2), 1))
      .reduceByKey(_+_)
      .map(v=> (v._1._1, (v._1._2, v._2)))
      .groupByKey()
      .map(
item=> {
valindex=item._1valrow=item._2.toList.sortWith(_._2>_._2).take(10)
          (index, row)
        }
      )
      .map(v=> {
v._2.map(x=> (v._1, x))
      })
      .flatMap(x=>x)
      .map(v=> {
if (idx<=10) {
idx=idx+1          (idx, v._1, v._2._1, v._2._2)
        } else {
idx=1          (idx, v._1, v._2._1, v._2._2)
        }
      })
      .groupBy(_._1)
      .sortBy(_._1)
      .map(v=> (v._1, v._2.toList))
r1.repartition(1).saveAsTextFile(
"/Users/mac/IdeaProjects/ly04/src/main/resources/"+System.currentTimeMillis()
    )
  }
}

最后呈现结果会如下图:

实际操作中,发现有时候预期结果无法正确呈现,怀疑是跟破坏了函数返回类型有关

目录
相关文章
|
8月前
|
机器学习/深度学习 SQL 分布式计算
Spark核心原理与应用场景解析:面试经验与必备知识点解析
本文深入探讨Spark核心原理(RDD、DAG、内存计算、容错机制)和生态系统(Spark SQL、MLlib、Streaming),并分析其在大规模数据处理、机器学习及实时流处理中的应用。通过代码示例展示DataFrame操作,帮助读者准备面试,同时强调结合个人经验、行业趋势和技术发展以展现全面的技术实力。
755 0
|
3月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
98 0
|
3月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
115 0
|
5月前
|
分布式计算 安全 OLAP
7倍性能提升|阿里云AnalyticDB Spark向量化能力解析
AnalyticDB Spark如何通过向量化引擎提升性能?
|
4月前
|
分布式计算 Java Apache
Apache Spark Streaming技术深度解析
【9月更文挑战第4天】Apache Spark Streaming是Apache Spark生态系统中用于处理实时数据流的一个重要组件。它将输入数据分成小批次(micro-batch),然后利用Spark的批处理引擎进行处理,从而结合了批处理和流处理的优点。这种处理方式使得Spark Streaming既能够保持高吞吐量,又能够处理实时数据流。
86 0
|
8月前
|
分布式计算 Scala Spark
Spark参数解析之MasterArguments
Spark参数解析之MasterArguments
52 0
|
8月前
|
存储 分布式计算 Apache
Spark编程范例:Word Count示例解析
Spark编程范例:Word Count示例解析
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
186 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
82 0
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
56 0

推荐镜像

更多