spark2 sql读取数据源编程学习样例1

简介: spark2 sql读取数据源编程学习样例1

作为一个开发人员,我们学习spark sql,最终的目标通过spark sql完成我们想做的事情,那么我们该如何实现。这里根据官网,给出代码样例,并且对代码做一些诠释和说明。


/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.spark.examples.sql
import java.util.Properties
import org.apache.spark.sql.SparkSession
object SQLDataSourceExample {
  case class Person(name: String, age: Long)
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("Spark SQL data sources example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    runBasicDataSourceExample(spark)
    runBasicParquetExample(spark)
    runParquetSchemaMergingExample(spark)
    runJsonDatasetExample(spark)
    runJdbcDatasetExample(spark)
    spark.stop()
  }
  private def runBasicDataSourceExample(spark: SparkSession): Unit = {
    // $example on:generic_load_save_functions$
    val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
    usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
    // $example off:generic_load_save_functions$
    // $example on:manual_load_options$
    val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
    peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
    // $example off:manual_load_options$
    // $example on:manual_load_options_csv$
    val peopleDFCsv = spark.read.format("csv")
      .option("sep", ";")
      .option("inferSchema", "true")
      .option("header", "true")
      .load("examples/src/main/resources/people.csv")
    // $example off:manual_load_options_csv$
    // $example on:direct_sql$
    val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
    // $example off:direct_sql$
    // $example on:write_sorting_and_bucketing$
    peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
    // $example off:write_sorting_and_bucketing$
    // $example on:write_partitioning$
    usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
    // $example off:write_partitioning$
    // $example on:write_partition_and_bucket$
    peopleDF
      .write
      .partitionBy("favorite_color")
      .bucketBy(42, "name")
      .saveAsTable("people_partitioned_bucketed")
    // $example off:write_partition_and_bucket$
    spark.sql("DROP TABLE IF EXISTS people_bucketed")
    spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed")
  }
  private def runBasicParquetExample(spark: SparkSession): Unit = {
    // $example on:basic_parquet_example$
    // Encoders for most common types are automatically provided by importing spark.implicits._
    import spark.implicits._
    val peopleDF = spark.read.json("examples/src/main/resources/people.json")
    // DataFrames can be saved as Parquet files, maintaining the schema information
    peopleDF.write.parquet("people.parquet")
    // Read in the parquet file created above
    // Parquet files are self-describing so the schema is preserved
    // The result of loading a Parquet file is also a DataFrame
    val parquetFileDF = spark.read.parquet("people.parquet")
    // Parquet files can also be used to create a temporary view and then used in SQL statements
    parquetFileDF.createOrReplaceTempView("parquetFile")
    val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
    namesDF.map(attributes => "Name: " + attributes(0)).show()
    // +------------+
    // |       value|
    // +------------+
    // |Name: Justin|
    // +------------+
    // $example off:basic_parquet_example$
  }
  private def runParquetSchemaMergingExample(spark: SparkSession): Unit = {
    // $example on:schema_merging$
    // This is used to implicitly convert an RDD to a DataFrame.
    import spark.implicits._
    // Create a simple DataFrame, store into a partition directory
    val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
    squaresDF.write.parquet("data/test_table/key=1")
    // Create another DataFrame in a new partition directory,
    // adding a new column and dropping an existing column
    val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
    cubesDF.write.parquet("data/test_table/key=2")
    // Read the partitioned table
    val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
    mergedDF.printSchema()
    // The final schema consists of all 3 columns in the Parquet files together
    // with the partitioning column appeared in the partition directory paths
    // root
    //  |-- value: int (nullable = true)
    //  |-- square: int (nullable = true)
    //  |-- cube: int (nullable = true)
    //  |-- key: int (nullable = true)
    // $example off:schema_merging$
  }
  private def runJsonDatasetExample(spark: SparkSession): Unit = {
    // $example on:json_dataset$
    // Primitive types (Int, String, etc) and Product types (case classes) encoders are
    // supported by importing this when creating a Dataset.
    import spark.implicits._
    // A JSON dataset is pointed to by path.
    // The path can be either a single text file or a directory storing text files
    val path = "examples/src/main/resources/people.json"
    val peopleDF = spark.read.json(path)
    // The inferred schema can be visualized using the printSchema() method
    peopleDF.printSchema()
    // root
    //  |-- age: long (nullable = true)
    //  |-- name: string (nullable = true)
    // Creates a temporary view using the DataFrame
    peopleDF.createOrReplaceTempView("people")
    // SQL statements can be run by using the sql methods provided by spark
    val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
    teenagerNamesDF.show()
    // +------+
    // |  name|
    // +------+
    // |Justin|
    // +------+
    // Alternatively, a DataFrame can be created for a JSON dataset represented by
    // a Dataset[String] storing one JSON object per string
    val otherPeopleDataset = spark.createDataset(
      """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
    val otherPeople = spark.read.json(otherPeopleDataset)
    otherPeople.show()
    // +---------------+----+
    // |        address|name|
    // +---------------+----+
    // |[Columbus,Ohio]| Yin|
    // +---------------+----+
    // $example off:json_dataset$
  }
  private def runJdbcDatasetExample(spark: SparkSession): Unit = {
    // $example on:jdbc_dataset$
    // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
    // Loading data from a JDBC source
    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .load()
    val connectionProperties = new Properties()
    connectionProperties.put("user", "username")
    connectionProperties.put("password", "password")
    val jdbcDF2 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    // Specifying the custom data types of the read schema
    connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
    val jdbcDF3 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    // Saving data to a JDBC source
    jdbcDF.write
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .save()
    jdbcDF2.write
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    // Specifying create table column data types on write
    jdbcDF.write
      .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    // $example off:jdbc_dataset$
  }
}

代码已经有了,我们来看看里面包含的内容。在这之前,我们可以想到自己以前是如何编程的。无论是那种语言,首先我们需要引入系统包,然后创建程序入口,最后去实现一个个功能。当然spark sql也是这样的。我们来看。

包名



首先


package org.apache.spark.examples.sql

这里是包名,如果熟悉Java编程,相信这个很容易理解。其它语言可以网上查查包的作用。


导入系统包

接着就是我们熟悉的导入系统包,也就是spark相关包。


import java.util.Properties
import org.apache.spark.sql.SparkSession


单例对象


导入包后,我们就要创建程序入口,在创建入口之前,我们需要一个单例对象SQLDataSourceExample


object SQLDataSourceExample

在其它程序,SQLDataSourceExample可能是一个静态类,这就涉及到Scala的特殊之处了,由于静态成员(方法或者变量)在Scala中并不存在。Scala从不定义静态成员,而通过定义单例object取而代之

更多参考

http://www.aboutyun.com/forum.php?mod=viewthread&tid=12402


程序入口main


def main(args: Array[String])

这里我们看到它的定义关键字def来实现,args是参数名,Array[String]是参数类型


实例化sparksession


val spark = SparkSession
     .builder()
     .appName("Spark SQL data sources example")
     .config("spark.some.config.option", "some-value")
     .getOrCreate()

上面代码则是实例化SparkSession


runBasicDataSourceExample(spark)
  runBasicParquetExample(spark)
  runParquetSchemaMergingExample(spark)
  runJsonDatasetExample(spark)
  runJdbcDatasetExample(spark)

上面其实去入口里面实现的功能,是直接调用的函数



spark.stop()

spark.stop这里表示程序运行完毕。这样入口,也可以说驱动里面的内容,我们已经阅读完毕。


函数实现


接着我们看每个函数的功能实现。


private def runBasicDataSourceExample(spark: SparkSession): Unit = {
    // $example on:generic_load_save_functions$
    val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
    usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
    // $example off:generic_load_save_functions$
    // $example on:manual_load_options$
    val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
    peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
    // $example off:manual_load_options$
    // $example on:manual_load_options_csv$
    val peopleDFCsv = spark.read.format("csv")
      .option("sep", ";")
      .option("inferSchema", "true")
      .option("header", "true")
      .load("examples/src/main/resources/people.csv")
    // $example off:manual_load_options_csv$
    // $example on:direct_sql$
    val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
    // $example off:direct_sql$
    // $example on:write_sorting_and_bucketing$
    peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
    // $example off:write_sorting_and_bucketing$
    // $example on:write_partitioning$
    usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
    // $example off:write_partitioning$
    // $example on:write_partition_and_bucket$
    peopleDF
      .write
      .partitionBy("favorite_color")
      .bucketBy(42, "name")
      .saveAsTable("people_partitioned_bucketed")
    // $example off:write_partition_and_bucket$
    spark.sql("DROP TABLE IF EXISTS people_bucketed")
    spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed")
  }

私有函数的定义通过private关键字实现。Unit 是 greet 的结果类型。Unit 的结果类型指的是函数没有返回有用的值。Scala 的 Unit 类型接近于 Java 的 void 类型。这里面最让我们不习惯的是冒号,其实这里可以理解为一个分隔符。


private def runBasicDataSourceExample(spark: SparkSession): Unit =

下面我们每个解读代码:


val usersDF = spark.read.load("examples/src/main/resources/users.parquet")

用来读取数据。


peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

用来指定name和age字段保存格式为parquet,save("namesAndAges.parquet"),这里容易让我们理解为文件,其实这里是文件夹。


val peopleDFCsv = spark.read.format("csv")
    .option("sep", ";")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("examples/src/main/resources/people.csv")

上面代码用来读取csv文件。option是csv设置,比如header是指是否以第一行作为字段名。默认为false。这是我们设置为true,也就是说默认第一行为字段名。

更多信息参考下面来自官网


sep (default ,): sets the single character as a separator for each field and value.
encoding (default UTF-8): decodes the CSV files by the given encoding type.
quote (default "): sets the single character used for escaping quoted values where the separator can be part of the value. If you would like to turn off quotations, you need to set not null but an empty string. This behaviour is different from com.databricks.spark.csv.
escape (default \): sets the single character used for escaping quotes inside an already quoted value.
comment (default empty string): sets the single character used for skipping lines beginning with this character. By default, it is disabled.
header (default false): uses the first line as names of columns.
inferSchema (default false): infers the input schema automatically from data. It requires one extra pass over the data.
ignoreLeadingWhiteSpace (default false): a flag indicating whether or not leading whitespaces from values being read should be skipped.
ignoreTrailingWhiteSpace (default false): a flag indicating whether or not trailing whitespaces from values being read should be skipped.
nullValue (default empty string): sets the string representation of a null value. Since 2.0.1, this applies to all supported types including the string type.
nanValue (default NaN): sets the string representation of a non-number" value.
positiveInf (default Inf): sets the string representation of a positive infinity value.
negativeInf (default -Inf): sets the string representation of a negative infinity value.
dateFormat (default yyyy-MM-dd): sets the string that indicates a date format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to date type.
timestampFormat (default yyyy-MM-dd'T'HH:mm:ss.SSSXXX): sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type.
maxColumns (default 20480): defines a hard limit of how many columns a record can have.
maxCharsPerColumn (default -1): defines the maximum number of characters allowed for any given value being read. By default, it is -1 meaning unlimited length
mode (default PERMISSIVE): allows a mode for dealing with corrupt records during parsing. It supports the following case-insensitive modes.
PERMISSIVE : sets other fields to null when it meets a corrupted record, and puts the malformed string into a field configured by columnNameOfCorruptRecord. To keep corrupt records, an user can set a string type field named columnNameOfCorruptRecord in an user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. When a length of parsed CSV tokens is shorter than an expected length of a schema, it sets null for extra fields.
DROPMALFORMED : ignores the whole corrupted records.
FAILFAST : throws an exception when it meets corrupted records.
columnNameOfCorruptRecord (default is the value specified in spark.sql.columnNameOfCorruptRecord): allows renaming the new field having malformed string created by PERMISSIVE mode. This overrides spark.sql.columnNameOfCorruptRecord.
multiLine (default false): parse one record, which may span multiple lines.

3de8bee96db9b041f4fb1f0843d8816e.jpg

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader


peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

42为bucket数目,name为字段名。这是在spark2.1才有的功能


usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

在文件系统中按给定列favorite_color分区输出。

peopleDF
     .write
     .partitionBy("favorite_color")
     .bucketBy(42, "name")
     .saveAsTable("people_partitioned_bucketed")
   // $example off:write_partition_and_bucket$
   spark.sql("DROP TABLE IF EXISTS people_bucketed")
   spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed")


上面分别为保存为表及删除表。


相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍如何基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
4月前
|
分布式计算 Kubernetes 调度
Kubeflow-Spark-Operator-架构学习指南
本指南系统解析 Spark Operator 架构,涵盖 Kubebuilder 开发、控制器设计与云原生集成。通过四阶段学习路径,助你从部署到贡献,掌握 Kubernetes Operator 核心原理与实战技能。
272 0
|
9月前
|
机器学习/深度学习 人工智能 自然语言处理
3 秒音频也能克隆?拆解 Spark-TTS 架构的极致小样本学习
本文深入解析了 Spark-TTS 模型的架构与原理,该模型仅需 3 秒语音样本即可实现高质量的零样本语音克隆。其核心创新在于 BiCodec 单流语音编码架构,将语音信号分解为语义 Token 和全局 Token,实现内容与音色解耦。结合大型语言模型(如 Qwen 2.5),Spark-TTS 能直接生成语义 Token 并还原波形,简化推理流程。实验表明,它不仅能克隆音色、语速和语调,还支持跨语言朗读及情感调整。尽管面临相似度提升、样本鲁棒性等挑战,但其技术突破为定制化 AI 声音提供了全新可能。
688 35
|
SQL 存储 关系型数据库
【MySQL基础篇】全面学习总结SQL语法、DataGrip安装教程
本文详细介绍了MySQL中的SQL语法,包括数据定义(DDL)、数据操作(DML)、数据查询(DQL)和数据控制(DCL)四个主要部分。内容涵盖了创建、修改和删除数据库、表以及表字段的操作,以及通过图形化工具DataGrip进行数据库管理和查询。此外,还讲解了数据的增、删、改、查操作,以及查询语句的条件、聚合函数、分组、排序和分页等知识点。
1177 56
【MySQL基础篇】全面学习总结SQL语法、DataGrip安装教程
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
300 0
|
11月前
|
SQL 数据库连接 Linux
数据库编程:在PHP环境下使用SQL Server的方法。
看看你吧,就像一个调皮的小丑鱼在一片广阔的数据库海洋中游弋,一路上吞下大小数据如同海中的珍珠。不管有多少难关,只要记住这个流程,剩下的就只是探索未知的乐趣,沉浸在这个充满挑战的数据库海洋中。
316 16
|
SQL 安全 前端开发
Web学习_SQL注入_联合查询注入
联合查询注入是一种强大的SQL注入攻击方式,攻击者可以通过 `UNION`语句合并多个查询的结果,从而获取敏感信息。防御SQL注入需要多层次的措施,包括使用预处理语句和参数化查询、输入验证和过滤、最小权限原则、隐藏错误信息以及使用Web应用防火墙。通过这些措施,可以有效地提高Web应用程序的安全性,防止SQL注入攻击。
572 2
|
SQL 存储 数据库
SQL学习一:ACID四个特性,CURD基本操作,常用关键字,常用聚合函数,五个约束,综合题
这篇文章是关于SQL基础知识的全面介绍,包括ACID特性、CURD操作、常用关键字、聚合函数、约束以及索引的创建和使用,并通过综合题目来巩固学习。
522 1
|
SQL 数据挖掘 Python
数据分析编程:SQL,Python or SPL?
数据分析编程用什么,SQL、python or SPL?话不多说,直接上代码,对比明显,明眼人一看就明了:本案例涵盖五个数据分析任务:1) 计算用户会话次数;2) 球员连续得分分析;3) 连续三天活跃用户数统计;4) 新用户次日留存率计算;5) 股价涨跌幅分析。每个任务基于相应数据表进行处理和计算。
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
390 0
|
分布式计算 Scala Spark
educoder的spark算子学习
educoder的spark算子学习
233 0