这里接着上篇,继续阅读代码,下面我们看看runBasicParquetExample函数的功能实现
runBasicParquetExample函数
private def runBasicParquetExample(spark: SparkSession): Unit = { // $example on:basic_parquet_example$ // Encoders for most common types are automatically provided by importing spark.implicits._ import spark.implicits._ val peopleDF = spark.read.json("examples/src/main/resources/people.json") // DataFrames can be saved as Parquet files, maintaining the schema information peopleDF.write.parquet("people.parquet") // Read in the parquet file created above // Parquet files are self-describing so the schema is preserved // The result of loading a Parquet file is also a DataFrame val parquetFileDF = spark.read.parquet("people.parquet") // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // $example off:basic_parquet_example$ }
这里面有一个包的导入
import spark.implicits._
Scala中与其它语言的区别是在对象,函数中可以导入包。这个包的作用是转换RDD为DataFrame。
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
上面自然是读取json文件。
peopleDF.write.parquet("people.parquet")
这里同样是保存文件,不过people.parquet是文件夹。文件夹里面是数据,其中有*00000*为数据文件。
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
这里调用sql语句。
namesDF.map(attributes => "Name: " + attributes(0)).show()
这里通过map映射,增加Name:
// +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // $example off:basic_parquet_example$
runParquetSchemaMergingExample函数
private def runParquetSchemaMergingExample(spark: SparkSession): Unit = { // $example on:schema_merging$ // This is used to implicitly convert an RDD to a DataFrame. import spark.implicits._ // Create a simple DataFrame, store into a partition directory val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square") squaresDF.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube") cubesDF.write.parquet("data/test_table/key=2") // Read the partitioned table val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths // root // |-- value: int (nullable = true) // |-- square: int (nullable = true) // |-- cube: int (nullable = true) // |-- key: int (nullable = true) // $example off:schema_merging$ }
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square") squaresDF.write.parquet("data/test_table/key=1")
上面是创建一个RDD,然后通过toDF转换为DataFrame。然后保存到分区目录下。
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube") cubesDF.write.parquet("data/test_table/key=2")
创建另外一个DataFrame,并且添加一个新列,删除现有列
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema()
上面自然是读取数据保存为DataFrame,option("mergeSchema", "true"), 默认值由spark.sql.parquet.mergeSchema指定。设置所有的分区文件是否合并Schema。设置后将覆盖spark.sql.parquet.mergeSchema指定值。
runJsonDatasetExample函数
private def runJsonDatasetExample(spark: SparkSession): Unit = { // $example on:json_dataset$ // Primitive types (Int, String, etc) and Product types (case classes) encoders are // supported by importing this when creating a Dataset. import spark.implicits._ // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files val path = "examples/src/main/resources/people.json" val peopleDF = spark.read.json(path) // The inferred schema can be visualized using the printSchema() method peopleDF.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by spark val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() // +------+ // | name| // +------+ // |Justin| // +------+ // Alternatively, a DataFrame can be created for a JSON dataset represented by // a Dataset[String] storing one JSON object per string val otherPeopleDataset = spark.createDataset( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = spark.read.json(otherPeopleDataset) otherPeople.show() // +---------------+----+ // | address|name| // +---------------+----+ // |[Columbus,Ohio]| Yin| // +---------------+----+ // $example off:json_dataset$ }
上面有些代码重复了,就不在解释了。
val path = "examples/src/main/resources/people.json" val peopleDF = spark.read.json(path) // The inferred schema can be visualized using the printSchema() method peopleDF.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true)
上面是读取json,并显示schema。
// Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by spark val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show()
peopleDF.createOrReplaceTempView("people")是DataFrame注册为people表
teenagerNamesDF.show()
自然是显示数据。
如下
// +------+ // | name| // +------+ // |Justin| // +------+
val otherPeopleDataset = spark.createDataset( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
这里创建一个json格式的dataset
val otherPeople = spark.read.json(otherPeopleDataset)
这行代码,是读取上面创建的dataset,然后创建DataFrame。从上面我们看出这也是dataset和DataFrame转换的一种方式。
runJdbcDatasetExample函数
private def runJdbcDatasetExample(spark: SparkSession): Unit = { // $example on:jdbc_dataset$ // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods // Loading data from a JDBC source val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load() val connectionProperties = new Properties() connectionProperties.put("user", "username") connectionProperties.put("password", "password") val jdbcDF2 = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Specifying the custom data types of the read schema connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING") val jdbcDF3 = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Saving data to a JDBC source jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save() jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Specifying create table column data types on write jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // $example off:jdbc_dataset$ } }
这个是运行Jdbc Dataset的例子。那么如何从jdbc读取数据,是通过下面各个option
.option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password")
第一行server也就是服务器地址
第二行是表名
第三行是用户名
第四行为密码,相信大家也能看明白。
val connectionProperties = new Properties()
Properties这个是用来做什么的那?
我们来看官网
它是
JDBC database 连接的一个参数,是一个字符串tag/value的列表。于是有了下面内容
connectionProperties.put("user", "username") connectionProperties.put("password", "password")
我们看到上面放入了用户名和密码
val jdbcDF2 = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
这里设置了连接url,表名,还有connectionProperties
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
上面是指定读取Schema的自定义数据类型。
jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save() jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Specifying create table column data types on write jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
上面分别都是将数据通过jdbc保存到数据库