大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友

简介: 大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节我们完成了如下的内容:


Spark 学习 WordCount 程序

Scala & Java 的方式分别编写 WordCount 程序

计算圆周率

需求背景

我们要实现一个程序来实现圆周率的计算,将利用下面的公式:

编写代码

package icu.wzk

import org.apache.spark.{SparkConf, SparkContext}
import scala.math.random


object SparkPi {

  def main(args: Array[String]): Unit = {
    var conf = new SparkConf()
      .setAppName("ScalaSparkPi")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val slices = if (args.length > 0) {
      args(0).toInt
    } else {
      0
    }
    val N =  100000000
    val count = sc.makeRDD(1 to N, slices)
      .map(idx => {
        val (x, y) = (random, random)
        if (x*x + y*y <= 1) {
          1
        } else {
          0
        }
      }).reduce(_ + _)
    println(s"Pi is ${4.0 * count / N}")
  }

}

代码部分截图如下所示:

代码解释

object SparkPi { … }

这个对象定义了一个 Scala 应用程序的入口。Scala 的 object 关键字用于定义一个单例对象,这意味着 SparkPi 只能有一个实例。


def main(args: Array[String]): Unit = { … }

main 方法是 Scala 应用程序的入口点,类似于 Java 中的 main 方法。args 是传递给程序的命令行参数,类型为 Array[String]。Unit 表示该方法没有返回值。


var conf = new SparkConf().setAppName(“ScalaSparkPi”)

SparkConf() 用于配置 Spark 应用程序。setAppName(“ScalaSparkPi”) 设置应用程序的名称为 ScalaSparkPi。

setMaster("local[]") 表示 Spark 应用程序将在本地运行,使用所有可用的 CPU 核心。local[] 是 Spark 中的特殊设置,表示本地模式下使用所有的 CPU 核心。

val sc = new SparkContext(conf)

SparkContext 是 Spark 应用程序的核心,负责与 Spark 集群进行交互。这里通过配置对象 conf 创建了一个新的 SparkContext 实例。


sc.setLogLevel(“WARN”)

设置日志的级别为 “WARN”。这意味着只会记录警告级别及以上的日志信息,减少不必要的日志输出。


val slices = if (args.length > 0) { … }

这段代码用来处理传递给程序的第一个参数,如果有参数传递过来,则将其转换为整数,作为分片数 slices。如果没有参数,则默认值为 0。


val N = 100000000

定义一个常量 N,表示将进行一亿次随机点的生成,以此来估算 \pi 值。


val count = sc.makeRDD(1 to N, slices)

sc.makeRDD(1 to N, slices) 创建一个包含从 1 到 N 的整数的 RDD(弹性分布式数据集),并将其划分为 slices 个分片进行并行计算。

map(idx => { … }) 是对 RDD 中的每个元素进行映射操作。对于每个 idx,生成两个随机数 x 和 y,分别表示点的 x 和 y 坐标。

if (xx + yy <= 1) 判断点 (x, y) 是否在单位圆内。如果在圆内,则返回 1,否则返回 0。

reduce(_ + _)

reduce(_ + _) 将所有的 1 和 0 相加,得到在单位圆内的点的总数。

println(s"Pi is ${4.0 * count / N}")

计算 \pi 的估计值:使用公式 \pi \approx 4 \times (\text{圆内点的数量} / \text{总点数})。

输出计算结果。

打包上传

mvn clean package
• 1

打包完成上传Jar包:

运行项目

spark-submit --master local[*] --class icu.wzk.SparkPi spark-wordcount-1.0-SNAPSHOT.jar 15
• 1

运行等待结果

运行完毕的结果如下:

找共同好友

需求背景

目前有一组数据

100, 200 300 400 500 600
200, 100 300 400
300, 100 200 400 500
400, 100 200 300
500, 100 300
600, 100

第一列表示用户,后边的数字表示该用户的好友,我们要对上面的这几列进行分析计算,得出共同的好友。

编写代码

方法一

核心思想利用笛卡尔积求两两之间的好友 然后去除多余的数据

package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FindFriends {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SparkFindFriends")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val lines: RDD[String] = sc.textFile(args(0))
    val friendsRDD: RDD[(String, Array[String])] = lines.map{
      line =>
        val fields: Array[String] = line.split(",")
        val userId = fields(0).trim
        val friends:  Array[String] = fields(1).trim.split("\\s+")
        (userId, friends)
    }
    friendsRDD
      .cartesian(friendsRDD)
      .filter({
        case ((id1, _), (id2, _)) => id1 < id2
      })
      .map{
        case ((id1, friends1), (id2, friends2)) => ((id1, id2), friends1.intersect(friends2).sorted.toBuffer)
      }
      .sortByKey()
      .collect()
      .foreach(println)
    sc.stop()
  }

}

方法二

消除笛卡尔积 核心思想是:将数据变形,找到两两的好友,再执行数据的合并

package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FindFriends2 {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SparkFindFriends")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val lines: RDD[String] = sc.textFile(args(0))
    val friendsRDD: RDD[(String, Array[String])] = lines.map{
      line =>
        val fields: Array[String] = line.split(",")
        val userId = fields(0).trim
        val friends:  Array[String] = fields(1).trim.split("\\s+")
        (userId, friends)
    }
    friendsRDD
      .flatMapValues(friends => friends.combinations(2))
      .map{
        case (k, v) => (v.mkString(" & "), Set(k))
      }
      .reduceByKey(_ | _)
      .sortByKey()
      .collect()
      .foreach(println)
    sc.stop()
  }

}

打包上传

运行项目

方法一

spark-submit --master local[*] --class icu.wzk.FindFriends spark-wordcount-1.0-SNAPSH

运行结果如下图:

方法二

spark-submit --master local[*] --class icu.wzk.FindFriends2 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/friends.txt

运行结果如下图所示:

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
8月前
|
存储 SQL 分布式计算
大数据之路:阿里巴巴大数据实践——元数据与计算管理
本内容系统讲解了大数据体系中的元数据管理与计算优化。元数据部分涵盖技术、业务与管理元数据的分类及平台工具,并介绍血缘捕获、智能推荐与冷热分级等技术创新。元数据应用于数据标签、门户管理与建模分析。计算管理方面,深入探讨资源调度失衡、数据倾斜、小文件及长尾任务等问题,提出HBO与CBO优化策略及任务治理方案,全面提升资源利用率与任务执行效率。
638 0
|
存储 负载均衡 算法
大数据散列分区计算哈希值
大数据散列分区计算哈希值
293 4
|
人工智能 分布式计算 调度
打破资源边界、告别资源浪费:ACK One 多集群Spark和AI作业调度
ACK One多集群Spark作业调度,可以帮助您在不影响集群中正在运行的在线业务的前提下,打破资源边界,根据各集群实际剩余资源来进行调度,最大化您多集群中闲置资源的利用率。
|
关系型数据库 MySQL 大数据
大数据新视界--大数据大厂之MySQL 数据库课程设计:MySQL 数据库 SQL 语句调优的进阶策略与实际案例(2-2)
本文延续前篇,深入探讨 MySQL 数据库 SQL 语句调优进阶策略。包括优化索引使用,介绍多种索引类型及避免索引失效等;调整数据库参数,如缓冲池、连接数和日志参数;还有分区表、垂直拆分等其他优化方法。通过实际案例分析展示调优效果。回顾与数据库课程设计相关文章,强调全面认识 MySQL 数据库重要性。为读者提供综合调优指导,确保数据库高效运行。
|
存储 分布式计算 调度
Spark Master HA 主从切换过程不会影响到集群已有作业的运行, 为什么?
Spark Master 的高可用性(HA)机制确保主节点故障时,备用主节点能无缝接管集群管理,保障稳定运行。关键在于: 1. **Driver 和 Executor 独立**:任务执行不依赖 Master。 2. **应用状态保持**:备用 Master 通过 ZooKeeper 恢复集群状态。 3. **ZooKeeper 协调**:快速选举新 Master 并同步状态。 4. **容错机制**:任务可在其他 Executor 上重新调度。 这些特性保证了集群在 Master 故障时仍能正常运行。
|
SQL 分布式计算 DataWorks
MaxCompute MaxFrame评测 | 分布式Python计算服务MaxFrame(完整操作版)
在当今数字化迅猛发展的时代,数据信息的保存与分析对企业决策至关重要。MaxCompute MaxFrame是阿里云自研的分布式计算框架,支持Python编程接口、兼容Pandas接口并自动进行分布式计算。通过MaxCompute的海量计算资源,企业可以进行大规模数据处理、可视化数据分析及科学计算等任务。本文将详细介绍如何开通MaxCompute和DataWorks服务,并使用MaxFrame进行数据操作。包括创建项目、绑定数据源、编写PyODPS 3节点代码以及执行SQL查询等内容。最后,针对使用过程中遇到的问题提出反馈建议,帮助用户更好地理解和使用MaxFrame。
|
SQL 分布式计算 NoSQL
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
253 1
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
|
分布式计算 Java MaxCompute
ODPS MR节点跑graph连通分量计算代码报错java heap space如何解决
任务启动命令:jar -resources odps-graph-connect-family-2.0-SNAPSHOT.jar -classpath ./odps-graph-connect-family-2.0-SNAPSHOT.jar ConnectFamily 若是设置参数该如何设置
|
分布式计算 大数据 Linux
大数据体系知识学习(二):WordCount案例实现及错误总结
这篇文章介绍了如何使用PySpark进行WordCount操作,包括环境配置、代码实现、运行结果和遇到的错误。作者在运行过程中遇到了Py4JJavaError和JAVA_HOME未设置的问题,并通过导入findspark初始化和设置环境变量解决了这些问题。文章还讨论了groupByKey和reduceByKey的区别。
303 1
|
存储 大数据 分布式数据库
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
286 1
下一篇
开通oss服务