spark2 sql读取数据源编程学习样例2:函数实现详解

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核8GB 50GB
简介: spark2 sql读取数据源编程学习样例2:函数实现详解

这里接着上篇,继续阅读代码,下面我们看看runBasicParquetExample函数的功能实现

runBasicParquetExample函数



private def runBasicParquetExample(spark: SparkSession): Unit = {
   // $example on:basic_parquet_example$
   // Encoders for most common types are automatically provided by importing spark.implicits._
   import spark.implicits._
   val peopleDF = spark.read.json("examples/src/main/resources/people.json")
   // DataFrames can be saved as Parquet files, maintaining the schema information
   peopleDF.write.parquet("people.parquet")
   // Read in the parquet file created above
   // Parquet files are self-describing so the schema is preserved
   // The result of loading a Parquet file is also a DataFrame
   val parquetFileDF = spark.read.parquet("people.parquet")
   // Parquet files can also be used to create a temporary view and then used in SQL statements
   parquetFileDF.createOrReplaceTempView("parquetFile")
   val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
   namesDF.map(attributes => "Name: " + attributes(0)).show()
   // +------------+
   // |       value|
   // +------------+
   // |Name: Justin|
   // +------------+
   // $example off:basic_parquet_example$
 }

这里面有一个包的导入

import spark.implicits._

Scala中与其它语言的区别是在对象,函数中可以导入包。这个包的作用是转换RDD为DataFrame。

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

上面自然是读取json文件。

peopleDF.write.parquet("people.parquet")


这里同样是保存文件,不过people.parquet是文件夹。文件夹里面是数据,其中有*00000*为数据文件。

val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")

这里调用sql语句。

namesDF.map(attributes => "Name: " + attributes(0)).show()

这里通过map映射,增加Name:

// +------------+
    // |       value|
    // +------------+
    // |Name: Justin|
    // +------------+
    // $example off:basic_parquet_example$




runParquetSchemaMergingExample函数


private def runParquetSchemaMergingExample(spark: SparkSession): Unit = {
    // $example on:schema_merging$
    // This is used to implicitly convert an RDD to a DataFrame.
    import spark.implicits._
    // Create a simple DataFrame, store into a partition directory
    val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
    squaresDF.write.parquet("data/test_table/key=1")
    // Create another DataFrame in a new partition directory,
    // adding a new column and dropping an existing column
    val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
    cubesDF.write.parquet("data/test_table/key=2")
    // Read the partitioned table
    val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
    mergedDF.printSchema()
    // The final schema consists of all 3 columns in the Parquet files together
    // with the partitioning column appeared in the partition directory paths
    // root
    //  |-- value: int (nullable = true)
    //  |-- square: int (nullable = true)
    //  |-- cube: int (nullable = true)
    //  |-- key: int (nullable = true)
    // $example off:schema_merging$
  }


val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

上面是创建一个RDD,然后通过toDF转换为DataFrame。然后保存到分区目录下。

val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
  cubesDF.write.parquet("data/test_table/key=2")

创建另外一个DataFrame,并且添加一个新列,删除现有列

val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
   mergedDF.printSchema()

上面自然是读取数据保存为DataFrame,option("mergeSchema", "true"), 默认值由spark.sql.parquet.mergeSchema指定。设置所有的分区文件是否合并Schema。设置后将覆盖spark.sql.parquet.mergeSchema指定值。


runJsonDatasetExample函数



private def runJsonDatasetExample(spark: SparkSession): Unit = {
    // $example on:json_dataset$
    // Primitive types (Int, String, etc) and Product types (case classes) encoders are
    // supported by importing this when creating a Dataset.
    import spark.implicits._
    // A JSON dataset is pointed to by path.
    // The path can be either a single text file or a directory storing text files
    val path = "examples/src/main/resources/people.json"
    val peopleDF = spark.read.json(path)
    // The inferred schema can be visualized using the printSchema() method
    peopleDF.printSchema()
    // root
    //  |-- age: long (nullable = true)
    //  |-- name: string (nullable = true)
    // Creates a temporary view using the DataFrame
    peopleDF.createOrReplaceTempView("people")
    // SQL statements can be run by using the sql methods provided by spark
    val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
    teenagerNamesDF.show()
    // +------+
    // |  name|
    // +------+
    // |Justin|
    // +------+
    // Alternatively, a DataFrame can be created for a JSON dataset represented by
    // a Dataset[String] storing one JSON object per string
    val otherPeopleDataset = spark.createDataset(
      """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
    val otherPeople = spark.read.json(otherPeopleDataset)
    otherPeople.show()
    // +---------------+----+
    // |        address|name|
    // +---------------+----+
    // |[Columbus,Ohio]| Yin|
    // +---------------+----+
    // $example off:json_dataset$
  }

上面有些代码重复了,就不在解释了。

val path = "examples/src/main/resources/people.json"    val peopleDF = spark.read.json(path)
    // The inferred schema can be visualized using the printSchema() method
    peopleDF.printSchema()
    // root
    //  |-- age: long (nullable = true)
    //  |-- name: string (nullable = true)

上面是读取json,并显示schema。

// Creates a temporary view using the DataFrame
    peopleDF.createOrReplaceTempView("people")
    // SQL statements can be run by using the sql methods provided by spark
    val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
    teenagerNamesDF.show()

peopleDF.createOrReplaceTempView("people")是DataFrame注册为people表

teenagerNamesDF.show()

自然是显示数据。

如下

// +------+
  // |  name|
  // +------+
  // |Justin|
  // +------+


val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)

这里创建一个json格式的dataset

val otherPeople = spark.read.json(otherPeopleDataset)

这行代码,是读取上面创建的dataset,然后创建DataFrame。从上面我们看出这也是dataset和DataFrame转换的一种方式。


runJdbcDatasetExample函数


private def runJdbcDatasetExample(spark: SparkSession): Unit = {
    // $example on:jdbc_dataset$
    // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
    // Loading data from a JDBC source
    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .load()
    val connectionProperties = new Properties()
    connectionProperties.put("user", "username")
    connectionProperties.put("password", "password")
    val jdbcDF2 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    // Specifying the custom data types of the read schema
    connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
    val jdbcDF3 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    // Saving data to a JDBC source
    jdbcDF.write
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .save()
    jdbcDF2.write
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    // Specifying create table column data types on write
    jdbcDF.write
      .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    // $example off:jdbc_dataset$
  }
}

这个是运行Jdbc Dataset的例子。那么如何从jdbc读取数据,是通过下面各个option

.option("url", "jdbc:postgresql:dbserver")
     .option("dbtable", "schema.tablename")
     .option("user", "username")
     .option("password", "password")

第一行server也就是服务器地址

第二行是表名

第三行是用户名

第四行为密码,相信大家也能看明白。

val connectionProperties = new Properties()

Properties这个是用来做什么的那?

我们来看官网

c4510f216b1cc6479d9f8487268a97c4.jpg

它是

JDBC database 连接的一个参数,是一个字符串tag/value的列表。于是有了下面内容

connectionProperties.put("user", "username")
   connectionProperties.put("password", "password")

我们看到上面放入了用户名和密码

val jdbcDF2 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

这里设置了连接url,表名,还有connectionProperties

connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")

上面是指定读取Schema的自定义数据类型。

jdbcDF.write
   .format("jdbc")
   .option("url", "jdbc:postgresql:dbserver")
   .option("dbtable", "schema.tablename")
   .option("user", "username")
   .option("password", "password")
   .save()
jdbcDF2.write
   .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
 // Specifying create table column data types on write
 jdbcDF.write
   .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
   .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

上面分别都是将数据通过jdbc保存到数据库


相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
3月前
|
SQL 存储 关系型数据库
【MySQL基础篇】全面学习总结SQL语法、DataGrip安装教程
本文详细介绍了MySQL中的SQL语法,包括数据定义(DDL)、数据操作(DML)、数据查询(DQL)和数据控制(DCL)四个主要部分。内容涵盖了创建、修改和删除数据库、表以及表字段的操作,以及通过图形化工具DataGrip进行数据库管理和查询。此外,还讲解了数据的增、删、改、查操作,以及查询语句的条件、聚合函数、分组、排序和分页等知识点。
【MySQL基础篇】全面学习总结SQL语法、DataGrip安装教程
|
2月前
|
SQL 分布式计算 Java
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
162 0
|
4月前
|
SQL 安全 前端开发
Web学习_SQL注入_联合查询注入
联合查询注入是一种强大的SQL注入攻击方式,攻击者可以通过 `UNION`语句合并多个查询的结果,从而获取敏感信息。防御SQL注入需要多层次的措施,包括使用预处理语句和参数化查询、输入验证和过滤、最小权限原则、隐藏错误信息以及使用Web应用防火墙。通过这些措施,可以有效地提高Web应用程序的安全性,防止SQL注入攻击。
113 2
|
5月前
|
SQL Oracle 关系型数据库
SQL优化-使用联合索引和函数索引
在一次例行巡检中,发现一条使用 `to_char` 函数将日期转换为字符串的 SQL 语句 CPU 利用率很高。为了优化该语句,首先分析了 where 条件中各列的选择性,并创建了不同类型的索引,包括普通索引、函数索引和虚拟列索引。通过对比不同索引的执行计划,最终确定了使用复合索引(包含函数表达式)能够显著降低查询成本,提高执行效率。
|
5月前
|
SQL 数据库 数据库管理
数据库SQL函数应用技巧与方法
在数据库管理中,SQL函数是处理和分析数据的强大工具
|
5月前
|
SQL 数据库 索引
SQL中COUNT函数结合条件使用的技巧与方法
在SQL查询中,COUNT函数是一个非常常用的聚合函数,用于计算表中满足特定条件的记录数
1137 5
|
4月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
5月前
|
SQL 存储 数据库
SQL学习一:ACID四个特性,CURD基本操作,常用关键字,常用聚合函数,五个约束,综合题
这篇文章是关于SQL基础知识的全面介绍,包括ACID特性、CURD操作、常用关键字、聚合函数、约束以及索引的创建和使用,并通过综合题目来巩固学习。
121 1
|
5月前
|
SQL 关系型数据库 MySQL
SQL日期函数
SQL日期函数
|
5月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
150 0