大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节我们完成了如下的内容:


Spark 学习 WordCount 程序

Scala & Java 的方式分别编写 WordCount 程序

计算圆周率

需求背景

我们要实现一个程序来实现圆周率的计算,将利用下面的公式:

编写代码

package icu.wzk

import org.apache.spark.{SparkConf, SparkContext}
import scala.math.random


object SparkPi {

  def main(args: Array[String]): Unit = {
    var conf = new SparkConf()
      .setAppName("ScalaSparkPi")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val slices = if (args.length > 0) {
      args(0).toInt
    } else {
      0
    }
    val N =  100000000
    val count = sc.makeRDD(1 to N, slices)
      .map(idx => {
        val (x, y) = (random, random)
        if (x*x + y*y <= 1) {
          1
        } else {
          0
        }
      }).reduce(_ + _)
    println(s"Pi is ${4.0 * count / N}")
  }

}

代码部分截图如下所示:

代码解释

object SparkPi { … }

这个对象定义了一个 Scala 应用程序的入口。Scala 的 object 关键字用于定义一个单例对象,这意味着 SparkPi 只能有一个实例。


def main(args: Array[String]): Unit = { … }

main 方法是 Scala 应用程序的入口点,类似于 Java 中的 main 方法。args 是传递给程序的命令行参数,类型为 Array[String]。Unit 表示该方法没有返回值。


var conf = new SparkConf().setAppName(“ScalaSparkPi”)

SparkConf() 用于配置 Spark 应用程序。setAppName(“ScalaSparkPi”) 设置应用程序的名称为 ScalaSparkPi。

setMaster("local[]") 表示 Spark 应用程序将在本地运行,使用所有可用的 CPU 核心。local[] 是 Spark 中的特殊设置,表示本地模式下使用所有的 CPU 核心。

val sc = new SparkContext(conf)

SparkContext 是 Spark 应用程序的核心,负责与 Spark 集群进行交互。这里通过配置对象 conf 创建了一个新的 SparkContext 实例。


sc.setLogLevel(“WARN”)

设置日志的级别为 “WARN”。这意味着只会记录警告级别及以上的日志信息,减少不必要的日志输出。


val slices = if (args.length > 0) { … }

这段代码用来处理传递给程序的第一个参数,如果有参数传递过来,则将其转换为整数,作为分片数 slices。如果没有参数,则默认值为 0。


val N = 100000000

定义一个常量 N,表示将进行一亿次随机点的生成,以此来估算 \pi 值。


val count = sc.makeRDD(1 to N, slices)

sc.makeRDD(1 to N, slices) 创建一个包含从 1 到 N 的整数的 RDD(弹性分布式数据集),并将其划分为 slices 个分片进行并行计算。

map(idx => { … }) 是对 RDD 中的每个元素进行映射操作。对于每个 idx,生成两个随机数 x 和 y,分别表示点的 x 和 y 坐标。

if (xx + yy <= 1) 判断点 (x, y) 是否在单位圆内。如果在圆内,则返回 1,否则返回 0。

reduce(_ + _)

reduce(_ + _) 将所有的 1 和 0 相加,得到在单位圆内的点的总数。

println(s"Pi is ${4.0 * count / N}")

计算 \pi 的估计值:使用公式 \pi \approx 4 \times (\text{圆内点的数量} / \text{总点数})。

输出计算结果。

打包上传

mvn clean package
• 1

打包完成上传Jar包:

运行项目

spark-submit --master local[*] --class icu.wzk.SparkPi spark-wordcount-1.0-SNAPSHOT.jar 15
• 1

运行等待结果

运行完毕的结果如下:

找共同好友

需求背景

目前有一组数据

100, 200 300 400 500 600
200, 100 300 400
300, 100 200 400 500
400, 100 200 300
500, 100 300
600, 100

第一列表示用户,后边的数字表示该用户的好友,我们要对上面的这几列进行分析计算,得出共同的好友。

编写代码

方法一

核心思想利用笛卡尔积求两两之间的好友 然后去除多余的数据

package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FindFriends {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SparkFindFriends")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val lines: RDD[String] = sc.textFile(args(0))
    val friendsRDD: RDD[(String, Array[String])] = lines.map{
      line =>
        val fields: Array[String] = line.split(",")
        val userId = fields(0).trim
        val friends:  Array[String] = fields(1).trim.split("\\s+")
        (userId, friends)
    }
    friendsRDD
      .cartesian(friendsRDD)
      .filter({
        case ((id1, _), (id2, _)) => id1 < id2
      })
      .map{
        case ((id1, friends1), (id2, friends2)) => ((id1, id2), friends1.intersect(friends2).sorted.toBuffer)
      }
      .sortByKey()
      .collect()
      .foreach(println)
    sc.stop()
  }

}

方法二

消除笛卡尔积 核心思想是:将数据变形,找到两两的好友,再执行数据的合并

package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FindFriends2 {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SparkFindFriends")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val lines: RDD[String] = sc.textFile(args(0))
    val friendsRDD: RDD[(String, Array[String])] = lines.map{
      line =>
        val fields: Array[String] = line.split(",")
        val userId = fields(0).trim
        val friends:  Array[String] = fields(1).trim.split("\\s+")
        (userId, friends)
    }
    friendsRDD
      .flatMapValues(friends => friends.combinations(2))
      .map{
        case (k, v) => (v.mkString(" & "), Set(k))
      }
      .reduceByKey(_ | _)
      .sortByKey()
      .collect()
      .foreach(println)
    sc.stop()
  }

}

打包上传

运行项目

方法一

spark-submit --master local[*] --class icu.wzk.FindFriends spark-wordcount-1.0-SNAPSH

运行结果如下图:

方法二

spark-submit --master local[*] --class icu.wzk.FindFriends2 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/friends.txt

运行结果如下图所示:

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
69 0
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
44 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
100 0
|
2月前
|
Java 大数据 数据库连接
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
33 2
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
|
2月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
46 3
|
2月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
43 1
|
2月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
33 0
|
2月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
31 0
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
31 0
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
45 0