Spark SQL 笔记

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 官方参考文档:http://spark.apache.org/docs/2.1.0/sql-programming-guide.

官方参考文档:

http://spark.apache.org/docs/2.1.0/sql-programming-guide.html#creating-dataframes

DataFrame

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R.

对于熟悉python的同学,Spark的DataFrame和python的DF很像。对于structured data files同学比较熟知的有xml、jason、parquet等。

关于parquet,请参考:

http://blog.csdn.net/yu616568/article/details/50993491

具体的df的操作请参考官网:

http://spark.apache.org/docs/2.1.0/sql-programming-guide.html#creating-dataframes

python操作演示:

>>> df = sqlContext.read.json("file:///home/zkpk/spark-1.5.2-bin-2.5.2/examples/src/main/resources/people.json")
>>> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
>>> df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

>>> df.select("name").show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

>>> df.select(df['name'], df['age'] + 1).show()
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

>>> df.filter(df['age'] > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

scala在eclipse中的实现

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object DataFrameOperation {
  def main(args: Array[String]): Unit = {
    //create  datarame
    //首先创建程序入口
    val conf=new SparkConf().setAppName("DataFrameOperation")
    val sc=new SparkContext(conf);
    //create sqlcontext
    val sqlContext=new SQLContext(sc);
    val df=sqlContext.read.json("hdfs://hadoop1:9000/examples/src/main/resources/people.json")
    df.show();  

    //print schema
    df.printSchema()
    //name age
    df.select("name").show();
    //
    df.select(df("name"),df("age")+1).show()
    //where
    df.filter(df("age") > 21).show()
    //groupby
    df.groupBy("age").count().show();   
  }
}

关于代码的提交:

  • 将上面的代码的代码文件(文件 即可不用导出工程) export成jar
  • 提交请参考 Run on a YARN cluster 在上一篇的spark教程里

Interoperating with RDDs(和RDD交互)

Spark SQL supports two different methods for converting existing RDDs into DataFrames. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application.

The second method for creating DataFrames is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct DataFrames when the columns and their types are not known until runtime.

通过reflection的方式将RDD转换成dataframe

注意:RDD是读取非结构化数据
java版本

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

public class RDD2DataFrameReflection {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName("RDD2DataFrameReflection");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        //生成一个RDD
        JavaRDD<Person> PersonRDD = sc.textFile("hdfs://hadoop1:9000/examples/src/main/resources/people.txt")
        .map(new Function<String, Person>() {

            public Person call(String line) throws Exception {
                String[] strs = line.split(",");
                String name=strs[0];
                int age=Integer.parseInt(strs[1].trim());
                Person person=new Person(age,name);     
                return person;
            }

        });
        //生成一个df
        DataFrame personDF = sqlContext.createDataFrame(PersonRDD, Person.class);
        //将df注册成临时表,之后就可以像操作表一样
        personDF.registerTempTable("person");

        DataFrame resultperson = sqlContext.sql("select name,age from person where age > 13 and age <= 19");
        //当想使用RDD的算子是,将df转换为RDD就可以使用类似foreach的操作了
        resultperson.javaRDD().foreach(new VoidFunction<Row>() {

            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            public void call(Row row) throws Exception {
            //把每一条数据都看成是一个row  row(0)=name  row(1)=age 
                System.out.println("name"+row.getString(0));
                System.out.println("age"+row.getInt(1));
            }
        });

        resultperson.javaRDD().saveAsTextFile("hdfs://hadoop1:9000/reflectionresult");

    }

}

python版本(reflection.py):

# sc is an existing SparkContext.
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
appName= "reflection"
master = "local"
conf1 = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf1)

sqlContext = SQLContext(sc)

# Load a text file and convert each line to a Row.
lines = sc.textFile("file:///home/zkpk/spark-1.5.2-bin-2.5.2/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print(teenName)

代码提交时,使用spark-submit reflection.py

用编程(programmatically)的方式将RDD转换成DF

>>> lines = sc.textFile("file:///home/zkpk/spark-1.5.2-bin-2.5.2/examples/src/main/resources/people.txt")
>>> parts = lines.map(lambda l: l.split(","))
>>> people = parts.map(lambda p: (p[0], p[1].strip()))
>>> parts.collect()
[[u'Michael', u' 29'], [u'Andy', u' 30'], [u'Justin', u' 19']]
>>> people.collect()
[(u'Michael', u'29'), (u'Andy', u'30'), (u'Justin', u'19')]
>>> schemaString = "name age"
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
NameError: name 'StructField' is not defined
>>> from pyspark.sql.types import *
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
>>> fields.collect()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'list' object has no attribute 'collect'
>>> schema = StructType(fields)
>>> schemaPeople = sqlContext.createDataFrame(people, schema)
>>> schemaPeople.registerTempTable("people")
>>> results = sqlContext.sql("SELECT name FROM people")
>>> results.collect()
[Row(name=u'Michael'), Row(name=u'Andy'), Row(name=u'Justin')]
>>> names = results.map(lambda p: "Name: " + p.name)
>>> names.collect()
[u'Name: Michael', u'Name: Andy', u'Name: Justin']
>>> for name in names.collect():
...   print(name)
... 
Name: Michael
Name: Andy
Name: Justin
>>> 

不懂StructField和StructType的还请参考

http://spark.apache.org/docs/1.5.2/api/python/pyspark.sql.html?highlight=structfield#pyspark.sql.types.StructField

DataFrame VS RDD

这里写图片描述

当生成DF的时候会返回schema类型(类似于postgresql的表结构),而RDD不会,仅仅返回的是一个Person,不知道里面具体的数据类型

数据的load以及save

参考:

http://spark.apache.org/docs/2.1.0/sql-programming-guide.html#data-sources

spark sql默认支持的是parquet文件格式,所以不制定load、save类型,默认都是parquet。关于parquet 参考

http://www.infoq.com/cn/articles/in-depth-analysis-of-parquet-column-storage-format

Parquet是语言无关的,而且不与任何一种数据处理框架绑定在一起,适配多种语言和组件,能够与Parquet配合的组件有:

查询引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL

计算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite

数据模型: Avro, Thrift, Protocol Buffers, POJOs

保存时是追加还是覆盖,请参考SaveModes

parquet 文件操作

通用文件操作:

df1 = sqlContext.read.load("file:///home/zkpk/spark-1.5.2-bin-2.5.2/examples/src/main/resources/users.parquet")

>>> df1.show()
+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+
df1.select("name", "favorite_color").show()
+------+--------------+
|  name|favorite_color|
+------+--------------+
|Alyssa|          null|
|   Ben|           red|
+------+--------------+

df1.select("name", "favorite_color").write.save("file:///home/zkpk/ecaoyng/input/namesAndFavColors.parquet")

自动推断分区(Partition Discovery)

看完下面的例子就知道什么是自动推断分区了

#在hdfs上新建一个目录country=US
hadoop fs -mkdir -p /ecaoyng/country=US
#将parquet文件上传到新建的目录
hadoop fs -put users.parquet /ecaoyng/country=US

#之前读到的df1的内容
>>> df1.show()
+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+
#在新建目录之后读取到的parquet文件,可以看出多了country这一列
>>> df2= sqlContext.read.parquet("hdfs:///ecaoyng/country=US/users.parquet")
>>> df2.show()
+------+--------------+----------------+-------+
|  name|favorite_color|favorite_numbers|country|
+------+--------------+----------------+-------+
|Alyssa|          null|  [3, 9, 15, 20]|     US|
|   Ben|           red|              []|     US|
+------+--------------+----------------+-------+

schema的合并


>>> df3 = sqlContext.createDataFrame(sc.parallelize(range(1, 6)).map(lambda i : Row(single=i, double=i*2))
... )
>>> df3.write.parquet("hdfs:///ecaoyng/test_table/key=1")
>>> df4=sqlContext.createDataFrame(sc.parallelize(range(6,11)).map(lambda i : Row(single=i, triple=i*3)))
>>> df4.write.parquet("hdfs:///ecaoyng/test_table/key=2")
>>> df5 = sqlContext.read.option("mergeSchema", "true").parquet("hdfs:///ecaoyng/test_table")
>>> df5.printSchema()
root
 |-- double: long (nullable = true)
 |-- single: long (nullable = true)
 |-- triple: long (nullable = true)
 |-- key: integer (nullable = true)

>>> df5.show()
+------+------+------+---+
|double|single|triple|key|
+------+------+------+---+
|  null|     6|    18|  2|
|  null|     7|    21|  2|
|  null|     8|    24|  2|
|  null|     9|    27|  2|
|  null|    10|    30|  2|
|     2|     1|  null|  1|
|     4|     2|  null|  1|
|     6|     3|  null|  1|
|     8|     4|  null|  1|
|    10|     5|  null|  1|
+------+------+------+---+

>>> 

数据源之JDBC之mysql

以mysql为例,首先启动mysql. 遇到错误信息如下

nother MySQL daemon already running with the same unix socket.
Starting mysqld:                                           [FAILED]

解决方案,重命名/var/lib/mysql/mysql.sock 即可,重新启动mysql
其他的请参考如下链接

https://www.cnblogs.com/wwxbi/p/6978774.html
http://spark.apache.org/docs/1.6.0/sql-programming-guide.html#jdbc-to-other-databases

注意,如果没有驱动包会报错:

java.sql.SQLException: No suitable driver
        at java.sql.DriverManager.getDriver(DriverManager.java:278)

解决方法是: 下载驱动包

在spark-env.sh中设置
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/usr/local/soft/hive/lib/mysql-connector-java-5.1.10.jar
那么在提交jdbc的脚本的时候就不可以设置--driver-class-path路径

如果在spark-env.sh脚本里没有设置MySQL驱动包的spark_classpath
那么需要在提交任务的时候在脚本里天如下内容:
--driver-class-path /usr/local/soft/hive/lib/mysql-connector-java-5.1.10.jar

数据源之Hive

在spark开发中spark开发占了很大一部分,而基于hive的开发,又占了其中很大的比重。下面我们就来进行hive和spark集成

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
28天前
|
SQL 存储 关系型数据库
SQL自学笔记(3):SQL里的DCL,DQL都代表什么?
本文介绍了SQL的基础语言类型(DDL、DML、DCL、DQL),并详细说明了如何创建用户和表格,最后推荐了几款适合初学者的免费SQL实践平台。
125 3
SQL自学笔记(3):SQL里的DCL,DQL都代表什么?
|
28天前
|
SQL 数据挖掘 数据库
SQL自学笔记(2):如何用SQL做简单的检索
本文深入介绍了SQL的基本语法,包括数据查询、过滤、排序、分组及表连接等操作,并通过实际案例展示了SQL在用户研究中的应用,如用户行为分析、用户细分、用户留存分析及满意度调查数据分析。
30 0
SQL自学笔记(2):如何用SQL做简单的检索
|
28天前
|
SQL 数据挖掘 关系型数据库
SQL自学笔记(1):什么是SQL?有什么用?
本文为用户研究新手介绍SQL(结构化查询语言),解释了SQL的基本概念、入门方法及在用户研究中的应用通过实际案例说明,如用户行为分析、用户细分和满意度调查数据分析,展示了SQL在用户研究中的重要作用。
69 0
SQL自学笔记(1):什么是SQL?有什么用?
|
2月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
3月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
80 0
|
3月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
99 0
|
3月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
72 0
|
3月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
88 0
|
3月前
|
SQL 存储 分布式计算
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
52 0
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
154 2
ClickHouse与大数据生态集成:Spark & Flink 实战