面试官嫌我Sql写的太low?要求我重写还加了三个需求?——二战Spark电影评分数据分析

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 面试官嫌我Sql写的太low?要求我重写还加了三个需求?——二战Spark电影评分数据分析

引言

大家好,我是ChinaManor,直译过来就是中国码农的意思,俺希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。


Spark综合练习——电影评分数据分析

这是我的上篇博文,当时仅是做了一个实现案例(demo级别 ),没想到居然让我押中了题,还让我稳稳的及格了(这次测试试卷难度极大,考60分都能在班上排进前10

不过我在复盘的时候,发现自己的致命弱点:写sql的能力太菜了。。

于是我重做了一遍,并满足了导师提的3个需求:

需求1: 查找电影评分个数超过50,且平均评分较高的前十部电影名称及其对应的平均评分

需求2: 查找每个电影类别及其对应的平均评分

需求3: 查找被评分次数较多的前十部电影

数据介绍:使用的文件movies.csv和ratings.csv

movies.csv该文件是电影数据,对应的为维表数据,其数据格式为

movieId title genres

电影id 电影名称 电影所属分类

样例数据如下所示:逗号分隔

1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy

ratings.csv该文件为定影评分数据,其数据格式为

userId movieId rating timestamp

电影id 电影名称 电影所属分类 时间戳

建表语句

CREATE DATABASE db_movies;
USE db_movies;
CREATE TABLE `ten_movies_avgrating` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `movieId` int(11) NOT NULL COMMENT '电影id',
  `ratingNum` int(11) NOT NULL COMMENT '评分个数',
  `title` varchar(100) NOT NULL COMMENT '电影名称',
  `avgRating` decimal(10,2) NOT NULL COMMENT '平均评分',
  `update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `movie_id_UNIQUE` (`movieId`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
CREATE TABLE `genres_average_rating` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `genres` varchar(100) NOT NULL COMMENT '电影类别',
  `avgRating` decimal(10,2) NOT NULL COMMENT '电影类别平均评分',
  `update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `genres_UNIQUE` (`genres`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
CREATE TABLE `ten_most_rated_films` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `movieId` int(11) NOT NULL COMMENT '电影Id',
  `title` varchar(100) NOT NULL COMMENT '电影名称',
  `ratingCnt` int(11) NOT NULL COMMENT '电影被评分的次数',
  `update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

项目结构一览图

由题意可知

先创建实体类,字段是从建表语句中得来的。


Entry.scala

package cn.movies.Packet
/**
  * @author ChinaManor
  *         #Description Entry
  *         #Date: 6/6/2021 17:23
  */
object Entry {
  case class Movies(
                     movieId: String, // 电影的id
                     title: String, // 电影的标题
                     genres: String // 电影类别
                   )
  case class Ratings(
                      userId: String, // 用户的id
                      movieId: String, // 电影的id
                      rating: String, // 用户评分
                      timestamp: String // 时间戳
                    )
  // 需求1MySQL结果表
  case class tenGreatestMoviesByAverageRating(
                                               movieId: String, // 电影的id
                                               ratingNum:String,
                                               title: String, // 电影的标题
                                               avgRating: String // 电影平均评分
                                             )
  // 需求2MySQL结果表
  case class topGenresByAverageRating(
                                       genres: String, //电影类别
                                       avgRating: String // 平均评分
                                     )
  // 需求3MySQL结果表
  case class tenMostRatedFilms(
                                movieId: String, // 电影的id
                                title: String, // 电影的标题
                                ratingCnt: String // 电影被评分的次数
                              )
}

再创建个表结构~~

Schema.scala

package cn.movies.Packet
import org.apache.spark.sql.types.{DataTypes, StructType}
/**
  * @author ChinaManor
  *         #Description Schema
  *         #Date: 6/6/2021 17:34
  */
object Schema {
  class SchemaLoader {
    // movies数据集schema信息
    private val movieSchema = new StructType()
      .add("movieId", DataTypes.StringType, false)
      .add("title", DataTypes.StringType, false)
      .add("genres", DataTypes.StringType, false)
    // ratings数据集schema信息
    private val ratingSchema = new StructType()
      .add("userId", DataTypes.StringType, false)
      .add("movieId", DataTypes.StringType, false)
      .add("rating", DataTypes.StringType, false)
      .add("timestamp", DataTypes.StringType, false)
    def getMovieSchema: StructType = movieSchema
    def getRatingSchema: StructType = ratingSchema
  }
}

然后开始写Main方法,其实只有区区八十行代码。。。

spark总要有实例对象吧。

// 创建spark session
    val spark = SparkSession
      .builder
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[4]")
      .getOrCreate

然后 new个schema信息

val schemaLoader = new SchemaLoader

然后尝试读取csv文件,

// 读取Movie数据集

val movieDF: DataFrame = readCsvIntoDataSet(spark, MOVIES_CSV_FILE_PATH, schemaLoader.getMovieSchema)

// 读取Rating数据集

val ratingDF: DataFrame = readCsvIntoDataSet(spark, RATINGS_CSV_FILE_PATH, schemaLoader.getRatingSchema)

发现读取方法和路径都没有,于是补救一下

// 文件路径
  private val MOVIES_CSV_FILE_PATH = "D:\\Users\\Administrator\\Desktop\\exam0601\\datas\\movies.csv"
  private val RATINGS_CSV_FILE_PATH = "D:\\Users\\Administrator\\Desktop\\exam0601\\datas\\ratings.csv"
  /**
    * 读取数据文件,转成DataFrame
    *
    * @param spark
    * @param path
    * @param schema
    * @return
    */
  def readCsvIntoDataSet(spark: SparkSession, path: String, schema: StructType) = {
    val dataDF: DataFrame = spark.read
      .format("csv")
      .option("header", "true")
      .schema(schema)
      .load(path)
    dataDF
  }

紧接着重头戏来了。。

写sql语句,在大数据行业懂得写sql就等于会了80%


WITH ratings_filter_cnt AS (
SELECT
     movieId,
     count( * ) AS rating_cnt,
     Round(avg( rating ),2) AS avg_rating
FROM
     ratings
GROUP BY
     movieId
HAVING
     count( * ) >= 50
),
ratings_filter_score AS (
SELECT
     movieId, -- 电影id
     rating_cnt, -- 个数
     avg_rating -- 电影平均评分
FROM ratings_filter_cnt
ORDER BY avg_rating DESC -- 平均评分降序排序
LIMIT 10 -- 平均分较高的前十部电影
)
SELECT
    m.movieId,
    r.rating_cnt AS ratingNum,
    m.title,
    r.avg_rating AS avgRating
FROM
   ratings_filter_score r
JOIN movies m ON m.movieId = r.movieId ORDER BY r.avg_rating DESC

关键点在于

WITH XXX AS SELECT


最后保存写入mysql表中

def saveToMysql(reportDF: DataFrame) = {
    // TODO: 使用SparkSQL提供内置Jdbc数据源保存数据
    reportDF
      .coalesce(1)
      .write
      // 追加模式,将数据追加到MySQL表中,再次运行,主键存在,报错异常
      .mode(SaveMode.Append)
      // 覆盖模式,无需测试,直接将以前数据全部删除,再次重新重建表,肯定不行
      //.mode(SaveMode.Overwrite)
      .format("jdbc")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("url", "jdbc:mysql://192.168.88.100:3306/db_movies?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", "db_movies.ten_most_rated_films")
      .save()
  }

另外两个需求的SQL:

// 需求2:查找每个电影类别及其对应的平均评分
WITH explode_movies AS (
SELECT
 movieId,
 title,
 category
FROM
 movies lateral VIEW explode ( split ( genres, "\\|" ) ) temp AS category  //爆炸函数拆一下| 
)
SELECT
 m.category AS genres,
 Round(avg( r.rating ),2) AS avgRating
FROM
 explode_movies m
 JOIN ratings r ON m.movieId = r.movieId
GROUP BY
 m.category
ORDER BY avgRating DESC
   // 需求3:查找被评分次数较多的前十部电影
WITH rating_group AS (
    SELECT
       movieId,
       count( * ) AS ratingCnt
    FROM ratings
    GROUP BY movieId
),
rating_filter AS (
    SELECT
       movieId,
       ratingCnt
    FROM rating_group
    ORDER BY ratingCnt DESC
    LIMIT 10
)
SELECT
    m.movieId,
    m.title,
    r.ratingCnt
FROM
    rating_filter r
JOIN movies m ON r.movieId = m.movieId ORDER BY r.ratingCnt DESC

总结

以上便是spark电影评分数据分析二次改写,比之前一篇sql更复杂,需求更多,

希望今晚的考试顺利通关@~@

如果需要完整版的代码可以私信我获取

愿你读过之后有自己的收获,如果有收获不妨一键三连一下~


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
2月前
|
Java
【Java基础面试十八】、说一说重写与重载的区别
这篇文章阐述了Java中重写与重载的区别:重载是同一个类中方法名相同但参数列表不同的方法之间的关系,而重写是子类中方法与父类中相同方法名和参数列表的方法之间的关系,且子类的返回值应小于等于父类,访问修饰符应大于等于父类。
【Java基础面试十八】、说一说重写与重载的区别
|
2月前
|
Java
【Java基础面试二十二】、为什么要重写hashCode()和equals()?
这篇文章解释了为什么需要重写`hashCode()`和`equals()`方法:因为Object类的`equals()`默认使用`==`比较,这在业务中通常是不够的,我们需要根据对象内容来比较相等性;同时,为了保持`hashCode()`与`equals()`的联动关系,一旦重写了`equals()`,通常也需要重写`hashCode()`。
【Java基础面试二十二】、为什么要重写hashCode()和equals()?
|
2月前
|
Java
【Java基础面试十九】、构造方法能不能重写?
这篇文章指出Java中的构造方法不能被重写,因为构造方法必须与类名相同,而重写要求子类方法与父类方法同名,允许构造方法重写将违背这一规则。
【Java基础面试十九】、构造方法能不能重写?
|
2月前
|
存储 数据采集 数据可视化
基于Python flask+MySQL+echart的电影数据分析可视化系统
该博客文章介绍了一个基于Python Flask框架、MySQL数据库和ECharts库构建的电影数据分析可视化系统,系统功能包括猫眼电影数据的爬取、存储、展示以及电影评价词云图的生成。
|
2月前
|
数据采集 存储 数据可视化
基于Python flask的猫眼电影票房数据分析可视化系统,可以定制可视化
本文介绍了一个基于Python Flask框架开发的猫眼电影票房数据分析可视化系统,该系统集成了数据爬取、存储处理、可视化展示和用户交互功能,使用户能够直观地分析和展示电影票房数据,具有高度定制性。
122 0
基于Python flask的猫眼电影票房数据分析可视化系统,可以定制可视化
|
3月前
|
存储 SQL 索引
面试题MySQL问题之使用SQL语句创建一个索引如何解决
面试题MySQL问题之使用SQL语句创建一个索引如何解决
48 1
|
3月前
|
SQL 监控 数据库
MSSQL性能调优实战:索引策略优化、SQL查询重写与高效并发管理的具体技巧
在Microsoft SQL Server(MSSQL)的性能调优过程中,索引策略的优化、SQL查询的重写以及高效并发管理是关键环节
|
2月前
|
机器学习/深度学习 算法 数据可视化
基于Python flask的豆瓣电影数据分析可视化系统,功能多,LSTM算法+注意力机制实现情感分析,准确率高达85%
本文介绍了一个基于Python Flask框架的豆瓣电影数据分析可视化系统,该系统集成了LSTM算法和注意力机制进行情感分析,准确率高达85%,提供了多样化的数据分析和情感识别功能,旨在帮助用户深入理解电影市场和观众喜好。
|
3月前
|
SQL 运维 监控
MSSQL性能调优实战:索引策略优化、SQL查询重写与智能锁管理
在Microsoft SQL Server(MSSQL)的运维中,性能调优是确保数据库高效运行、满足业务需求的关键环节
|
3月前
|
SQL 监控 数据库
MSSQL性能调优深度探索:索引精细调整、SQL重写优化与智能并发控制
在Microsoft SQL Server(MSSQL)的日常管理和维护中,性能调优是一项至关重要的任务