六十四、Spark-分别统计各个单词个数及特殊字符总个数

简介: 六十四、Spark-分别统计各个单词个数及特殊字符总个数

共享变量


广播变量(Broadcast Variables):广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本,简单理解:减少内存,减小计算压力;


累加器(Accumulators):累加器支持在所有不同节点之间进行累加计算(比如计数或者求和);


需求说明:以词频统计WordCount程序为例,处理特殊数据,包括非单词符号,做WordCount的同时统计出特殊字符的数量



原数据展示



33.png

        注:原数据杂乱无章,与所需单词混淆,间隔且不等


业务逻辑


1、创建本地环境,并设置日志提示级别


val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")

2、创建累加器


val mycounter: LongAccumulator = sc.longAccumulator("mycounter")

3、定义特殊字符集合


val ruleList: List[String] = List(",", ".", "!", "#", "$", "`", "~", "@", "?", "*", "^")

4、将集合作为广播变量广播到各个节点


val broadcast: Broadcast[List[String]] = sc.broadcast(ruleList)

5、加载数据,创建RDD


val lines: RDD[String] = sc.textFile("data/input/words2.txt")

6、过滤筛选


val wordcountResult: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
      .flatMap(_.split("\\s+"))
      .filter(ch => {
        //获取广播数据
        val list: List[String] = broadcast.value
        if (list.contains(ch)) { //特殊字符
          mycounter.add(1)
          false
        } else { //单词
          true
        }
      }).map((_, 1))
      .reduceByKey(_ + _)

7、输出单词统计及特殊字符


wordcountResult.foreach(println)
val chResult: lang.Long = mycounter.value
println("特殊字符的数量:" + chResult)

完整代码


package org.example.spark
import java.lang
import org.apache.commons.lang3.StringUtils
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
/**
 * Author tuomasi
 * Desc 演示RDD的共享变量
 */
object RDD_ShareVariable {
  def main(args: Array[String]): Unit = {
    //TODO 0.env/创建环境
    val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")
    //创建计数器/累加器
    val mycounter: LongAccumulator = sc.longAccumulator("mycounter")
    //定义一个特殊字符集合
    val ruleList: List[String] = List(",", ".", "!", "#", "$", "`", "~", "@", "?", "*", "^")
    //将集合作为广播变量广播到各个节点
    val broadcast: Broadcast[List[String]] = sc.broadcast(ruleList)
    //TODO 1.source/加载数据/创建RDD
    val lines: RDD[String] = sc.textFile("data/input/words2.txt")
    //TODO 2.transformation
    val wordcountResult: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
      .flatMap(_.split("\\s+"))
      .filter(ch => {
        //获取广播数据
        val list: List[String] = broadcast.value
        if (list.contains(ch)) { //特殊字符
          mycounter.add(1)
          false
        } else { //单词
          true
        }
      }).map((_, 1))
      .reduceByKey(_ + _)
    //TODO 3.sink/输出
    wordcountResult.foreach(println)
    val chResult: lang.Long = mycounter.value
    println("特殊字符的数量:" + chResult)
  }
}

程序运行


原数据:


34.png


控制台打印:


35.png


        注:通过对比,该程序实现了单词与特殊字符的分别统计


项目总结


使用广播变量的好处:


1、Driver每次分发任务的时候会把task和计算逻辑的变量发送给Executor。不使用广播变量,在每个Executor中有多少个task就有多少个Driver端变量副本。这样会导致消耗大量的内存导致严重的后果。

2、使用广播变量的好处,不需要每个task带上一份变量副本,而是变成每个节点的executor才一份副本。这样的话, 就可以让变量产生的副本大大减少;


36.png


 


相关文章
|
消息中间件 分布式计算 Kafka
195 Spark Streaming整合Kafka完成网站点击流实时统计
195 Spark Streaming整合Kafka完成网站点击流实时统计
74 0
|
分布式计算 关系型数据库 MySQL
Spark Streaming实时流处理项目实战笔记——将统计结果写入到MySQL数据库中
Spark Streaming实时流处理项目实战笔记——将统计结果写入到MySQL数据库中
Spark Streaming实时流处理项目实战笔记——将统计结果写入到MySQL数据库中
|
5月前
|
分布式计算 资源调度 Java
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
58 0
|
5月前
|
分布式计算 Hadoop Scala
Scala +Spark+Hadoop+Zookeeper+IDEA实现WordCount单词计数(简单实例-上)
Scala +Spark+Hadoop+Zookeeper+IDEA实现WordCount单词计数(简单实例-上)
49 0
|
5月前
|
分布式计算 定位技术 Scala
使用spark基于出租车GPS数据实现车辆数量统计以及北京每个城区的车辆位置点数分析
使用spark基于出租车GPS数据实现车辆数量统计以及北京每个城区的车辆位置点数分析
110 0
|
6月前
|
机器学习/深度学习 分布式计算 大数据
【大数据技术】Spark MLlib机器学习特征抽取 TF-IDF统计词频实战(附源码和数据集)
【大数据技术】Spark MLlib机器学习特征抽取 TF-IDF统计词频实战(附源码和数据集)
75 0
|
6月前
|
存储 分布式计算 搜索推荐
【大数据技术Hadoop+Spark】MapReduce之单词计数和倒排索引实战(附源码和数据集 超详细)
【大数据技术Hadoop+Spark】MapReduce之单词计数和倒排索引实战(附源码和数据集 超详细)
195 0
|
SQL JSON 分布式计算
【大数据学习篇10】Spark项目实战~网站转化率统计
【大数据学习篇10】Spark项目实战~网站转化率统计
491 0
【大数据学习篇10】Spark项目实战~网站转化率统计
|
存储 分布式计算 Java
JAVA Spark rdd使用Spark编程实现:统计出每个省份广 告被点击次数的TOP3
JAVA Spark rdd使用Spark编程实现:统计出每个省份广 告被点击次数的TOP3
143 0
|
分布式计算 大数据 Hadoop
大数据实验——用Spark实现wordcount单词统计
大数据实验——用Spark实现wordcount单词统计
大数据实验——用Spark实现wordcount单词统计

热门文章

最新文章