5万字长文!搞定Spark方方面面(六)

简介: 5万字长文!搞定Spark方方面面
6.4 排序开窗函数

6.4.1 ROW_NUMBER顺序排序row_number() over(order by score) as rownum 表示按score 升序的方式来排序,并得出排序结果的序号

注意:在排序开窗函数中使用 PARTITION BY 子句需要放置在ORDER BY 子句之前。

示例1

spark.sql("select name, class, score, row_number() over(order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
|  a4|    2|   74|   3|
|  a2|    1|   78|   4|
| a10|    3|   78|   5|
|  a1|    1|   80|   6|
|  a5|    2|   92|   7|
|  a3|    1|   95|   8|
|  a6|    3|   99|   9|
|  a7|    3|   99|  10|
| a11|    3|  100|  11|
+----+-----+-----+----+
spark.sql("select name, class, score, row_number() over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+                                                         
|name|class|score|rank|
+----+-----+-----+----+
|  a2|    1|   78|   1|
|  a1|    1|   80|   2|
|  a3|    1|   95|   3|
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
| a10|    3|   78|   3|
|  a6|    3|   99|   4|
|  a7|    3|   99|   5|
| a11|    3|  100|   6|
|  a4|    2|   74|   1|
|  a5|    2|   92|   2|
+----+-----+-----+----+

6.4.2 RANK跳跃排序

rank() over(order by score) as rank表示按 score升序的方式来排序,并得出排序结果的排名号。
这个函数求出来的排名结果可以并列(并列第一/并列第二),并列排名之后的排名将是并列的排名加上并列数
简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第三名,也就是没有了第二名,但是有两个第一名

示例2

spark.sql("select name, class, score, rank() over(order by score) rank from scores").show()    
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
|  a4|    2|   74|   3|
| a10|    3|   78|   4|
|  a2|    1|   78|   4|
|  a1|    1|   80|   6|
|  a5|    2|   92|   7|
|  a3|    1|   95|   8|
|  a6|    3|   99|   9|
|  a7|    3|   99|   9|
| a11|    3|  100|  11|
+----+-----+-----+----+
spark.sql("select name, class, score, rank() over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+                                                         
|name|class|score|rank|
+----+-----+-----+----+
|  a2|    1|   78|   1|
|  a1|    1|   80|   2|
|  a3|    1|   95|   3|
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
| a10|    3|   78|   3|
|  a6|    3|   99|   4|
|  a7|    3|   99|   4|
| a11|    3|  100|   6|
|  a4|    2|   74|   1|
|  a5|    2|   92|   2|
+----+-----+-----+----+

6.4.3 DENSE_RANK连续排序

dense_rank() over(order by score) as dense_rank 表示按score 升序的方式来排序,并得出排序结果的排名号。
这个函数并列排名之后的排名是并列排名加1
简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第二名,也就是两个第一名,一个第二名

示例3

spark.sql("select name, class, score, dense_rank() over(order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
|  a4|    2|   74|   3|
|  a2|    1|   78|   4|
| a10|    3|   78|   4|
|  a1|    1|   80|   5|
|  a5|    2|   92|   6|
|  a3|    1|   95|   7|
|  a6|    3|   99|   8|
|  a7|    3|   99|   8|
| a11|    3|  100|   9|
+----+-----+-----+----+
spark.sql("select name, class, score, dense_rank() over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+                                                         
|name|class|score|rank|
+----+-----+-----+----+
|  a2|    1|   78|   1|
|  a1|    1|   80|   2|
|  a3|    1|   95|   3|
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
| a10|    3|   78|   3|
|  a6|    3|   99|   4|
|  a7|    3|   99|   4|
| a11|    3|  100|   5|
|  a4|    2|   74|   1|
|  a5|    2|   92|   2|
+----+-----+-----+----+

6.4.4 NTILE分组排名[了解]

ntile(6) over(order by score)as ntile表示按 score 升序的方式来排序,然后 6 等分成 6 个组,并显示所在组的序号。

示例4

spark.sql("select name, class, score, ntile(6) over(order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
|  a8|    3|   45|   1|
|  a9|    3|   55|   1|
|  a4|    2|   74|   2|
|  a2|    1|   78|   2|
| a10|    3|   78|   3|
|  a1|    1|   80|   3|
|  a5|    2|   92|   4|
|  a3|    1|   95|   4|
|  a6|    3|   99|   5|
|  a7|    3|   99|   5|
| a11|    3|  100|   6|
+----+-----+-----+----+
spark.sql("select name, class, score, ntile(6) over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+                                                         
|name|class|score|rank|
+----+-----+-----+----+
|  a2|    1|   78|   1|
|  a1|    1|   80|   2|
|  a3|    1|   95|   3|
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
| a10|    3|   78|   3|
|  a6|    3|   99|   4|
|  a7|    3|   99|   5|
| a11|    3|  100|   6|
|  a4|    2|   74|   1|
|  a5|    2|   92|   2|
+----+-----+-----+----+

7、Spark-On-Hive

7.1 概述

官网

http://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html

Configuration of Hive is done by placing your hive-site.xml, core-site.xml (for security configuration), and hdfs-site.xml (for HDFS configuration) file in conf/.

Hive查询流程及原理
执行HQL时,先到MySQL元数据库中查找描述信息,然后解析HQL并根据描述信息生成MR任务
Hive将SQL转成MapReduce执行速度慢
使用SparkSQL整合Hive其实就是让SparkSQL去加载Hive 的元数据库,然后通过SparkSQL执行引擎去操作Hive表内的数据
所以首先需要开启Hive的元数据库服务,让SparkSQL能够加载元数据
7.2 Hive开启MetaStore服务
1: 修改 hive/conf/hive-site.xml 新增如下配置
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
      <name>hive.metastore.warehouse.dir</name>
      <value>/user/hive/warehouse</value>
    </property>
    <property>
      <name>hive.metastore.local</name>
      <value>false</value>
    </property>
    <property>
      <name>hive.metastore.uris</name>
      <value>thrift://node01:9083</value>
    </property>
 </configuration>
2: 后台启动 Hive MetaStore服务
nohup /export/servers/hive/bin/hive --service metastore 2>&1 >> /var/log.log &
7.3 SparkSQL整合Hive MetaStore
Spark 有一个内置的 MateStore,使用 Derby 嵌入式数据库保存数据,但是这种方式不适合生产环境,因为这种模式同一时间只能有一个 SparkSession 使用,所以生产环境更推荐使用 Hive 的 MetaStore
SparkSQL 整合 Hive 的 MetaStore 主要思路就是要通过配置能够访问它, 并且能够使用 HDFS 保存 WareHouse,所以可以直接拷贝 Hadoop 和 Hive 的配置文件到 Spark 的配置目录
hive-site.xml 元数据仓库的位置等信息
core-site.xml 安全相关的配置
hdfs-site.xml HDFS 相关的配置
使用IDEA本地测试直接把以上配置文件放在resources目录即可
7.4 使用SparkSQL操作Hive表
package cn.itcast.sql
import org.apache.spark.sql.SparkSession
object HiveSupport {
  def main(args: Array[String]): Unit = {
    //创建sparkSession
    val spark = SparkSession
      .builder()
      .appName("HiveSupport")
      .master("local[*]")
      //.config("spark.sql.warehouse.dir", "hdfs://node01:8020/user/hive/warehouse")
      //.config("hive.metastore.uris", "thrift://node01:9083")
      .enableHiveSupport()//开启hive语法的支持
      .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    //查看有哪些表
    spark.sql("show tables").show()
    //创建表
    spark.sql("CREATE TABLE person (id int, name string, age int) row format delimited fields terminated by ' '")
    //加载数据,数据为当前SparkDemo项目目录下的person.txt(和src平级)
    spark.sql("LOAD DATA LOCAL INPATH 'SparkDemo/person.txt' INTO TABLE person")
    //查询数据
    spark.sql("select * from person ").show()
    spark.stop()
  }
}

五、Spark Streaming引入详解

1.1 新的场景需求

集群监控

一般的大型集群和平台, 都需要对其进行监控的需求。
要针对各种数据库, 包括 MySQL, HBase 等进行监控
要针对应用进行监控, 例如 Tomcat, Nginx, Node.js 等
要针对硬件的一些指标进行监控, 例如 CPU, 内存, 磁盘 等

640.png

还有很多很多

640.png

640.png

1.2 Spark Streaming介绍

官网

http://spark.apache.org/streaming/

概述

Spark Streaming是一个基于Spark Core之上的实时计算框架,
可以从很多数据源消费数据并对数据进行实时的处理,
具有高吞吐量和容错能力强等特点。

640.png

Spark Streaming的特点

1.易用
可以像编写离线批处理一样去编写流式程序,支持java/scala/python语言。
2.容错
SparkStreaming在没有额外代码和配置的情况下可以恢复丢失的工作。
3.易整合到Spark体系
流式处理与批处理和交互式查询相结合。

1.3 实时计算所处的位置

640.png

二、Spark Streaming原理

2.1 SparkStreaming原理

2.1.1 整体流程
Spark Streaming中,会有一个接收器组件Receiver,作为一个长期运行的task跑在一个Executor上。Receiver接收外部的数据流形成input DStream
DStream会被按照时间间隔划分成一批一批的RDD,当批处理间隔缩短到秒级时,便可以用于处理实时数据流。时间间隔的大小可以由参数指定,一般设在500毫秒到几秒之间。
对DStream进行操作就是对RDD进行操作,计算处理的结果可以传给外部系统。
Spark Streaming的工作流程像下面的图所示一样,接收到实时数据后,给数据分批次,然后传给Spark Engine(引擎)处理最后生成该批次的结果。

5f983223bbd7c8b19eebe51019285a72.png

2.1.2 数据抽象
Spark Streaming的基础抽象是DStream(Discretized Stream,离散化数据流,连续不断的数据流),代表持续性的数据流和经过各种Spark算子操作后的结果数据流

可以从以下多个角度深入理解DStream

1.DStream本质上就是一系列时间上连续的RDD

640.png

2.对DStream的数据的进行操作也是按照RDD为单位来进行的

640.png

3.容错性
底层RDD之间存在依赖关系,DStream直接也有依赖关系,RDD具有容错性,那么DStream也具有容错性
如图:每一个椭圆形表示一个RDD
椭圆形中的每个圆形代表一个RDD中的一个Partition分区
每一列的多个RDD表示一个DStream(图中有三列所以有三个DStream)
每一行最后一个RDD则表示每一个Batch Size所产生的中间结果RDD

640.png

4.准实时性/近实时性
Spark Streaming将流式计算分解成多个Spark Job,对于每一时间段数据的处理都会经过Spark DAG图分解以及Spark的任务集的调度过程。
对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~5秒钟之间
所以Spark Streaming能够满足流式准实时计算场景,对实时性要求非常高的如高频实时交易场景则不太适合

总结

简单来说DStream就是对RDD的封装,你对DStream进行操作,就是对RDD进行操作
对于DataFrame/DataSet/DStream来说本质上都可以理解成RDD

640.png

2.2 DStream相关操作

DStream上的操作与RDD的类似,分为以下两种:

Transformations(转换)
Output Operations(输出)/Action
2.2.1 Transformations
常见Transformation—无状态转换:每个批次的处理不依赖于之前批次的数据
Transformation Meaning
map(func) 对DStream中的各个元素进行func函数操作,然后返回一个新的DStream
flatMap(func) 与map方法类似,只不过各个输入项可以被输出为零个或多个输出项
filter(func) 过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream
union(otherStream) 将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.
reduceByKey(func, [numTasks]) 利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream
join(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W)类型的DStream
transform(func) 通过RDD-to-RDD函数作用于DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD

特殊的Transformations—有状态转换:当前批次的处理需要使用之前批次的数据或者中间结果。

有状态转换包括基于追踪状态变化的转换(updateStateByKey)和滑动窗口的转换

1.UpdateStateByKey(func)
2.Window Operations 窗口操作
2.2.2 Output/Action
Output Operations可以将DStream的数据输出到外部的数据库或文件系统
当某个Output Operations被调用时,spark streaming程序才会开始真正的计算过程(与RDD的Action类似)
Output Operation Meaning
print() 打印到控制台
saveAsTextFiles(prefix, [suffix]) 保存流的内容为文本文件,文件名为"prefix-TIME_IN_MS[.suffix]".
saveAsObjectFiles(prefix,[suffix]) 保存流的内容为SequenceFile,文件名为 “prefix-TIME_IN_MS[.suffix]”.
saveAsHadoopFiles(prefix,[suffix]) 保存流的内容为hadoop文件,文件名为"prefix-TIME_IN_MS[.suffix]".
foreachRDD(func) 对Dstream里面的每个RDD执行func

2.3 总结

640.png

相关文章
|
8月前
|
运维 前端开发 安全
万字长文搞懂产品模式和项目模式
万字长文搞懂产品模式和项目模式
97 0
|
11月前
|
信息无障碍 C++
万字长文,深度分析 c++的前景
万字长文,深度分析 c++的前景
105 0
|
存储 Java 编译器
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!3
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!
|
存储 JSON 安全
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!1
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!
|
存储 缓存 算法
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!2
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!
|
存储 前端开发 安全
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!4
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!
|
存储 缓存 分布式计算
5万字长文!搞定Spark方方面面(三)
5万字长文!搞定Spark方方面面
131 0
5万字长文!搞定Spark方方面面(三)
|
分布式计算 资源调度 算法
5万字长文!搞定Spark方方面面(一)
5万字长文!搞定Spark方方面面
329 0
5万字长文!搞定Spark方方面面(一)
|
消息中间件 存储 分布式计算
5万字长文!搞定Spark方方面面(七)
5万字长文!搞定Spark方方面面
203 0
5万字长文!搞定Spark方方面面(七)
|
分布式计算 资源调度 并行计算
5万字长文!搞定Spark方方面面(二)
5万字长文!搞定Spark方方面面
258 0
5万字长文!搞定Spark方方面面(二)