6.4 排序开窗函数
6.4.1 ROW_NUMBER顺序排序row_number() over(order by score) as rownum 表示按score 升序的方式来排序,并得出排序结果的序号
注意:在排序开窗函数中使用 PARTITION BY 子句需要放置在ORDER BY 子句之前。
示例1
spark.sql("select name, class, score, row_number() over(order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a8| 3| 45| 1| | a9| 3| 55| 2| | a4| 2| 74| 3| | a2| 1| 78| 4| | a10| 3| 78| 5| | a1| 1| 80| 6| | a5| 2| 92| 7| | a3| 1| 95| 8| | a6| 3| 99| 9| | a7| 3| 99| 10| | a11| 3| 100| 11| +----+-----+-----+----+ spark.sql("select name, class, score, row_number() over(partition by class order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a2| 1| 78| 1| | a1| 1| 80| 2| | a3| 1| 95| 3| | a8| 3| 45| 1| | a9| 3| 55| 2| | a10| 3| 78| 3| | a6| 3| 99| 4| | a7| 3| 99| 5| | a11| 3| 100| 6| | a4| 2| 74| 1| | a5| 2| 92| 2| +----+-----+-----+----+
6.4.2 RANK跳跃排序
rank() over(order by score) as rank表示按 score升序的方式来排序,并得出排序结果的排名号。 这个函数求出来的排名结果可以并列(并列第一/并列第二),并列排名之后的排名将是并列的排名加上并列数 简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第三名,也就是没有了第二名,但是有两个第一名
示例2
spark.sql("select name, class, score, rank() over(order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a8| 3| 45| 1| | a9| 3| 55| 2| | a4| 2| 74| 3| | a10| 3| 78| 4| | a2| 1| 78| 4| | a1| 1| 80| 6| | a5| 2| 92| 7| | a3| 1| 95| 8| | a6| 3| 99| 9| | a7| 3| 99| 9| | a11| 3| 100| 11| +----+-----+-----+----+ spark.sql("select name, class, score, rank() over(partition by class order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a2| 1| 78| 1| | a1| 1| 80| 2| | a3| 1| 95| 3| | a8| 3| 45| 1| | a9| 3| 55| 2| | a10| 3| 78| 3| | a6| 3| 99| 4| | a7| 3| 99| 4| | a11| 3| 100| 6| | a4| 2| 74| 1| | a5| 2| 92| 2| +----+-----+-----+----+
6.4.3 DENSE_RANK连续排序
dense_rank() over(order by score) as dense_rank 表示按score 升序的方式来排序,并得出排序结果的排名号。 这个函数并列排名之后的排名是并列排名加1 简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第二名,也就是两个第一名,一个第二名
示例3
spark.sql("select name, class, score, dense_rank() over(order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a8| 3| 45| 1| | a9| 3| 55| 2| | a4| 2| 74| 3| | a2| 1| 78| 4| | a10| 3| 78| 4| | a1| 1| 80| 5| | a5| 2| 92| 6| | a3| 1| 95| 7| | a6| 3| 99| 8| | a7| 3| 99| 8| | a11| 3| 100| 9| +----+-----+-----+----+ spark.sql("select name, class, score, dense_rank() over(partition by class order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a2| 1| 78| 1| | a1| 1| 80| 2| | a3| 1| 95| 3| | a8| 3| 45| 1| | a9| 3| 55| 2| | a10| 3| 78| 3| | a6| 3| 99| 4| | a7| 3| 99| 4| | a11| 3| 100| 5| | a4| 2| 74| 1| | a5| 2| 92| 2| +----+-----+-----+----+
6.4.4 NTILE分组排名[了解]
ntile(6) over(order by score)as ntile表示按 score 升序的方式来排序,然后 6 等分成 6 个组,并显示所在组的序号。
示例4
spark.sql("select name, class, score, ntile(6) over(order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a8| 3| 45| 1| | a9| 3| 55| 1| | a4| 2| 74| 2| | a2| 1| 78| 2| | a10| 3| 78| 3| | a1| 1| 80| 3| | a5| 2| 92| 4| | a3| 1| 95| 4| | a6| 3| 99| 5| | a7| 3| 99| 5| | a11| 3| 100| 6| +----+-----+-----+----+ spark.sql("select name, class, score, ntile(6) over(partition by class order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a2| 1| 78| 1| | a1| 1| 80| 2| | a3| 1| 95| 3| | a8| 3| 45| 1| | a9| 3| 55| 2| | a10| 3| 78| 3| | a6| 3| 99| 4| | a7| 3| 99| 5| | a11| 3| 100| 6| | a4| 2| 74| 1| | a5| 2| 92| 2| +----+-----+-----+----+
7、Spark-On-Hive
7.1 概述
官网
http://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html
Configuration of Hive is done by placing your hive-site.xml, core-site.xml (for security configuration), and hdfs-site.xml (for HDFS configuration) file in conf/.
Hive查询流程及原理 执行HQL时,先到MySQL元数据库中查找描述信息,然后解析HQL并根据描述信息生成MR任务 Hive将SQL转成MapReduce执行速度慢 使用SparkSQL整合Hive其实就是让SparkSQL去加载Hive 的元数据库,然后通过SparkSQL执行引擎去操作Hive表内的数据 所以首先需要开启Hive的元数据库服务,让SparkSQL能够加载元数据
7.2 Hive开启MetaStore服务
1: 修改 hive/conf/hive-site.xml 新增如下配置 <?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>hive.metastore.warehouse.dir</name> <value>/user/hive/warehouse</value> </property> <property> <name>hive.metastore.local</name> <value>false</value> </property> <property> <name>hive.metastore.uris</name> <value>thrift://node01:9083</value> </property> </configuration> 2: 后台启动 Hive MetaStore服务 nohup /export/servers/hive/bin/hive --service metastore 2>&1 >> /var/log.log &
7.3 SparkSQL整合Hive MetaStore
Spark 有一个内置的 MateStore,使用 Derby 嵌入式数据库保存数据,但是这种方式不适合生产环境,因为这种模式同一时间只能有一个 SparkSession 使用,所以生产环境更推荐使用 Hive 的 MetaStore SparkSQL 整合 Hive 的 MetaStore 主要思路就是要通过配置能够访问它, 并且能够使用 HDFS 保存 WareHouse,所以可以直接拷贝 Hadoop 和 Hive 的配置文件到 Spark 的配置目录 hive-site.xml 元数据仓库的位置等信息 core-site.xml 安全相关的配置 hdfs-site.xml HDFS 相关的配置 使用IDEA本地测试直接把以上配置文件放在resources目录即可
7.4 使用SparkSQL操作Hive表
package cn.itcast.sql import org.apache.spark.sql.SparkSession object HiveSupport { def main(args: Array[String]): Unit = { //创建sparkSession val spark = SparkSession .builder() .appName("HiveSupport") .master("local[*]") //.config("spark.sql.warehouse.dir", "hdfs://node01:8020/user/hive/warehouse") //.config("hive.metastore.uris", "thrift://node01:9083") .enableHiveSupport()//开启hive语法的支持 .getOrCreate() spark.sparkContext.setLogLevel("WARN") //查看有哪些表 spark.sql("show tables").show() //创建表 spark.sql("CREATE TABLE person (id int, name string, age int) row format delimited fields terminated by ' '") //加载数据,数据为当前SparkDemo项目目录下的person.txt(和src平级) spark.sql("LOAD DATA LOCAL INPATH 'SparkDemo/person.txt' INTO TABLE person") //查询数据 spark.sql("select * from person ").show() spark.stop() } }
五、Spark Streaming引入详解
1.1 新的场景需求
集群监控
一般的大型集群和平台, 都需要对其进行监控的需求。 要针对各种数据库, 包括 MySQL, HBase 等进行监控 要针对应用进行监控, 例如 Tomcat, Nginx, Node.js 等 要针对硬件的一些指标进行监控, 例如 CPU, 内存, 磁盘 等
还有很多很多
1.2 Spark Streaming介绍
官网
http://spark.apache.org/streaming/
概述
Spark Streaming是一个基于Spark Core之上的实时计算框架, 可以从很多数据源消费数据并对数据进行实时的处理, 具有高吞吐量和容错能力强等特点。
Spark Streaming的特点
1.易用 可以像编写离线批处理一样去编写流式程序,支持java/scala/python语言。 2.容错 SparkStreaming在没有额外代码和配置的情况下可以恢复丢失的工作。 3.易整合到Spark体系 流式处理与批处理和交互式查询相结合。
1.3 实时计算所处的位置
二、Spark Streaming原理
2.1 SparkStreaming原理
2.1.1 整体流程
Spark Streaming中,会有一个接收器组件Receiver,作为一个长期运行的task跑在一个Executor上。Receiver接收外部的数据流形成input DStream DStream会被按照时间间隔划分成一批一批的RDD,当批处理间隔缩短到秒级时,便可以用于处理实时数据流。时间间隔的大小可以由参数指定,一般设在500毫秒到几秒之间。 对DStream进行操作就是对RDD进行操作,计算处理的结果可以传给外部系统。 Spark Streaming的工作流程像下面的图所示一样,接收到实时数据后,给数据分批次,然后传给Spark Engine(引擎)处理最后生成该批次的结果。
2.1.2 数据抽象
Spark Streaming的基础抽象是DStream(Discretized Stream,离散化数据流,连续不断的数据流),代表持续性的数据流和经过各种Spark算子操作后的结果数据流
可以从以下多个角度深入理解DStream
1.DStream本质上就是一系列时间上连续的RDD
2.对DStream的数据的进行操作也是按照RDD为单位来进行的
3.容错性 底层RDD之间存在依赖关系,DStream直接也有依赖关系,RDD具有容错性,那么DStream也具有容错性 如图:每一个椭圆形表示一个RDD 椭圆形中的每个圆形代表一个RDD中的一个Partition分区 每一列的多个RDD表示一个DStream(图中有三列所以有三个DStream) 每一行最后一个RDD则表示每一个Batch Size所产生的中间结果RDD
4.准实时性/近实时性 Spark Streaming将流式计算分解成多个Spark Job,对于每一时间段数据的处理都会经过Spark DAG图分解以及Spark的任务集的调度过程。 对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~5秒钟之间 所以Spark Streaming能够满足流式准实时计算场景,对实时性要求非常高的如高频实时交易场景则不太适合
总结
简单来说DStream就是对RDD的封装,你对DStream进行操作,就是对RDD进行操作 对于DataFrame/DataSet/DStream来说本质上都可以理解成RDD
2.2 DStream相关操作
DStream上的操作与RDD的类似,分为以下两种:
Transformations(转换) Output Operations(输出)/Action
2.2.1 Transformations
常见Transformation—无状态转换:每个批次的处理不依赖于之前批次的数据
Transformation | Meaning |
map(func) | 对DStream中的各个元素进行func函数操作,然后返回一个新的DStream |
flatMap(func) | 与map方法类似,只不过各个输入项可以被输出为零个或多个输出项 |
filter(func) | 过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream |
union(otherStream) | 将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream. |
reduceByKey(func, [numTasks]) | 利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream |
join(otherStream, [numTasks]) | 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W)类型的DStream |
transform(func) | 通过RDD-to-RDD函数作用于DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD |
特殊的Transformations—有状态转换:当前批次的处理需要使用之前批次的数据或者中间结果。
有状态转换包括基于追踪状态变化的转换(updateStateByKey)和滑动窗口的转换
1.UpdateStateByKey(func) 2.Window Operations 窗口操作
2.2.2 Output/Action
Output Operations可以将DStream的数据输出到外部的数据库或文件系统 当某个Output Operations被调用时,spark streaming程序才会开始真正的计算过程(与RDD的Action类似)
Output Operation | Meaning |
print() | 打印到控制台 |
saveAsTextFiles(prefix, [suffix]) | 保存流的内容为文本文件,文件名为"prefix-TIME_IN_MS[.suffix]". |
saveAsObjectFiles(prefix,[suffix]) | 保存流的内容为SequenceFile,文件名为 “prefix-TIME_IN_MS[.suffix]”. |
saveAsHadoopFiles(prefix,[suffix]) | 保存流的内容为hadoop文件,文件名为"prefix-TIME_IN_MS[.suffix]". |
foreachRDD(func) | 对Dstream里面的每个RDD执行func |
2.3 总结