SparkSQL(Spark-1.4.0)实战系列(二)——DataFrames进阶

简介: 本节主要内容如下DataFrame与RDD的互操作实战不同数据源构建DataFrame实战DataFrame与RDD的互操作实战1 采用反映机制进行Schema类型推导(RDD到DataFrame的转换) SparkSQL支持RDD到DataFrame的自动转换,实现方法是通过Case类定义表的Schema,Spark会通过反射机制读取case class的

本节主要内容如下

  1. DataFrame与RDD的互操作实战
  2. 不同数据源构建DataFrame实战

DataFrame与RDD的互操作实战

1 采用反映机制进行Schema类型推导(RDD到DataFrame的转换)
SparkSQL支持RDD到DataFrame的自动转换,实现方法是通过Case类定义表的Schema,Spark会通过反射机制读取case class的参数名并将其配置成表的列名。

//导入该语句后,RDD将会被隐式转换成DataFrame
import sqlContext.implicits._

//定义一个类为Person的Case Class作为Schema
case class Person(name: String, age: Int)

//读取文件并将数据Map成Person实例,然后转换为DataFrame,采用toDF()方法,本实例从HDFS上进行数据读取
val people = sc.textFile("/data/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

//将实例为peopler的DataFrame注册成表
people.registerTempTable("people")

//采用SQLContext中的sql方法执行SQL语句
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

//输出返回结果
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

2 利用程序动态指定Schema
在某些应用场景下,我们可能并不能提前确定对应列的个数,因而case class无法进行定义,此时可以通过传入一个字符串来设置Schema信息。具体过程如下:

// 创建RDD
val people = sc.textFile("/data/people.txt")

//Schema字符串
val schemaString = "name age"

// 导入Row
import org.apache.spark.sql.Row;

//导入Spark SQL数据类型
import org.apache.spark.sql.types.{StructType,StructField,StringType};

//利用schemaString动态生成Schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// 将people RDD转换成Rows
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// 创建DataFrame
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

//注册成表
peopleDataFrame.registerTempTable("people")

//执行SQL语句.
val results = sqlContext.sql("SELECT name FROM people")

//打印输出
results.map(t => "Name: " + t(0)).collect().foreach(println)

通过不同数据源创建DataFrame

前面我们创建DataFrame时,读取的是HDFS中的txt类型数据,在SparkSQL中,它支持多种数据源,主要包括JSON、Parquet等。

//读取json格式数据
val jsonFile= sqlContext.read.json("/data/people.json")

//jsonFile注册成表
jsonFile.registerTempTable("peopleJson")
val teenagers = sqlContext.sql("SELECT name FROM peopleJson WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

这里写图片描述

//保存为parquet格式数据
jsonFile.select("name", "age").write.format("parquet").save("/data/namesAndAges.parquet")

这里写图片描述
parquet文件目录结构如下图
这里写图片描述

//读取parquet格式数据
val parquetFile = sqlContext.read.parquet(“/data/namesAndAges.parquet”)

//parquetFile注册成表
parquetFile.registerTempTable(“parquetPerson”)
val teenagers = sqlContext.sql(“SELECT name FROM parquetPerson WHERE age >= 13 AND age <= 19”)
teenagers.map(t => “Name: ” + t(0)).collect().foreach(println)

添加公众微信号,可以了解更多最新技术资讯
这里写图片描述

目录
相关文章
|
6月前
|
设计模式 SQL 分布式计算
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
73 0
|
6月前
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
146 2
|
6天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
29 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
28天前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
37 1
|
28天前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
35 0
|
28天前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
66 0
|
28天前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
30 0
|
28天前
|
存储 SQL 分布式计算
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
37 0
|
28天前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
41 0
|
28天前
|
SQL 存储 分布式计算
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
35 0