Spark-编程进阶(Scala版)

简介: Spark-编程进阶(Scala版)

累加器

累加器提供了将工作节点中的值聚合到驱动器程序中的简单语法。累加器的一个常见用法是在调测时对作业执行过程中的时间进行计数。


例:累加空行

val sc = new SparkContext()
val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0)//创建Accumulator[Int]并初始化为0
val callSigns = file.flatMap(line =>{
  if(line == ""){
   blankLines += 1    //累加器+1
  }
  line.split(" ")
})
callSigns.saveAsSequenceFile("output.txt")
println("blank Lines:" + blankLines.value)

广播变量

广播变量可以让程序高效的向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作应用


广播变量就是类型为spark.broadcast.Broadcast[T]的一个对象,其中存放着类型为T的值。可以在任务中通过对Broadcast对象调用value来获取该对象的值。这个值只会被发送到各个节点一次,使用的是一种高效的类似BitTorrent的机制。


例:在scala中使用广播变量查询国家

val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map{case (sign,count) =>
  val country = lookupInArray(sign,signPrefixes.value)
  (country,count)
  }.reduceByKey((x,y) => x + y)
  countryContactCounts.saveAsSequenceFile(outputDir + "/countries.txt")

基于分区进行操作

基于分区对数据进行操作可以让我们避免为每个数据元素进行重复的配置工作,Spark提供基于分区的map和foreach,让你的部分代码只对RDD的每个分区运行一次,以用来降低操作代价。


例:使用共享连接池与JSON解释器

val contactsContactLists = validSigns.distinct().mapPartitions{
  signs => 
  val mapper = create Mapper()
  val client = new HttpClient()
  client.start()
  //创建http请求
  sign.map {sign =>
    createExchangeForSign(sign)
  //获取响应
  }.map{case (sign,exchange) =>
      (sign, readExchangeCallLog(mapper,exchange))
  }.filter(x => x._2 !=null) //删除空的呼叫日志
}

数值RDD的操作

spark的数值操作是通过流式算法实现的,允许以每次一个元素的方式构建出模型,这些统计数据都会在调用stats时通过一次遍历数据计算出来,并以StatsCounter对象返回,下面列出StatsCounter上可用的方法。

image.png


如果只想计算这些统计数据中的一个,可以直接对RDD调用对应方法:rdd.max() /rdd.min()

相关文章
|
2月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
|
3月前
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
|
2月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
5天前
|
安全 编译器 Scala
何时需要指定泛型:Scala编程指南
本文是Scala编程指南,介绍了何时需要指定泛型类型参数。泛型提供代码重用和类型安全性,但在编译器无法推断类型、需要提高代码清晰度、调用泛型方法或创建泛型集合时,应明确指定类型参数。通过示例展示了泛型在避免类型错误和增强编译时检查方面的作用,强调了理解泛型使用时机对编写高效Scala代码的重要性。
16 1
何时需要指定泛型:Scala编程指南
|
2月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
139 1
|
3月前
|
分布式计算 并行计算 Hadoop
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
|
3月前
|
分布式计算 Java Scala
spark 与 scala 的对应版本查看、在idea中maven版本不要选择17,弄了好久,换成11就可以啦
spark 与 scala 的对应版本查看、.在idea中maven版本不要选择17,弄了好久,换成11就可以啦
115 2
|
3月前
|
分布式计算 数据处理 Scala
Spark 集群和 Scala 编程语言的关系
Spark 集群和 Scala 编程语言的关系
32 0
|
4月前
|
存储 分布式计算 Apache
Spark编程范例:Word Count示例解析
Spark编程范例:Word Count示例解析
|
SQL 消息中间件 分布式计算
如何查看spark与hadoop、kafka、Scala、flume、hive等兼容版本【适用于任何版本】
如何查看spark与hadoop、kafka、Scala、flume、hive等兼容版本【适用于任何版本】
689 0
如何查看spark与hadoop、kafka、Scala、flume、hive等兼容版本【适用于任何版本】