spark2 sql编程样例:sql操作
下面说下spark sql的各种操作。
如果你想一个spark sql程序,那么你会想,你到底该使用哪个包,如何嵌入sql语句,如何创建表,如何显示表内容,如何指定表显示字段。下面解决了我们这些问题。同样我们先附上代码:
代码
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.examples.sql import org.apache.spark.sql.Row // $example on:init_session$ import org.apache.spark.sql.SparkSession // $example off:init_session$ // $example on:programmatic_schema$ // $example on:data_types$ import org.apache.spark.sql.types._ // $example off:data_types$ // $example off:programmatic_schema$ object SparkSQLExample { // $example on:create_ds$ case class Person(name: String, age: Long) // $example off:create_ds$ def main(args: Array[String]) { // $example on:init_session$ val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._ // $example off:init_session$ runBasicDataFrameExample(spark) runDatasetCreationExample(spark) runInferSchemaExample(spark) runProgrammaticSchemaExample(spark) spark.stop() } private def runBasicDataFrameExample(spark: SparkSession): Unit = { // $example on:create_df$ val df = spark.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // $example off:create_df$ // $example on:untyped_ops$ // This import is needed to use the $-notation import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+ // $example off:untyped_ops$ // $example on:run_sql$ // Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // $example off:run_sql$ // $example on:global_temp_view$ // Register the DataFrame as a global temporary view df.createGlobalTempView("people") // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // $example off:global_temp_view$ } private def runDatasetCreationExample(spark: SparkSession): Unit = { import spark.implicits._ // $example on:create_ds$ // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // $example off:create_ds$ } private def runInferSchemaExample(spark: SparkSession): Unit = { // $example on:schema_inferring$ // For implicit conversions from RDDs to DataFrames import spark.implicits._ // Create an RDD of Person objects from a text file, convert it to a Dataframe val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF() // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // or by field name teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // No pre-defined encoders for Dataset[Map[K,V]], define explicitly implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() // Array(Map("name" -> "Justin", "age" -> 19)) // $example off:schema_inferring$ } private def runProgrammaticSchemaExample(spark: SparkSession): Unit = { import spark.implicits._ // $example on:programmatic_schema$ // Create an RDD val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Generate the schema based on the string of schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // Convert records of the RDD (people) to Rows val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL can be run over a temporary view created using DataFrames val results = spark.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name results.map(attributes => "Name: " + attributes(0)).show() // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+ // $example off:programmatic_schema$ } }
接着我们阅读代码:
包名
package org.apache.spark.examples.sql
同样还是先有一个自定义的包名org.apache.spark.examples.sql
导入包
import org.apache.spark.sql.Row // $example on:init_session$ import org.apache.spark.sql.SparkSession // $example off:init_session$ // $example on:programmatic_schema$ // $example on:data_types$ import org.apache.spark.sql.types._ // $example off:data_types$ // $example off:programmatic_schema$
上面我们导入了三个包,其中import org.apache.spark.sql.Row,import org.apache.spark.sql.types._是我们在读取数据源没有用到的。
case class Person(name: String, age: Long)
这里是自定义了一个类,跟普通类稍微有一些区别。
详细参考:
scala中case class是什么?http://www.aboutyun.com/forum.php?mod=viewthread&tid=23464
程序入口
def main(args: Array[String]) { // $example on:init_session$ val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._ // $example off:init_session$ runBasicDataFrameExample(spark) runDatasetCreationExample(spark) runInferSchemaExample(spark) runProgrammaticSchemaExample(spark) spark.stop() }
上面跟spark读取数据源是一样的,不在重复,想了解可查看
spark2 sql读取数据源编程学习样例1:程序入口、功能等知识详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23484
runBasicDataFrameExample函数
private def runBasicDataFrameExample(spark: SparkSession): Unit = { // $example on:create_df$ val df = spark.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // $example off:create_df$ // $example on:untyped_ops$ // This import is needed to use the $-notation import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+ // $example off:untyped_ops$ // $example on:run_sql$ // Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // $example off:run_sql$ // $example on:global_temp_view$ // Register the DataFrame as a global temporary view df.createGlobalTempView("people") // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // $example off:global_temp_view$ }
这里面其实也有重复的,也不全部介绍了。其中
df.select("name").show()
是一直显示自定字段name的列表,如下:
// +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+
df.select($"name", $"age" + 1).show()
上面我们还可以对字段操作,将字段的age都加1,并显示,如下:
// +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+
从这里说明Scala已经变的跟传统数据库差不多了。
df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+
上面查询是年龄大于21岁的记录。
// Count people by age df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
上面则是分组,并统计。
// Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
上面是注册一个sql临时视图,然后查询显示
// Register the DataFrame as a global temporary view df.createGlobalTempView("people") // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
我们看到这里有些区别带有global 。一个是people视图,一个是global_temp.people视图。
spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
Global 临时视图是cross-session,也就是可能是不止在一个SparkSession中显示。当然这个后面在验证下。大家感兴趣也可以测试下。
runDatasetCreationExample函数
private def runDatasetCreationExample(spark: SparkSession): Unit = { import spark.implicits._ // $example on:create_ds$ // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // $example off:create_ds$ }
整体来说实现了由集合分别转换为DataFrame和dataset。
val caseClassDS = Seq(Person("Andy", 32)).toDS()
上面是person类转换为序列,然后序列转换为DataFrame。
val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
从上面可以看出集合可以转换为dataset,然后通过map映射分别都加1,然后通过collect函数显示。
val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
上面其实很有意思
spark.read.json(path)
这里其实为DataFrame,但是通过
as[Person]
转换为了dataset,person则为case类。
runInferSchemaExample函数
private def runInferSchemaExample(spark: SparkSession): Unit = { // $example on:schema_inferring$ // For implicit conversions from RDDs to DataFrames import spark.implicits._ // Create an RDD of Person objects from a text file, convert it to a Dataframe val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF() // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // or by field name teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // No pre-defined encoders for Dataset[Map[K,V]], define explicitly implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() // Array(Map("name" -> "Justin", "age" -> 19)) // $example off:schema_inferring$ }
上面分别是rdd转换为DataFrame,以及DataFrame行的操作
val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF()
上面代码便是读取people.txt为rdd,然后转换为DataFrame。这里面大部分也重复了。需要说明的
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
上面是map的序列化类mapEncoder 。
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+
这里面值得说的getAs方法,它是DataFrame的row的方法,返回的是name字段的值
官网解释如下
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
teenager.getValuesMap[Any](List("name", "age"))这里是row的一个方法getValuesMap,获取指定几列的值
官网解释如下:
关于DataFrame row的更多操作方法,可参考
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row
runProgrammaticSchemaExample函数
private def runProgrammaticSchemaExample(spark: SparkSession): Unit = { import spark.implicits._ // $example on:programmatic_schema$ // Create an RDD val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Generate the schema based on the string of schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // Convert records of the RDD (people) to Rows val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL can be run over a temporary view created using DataFrames val results = spark.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name results.map(attributes => "Name: " + attributes(0)).show() // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+ // $example off:programmatic_schema$ }
这个函数主要实现了,将RDD转换DataFrame的过程。
// The schema is encoded in a string val schemaString = "name age" // Generate the schema based on the string of schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // Convert records of the RDD (people) to Rows val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim))
上面分别是创建一个字符串schemaString ,然后对schemaString处理,通过StructField和StructType转换为schema ,rowRDD 则是由peopleRDD转换而来。
val peopleDF = spark.createDataFrame(rowRDD, schema)
最后这里生成了DataFrame。其它基本重复了,不在啰嗦。