SparkDSL修改版之从csv文件读取数据并写入Mysql

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: SparkDSL修改版之从csv文件读取数据并写入Mysql
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel
/**
 * 电影评分数据分析,需求如下:
 *      需求1:查找电影评分个数超过50,且平均评分较高的前十部电影名称及其对应的平均评分
 *          电影ID    评分个数     电影名称 平均评分   更新时间
 *          movie_id、rating_num、title、rating_avg、update_time
 *      需求2:查找每个电影类别及其对应的平均评分
 *          电影类别  电影类别平均评分     更新时间
 *          genre、 rating_avg       、update_time
 *      需求3:查找被评分次数较多的前十部电影
 *          电影ID   电影名称  电影被评分的次数   更新时间
 *          movie_id、title、rating_num、      update_time
*/
object MetricsAppMain {
  // 文件路径
  private val RATINGS_CSV_FILE_PATH = "J:\\t4\\FlinkCommodityRecommendationSystem-main\\FlinkCommodityRecommendationSystem-main\\recommendation\\src\\main\\resources\\ratings.csv"
//  private val MOVIES_CSV_FILE_PATH = "D:\\Users\\Administrator\\Desktop\\exam0601\\datas\\movies.csv"
  def main(args: Array[String]): Unit = {
    // step1、创建SparkSession实例对象
    val spark: SparkSession = createSparkSession(this.getClass)
    import spark.implicits._
    /*
      分析需求可知,三个需求最终结果,需要使用事实表数据和维度表数据关联,所以先数据拉宽,再指标计算
        TODO: 按照数据仓库分层理论管理数据和开发指标
        - 第一层(最底层):ODS层
          直接加CSV文件数据为DataFrame
        - 第二层(中间层):DW层
          将加载业务数据(电影评分数据)和维度数据(电影基本信息数据)进行Join关联,拉宽操作
        - 第三层(最上层):DA层/APP层
          依据需求开发程序,计算指标,进行存储到MySQL表
     */
    // step2、【ODS层】:加载数据,CSV格式数据,文件首行为列名称
    val ratingDF: DataFrame = readCsvFile(spark, RATINGS_CSV_FILE_PATH, verbose = false)
//    val movieDF: DataFrame = readCsvFile(spark, MOVIES_CSV_FILE_PATH, verbose = false)
    // step3、【DW层】:将电影评分数据与电影信息数据进行关联,数据拉宽操作
//    val detailDF: DataFrame = joinDetail(ratingDF, movieDF)
    printConsole(ratingDF)
    // step4、【DA层】:按照业务需求,进行指标统计分析
    computeMetric(ratingDF)
    Thread.sleep(1000000)
    // 应用结束,关闭资源
    spark.stop()
  }
  /**
   * 构建SparkSession实例对象,默认情况下本地模式运行
   */
  def createSparkSession(clazz: Class[_], master: String = "local[2]"): SparkSession = {
    SparkSession.builder()
      .appName(clazz.getSimpleName.stripSuffix("$"))
      .master(master)
      .config("spark.sql.shuffle.partitions", "2")
      .getOrCreate()
  }
  /**
   * 读取CSV格式文本文件数据,封装到DataFrame数据集
   */
  def readCsvFile(spark: SparkSession, path: String, verbose: Boolean = true): DataFrame = {
    val dataframe: DataFrame = spark.read
      // 设置分隔符为逗号
      .option("sep", ",")
      // 文件首行为列名称
      .option("header", "true")
      // 依据数值自动推断数据类型
      .option("inferSchema", "true")
      .csv(path)
    if(verbose){
      printConsole(dataframe)
    }
    // 返回数据集
    dataframe
  }
  /**
   * 按照业务需求,进行指标统计,默认情况下,结果数据打印控制台
   */
  def computeMetric(dataframe: DataFrame): Unit = {
    // TODO: 缓存数据
    dataframe.persist(StorageLevel.MEMORY_AND_DISK)
    // 需求1:查找电影评分个数超过50,且平均评分较高的前十部电影名称及其对应的平均评分
    val top10FilesDF: DataFrame = top10Films(dataframe)
    //printConsole(top10FilesDF)
    upsertToMySQL(
      top10FilesDF, //
      "replace into test.rating (id, userId, productId, score, timestamp) values (null, ?, ?, ?, ?)", //
      (pstmt: PreparedStatement, row: Row) => {
        pstmt.setInt(1, row.getAs[Int]("userId"))
        pstmt.setInt(2, row.getAs[Int]("productId"))
        pstmt.setDouble(3, row.getAs[Double]("score"))
        pstmt.setInt(4, row.getAs[Int]("timestamp"))
      }
    )
    // 释放资源
    dataframe.unpersist()
  }
  /**
   * 需求:查找电影评分个数超过50,且平均评分较高的前十部电影名称及其对应的平均评分
   *    电影ID    评分个数     电影名称 平均评分   更新时间
   *    movie_id、rating_num、title、rating_avg、update_time
   */
  def top10Films(dataframe: DataFrame): DataFrame = {
    import dataframe.sparkSession.implicits._
    dataframe
      // 添加日期字段
//      .withColumn("update_time", current_timestamp())
  }
  /**
   * 将DataFrame数据集打印控制台,显示Schema信息和前10条数据
   */
  def printConsole(dataframe: DataFrame): Unit = {
    // 显示Schema信息
    dataframe.printSchema()
    // 显示前10条数据
    dataframe.show(10, truncate = false)
  }
  /**
   * 将数据保存至MySQL表中,采用replace方式,当主键存在时,更新数据;不存在时,插入数据
   * @param dataframe 数据集
   * @param sql 插入数据SQL语句
   * @param accept 函数,如何设置Row中每列数据到SQL语句中占位符值
   */
  def upsertToMySQL(dataframe: DataFrame, sql: String,
                    accept: (PreparedStatement, Row) => Unit): Unit = {
    // 降低分区数目,对每个分区进行操作
    dataframe.coalesce(1).foreachPartition{iter =>
      // step1. 加载驱动类
      Class.forName("com.mysql.cj.jdbc.Driver")
      // 声明变量
      var conn: Connection = null
      var pstmt: PreparedStatement = null
      try{
        // step2. 创建连接
        conn = DriverManager.getConnection(
          "jdbc:mysql://120.26.162.238:33306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
          "root",
          "123456"
        )
        pstmt = conn.prepareStatement(sql)
        // step3. 插入数据
        iter.foreach{row =>
          // 设置SQL语句中占位符的值
          accept(pstmt, row)
          // 加入批次中
          pstmt.addBatch()
        }
        // 批量执行批次
        pstmt.executeBatch()
      }catch {
        case e: Exception => e.printStackTrace()
      }finally {
        // step4. 关闭连接
        if(null != pstmt) pstmt.close()
        if(null != conn) conn.close()
      }
    }
  }
}


相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
9天前
|
缓存 NoSQL 关系型数据库
13- Redis和Mysql如何保证数据⼀致?
该内容讨论了保证Redis和MySQL数据一致性的几种策略。首先提到的两种方法存在不一致风险:先更新MySQL再更新Redis,或先删Redis再更新MySQL。第三种方案是通过MQ异步同步以达到最终一致性,适用于一致性要求较高的场景。项目中根据不同业务需求选择不同方案,如对一致性要求不高的情况不做处理,时效性数据设置过期时间,高一致性需求则使用MQ确保同步,最严格的情况可能涉及分布式事务(如Seata的TCC模式)。
35 6
|
16天前
|
SQL 关系型数据库 MySQL
轻松入门MySQL:保障数据完整性,MySQL事务在进销存管理系统中的应用(12)
轻松入门MySQL:保障数据完整性,MySQL事务在进销存管理系统中的应用(12)
|
23天前
|
关系型数据库 MySQL
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
20 0
|
26天前
Mybatis+mysql动态分页查询数据案例——测试类HouseDaoMybatisImplTest)
Mybatis+mysql动态分页查询数据案例——测试类HouseDaoMybatisImplTest)
20 1
|
26天前
|
Java 关系型数据库 数据库连接
Mybatis+MySQL动态分页查询数据经典案例(含代码以及测试)
Mybatis+MySQL动态分页查询数据经典案例(含代码以及测试)
24 1
|
26天前
Mybatis+mysql动态分页查询数据案例——条件类(HouseCondition)
Mybatis+mysql动态分页查询数据案例——条件类(HouseCondition)
15 1
|
26天前
Mybatis+mysql动态分页查询数据案例——分页工具类(Page.java)
Mybatis+mysql动态分页查询数据案例——分页工具类(Page.java)
21 1
|
26天前
Mybatis+mysql动态分页查询数据案例——房屋信息的实现类(HouseDaoMybatisImpl)
Mybatis+mysql动态分页查询数据案例——房屋信息的实现类(HouseDaoMybatisImpl)
21 2
|
26天前
Mybatis+mysql动态分页查询数据案例——工具类(MybatisUtil.java)
Mybatis+mysql动态分页查询数据案例——工具类(MybatisUtil.java)
15 1
|
28天前
|
SQL 安全 关系型数据库
MySQL安全性:防止攻击和保护数据
MySQL安全性:防止攻击和保护数据
34 1