【spark2.x】如何通过SparkSQL读取csv文件

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 【spark2.x】如何通过SparkSQL读取csv文件
package cn.itcast.spark.source
import java.util.Properties
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
object _03SparkSQLSourceTest {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
      .master("local[4]")
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .config("spark.sql.shuffle.partitions", "4")
      .getOrCreate()
    import spark.implicits._
    // TODO: 1. CSV 格式数据文本文件数据 -> 依据 CSV文件首行是否是列名称,决定读取数据方式不一样的
    /*
      CSV 格式数据:
        每行数据各个字段使用逗号隔开
        也可以指的是,每行数据各个字段使用 单一 分割符 隔开数据
     */
    // 方式一:首行是列名称,数据文件u.dat
    val dataframe: DataFrame = spark.read
      .format("csv")
      .option("sep", "\\t")
      .option("header", "true")
      .option("inferSchema", "true")
      .load("datas/ml-100k/u.dat")
    dataframe.printSchema()
    dataframe.show(10, truncate = false)
    // 方式二:首行不是列名,需要自定义Schema信息,数据文件u.data
    // 自定义schema信息
    val schema: StructType = new StructType()
      .add("user_id", IntegerType, nullable = true)
      .add("iter_id", IntegerType, nullable = true)
      .add("rating", DoubleType, nullable = true)
      .add("timestamp", LongType, nullable = true)
    val df: DataFrame = spark.read
      .format("csv")
      .schema(schema)
      .option("sep", "\\t")
      .load("datas/ml-100k/u.data")
    df.printSchema()
    df.show(10, truncate = false)
    /* ============================================================================== */
    // TODO: 2. 读取MySQL表中数据
    // 第一、简洁版格式
    /*
      def jdbc(url: String, table: String, properties: Properties): DataFrame
     */
    val props =  new Properties()
    props.put("user", "root")
    props.put("password", "123456")
    props.put("driver", "com.mysql.cj.jdbc.Driver")
    val empDF: DataFrame = spark.read.jdbc(
      "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true", //
      "db_test.emp", //
      props //
    )
    println(s"Partition Number = ${empDF.rdd.getNumPartitions}")
    empDF.printSchema()
    empDF.show(10, truncate = false)
    // 第二、标准格式写
    /*
      WITH tmp AS (
        select * from emp e join dept d on e.deptno = d.deptno
      )
     */
    val table: String = "(select ename,deptname,sal from db_test.emp e join db_test.dept d on e.deptno = d.deptno) AS tmp"
    val joinDF: DataFrame = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true")
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", table)
      .load()
    joinDF.printSchema()
    joinDF.show(10, truncate = false)
    // 应用结束,关闭资源
    spark.stop()
  }
}


相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
7月前
|
设计模式 SQL 分布式计算
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
80 0
|
SQL 机器学习/深度学习 分布式计算
Spark5:SparkSQL
Spark5:SparkSQL
113 0
|
7月前
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
170 2
|
2月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
49 1
|
2月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
78 0
|
2月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
95 0
|
2月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
68 0
|
2月前
|
存储 SQL 分布式计算
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
54 0
|
2月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
85 0
|
2月前
|
SQL 存储 分布式计算
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
51 0