共享变量
广播变量(Broadcast Variables):广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本,简单理解:减少内存,减小计算压力;
累加器(Accumulators):累加器支持在所有不同节点之间进行累加计算(比如计数或者求和);
需求说明:以词频统计WordCount程序为例,处理特殊数据,包括非单词符号,做WordCount的同时统计出特殊字符的数量
原数据展示
注:原数据杂乱无章,与所需单词混淆,间隔且不等
业务逻辑
1、创建本地环境,并设置日志提示级别
val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN")
2、创建累加器
val mycounter: LongAccumulator = sc.longAccumulator("mycounter")
3、定义特殊字符集合
val ruleList: List[String] = List(",", ".", "!", "#", "$", "`", "~", "@", "?", "*", "^")
4、将集合作为广播变量广播到各个节点
val broadcast: Broadcast[List[String]] = sc.broadcast(ruleList)
5、加载数据,创建RDD
val lines: RDD[String] = sc.textFile("data/input/words2.txt")
6、过滤筛选
val wordcountResult: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_)) .flatMap(_.split("\\s+")) .filter(ch => { //获取广播数据 val list: List[String] = broadcast.value if (list.contains(ch)) { //特殊字符 mycounter.add(1) false } else { //单词 true } }).map((_, 1)) .reduceByKey(_ + _)
7、输出单词统计及特殊字符
wordcountResult.foreach(println) val chResult: lang.Long = mycounter.value println("特殊字符的数量:" + chResult)
完整代码
package org.example.spark import java.lang import org.apache.commons.lang3.StringUtils import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf, SparkContext} /** * Author tuomasi * Desc 演示RDD的共享变量 */ object RDD_ShareVariable { def main(args: Array[String]): Unit = { //TODO 0.env/创建环境 val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN") //创建计数器/累加器 val mycounter: LongAccumulator = sc.longAccumulator("mycounter") //定义一个特殊字符集合 val ruleList: List[String] = List(",", ".", "!", "#", "$", "`", "~", "@", "?", "*", "^") //将集合作为广播变量广播到各个节点 val broadcast: Broadcast[List[String]] = sc.broadcast(ruleList) //TODO 1.source/加载数据/创建RDD val lines: RDD[String] = sc.textFile("data/input/words2.txt") //TODO 2.transformation val wordcountResult: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_)) .flatMap(_.split("\\s+")) .filter(ch => { //获取广播数据 val list: List[String] = broadcast.value if (list.contains(ch)) { //特殊字符 mycounter.add(1) false } else { //单词 true } }).map((_, 1)) .reduceByKey(_ + _) //TODO 3.sink/输出 wordcountResult.foreach(println) val chResult: lang.Long = mycounter.value println("特殊字符的数量:" + chResult) } }
程序运行
原数据:
控制台打印:
注:通过对比,该程序实现了单词与特殊字符的分别统计
项目总结
使用广播变量的好处:
1、Driver每次分发任务的时候会把task和计算逻辑的变量发送给Executor。不使用广播变量,在每个Executor中有多少个task就有多少个Driver端变量副本。这样会导致消耗大量的内存导致严重的后果。
2、使用广播变量的好处,不需要每个task带上一份变量副本,而是变成每个节点的executor才一份副本。这样的话, 就可以让变量产生的副本大大减少;