【spark2.x】如何通过SparkSQL读取csv文件

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
简介: 【spark2.x】如何通过SparkSQL读取csv文件
package cn.itcast.spark.source
import java.util.Properties
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
object _03SparkSQLSourceTest {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
      .master("local[4]")
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .config("spark.sql.shuffle.partitions", "4")
      .getOrCreate()
    import spark.implicits._
    // TODO: 1. CSV 格式数据文本文件数据 -> 依据 CSV文件首行是否是列名称,决定读取数据方式不一样的
    /*
      CSV 格式数据:
        每行数据各个字段使用逗号隔开
        也可以指的是,每行数据各个字段使用 单一 分割符 隔开数据
     */
    // 方式一:首行是列名称,数据文件u.dat
    val dataframe: DataFrame = spark.read
      .format("csv")
      .option("sep", "\\t")
      .option("header", "true")
      .option("inferSchema", "true")
      .load("datas/ml-100k/u.dat")
    dataframe.printSchema()
    dataframe.show(10, truncate = false)
    // 方式二:首行不是列名,需要自定义Schema信息,数据文件u.data
    // 自定义schema信息
    val schema: StructType = new StructType()
      .add("user_id", IntegerType, nullable = true)
      .add("iter_id", IntegerType, nullable = true)
      .add("rating", DoubleType, nullable = true)
      .add("timestamp", LongType, nullable = true)
    val df: DataFrame = spark.read
      .format("csv")
      .schema(schema)
      .option("sep", "\\t")
      .load("datas/ml-100k/u.data")
    df.printSchema()
    df.show(10, truncate = false)
    /* ============================================================================== */
    // TODO: 2. 读取MySQL表中数据
    // 第一、简洁版格式
    /*
      def jdbc(url: String, table: String, properties: Properties): DataFrame
     */
    val props =  new Properties()
    props.put("user", "root")
    props.put("password", "123456")
    props.put("driver", "com.mysql.cj.jdbc.Driver")
    val empDF: DataFrame = spark.read.jdbc(
      "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true", //
      "db_test.emp", //
      props //
    )
    println(s"Partition Number = ${empDF.rdd.getNumPartitions}")
    empDF.printSchema()
    empDF.show(10, truncate = false)
    // 第二、标准格式写
    /*
      WITH tmp AS (
        select * from emp e join dept d on e.deptno = d.deptno
      )
     */
    val table: String = "(select ename,deptname,sal from db_test.emp e join db_test.dept d on e.deptno = d.deptno) AS tmp"
    val joinDF: DataFrame = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true")
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", table)
      .load()
    joinDF.printSchema()
    joinDF.show(10, truncate = false)
    // 应用结束,关闭资源
    spark.stop()
  }
}


相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
目录
相关文章
|
设计模式 SQL 分布式计算
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
185 0
|
SQL 机器学习/深度学习 分布式计算
Spark5:SparkSQL
Spark5:SparkSQL
212 0
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
350 2
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
180 1
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
340 0
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
265 0
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
307 0
|
存储 SQL 分布式计算
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
233 0
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
388 0
|
SQL 存储 分布式计算
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
250 0