Spark综合练习——电影评分数据分析

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: Spark综合练习——电影评分数据分析

引言

大家好,我是ChinaManor,直译过来就是中国码农的意思,俺希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,一个平凡而不平庸的人。

ChatGPT生成测试数据:

请根据如下格式生成100行数据集:

1::1287::5::978302039

1::2804::5::978300719

1::594::4::978302268

chatpgt注册:https://github.com/xianyu110/awesome-chatgpt-project

今天给大家带来一个Spark综合练习案例–电影评分

老师:给定需求统计评分次数>200的电影平均分Top10,并写入Mysql数据库中

我:所有字我都认识,怎么连在一起我就不认识了

不管了先new个实例对象,总没错吧

val sparkSession = SparkSession
      .builder()
      .config("spark.sql.shuffle.partitions", "4")
      .appName("电影数据分析")
      .master("local[2]")
      .getOrCreate()

然后大数据无非输入,转换,输出,我再弄个spark读取文件?

val lines: RDD[String] = sparkSession.read.textFile("E:\\xx\\SparkDemo\\input\\ratings.dat").rdd

再然后RDD转换成DF

val rdd: RDD[(Int, Int, Int, Long)] = lines.mapPartitions { item => {
      item.map { line => {
        val strings: Array[String] = line.trim.split("::")
        (strings(0).toInt, strings(1).toInt, strings(2).toInt, strings(3).toLong)
        }
      }
    }
    }
    import sparkSession.implicits._
    val reusltDF: DataFrame = rdd.toDF("user_id", "item_id", "rating", "timestamp")

测试一下行不行

//    查看约束
    reusltDF.printSchema()
    //查看数据
    reusltDF.show()

好像跑通了!!笑容逐渐放肆~什么SQL不整了,上来直接DSL

val resultDS: Dataset[Row] = reusltDF
          //a.对数据按电影id进行分组
          .groupBy($"item_id")
          //b.对聚合数据求平均值和评分次数
          .agg(
            round(avg($"rating"), 2).as("avg_rating"),
            count($"user_id").as("cnt_rating")
          )
          //c.过滤出评分大于2000的
          .filter($"cnt_rating" > 2000)
          //d.按照评分的平均值进行降序排序
          .orderBy($"avg_rating".desc)
          //e.取前十条数据
          .limit(10)

最后最后保存到Mysql

SaveToMysql(resultDF);

/**
    * 保存数据至MySQL数据库,使用函数foreachPartition对每个分区数据操作,主键存在时更新,不存在时插入
    */
    def saveToMySQL(dataFrame: DataFrame): Unit = {
      dataFrame.rdd.coalesce(1).foreachPartition{ iter =>
      // a. 加载驱动类
        Class.forName("com.mysql.cj.jdbc.Driver")
      // 声明变量
      var conn: Connection = null
      var pstmt: PreparedStatement = null
      try{
        // b. 获取连接
        conn = DriverManager.getConnection(
          "jdbc:mysql://192.168.88.100:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
          "root", //
          "123456"
        )
        // c. 获取PreparedStatement对象
        val insertSql ="""
                |INSERT
                |O
                | db_test.demo
                | (item_id, avg_rating, cnt_rating)
                |VALUES (?, ?, ?)
                |""".stripMargin
        pstmt = conn.prepareStatement(insertSql)
        conn.setAutoCommit(false)
        // d. 将分区中数据插入到表中,批量插入
        iter.foreach{ row =>
          pstmt.setInt(1, row.getAs[Int]("item_id"))
          pstmt.setInt(2, row.getAs[Int]("avg_rating"))
          pstmt.setInt(3, row.getAs[Int]("cnt_rating"))
          // 加入批次
          pstmt.addBatch()
        }
        // TODO: 批量插入
        pstmt.executeBatch()
        conn.commit()
      }catch {
        case e: Exception => e.printStackTrace()
      }finally {
        if(null != pstmt) pstmt.close()
        if(null != conn) conn.close()
      } }

大功告成了!

补充: 采用DSL编程的详尽注释版

package cn.itcast.spark.metrics
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel
/**
 * 电影评分数据分析,需求如下:
 *      需求1:查找电影评分个数超过50,且平均评分较高的前十部电影名称及其对应的平均评分
 *          电影ID    评分个数     电影名称 平均评分   更新时间
 *          movie_id、rating_num、title、rating_avg、update_time
 *      需求2:查找每个电影类别及其对应的平均评分
 *          电影类别  电影类别平均评分     更新时间
 *          genre、 rating_avg       、update_time
 *      需求3:查找被评分次数较多的前十部电影
 *          电影ID   电影名称  电影被评分的次数   更新时间
 *          movie_id、title、rating_num、      update_time
*/
object MetricsAppMain {
  // 文件路径
  private val RATINGS_CSV_FILE_PATH = "D:\\Users\\Administrator\\Desktop\\exam0601\\datas\\ratings.csv"
  private val MOVIES_CSV_FILE_PATH = "D:\\Users\\Administrator\\Desktop\\exam0601\\datas\\movies.csv"
  def main(args: Array[String]): Unit = {
    // step1、创建SparkSession实例对象
    val spark: SparkSession = createSparkSession(this.getClass)
    import spark.implicits._
    /*
      分析需求可知,三个需求最终结果,需要使用事实表数据和维度表数据关联,所以先数据拉宽,再指标计算
        TODO: 按照数据仓库分层理论管理数据和开发指标
        - 第一层(最底层):ODS层
          直接加CSV文件数据为DataFrame
        - 第二层(中间层):DW层
          将加载业务数据(电影评分数据)和维度数据(电影基本信息数据)进行Join关联,拉宽操作
        - 第三层(最上层):DA层/APP层
          依据需求开发程序,计算指标,进行存储到MySQL表
     */
    // step2、【ODS层】:加载数据,CSV格式数据,文件首行为列名称
    val ratingDF: DataFrame = readCsvFile(spark, RATINGS_CSV_FILE_PATH, verbose = false)
    val movieDF: DataFrame = readCsvFile(spark, MOVIES_CSV_FILE_PATH, verbose = false)
    // step3、【DW层】:将电影评分数据与电影信息数据进行关联,数据拉宽操作
    val detailDF: DataFrame = joinDetail(ratingDF, movieDF)
    //printConsole(detailDF)
    // step4、【DA层】:按照业务需求,进行指标统计分析
    computeMetric(detailDF)
    Thread.sleep(1000000)
    // 应用结束,关闭资源
    spark.stop()
  }
  /**
   * 构建SparkSession实例对象,默认情况下本地模式运行
   */
  def createSparkSession(clazz: Class[_], master: String = "local[2]"): SparkSession = {
    SparkSession.builder()
      .appName(clazz.getSimpleName.stripSuffix("$"))
      .master(master)
      .config("spark.sql.shuffle.partitions", "2")
      .getOrCreate()
  }
  /**
   * 读取CSV格式文本文件数据,封装到DataFrame数据集
   */
  def readCsvFile(spark: SparkSession, path: String, verbose: Boolean = true): DataFrame = {
    val dataframe: DataFrame = spark.read
      // 设置分隔符为逗号
      .option("sep", ",")
      // 文件首行为列名称
      .option("header", "true")
      // 依据数值自动推断数据类型
      .option("inferSchema", "true")
      .csv(path)
    if(verbose){
      printConsole(dataframe)
    }
    // 返回数据集
    dataframe
  }
  /**
   * 将事实表数据与维度表数据进行Join关联
   */
  def joinDetail(df1: DataFrame, df2: DataFrame, joinType: String = "left_outer"): DataFrame = {
    df1
      // 采用leftJoin关联数据
      .join(df2, df1("movieId") === df2("movieId"), joinType)
      // 选取字段
        .select(
          df1("userId").as("user_id"), //
          df1("movieId").as("movie_id"), //
          df1("rating"), //
          df1("timestamp"), //
          df2("title"), //
          df2("genres") //
        )
  }
  /**
   * 按照业务需求,进行指标统计,默认情况下,结果数据打印控制台
   */
  def computeMetric(dataframe: DataFrame): Unit = {
    // TODO: 缓存数据
    dataframe.persist(StorageLevel.MEMORY_AND_DISK)
    // 需求1:查找电影评分个数超过50,且平均评分较高的前十部电影名称及其对应的平均评分
    val top10FilesDF: DataFrame = top10Films(dataframe)
    //printConsole(top10FilesDF)
    upsertToMySQL(
      top10FilesDF, //
      "replace into db_metrics.top_10_files (id, movie_id, rating_num, title, rating_avg) values (null, ?, ?, ?, ?)", //
      (pstmt: PreparedStatement, row: Row) => {
        pstmt.setInt(1, row.getAs[Int]("movie_id"))
        pstmt.setLong(2, row.getAs[Long]("rating_num"))
        pstmt.setString(3, row.getAs[String]("title"))
        pstmt.setDouble(4, row.getAs[Double]("rating_avg"))
      }
    )
    // 需求2:查找每个电影类别及其对应的平均评分
    val genresRatingDF: DataFrame = genresRating(dataframe)
    //printConsole(genresRatingDF)
//    upsertToMySQL(
//      genresRatingDF, //
//      "replace into db_metrics.genres_rating (id, genre, rating_avg) values (null, ?, ?)", //
//      (pstmt: PreparedStatement, row: Row) => {
//        pstmt.setString(1, row.getAs[String]("genre"))
//        pstmt.setDouble(2, row.getAs[Double]("rating_avg"))
//      }
//    )
    // 需求3:查找被评分次数较多的前十部电影
    val best10FilesDF: DataFrame = best10Files(dataframe)
    //printConsole(best10FilesDF)
//    upsertToMySQL(
//      best10FilesDF, //
//      "replace into db_metrics.best_10_films (id, movie_id, title, rating_num) values (null, ?, ?, ?)", //
//      (pstmt: PreparedStatement, row: Row) => {
//        pstmt.setInt(1, row.getAs[Int]("movie_id"))
//        pstmt.setString(2, row.getAs[String]("title"))
//        pstmt.setLong(3, row.getAs[Long]("rating_num"))
//      }
//    )
    // 释放资源
    dataframe.unpersist()
  }
  /**
   * 需求:查找电影评分个数超过50,且平均评分较高的前十部电影名称及其对应的平均评分
   *    电影ID    评分个数     电影名称 平均评分   更新时间
   *    movie_id、rating_num、title、rating_avg、update_time
   */
  def top10Films(dataframe: DataFrame): DataFrame = {
    import dataframe.sparkSession.implicits._
    dataframe
      .groupBy($"movie_id", $"title")
      .agg(
        count($"movie_id").as("rating_num"), // 统计电影被评分的个数
        round(avg($"rating"), 2).as("rating_avg") // 统计电影被评分的平均分
      )
      // 过滤评分个数大于50
      .where($"rating_num" > 50)
      // 降序排序,按照平均分
      .orderBy($"rating_avg".desc)
      // 获取前10电影
      .limit(10)
      // 添加日期字段
      .withColumn("update_time", current_timestamp())
  }
  /**
   * 需求:查找每个电影类别及其对应的平均评分
   *     电影类别  电影类别平均评分     更新时间
   *     genre、 rating_avg       、update_time
   */
  def genresRating(dataframe: DataFrame): DataFrame = {
    import dataframe.sparkSession.implicits._
    dataframe
      // 将每个电影类别字段:genres,按照|划分,使用爆炸函数进行行转列
        .select(
          explode(split($"genres", "\\|")).as("genre"), //
          $"rating" //
        )
      // 按照类别分组,计算平均评分
        .groupBy($"genre")
        .agg(
            round(avg($"rating"), 2).as("rating_avg")
        )
      // 对统计值降序排序
        .orderBy($"rating_avg".desc)
      // 添加日期字段
      .withColumn("update_time", current_timestamp())
  }
  /**
   * 需求:查找被评分次数较多的前十部电影
   *     电影ID   电影名称  电影被评分的次数   更新时间
   *     movie_id、title、rating_num、      update_time
   */
  def best10Files(dataframe: DataFrame): DataFrame = {
    import dataframe.sparkSession.implicits._
    dataframe
      .groupBy($"movie_id", $"title")
      .agg(
        count($"movie_id").as("rating_num") // 统计电影被评分的个数
      )
      // 降序排序,按照平均分
      .orderBy($"rating_num".desc)
      // 获取前10电影
      .limit(10)
      // 添加日期字段
      .withColumn("update_time", current_timestamp())
  }
  /**
   * 将DataFrame数据集打印控制台,显示Schema信息和前10条数据
   */
  def printConsole(dataframe: DataFrame): Unit = {
    // 显示Schema信息
    dataframe.printSchema()
    // 显示前10条数据
    dataframe.show(10, truncate = false)
  }
  /**
   * 将数据保存至MySQL表中,采用replace方式,当主键存在时,更新数据;不存在时,插入数据
   * @param dataframe 数据集
   * @param sql 插入数据SQL语句
   * @param accept 函数,如何设置Row中每列数据到SQL语句中占位符值
   */
  def upsertToMySQL(dataframe: DataFrame, sql: String,
                    accept: (PreparedStatement, Row) => Unit): Unit = {
    // 降低分区数目,对每个分区进行操作
    dataframe.coalesce(1).foreachPartition{iter =>
      // step1. 加载驱动类
      Class.forName("com.mysql.cj.jdbc.Driver")
      // 声明变量
      var conn: Connection = null
      var pstmt: PreparedStatement = null
      try{
        // step2. 创建连接
        conn = DriverManager.getConnection(
          "jdbc:mysql://192.168.88.100:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
          "root",
          "123456"
        )
        pstmt = conn.prepareStatement(sql)
        // step3. 插入数据
        iter.foreach{row =>
          // 设置SQL语句中占位符的值
          accept(pstmt, row)
          // 加入批次中
          pstmt.addBatch()
        }
        // 批量执行批次
        pstmt.executeBatch()
      }catch {
        case e: Exception => e.printStackTrace()
      }finally {
        // step4. 关闭连接
        if(null != pstmt) pstmt.close()
        if(null != conn) conn.close()
      }
    }
  }
}

总结

以上便是电影评分数据分析spark版,愿你读过之后有自己的收获,如果有收获不妨一键三连一下~


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
5月前
|
数据采集 存储 数据挖掘
TMDB电影数据分析(下)
TMDB电影数据分析(下)
|
5月前
|
存储 数据采集 数据挖掘
TMDB电影数据分析(上)
TMDB电影数据分析(上)
|
2月前
|
算法 数据挖掘 数据处理
豆瓣评分8.7!Python pandas创始人亲码的数据分析入门手册!
在众多解释型语言中,Python最大的特点是拥有一个巨大而活跃的科学计算社区。进入21世纪以来,在行业应用和学术研究中采用python进行科学计算的势头越来越猛。 近年来,由于Python有不断改良的库(主要是pandas),使其成为数据处理任务的一大代替方案,结合其在通用编程方面的强大实力,完全可以只使用Python这一种语言去构建以数据为中心的应用程序。 作为一个科学计算平台,Python的成功源于能够轻松的集成C、C++以及Fortran代码。大部分现代计算机环境都利用了一些Fortran和C库来是西安线性代数、优选、积分、快速傅里叶变换以及其他诸如此类的算法。
|
2月前
|
算法 数据挖掘 数据处理
豆瓣评分8.7!Python pandas创始人亲码的数据分析入门手册!
在众多解释型语言中,Python最大的特点是拥有一个巨大而活跃的科学计算社区。进入21世纪以来,在行业应用和学术研究中采用python进行科学计算的势头越来越猛。 近年来,由于Python有不断改良的库(主要是pandas),使其成为数据处理任务的一大代替方案,结合其在通用编程方面的强大实力,完全可以只使用Python这一种语言去构建以数据为中心的应用程序。 作为一个科学计算平台,Python的成功源于能够轻松的集成C、C++以及Fortran代码。大部分现代计算机环境都利用了一些Fortran和C库来是西安线性代数、优选、积分、快速傅里叶变换以及其他诸如此类的算法。
|
2月前
|
存储 数据采集 数据可视化
基于Python flask+MySQL+echart的电影数据分析可视化系统
该博客文章介绍了一个基于Python Flask框架、MySQL数据库和ECharts库构建的电影数据分析可视化系统,系统功能包括猫眼电影数据的爬取、存储、展示以及电影评价词云图的生成。
|
2月前
|
数据采集 存储 数据可视化
基于Python flask的猫眼电影票房数据分析可视化系统,可以定制可视化
本文介绍了一个基于Python Flask框架开发的猫眼电影票房数据分析可视化系统,该系统集成了数据爬取、存储处理、可视化展示和用户交互功能,使用户能够直观地分析和展示电影票房数据,具有高度定制性。
122 0
基于Python flask的猫眼电影票房数据分析可视化系统,可以定制可视化
|
2月前
|
机器学习/深度学习 算法 数据可视化
基于Python flask的豆瓣电影数据分析可视化系统,功能多,LSTM算法+注意力机制实现情感分析,准确率高达85%
本文介绍了一个基于Python Flask框架的豆瓣电影数据分析可视化系统,该系统集成了LSTM算法和注意力机制进行情感分析,准确率高达85%,提供了多样化的数据分析和情感识别功能,旨在帮助用户深入理解电影市场和观众喜好。
|
4月前
|
分布式计算 数据挖掘 数据处理
Spark如何支持实时数据分析?
【6月更文挑战第16天】Spark如何支持实时数据分析?
50 3
|
3月前
|
JavaScript Java 测试技术
基于springboot+vue.js+uniapp的豆瓣电影数据分析可视化系统附带文章源码部署视频讲解等
基于springboot+vue.js+uniapp的豆瓣电影数据分析可视化系统附带文章源码部署视频讲解等
46 0
|
2月前
|
数据采集 数据可视化 数据挖掘
数据分析大神养成记:Python+Pandas+Matplotlib助你飞跃!
在数字化时代,数据分析至关重要,而Python凭借其强大的数据处理能力和丰富的库支持,已成为该领域的首选工具。Python作为基石,提供简洁语法和全面功能,适用于从数据预处理到高级分析的各种任务。Pandas库则像是神兵利器,其DataFrame结构让表格型数据的处理变得简单高效,支持数据的增删改查及复杂变换。配合Matplotlib这一数据可视化的魔法棒,能以直观图表展现数据分析结果。掌握这三大神器,你也能成为数据分析领域的高手!
59 2

热门文章

最新文章