点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(正在更新!)
章节内容
上节完成的内容如下:
RDD容错机制
RDD分区机制
RDD分区器
RDD自定义分区器
广播变量
基本介绍
有时候需要在多个任务之间共享变量,或者在任务(Task)和 Driver Program 之间共享变量。
为了满足这个需求,Spark提供了两种类型的变量。
广播变量(broadcast variable)
累加器(accumulators)
广播变量、累加器的主要作用是为了优化Spark程序。
广播变量将变量在节点的Executor之间进行共享(由Driver广播),广播变量用来高效分发较大的对象,向所有工作节点(Executor)发送一个较大的只读值,以供一个或多个操作使用。
使用广播变量的过程如下:
对一个类型T的对象调用SparkContext.broadcast创建一个Broadcast[T]对象,任何可序列化的类型都可以这么实现(在Driver端)
通过Value属性访问该对象的值(Executor中)
变量只会被分到各个Executor一次,作为只读值处理
广播变量的相关参数:
- spark.broadcast.blockSize(缺省值: 4m)
- spark.broadcast.checksum(缺省值:true)
- spark.broadcast.compree(缺省值:true)
变量应用
普通JOIN
MapSideJoin
生成数据 test_spark_01.txt
1000;商品1 1001;商品2 1002;商品3 1003;商品4 1004;商品5 1005;商品6 1006;商品7 1007;商品8 1008;商品9
生成数据格式如下:
生成数据 test_spark_02.txt
10000;订单1;1000 10001;订单2;1001 10002;订单3;1002 10003;订单4;1003 10004;订单5;1004 10005;订单6;1005 10006;订单7;1006 10007;订单8;1007 10008;订单9;1008
生成的数据格式如下:
编写代码1
我们编写代码进行测试
package icu.wzk import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object JoinDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("JoinDemo") .setMaster("local[*]") val sc = new SparkContext(conf) sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024) val productRDD: RDD[(String, String)] = sc .textFile("data/test_spark_01.txt") .map { line => val fields = line.split(";") (fields(0), line) } val orderRDD: RDD[(String, String)] = sc .textFile("data/test_spark_02.txt", 8) .map { line => val fields = line.split(";") (fields(2), line) } val resultRDD = productRDD.join(orderRDD) println(resultRDD.count()) Thread.sleep(100000) sc.stop() } }
编译打包1
mvn clean package • 1
并上传到服务器,准备运行
运行测试1
spark-submit --master local[*] --class icu.wzk.JoinDemo spark-wordcount-1.0-SNAPSHOT.jar • 1
提交任务并执行,注意数据的路径,查看下图:
运行结果可以查看到,运行了: 2.203100 秒 (取决于你的数据量的多少)
运行结果可以查看到,运行了: 2.203100 秒 (取决于你的数据量的多少)
2024-07-19 10:35:08,808 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(54)) - Job 0 finished: count at JoinDemo.scala:32, took 2.203100 s 200
编写代码2
接下来,我们对比使用 MapSideJoin 的方式
package icu.wzk import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object MapSideJoin { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("MapSideJoin") .setMaster("local[*]") val sc = new SparkContext(conf) sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024) val productRDD: RDD[(String, String)] = sc .textFile("data/test_spark_01.txt") .map { line => val fields = line.split(";") (fields(0), line) } val productBC = sc.broadcast(productRDD.collectAsMap()) val orderRDD: RDD[(String, String)] = sc .textFile("data/test_spark_02.txt") .map { line => val fields = line.split(";") (fields(2), line) } val resultRDD = orderRDD .map { case (pid, orderInfo) => val productInfo = productBC.value (pid, (orderInfo, productInfo.getOrElse(pid, null))) } println(resultRDD.count()) sc.stop() } }
编译打包2
mvn clean package • 1
编译后上传到服务器准备执行:
运行测试2
spark-submit --master local[*] --class icu.wzk.MapSideJoin spark-wordcount-1.0-SNAPSHOT.jar
启动我们的程序,并观察结果
我们可以观察到,这次只用了 0.10078 秒就完成了任务:
累加器
基本介绍
累加器的作用:可以实现一个变量在不同的Executor端能保持状态的累加。
累加器在Driver端定义、读取,在Executor中完成累加。
累加器也是Lazy的,需要Action触发:Action触发一次,执行一次;触发多次,执行多次。
Spark内置了三种类型的累加器,分别是:
LongAccumulator 用来累加整数型
DoubleAccumulator 用来累加浮点型
CollectionAccumulator 用来累加集合元素
运行测试
我们可以在 SparkShell 中进行一些简单的测试,目前我在 h122 节点上,启动SparkShell
spark-shell --master local[*]
启动的主界面如下:
写入如下的内容进行测试:
val data = sc.makeRDD("hadoop spark hive hbase java scala hello world spark scala java hive".split("\\s+")) val acc1 = sc.longAccumulator("totalNum1") val acc2 = sc.doubleAccumulator("totalNum2") val acc3 = sc.collectionAccumulator[String]("allwords")
我们进行测试的结果如下图所示:
继续编写一段进行测试:
val rdd = data.map{word => acc1.add(word.length); acc2.add(word.length); acc3.add(word); word} rdd.count rdd.collect println(acc1.value) println(acc2.value) println(acc3.value)
我们进行测试的结果如下: