大数据Spark电影评分数据分析

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 大数据Spark电影评分数据分析

1 数据 ETL

使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:对电影评分数据进行统分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)。数据集ratings.dat总共100万条数据,数据格式如下每行数据各个字段之间使用双冒号分开:

数据集地址:https://grouplens.org/datasets/movielens/

数据处理分析步骤如下:

1. 第一步、读取电影评分数据,从本地文件系统读取
2. 第二步、转换数据,指定Schema信息,封装到DataFrame
3. 第三步、基于SQL方式分析
4. 第四步、基于DSL方式分析

读取电影评分数据,将其转换为DataFrame,使用指定列名方式定义Schema信息,代码如下:

// 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
  .master("local[4]")
  .appName(this.getClass.getSimpleName.stripSuffix("$"))
  .getOrCreate()
// 导入隐式转换
import spark.implicits._
// 1. 读取电影评分数据,从本地文件系统读取
val rawRatingsDS: Dataset[String] = spark.read.textFile("datas/ml-1m/ratings.dat")
// 2. 转换数据
val ratingsDF: DataFrame = rawRatingsDS
  // 过滤数据.
  .filter(line => null != line && line.trim.split("::").length == 4)
  // 提取转换数据
  .mapPartitions { iter =>
    iter.map { line =>
      // 按照分割符分割,拆箱到变量中
      val Array(userId, movieId, rating, timestamp) = line.trim.split("::")
      // 返回四元组
      (userId, movieId, rating.toDouble, timestamp.toLong)
    }
  }
  // 指定列名添加Schema
  .toDF("userId", "movieId", "rating", "timestamp")
/*
root
|-- userId: string (nullable = true)
|-- movieId: string (nullable = true)
|-- rating: double (nullable = false)
|-- timestamp: long (nullable = false)
*/
//ratingsDF.printSchema()
/*
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
| 1| 1193| 5.0|978300760|
| 1| 661| 3.0|978302109|
| 1| 594| 4.0|978302268|
| 1| 919| 4.0|978301368|
+------+-------+------+---------+
*/
//ratingsDF.show(4)

2 使用 SQL 分析

首先将DataFrame注册为临时视图,再编写SQL语句,最后使用SparkSession执行,代码如下:

// TODO: 基于SQL方式分析
// 第一步、注册DataFrame为临时视图
ratingsDF.createOrReplaceTempView("view_temp_ratings")
// 第二步、编写SQL
val top10MovieDF: DataFrame = spark.sql(
  """
    |SELECT
    | movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating
    |FROM
    | view_temp_ratings
    |GROUP BY
    | movieId
    |HAVING
    | cnt_rating > 2000
    |ORDER BY
    | avg_rating DESC, cnt_rating DESC
    |LIMIT
    | 10
""".stripMargin)
//top10MovieDF.printSchema()
top10MovieDF.show(10, truncate = false)

应用scala的stripMargin方法,在scala中stripMargin默认是“|”作为出来连接符,在多行换行的行头前面加一个“|”符号即可。


代码实例:


val speech = “”"abc


|def"“”.stripMargin


运行的结果为:


abc


ldef

运行程序结果如下:

3 使用 DSL 分析

调用Dataset中函数,采用链式编程分析数据,核心代码如下:

// TODO: 基于DSL=Domain Special Language(特定领域语言) 分析
import org.apache.spark.sql.functions._
val resultDF: DataFrame = ratingsDF
  // 选取字段
  .select($"movieId", $"rating")
  // 分组:按照电影ID,获取平均评分和评分次数
  .groupBy($"movieId")
  .agg( //
    round(avg($"rating"), 2).as("avg_rating"), //
    count($"movieId").as("cnt_rating") //
  )
  // 过滤:评分次数大于2000
  .filter($"cnt_rating" > 2000)
  // 排序:先按照评分降序,再按照次数降序
  .orderBy($"avg_rating".desc, $"cnt_rating".desc)
  // 获取前10
  .limit(10)
//resultDF.printSchema()
resultDF.show(10)

Round函数返回一个数值,该数值是按照指定的小数位数进行四舍五入运算的结果。除数值外,也可对日期进行舍入运算。

round(3.19, 1) 将 3.19 四舍五入到一个小数位 (3.2)

round(2.649, 1) 将 2.649 四舍五入到一个小数位 (2.6)

round(-5.574, 2) 将 -5.574 四舍五入到两小数位 (-5.57)其中使用SparkSQL中自带函数库functions,在org.apache.spark.sql.functions中,包含常用函

数,有些与Hive中函数库类似,但是名称不一样。

使用需要导入函数库:import org.apache.spark.sql.functions._

4 保存结果数据

将分析结果数据保存到外部存储系统中,比如保存到MySQL数据库表中或者CSV文件中。

// TODO: 将分析的结果数据保存MySQL数据库和CSV文件
// 结果DataFrame被使用多次,缓存
resultDF.persist(StorageLevel.MEMORY_AND_DISK)
// 1. 保存MySQL数据库表汇总
resultDF
  .coalesce(1) // 考虑降低分区数目
  .write
  .mode("overwrite")
  .option("driver", "com.mysql.cj.jdbc.Driver")
  .option("user", "root")
  .option("password", "123456")
  .jdbc(
    "jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnic
      ode = true",
      "db_test.tb_top10_movies",
      new Properties ()
      )
      // 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开
      resultDF
      .coalesce (1)
      .write.mode ("overwrite")
      .csv ("datas/top10-movies")
      // 释放缓存数据
      resultDF.unpersist ()

查看数据库中结果表的数据:

5 案例完整代码

电影评分数据分析,经过数据ETL、数据分析(SQL分析和DSL分析)及最终保存结果,整套

数据处理分析流程,其中涉及到很多数据细节,完整代码如下

import java.util.Properties
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel
/**
 * 需求:对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)
 */
object SparkTop10Movie {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
      .master("local[4]")
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      // TODO: 设置shuffle时分区数目
      .config("spark.sql.shuffle.partitions", "4")
      .getOrCreate()
    // 导入隐式转换
    import spark.implicits._
    // 1. 读取电影评分数据,从本地文件系统读取
    val rawRatingsDS: Dataset[String] = spark.read.textFile("datas/ml-1m/ratings.dat")
    // 2. 转换数据
    val ratingsDF: DataFrame = rawRatingsDS
      // 过滤数据
      .filter(line => null != line && line.trim.split("::").length == 4)
      // 提取转换数据
      .mapPartitions { iter =>
        iter.map { line =>
          // 按照分割符分割,拆箱到变量中
          val Array(userId, movieId, rating, timestamp) = line.trim.split("::")
          // 返回四元组
          (userId, movieId, rating.toDouble, timestamp.toLong)
        }
      }
      // 指定列名添加Schema
      .toDF("userId", "movieId", "rating", "timestamp")
    /*
    root
    |-- userId: string (nullable = true)
    |-- movieId: string (nullable = true)
    |-- rating: double (nullable = false)
    |-- timestamp: long (nullable = false)
    */
    //ratingsDF.printSchema()
    /*
    +------+-------+------+---------+
    |userId|movieId|rating|timestamp|
    +------+-------+------+---------+
    | 1| 1193| 5.0|978300760|
    | 1| 661| 3.0|978302109|
    | 1| 594| 4.0|978302268|
    | 1| 919| 4.0|978301368|
    +------+-------+------+---------+
    */
    //ratingsDF.show(4)
    // TODO: 基于SQL方式分析
    // 第一步、注册DataFrame为临时视图
    ratingsDF.createOrReplaceTempView("view_temp_ratings")
    // 第二步、编写SQL
    val top10MovieDF: DataFrame = spark.sql(
      """
        |SELECT
        | movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating
        |FROM
        | view_temp_ratings
        |GROUP BY
        | movieId
        |HAVING
        | cnt_rating > 2000
        |ORDER BY
        | avg_rating DESC, cnt_rating DESC
        |LIMIT
        | 10
""".stripMargin)
    //top10MovieDF.printSchema()
    top10MovieDF.show(10, truncate = false)
    println("===============================================================")
    // TODO: 基于DSL=Domain Special Language(特定领域语言) 分析
    import org.apache.spark.sql.functions._
    val resultDF: DataFrame = ratingsDF
      // 选取字段
      .select($"movieId", $"rating")
      // 分组:按照电影ID,获取平均评分和评分次数
      .groupBy($"movieId")
      .agg( //
        round(avg($"rating"), 2).as("avg_rating"), //
        count($"movieId").as("cnt_rating") //
      )
      // 过滤:评分次数大于2000
      .filter($"cnt_rating" > 2000)
      // 排序:先按照评分降序,再按照次数降序
      .orderBy($"avg_rating".desc, $"cnt_rating".desc)
      // 获取前10
      .limit(10)
    //resultDF.printSchema()
    resultDF.show(10)
    // TODO: 将分析的结果数据保存MySQL数据库和CSV文件
    // 结果DataFrame被使用多次,缓存
    resultDF.persist(StorageLevel.MEMORY_AND_DISK)
    // 1. 保存MySQL数据库表汇总
    resultDF
      .coalesce(1) // 考虑降低分区数目
      .write
      .mode("overwrite")
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .jdbc(
        "jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnic
          ode = true",
          "db_test.tb_top10_movies",
          new Properties ()
          )
          // 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开
          resultDF
          .coalesce (1)
          .write.mode ("overwrite")
          .csv ("datas/top10-movies")
          // 释放缓存数据
          resultDF.unpersist ()
          // 应用结束,关闭资源
          Thread.sleep (10000000)
          spark.stop ()
          }
          }

6 Shuffle 分区数目问题

运行上述程序时,查看WEB UI监控页面发现,某个Stage中有200个Task任务,也就是说RDD有200分区Partition。

原因:在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions )为

200,在实际项目中要合理的设置。在构建SparkSession实例对象时,设置参数的值:

// 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.master("local[4]")
.appName(this.getClass.getSimpleName.stripSuffix("$"))
// TODO: 设置shuffle时分区数目
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
// 导入隐式转换
import spark.implicits._


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
3月前
|
机器学习/深度学习 算法 数据挖掘
2023 年第二届钉钉杯大学生大数据挑战赛初赛 初赛 A:智能手机用户监测数据分析 问题二分类与回归问题Python代码分析
本文介绍了2023年第二届钉钉杯大学生大数据挑战赛初赛A题的Python代码分析,涉及智能手机用户监测数据分析中的聚类分析和APP使用情况的分类与回归问题。
87 0
2023 年第二届钉钉杯大学生大数据挑战赛初赛 初赛 A:智能手机用户监测数据分析 问题二分类与回归问题Python代码分析
|
15天前
|
并行计算 数据挖掘 大数据
Python数据分析实战:利用Pandas处理大数据集
Python数据分析实战:利用Pandas处理大数据集
|
2月前
|
机器学习/深度学习 数据挖掘 大数据
大数据时代的“淘金术”:Python数据分析+深度学习框架实战指南
在大数据时代,数据被视为新财富源泉,而从海量信息中提取价值成为企业竞争的核心。本文通过对比方式探讨如何运用Python数据分析与深度学习框架实现这一目标。Python凭借其强大的数据处理能力及丰富库支持,已成为数据科学家首选工具;而TensorFlow和PyTorch等深度学习框架则为复杂模型构建提供强有力的技术支撑。通过融合Python数据分析与深度学习技术,我们能在各领域中发掘数据的无限潜力。无论是商业分析还是医疗健康,掌握这些技能都将为企业和社会带来巨大价值。
96 6
|
3月前
|
存储 数据可视化 数据挖掘
大数据环境下的房地产数据分析与预测研究的设计与实现
本文介绍了一个基于Python大数据环境下的昆明房地产市场分析与预测系统,通过数据采集、清洗、分析、机器学习建模和数据可视化技术,为房地产行业提供决策支持和市场洞察,探讨了模型的可行性、功能需求、数据库设计及实现过程,并展望了未来研究方向。
184 4
大数据环境下的房地产数据分析与预测研究的设计与实现
|
3月前
|
存储 数据可视化 大数据
基于Python Django的大数据招聘数据分析系统,包括数据大屏和后台管理
本文介绍了一个基于Python Django框架开发的大数据招聘数据分析系统,该系统具备后台管理功能和数据大屏展示,利用大数据技术收集和分析招聘市场趋势,帮助企业和招聘机构提高招聘效率和质量。
157 3
|
3月前
|
存储 数据采集 数据可视化
基于Python flask+MySQL+echart的电影数据分析可视化系统
该博客文章介绍了一个基于Python Flask框架、MySQL数据库和ECharts库构建的电影数据分析可视化系统,系统功能包括猫眼电影数据的爬取、存储、展示以及电影评价词云图的生成。
125 1
|
3月前
|
机器学习/深度学习 算法 数据挖掘
【2023 年第二届钉钉杯大学生大数据挑战赛初赛】 初赛 A:智能手机用户监测数据分析 问题一Python代码分析
本文提供了2023年第二届钉钉杯大学生大数据挑战赛初赛A题"智能手机用户监测数据分析"的Python代码分析,包括数据预处理、特征工程、聚类分析等步骤,以及如何使用不同聚类算法进行用户行为分析。
73 0
【2023 年第二届钉钉杯大学生大数据挑战赛初赛】 初赛 A:智能手机用户监测数据分析 问题一Python代码分析
|
3月前
|
Java Spring 安全
Spring 框架邂逅 OAuth2:解锁现代应用安全认证的秘密武器,你准备好迎接变革了吗?
【8月更文挑战第31天】现代化应用的安全性至关重要,OAuth2 作为实现认证和授权的标准协议之一,被广泛采用。Spring 框架通过 Spring Security 提供了强大的 OAuth2 支持,简化了集成过程。本文将通过问答形式详细介绍如何在 Spring 应用中集成 OAuth2,包括 OAuth2 的基本概念、集成步骤及资源服务器保护方法。首先,需要在项目中添加 `spring-security-oauth2-client` 和 `spring-security-oauth2-resource-server` 依赖。
55 0
|
3月前
|
消息中间件 分布式计算 Kafka
MaxCompute 在实时数据分析中的角色
【8月更文第31天】随着大数据应用场景的不断扩展,对数据处理速度的要求越来越高,传统的批处理模式已经难以满足某些业务对实时性的需求。在这种背景下,实时数据处理成为了大数据领域的研究热点之一。阿里云的 MaxCompute 虽然主要用于离线数据处理,但通过与其他实时流处理系统(如 Apache Flink 或 Kafka Streams)的集成,也可以参与到实时数据分析中。本文将探讨 MaxCompute 在实时数据分析中的角色,并介绍如何将 MaxCompute 与 Flink 结合使用。
92 0
|
3月前
|
数据采集 存储 数据可视化
基于Python flask的猫眼电影票房数据分析可视化系统,可以定制可视化
本文介绍了一个基于Python Flask框架开发的猫眼电影票房数据分析可视化系统,该系统集成了数据爬取、存储处理、可视化展示和用户交互功能,使用户能够直观地分析和展示电影票房数据,具有高度定制性。
152 0
基于Python flask的猫眼电影票房数据分析可视化系统,可以定制可视化

热门文章

最新文章

下一篇
无影云桌面